1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(replication): Use a ring buffer with messages to serve replication. (#1835)

* feat(replication): Use a ring buffer with messages to serve replication.

* Fix libraries dep graph

* Address PR feedback

* nits

* add a comment

* Lower the default log length
This commit is contained in:
Roy Jacobson 2023-09-18 13:59:41 +03:00 committed by GitHub
parent 1e61ec8114
commit db21b735f6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 127 additions and 65 deletions

View file

@ -17,6 +17,7 @@ endif()
add_library(dfly_transaction db_slice.cc malloc_stats.cc engine_shard_set.cc blocking_controller.cc
common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc
server_state.cc table.cc top_keys.cc transaction.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
${TX_LINUX_SRCS}
)
cxx_link(dfly_transaction dfly_core strings_lib)
@ -38,7 +39,6 @@ add_library(dragonfly_lib channel_store.cc command_registry.cc
detail/snapshot_storage.cc
set_family.cc stream_family.cc string_family.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc
cluster/cluster_family.cc acl/user.cc acl/user_registry.cc acl/acl_family.cc
acl/validator.cc acl/helpers.cc)

View file

@ -20,7 +20,7 @@ namespace dfly {
namespace {
// Build a CmdData from parts passed to absl::StrCat.
template <typename... Ts> journal::ParsedEntry::CmdData BuildFromParts(Ts... parts) {
vector<string> raw_parts{absl::StrCat(forward<Ts>(parts))...};
vector<string> raw_parts{absl::StrCat(std::forward<Ts>(parts))...};
auto cmd_str = accumulate(raw_parts.begin(), raw_parts.end(), std::string{});
auto buf = make_unique<char[]>(cmd_str.size());
@ -28,8 +28,8 @@ template <typename... Ts> journal::ParsedEntry::CmdData BuildFromParts(Ts... par
CmdArgVec slice_parts{};
size_t start = 0;
for (auto part : raw_parts) {
slice_parts.push_back(MutableSlice{buf.get() + start, part.size()});
for (const auto& part : raw_parts) {
slice_parts.emplace_back(buf.get() + start, part.size());
start += part.size();
}
@ -38,8 +38,9 @@ template <typename... Ts> journal::ParsedEntry::CmdData BuildFromParts(Ts... par
} // namespace
JournalExecutor::JournalExecutor(Service* service)
: service_{service}, reply_builder_{facade::ReplyMode::NONE}, conn_context_{nullptr, nullptr,
&reply_builder_} {
: service_{service},
reply_builder_{facade::ReplyMode::NONE},
conn_context_{nullptr, nullptr, &reply_builder_} {
conn_context_.is_replicating = true;
conn_context_.journal_emulated = true;
}

View file

@ -16,7 +16,7 @@ class Service;
// JournalExecutor allows executing journal entries.
class JournalExecutor {
public:
JournalExecutor(Service* service);
explicit JournalExecutor(Service* service);
~JournalExecutor();
JournalExecutor(JournalExecutor&&) = delete;

View file

@ -96,6 +96,14 @@ bool Journal::HasRegisteredCallbacks() const {
return journal_slice.HasRegisteredCallbacks();
}
bool Journal::IsLSNInBuffer(LSN lsn) const {
return journal_slice.IsLSNInBuffer(lsn);
}
std::string_view Journal::GetEntry(LSN lsn) const {
return journal_slice.GetEntry(lsn);
}
LSN Journal::GetLsn() const {
return journal_slice.cur_lsn();
}

View file

@ -39,6 +39,9 @@ class Journal {
void UnregisterOnChange(uint32_t id);
bool HasRegisteredCallbacks() const;
bool IsLSNInBuffer(LSN lsn) const;
std::string_view GetEntry(LSN lsn) const;
/*
void AddCmd(TxId txid, Op opcode, Span args) {
OpArgs(txid, opcode, args);

View file

@ -5,12 +5,19 @@
#include "server/journal/journal_slice.h"
#include <absl/container/inlined_vector.h>
#include <absl/flags/flag.h>
#include <absl/strings/escaping.h>
#include <absl/strings/str_cat.h>
#include <fcntl.h>
#include <filesystem>
#include "base/function2.hpp"
#include "base/logging.h"
#include "server/journal/serializer.h"
ABSL_FLAG(int, shard_repl_backlog_len, 1 << 10,
"The length of the circular replication log per shard");
namespace dfly {
namespace journal {
@ -34,12 +41,6 @@ string ShardName(std::string_view base, unsigned index) {
CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \
} while (false)
struct JournalSlice::RingItem {
LSN lsn;
TxId txid;
Op opcode;
};
JournalSlice::JournalSlice() {
}
@ -48,11 +49,11 @@ JournalSlice::~JournalSlice() {
}
void JournalSlice::Init(unsigned index) {
// if (ring_buffer_) // calling this function multiple times is allowed and it's a no-op.
// return;
if (ring_buffer_) // calling this function multiple times is allowed and it's a no-op.
return;
slice_index_ = index;
// ring_buffer_.emplace(128); // TODO: to make it configurable
ring_buffer_.emplace(absl::GetFlag(FLAGS_shard_repl_backlog_len));
}
#if 0
@ -116,20 +117,49 @@ error_code JournalSlice::Close() {
}
#endif
bool JournalSlice::IsLSNInBuffer(LSN lsn) const {
DCHECK(ring_buffer_);
if (ring_buffer_->empty()) {
return false;
}
return (*ring_buffer_)[0].lsn <= lsn && lsn <= ((*ring_buffer_)[ring_buffer_->size() - 1].lsn);
}
std::string_view JournalSlice::GetEntry(LSN lsn) const {
DCHECK(ring_buffer_ && IsLSNInBuffer(lsn));
auto start = (*ring_buffer_)[0].lsn;
DCHECK((*ring_buffer_)[lsn - start].lsn == lsn);
return (*ring_buffer_)[lsn - start].data;
}
void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
// DCHECK(ring_buffer_);
if (entry.opcode != Op::NOOP) {
lsn_++;
// TODO: This is preparation for AOC style journaling, currently unused.
DCHECK(ring_buffer_);
JournalItem dummy;
JournalItem* item;
if (entry.opcode == Op::NOOP) {
item = &dummy;
item->lsn = -1;
item->opcode = entry.opcode;
item->data = "";
} else {
// GetTail gives a pointer to a new tail entry in the buffer, possibly overriding the last entry
// if the buffer is full.
item = ring_buffer_->GetTail(true);
item->opcode = entry.opcode;
item->lsn = lsn_++;
io::BufSink buf_sink{&ring_serialize_buf_};
JournalWriter writer{&buf_sink};
writer.Write(entry);
item->data = io::View(ring_serialize_buf_.InputBuffer());
ring_serialize_buf_.Clear();
VLOG(2) << "Writing item [" << item->lsn << "]: " << entry.ToString();
}
#if 0
RingItem item;
item.lsn = prev_lsn;
item.opcode = entry.opcode;
item.txid = entry.txid;
VLOG(1) << "Writing item [" << item.lsn << "]: " << entry.ToString();
ring_buffer_->EmplaceOrOverride(move(item));
if (shard_file_) {
string line = absl::StrCat(item.lsn, " ", entry.txid, " ", entry.opcode, "\n");
error_code ec = shard_file_->Write(io::Buffer(line), file_offset_, 0);
@ -137,15 +167,15 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
file_offset_ += line.size();
}
#endif
}
// TODO: Remove the callbacks, replace with notifiers
{
std::shared_lock lk(cb_mu_);
DVLOG(2) << "AddLogRecord: run callbacks for " << entry.ToString()
<< " num callbacks: " << change_cb_arr_.size();
for (const auto& k_v : change_cb_arr_) {
k_v.second(entry, await);
k_v.second(*item, await);
}
}
}

View file

@ -45,19 +45,28 @@ class JournalSlice {
void AddLogRecord(const Entry& entry, bool await);
// Register a callback that will be called every time a new entry is
// added to the journal.
// The callback receives the entry and a boolean that indicates whether
// awaiting (to apply backpressure) is allowed.
uint32_t RegisterOnChange(ChangeCallback cb);
void UnregisterOnChange(uint32_t);
bool HasRegisteredCallbacks() const {
std::shared_lock lk(cb_mu_);
return !change_cb_arr_.empty();
}
private:
struct RingItem;
/// Returns whether the journal entry with this LSN is available
/// from the buffer.
bool IsLSNInBuffer(LSN lsn) const;
std::string_view GetEntry(LSN lsn) const;
private:
// std::string shard_path_;
// std::unique_ptr<LinuxFile> shard_file_;
// std::optional<base::RingBuffer<RingItem>> ring_buffer_;
std::optional<base::RingBuffer<JournalItem>> ring_buffer_;
base::IoBuf ring_serialize_buf_;
mutable util::SharedMutex cb_mu_;
std::vector<std::pair<uint32_t, ChangeCallback>> change_cb_arr_ ABSL_GUARDED_BY(cb_mu_);

View file

@ -4,8 +4,11 @@
#include "server/journal/serializer.h"
#include <system_error>
#include "base/io_buf.h"
#include "base/logging.h"
#include "glog/logging.h"
#include "io/io.h"
#include "server/common.h"
#include "server/error.h"

View file

@ -10,12 +10,12 @@ using namespace util;
void JournalStreamer::Start(io::Sink* dest) {
using namespace journal;
write_fb_ = fb2::Fiber("journal_stream", &JournalStreamer::WriterFb, this, dest);
journal_cb_id_ = journal_->RegisterOnChange([this](const Entry& entry, bool allow_await) {
if (entry.opcode == Op::NOOP) {
// No recode to write, just await if data was written so consumer will read the data.
journal_cb_id_ = journal_->RegisterOnChange([this](const JournalItem& item, bool allow_await) {
if (item.opcode == Op::NOOP) {
// No record to write, just await if data was written so consumer will read the data.
return AwaitIfWritten();
}
writer_.Write(entry);
Write(io::Buffer(item.data));
NotifyWritten(allow_await);
});
}

View file

@ -40,7 +40,6 @@ class JournalStreamer : protected BufferedStreamerBase {
journal::Journal* journal_;
Fiber write_fb_{};
JournalWriter writer_{this};
};
} // namespace dfly

View file

@ -69,7 +69,13 @@ struct ParsedEntry : public EntryBase {
std::string ToString() const;
};
using ChangeCallback = std::function<void(const Entry&, bool await)>;
struct JournalItem {
LSN lsn;
Op opcode;
std::string data;
};
using ChangeCallback = std::function<void(const JournalItem&, bool await)>;
} // namespace journal
} // namespace dfly

View file

@ -4,6 +4,8 @@
#include "server/rdb_load.h"
#include "absl/strings/escaping.h"
extern "C" {
#include "redis/intset.h"
@ -2147,20 +2149,28 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service) {
while (done < num_entries) {
journal::ParsedEntry entry{};
SET_OR_RETURN(journal_reader_.ReadEntry(), entry);
done++;
if (!entry.cmd.cmd_args.empty()) {
if (absl::EqualsIgnoreCase(facade::ToSV(entry.cmd.cmd_args[0]), "FLUSHALL") ||
absl::EqualsIgnoreCase(facade::ToSV(entry.cmd.cmd_args[0]), "FLUSHDB")) {
// Applying a flush* operation in the middle of a load can cause out-of-sync deletions of
// data that should not be deleted, see https://github.com/dragonflydb/dragonfly/issues/1231
// By returning an error we are effectively restarting the replication.
return RdbError(errc::unsupported_operation);
}
// EXEC entries are just for preserving atomicity of transactions. We don't create
// transactions and we don't care about atomicity when we're loading an RDB, so skip them.
// Currently rdb_save also filters those records out, but we filter them additionally here
// for better forward compatibility if we decide to change that.
if (entry.opcode == journal::Op::EXEC)
continue;
if (entry.cmd.cmd_args.empty())
return RdbError(errc::rdb_file_corrupted);
if (absl::EqualsIgnoreCase(facade::ToSV(entry.cmd.cmd_args[0]), "FLUSHALL") ||
absl::EqualsIgnoreCase(facade::ToSV(entry.cmd.cmd_args[0]), "FLUSHDB")) {
// Applying a flush* operation in the middle of a load can cause out-of-sync deletions of
// data that should not be deleted, see https://github.com/dragonflydb/dragonfly/issues/1231
// By returning an error we are effectively restarting the replication.
return RdbError(errc::unsupported_operation);
}
VLOG(1) << "Executing item: " << entry.ToString();
ex.Execute(entry.dbid, entry.cmd);
VLOG(1) << "Reading item: " << entry.ToString();
done++;
}
return std::error_code{};

View file

@ -733,15 +733,10 @@ io::Bytes RdbSerializer::PrepareFlush() {
return mem_buf_.InputBuffer();
}
error_code RdbSerializer::WriteJournalEntry(const journal::Entry& entry) {
io::BufSink buf_sink{&journal_mem_buf_};
JournalWriter writer{&buf_sink};
writer.Write(entry);
error_code RdbSerializer::WriteJournalEntry(std::string_view serialized_entry) {
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_BLOB));
RETURN_ON_ERR(SaveLen(1));
RETURN_ON_ERR(SaveString(io::View(journal_mem_buf_.InputBuffer())));
journal_mem_buf_.Clear();
RETURN_ON_ERR(SaveString(serialized_entry));
return error_code{};
}

View file

@ -158,7 +158,7 @@ class RdbSerializer {
}
// Write journal entry as an embedded journal blob.
std::error_code WriteJournalEntry(const journal::Entry& entry);
std::error_code WriteJournalEntry(std::string_view entry);
std::error_code SendJournalOffset(uint64_t journal_offset);
@ -188,7 +188,6 @@ class RdbSerializer {
void AllocateCompressorOnce();
base::IoBuf mem_buf_;
base::IoBuf journal_mem_buf_;
std::string tmp_str_;
base::PODArray<uint8_t> tmp_buf_;
DbIndex last_entry_db_index_ = kInvalidDbId;

View file

@ -260,14 +260,13 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
// transaction.
// OnJournalEntry registers for changes in journal, the journal change function signature is
// (const journal::Entry& entry, bool await) In snapshot flow we dont use the await argument.
void SliceSnapshot::OnJournalEntry(const journal::Entry& entry, bool unused_await_arg) {
// We ignore non payload entries like EXEC because we have no transactional ordering during
// LOAD phase on replica.
if (!entry.HasPayload()) {
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool unused_await_arg) {
// We ignore EXEC and NOOP entries because we they have no meaning during
// the LOAD phase on replica.
if (item.opcode == journal::Op::NOOP || item.opcode == journal::Op::EXEC)
return;
}
serializer_->WriteJournalEntry(entry);
serializer_->WriteJournalEntry(item.data);
// This is the only place that flushes in streaming mode
// once the iterate buckets fiber finished.

View file

@ -92,7 +92,7 @@ class SliceSnapshot {
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
// Journal listener
void OnJournalEntry(const journal::Entry& entry, bool unused_await_arg);
void OnJournalEntry(const journal::JournalItem& item, bool unused_await_arg);
// Close dest channel if not closed yet.
void CloseRecordChannel();