Skip to content

Commit

Permalink
Support identify-binlog-type option for lower version pika upgrade us…
Browse files Browse the repository at this point in the history
…age (#812)
  • Loading branch information
whoiami authored Nov 25, 2019
1 parent 8adee08 commit 8d83b99
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 3 deletions.
5 changes: 5 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ double-master-server-id :
write-binlog : yes
# binlog file size: default is 100M, limited in [1K, 2G]
binlog-file-size : 104857600
# When it becomes slave, the type of binlog it receives from the master
# if this option is set to 'new', that means I will be a slave to Pika who's version 3.0
# if this opsion is set to 'old', that means I will be a slave to Pika who's version 2.3.3 ~ 2.3.8
# identify-binlog-type [new | old]
identify-binlog-type : new
# Automatically triggers a small compaction according statistics
# Use the cache to store up to 'max-cache-statistic-keys' keys
# if 'max-cache-statistic-keys' set to '0', that means turn off the statistics function
Expand Down
10 changes: 8 additions & 2 deletions include/pika_binlog_receiver_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "slash/include/env.h"
#include "include/pika_define.h"
#include "include/pika_binlog_receiver_conn.h"
#include "include/pika_master_conn.h"
#include "include/pika_command.h"
#include "include/pika_conf.h"

Expand Down Expand Up @@ -49,8 +50,13 @@ class PikaBinlogReceiverThread {
pink::Thread *thread,
void* worker_specific_data,
pink::PinkEpoll* pink_epoll) const override {
LOG(INFO) << "Master conn factory creat pika binlog conn ip_port" << ip_port;
return std::make_shared<PikaBinlogReceiverConn>(connfd, ip_port, binlog_receiver_);
if (g_pika_conf->identify_binlog_type() == "old") {
LOG(INFO) << "Master conn factory create pika master conn";
return std::make_shared<PikaMasterConn>(connfd, ip_port, binlog_receiver_);
} else {
LOG(INFO) << "Master conn factory creat pika binlog conn ip_port" << ip_port;
return std::make_shared<PikaBinlogReceiverConn>(connfd, ip_port, binlog_receiver_);
}
}

private:
Expand Down
7 changes: 7 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class PikaConf : public slash::BaseConf {
std::string slaveof() {RWLock l(&rwlock_, false); return slaveof_;}
int slave_priority() {RWLock l(&rwlock_, false); return slave_priority_;}
bool write_binlog() {RWLock l(&rwlock_, false); return write_binlog_;}
std::string identify_binlog_type() {RWLock l(&rwlock_, false); return identify_binlog_type_;}
int thread_num() { RWLock l(&rwlock_, false); return thread_num_; }
int thread_pool_size() { RWLock l(&rwlock_, false); return thread_pool_size_; }
int sync_thread_num() { RWLock l(&rwlock_, false); return sync_thread_num_; }
Expand Down Expand Up @@ -120,6 +121,11 @@ class PikaConf : public slash::BaseConf {
TryPushDiffCommands("write-binlog", value);
write_binlog_ = (value == "yes") ? true : false;
}
void SetIdentifyBinlogType(const std::string& value) {
RWLock l(&rwlock_, true);
TryPushDiffCommands("identify-binlog-type", value);
identify_binlog_type_ = value;
}
void SetMaxCacheStatisticKeys(const int value) {
RWLock l(&rwlock_, true);
TryPushDiffCommands("max-cache-statistic-keys", std::to_string(value));
Expand Down Expand Up @@ -264,6 +270,7 @@ class PikaConf : public slash::BaseConf {
std::string bgsave_path_;
std::string bgsave_prefix_;
std::string pidfile_;
std::string identify_binlog_type_;

//char pidfile_[PIKA_WORD_SIZE];
std::string compression_;
Expand Down
23 changes: 23 additions & 0 deletions include/pika_master_conn.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_MASTER_CONN_H_
#define PIKA_MASTER_CONN_H_

#include "pink/include/redis_conn.h"
#include "include/pika_command.h"

class PikaBinlogReceiverThread;

class PikaMasterConn: public pink::RedisConn {
public:
PikaMasterConn(int fd, std::string ip_port, void* worker_specific_data);
virtual int DealMessage(const PikaCmdArgsType& argv, std::string* response);
private:
bool is_first_send_;
PikaBinlogReceiverThread* binlog_receiver_;
};

#endif
21 changes: 20 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,12 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeInt32(&config_body, g_pika_conf->binlog_file_size());
}

if (slash::stringmatch(pattern.data(), "identify-binlog-type", 1)) {
elements += 2;
EncodeString(&config_body, "identify-binlog-type");
EncodeString(&config_body, g_pika_conf->identify_binlog_type());
}

if (slash::stringmatch(pattern.data(), "compression", 1)) {
elements += 2;
EncodeString(&config_body, "compression");
Expand Down Expand Up @@ -1309,7 +1315,7 @@ void ConfigCmd::ConfigGet(std::string &ret) {
void ConfigCmd::ConfigSet(std::string& ret) {
std::string set_item = config_args_v_[1];
if (set_item == "*") {
ret = "*23\r\n";
ret = "*24\r\n";
EncodeString(&ret, "loglevel");
EncodeString(&ret, "timeout");
EncodeString(&ret, "requirepass");
Expand All @@ -1327,6 +1333,7 @@ void ConfigCmd::ConfigSet(std::string& ret) {
EncodeString(&ret, "slowlog-max-len");
EncodeString(&ret, "slave-read-only");
EncodeString(&ret, "write-binlog");
EncodeString(&ret, "identify-binlog-type");
EncodeString(&ret, "max-cache-statistic-keys");
EncodeString(&ret, "small-compaction-threshold");
EncodeString(&ret, "db-sync-speed");
Expand Down Expand Up @@ -1455,6 +1462,18 @@ void ConfigCmd::ConfigSet(std::string& ret) {
}
g_pika_conf->SetSlaveReadOnly(is_readonly);
ret = "+OK\r\n";
} else if (set_item == "identify-binlog-type") {
int role = g_pika_server->role();
if (role == PIKA_ROLE_SLAVE || role == PIKA_ROLE_DOUBLE_MASTER) {
ret = "-ERR need to close master-slave or double-master mode first\r\n";
return;
} else if (value != "new" && value != "old") {
ret = "-ERR invalid identify-binlog-type (new or old)\r\n";
return;
} else {
g_pika_conf->SetIdentifyBinlogType(value);
ret = "+OK\r\n";
}
} else if (set_item == "max-cache-statistic-keys") {
if (!slash::string2l(value.data(), value.size(), &ival) || ival < 0) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-cache-statistic-keys'\r\n";
Expand Down
5 changes: 5 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ int PikaConf::Load()
std::string wb;
GetConfStr("write-binlog", &wb);
write_binlog_ = (wb == "no") ? false : true;
GetConfStr("identify-binlog-type", &identify_binlog_type_);
if (identify_binlog_type_ != "new" && identify_binlog_type_ != "old") {
identify_binlog_type_ = "new";
}
GetConfInt("binlog-file-size", &binlog_file_size_);
if (binlog_file_size_ < 1024
|| static_cast<int64_t>(binlog_file_size_) > (1024LL * 1024 * 1024)) {
Expand Down Expand Up @@ -360,6 +364,7 @@ int PikaConf::ConfigRewrite() {

SetConfStr("write-binlog", write_binlog_ ? "yes" : "no");
SetConfInt("binlog-file-size", binlog_file_size_);
SetConfStr("identify-binlog-type", identify_binlog_type_);
SetConfStr("compression", compression_);
SetConfInt("max-cache-statistic-keys", max_cache_statistic_keys_);
SetConfInt("small-compaction-threshold", small_compaction_threshold_);
Expand Down
120 changes: 120 additions & 0 deletions src/pika_master_conn.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include "include/pika_master_conn.h"

#include <glog/logging.h>

#include "slash/include/slash_string.h"
#include "slash/include/slash_coding.h"
#include "include/pika_server.h"
#include "include/pika_conf.h"
#include "include/pika_binlog_receiver_thread.h"

extern PikaServer* g_pika_server;
extern PikaConf* g_pika_conf;

PikaMasterConn::PikaMasterConn(int fd, std::string ip_port,
void* worker_specific_data)
: RedisConn(fd, ip_port, NULL) {
is_first_send_ = true;
binlog_receiver_ =
reinterpret_cast<PikaBinlogReceiverThread*>(worker_specific_data);
}

int PikaMasterConn::DealMessage(
const PikaCmdArgsType& _argv, std::string* response) {
PikaCmdArgsType argv = _argv;
// no reply
// eq set_is_reply(false);
if (argv.empty()) {
return -2;
}

g_pika_server->UpdateQueryNumAndExecCountTable(argv[0]);
// Auth
if (is_first_send_) {
if (argv.size() == 2 && argv[0] == "auth") {
if (argv[1] == std::to_string(g_pika_server->sid())) {
is_first_send_ = false;
LOG(INFO) << "BinlogReceiverThread AccessHandle succeeded, My server id: " << g_pika_server->sid() << " Master auth server id: " << argv[1];
return 0;
}
LOG(INFO) << "BinlogReceiverThread AccessHandle failed, My server id: " << g_pika_server->sid() << " Master auth server id: " << argv[1];
}
return -2;
}

// TODO(shq) maybe monitor do not need these infomation
BinlogItem binlog_item;
std::string server_id;
std::string binlog_info;
if (!g_pika_server->DoubleMasterMode()) {
if (argv.size() > 4 &&
*(argv.end() - 4) == kPikaBinlogMagic) {
// Record new binlog format
argv.pop_back(); // send_to_hub flag

binlog_info = argv.back(); // binlog_info
argv.pop_back();
uint32_t exec_time = 0;
uint32_t filenum = 0;
uint64_t offset = 0;
slash::GetFixed32(&binlog_info, &exec_time);
slash::GetFixed32(&binlog_info, &filenum);
slash::GetFixed64(&binlog_info, &offset);
binlog_item.set_exec_time(exec_time);
binlog_item.set_filenum(filenum);
binlog_item.set_offset(offset);

server_id = argv.back(); // server_id
argv.pop_back();
binlog_item.set_server_id(std::atoi(server_id.c_str()));

argv.pop_back(); // kPikaBinlogMagic
}
}

// Monitor related
std::string monitor_message;
if (g_pika_server->HasMonitorClients()) {
std::string monitor_message = std::to_string(1.0*slash::NowMicros()/1000000)
+ " [" + this->ip_port() + "]";
for (PikaCmdArgsType::iterator iter = argv.begin(); iter != argv.end(); iter++) {
monitor_message += " " + slash::ToRead(*iter);
}
g_pika_server->AddMonitorMessage(monitor_message);
}

bool is_readonly = g_pika_server->readonly();

// Here, the binlog dispatch thread, instead of the binlog bgthread takes on the task to write binlog
// Only when the server is readonly
uint64_t serial = binlog_receiver_->GetnPlusSerial();
if (is_readonly) {
if (!g_pika_server->WaitTillBinlogBGSerial(serial)) {
return -2;
}
std::string opt = slash::StringToLower(argv[0]);
Cmd* c_ptr = binlog_receiver_->GetCmd(opt);

g_pika_server->logger_->Lock();
g_pika_server->logger_->Put(c_ptr->ToBinlog(argv,
binlog_item.exec_time(),
std::to_string(binlog_item.server_id()),
binlog_item.logic_id(),
binlog_item.filenum(),
binlog_item.offset()));
g_pika_server->logger_->Unlock();
g_pika_server->SignalNextBinlogBGSerial();
}

PikaCmdArgsType *v = new PikaCmdArgsType(argv);
BinlogItem *b = new BinlogItem(binlog_item);
std::string dispatch_key = argv.size() >= 2 ? argv[1] : argv[0];
g_pika_server->DispatchBinlogBG(dispatch_key, v, b, serial, is_readonly);

return 0;
}

0 comments on commit 8d83b99

Please sign in to comment.