1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

feat(server): add dfly replica offset command (#780)

* feat(server): add dfly replica offset command

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-02-13 11:11:33 +02:00 committed by GitHub
parent e7a5d583d0
commit 9c9ae84493
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 136 additions and 77 deletions

View file

@ -21,7 +21,7 @@ add_library(dragonfly_lib channel_slice.cc command_registry.cc
snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc
set_family.cc stream_family.cc string_family.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc)
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc)
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib http_client_lib
absl::random_random TRDP::jsoncons zstd TRDP::lz4)

View file

@ -12,9 +12,8 @@
#include "facade/dragonfly_connection.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/io_utils.h"
#include "server/journal/journal.h"
#include "server/journal/serializer.h"
#include "server/journal/streamer.h"
#include "server/rdb_save.h"
#include "server/script_mgr.h"
#include "server/server_family.h"
@ -29,7 +28,6 @@ namespace dfly {
using namespace facade;
using namespace std;
using namespace util::fibers_ext;
using util::ProactorBase;
namespace {
@ -60,63 +58,6 @@ struct TransactionGuard {
Transaction* t;
};
// Buffered single-shard journal streamer that listens for journal changes with a
// journal listener and writes them to a destination sink in a separate fiber.
class JournalStreamer : protected BufferedStreamerBase {
public:
JournalStreamer(journal::Journal* journal, Context* cntx)
: BufferedStreamerBase{cntx->GetCancellation()}, cntx_{cntx},
journal_cb_id_{0}, journal_{journal}, write_fb_{}, writer_{this} {
}
// Self referential.
JournalStreamer(const JournalStreamer& other) = delete;
JournalStreamer(JournalStreamer&& other) = delete;
// Register journal listener and start writer in fiber.
void Start(io::Sink* dest);
// Must be called on context cancellation for unblocking
// and manual cleanup.
void Cancel();
private:
// Writer fiber that steals buffer contents and writes them to dest.
void WriterFb(io::Sink* dest);
private:
Context* cntx_;
uint32_t journal_cb_id_;
journal::Journal* journal_;
Fiber write_fb_;
JournalWriter writer_;
};
void JournalStreamer::Start(io::Sink* dest) {
write_fb_ = Fiber(&JournalStreamer::WriterFb, this, dest);
journal_cb_id_ = journal_->RegisterOnChange([this](const journal::Entry& entry) {
writer_.Write(entry);
NotifyWritten();
});
}
void JournalStreamer::Cancel() {
journal_->UnregisterOnChange(journal_cb_id_);
Finalize();
if (write_fb_.IsJoinable())
write_fb_.Join();
}
void JournalStreamer::WriterFb(io::Sink* dest) {
if (auto ec = ConsumeIntoSink(dest); ec) {
cntx_->ReportError(ec);
}
}
} // namespace
DflyCmd::ReplicaRoleInfo::ReplicaRoleInfo(std::string address, uint32_t listening_port,
@ -175,6 +116,10 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
return Expire(args, cntx);
}
if (sub_cmd == "REPLICAOFFSET" && args.size() == 3) {
return ReplicaOffset(args, cntx);
}
rb->SendError(kSyntaxErr);
}
@ -303,7 +248,8 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
absl::InsecureBitGen gen;
string eof_token = GetRandomHex(gen, 40);
replica_ptr->flows[flow_id] = FlowInfo{cntx->owner(), eof_token};
replica_ptr->flows[flow_id].conn = cntx->owner();
replica_ptr->flows[flow_id].eof_token = eof_token;
listener_->Migrate(cntx->owner(), shard_set->pool()->at(flow_id));
rb->StartArray(2);
@ -392,6 +338,28 @@ void DflyCmd::Expire(CmdArgList args, ConnectionContext* cntx) {
return rb->SendOk();
}
void DflyCmd::ReplicaOffset(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
string_view sync_id_str = ArgS(args, 2);
VLOG(1) << "Got DFLY REPLICAOFFSET " << sync_id_str;
auto [sync_id, replica_ptr] = GetReplicaInfoOrReply(sync_id_str, rb);
if (!sync_id)
return;
string result;
unique_lock lk(replica_ptr->mu);
rb->StartArray(replica_ptr->flows.size());
for (size_t flow_id = 0; flow_id < replica_ptr->flows.size(); ++flow_id) {
JournalStreamer* streamer = replica_ptr->flows[flow_id].streamer.get();
if (streamer) {
rb->SendLong(streamer->GetRecordCount());
} else {
rb->SendLong(0);
}
}
}
OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
DCHECK(!flow->full_sync_fb.IsJoinable());
@ -431,18 +399,17 @@ void DflyCmd::StopFullSyncInThread(FlowInfo* flow, EngineShard* shard) {
OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
// Create streamer for shard flows.
JournalStreamer* streamer = nullptr;
if (shard != nullptr) {
streamer = new JournalStreamer{sf_->journal(), cntx};
streamer->Start(flow->conn->socket());
flow->streamer.reset(new JournalStreamer(sf_->journal(), cntx));
flow->streamer->Start(flow->conn->socket());
}
// Register cleanup.
flow->cleanup = [this, streamer, flow]() {
flow->cleanup = [this, flow]() {
flow->TryShutdownSocket();
if (streamer) {
streamer->Cancel();
delete streamer;
if (flow->streamer) {
flow->streamer->Cancel();
}
};
@ -654,4 +621,10 @@ void DflyCmd::FlowInfo::TryShutdownSocket() {
}
}
DflyCmd::FlowInfo::~FlowInfo() {
}
DflyCmd::FlowInfo::FlowInfo() {
}
} // namespace dfly

View file

@ -27,10 +27,7 @@ namespace dfly {
class EngineShardSet;
class ServerFamily;
class RdbSaver;
namespace journal {
class Journal;
} // namespace journal
class JournalStreamer;
// DflyCmd is responsible for managing replication. A master instance can be connected
// to many replica instances, what is more, each of them can open multiple connections.
@ -83,10 +80,8 @@ class DflyCmd {
// Stores information related to a single flow.
struct FlowInfo {
FlowInfo() = default;
FlowInfo(facade::Connection* conn, const std::string& eof_token)
: conn{conn}, eof_token{eof_token} {};
FlowInfo();
~FlowInfo();
// Shutdown associated socket if its still open.
void TryShutdownSocket();
@ -94,6 +89,7 @@ class DflyCmd {
util::fibers_ext::Fiber full_sync_fb; // Full sync fiber.
std::unique_ptr<RdbSaver> saver; // Saver used by the full sync phase.
std::unique_ptr<JournalStreamer> streamer;
std::string eof_token;
std::function<void()> cleanup; // Optional cleanup for cancellation.
@ -164,6 +160,10 @@ class DflyCmd {
// Check all keys for expiry.
void Expire(CmdArgList args, ConnectionContext* cntx);
// REPLICAOFFSET
// Return journal records num sent for each flow of replication.
void ReplicaOffset(CmdArgList args, ConnectionContext* cntx);
// Start full sync in thread. Start FullSyncFb. Called for each flow.
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);

View file

@ -0,0 +1,36 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/journal/streamer.h"
namespace dfly {
void JournalStreamer::Start(io::Sink* dest) {
write_fb_ = util::fibers_ext::Fiber(&JournalStreamer::WriterFb, this, dest);
journal_cb_id_ = journal_->RegisterOnChange([this](const journal::Entry& entry) {
writer_.Write(entry);
record_cnt_.fetch_add(1, std::memory_order_relaxed);
NotifyWritten();
});
}
uint64_t JournalStreamer::GetRecordCount() const {
return record_cnt_.load(std::memory_order_relaxed);
}
void JournalStreamer::Cancel() {
journal_->UnregisterOnChange(journal_cb_id_);
Finalize();
if (write_fb_.IsJoinable())
write_fb_.Join();
}
void JournalStreamer::WriterFb(io::Sink* dest) {
if (auto ec = ConsumeIntoSink(dest); ec) {
cntx_->ReportError(ec);
}
}
} // namespace dfly

View file

@ -0,0 +1,50 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include "server/io_utils.h"
#include "server/journal/journal.h"
#include "server/journal/serializer.h"
#include "util/fibers/fiber.h"
namespace dfly {
// Buffered single-shard journal streamer that listens for journal changes with a
// journal listener and writes them to a destination sink in a separate fiber.
class JournalStreamer : protected BufferedStreamerBase {
public:
JournalStreamer(journal::Journal* journal, Context* cntx)
: BufferedStreamerBase{cntx->GetCancellation()}, cntx_{cntx},
journal_cb_id_{0}, journal_{journal}, write_fb_{}, writer_{this} {
}
// Self referential.
JournalStreamer(const JournalStreamer& other) = delete;
JournalStreamer(JournalStreamer&& other) = delete;
// Register journal listener and start writer in fiber.
void Start(io::Sink* dest);
// Must be called on context cancellation for unblocking
// and manual cleanup.
void Cancel();
uint64_t GetRecordCount() const;
private:
// Writer fiber that steals buffer contents and writes them to dest.
void WriterFb(io::Sink* dest);
private:
Context* cntx_;
uint32_t journal_cb_id_;
journal::Journal* journal_;
util::fibers_ext::Fiber write_fb_;
JournalWriter writer_;
std::atomic_uint64_t record_cnt_;
};
} // namespace dfly