From 7788600c9b694d051e92b68fe54e91bea39c1fc6 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Fri, 30 Dec 2022 16:18:37 +0300 Subject: [PATCH] feat(server): Buffered journal serializers (#619) --- src/server/dflycmd.cc | 17 ++- src/server/dflycmd.h | 2 +- src/server/journal/serializer.cc | 182 ++++++++++++++++------------ src/server/journal/serializer.h | 54 +++++---- src/server/journal_test.cc | 10 +- src/server/rdb_load.cc | 45 +++---- src/server/rdb_load.h | 2 +- src/server/rdb_save.cc | 20 +-- src/server/rdb_save.h | 4 + src/server/replica.cc | 4 +- src/server/serializer_commons.cc | 66 +++++----- src/server/serializer_commons.h | 19 ++- tests/dragonfly/replication_test.py | 4 +- 13 files changed, 240 insertions(+), 189 deletions(-) diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index bceb81b3c..555979a50 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -290,7 +290,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) { FlowInfo* flow = &replica_ptr->flows[index]; StopFullSyncInThread(flow, shard); - status = StartStableSyncInThread(flow, shard); + status = StartStableSyncInThread(flow, &replica_ptr->cntx, shard); return OpStatus::OK; }; shard_set->pool()->AwaitFiberOnAll(std::move(cb)); @@ -350,18 +350,23 @@ void DflyCmd::StopFullSyncInThread(FlowInfo* flow, EngineShard* shard) { flow->saver.reset(); } -OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, EngineShard* shard) { +OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { // Register journal listener and cleanup. uint32_t cb_id = 0; + JournalWriter* writer = nullptr; if (shard != nullptr) { - JournalWriter writer{flow->conn->socket()}; - auto journal_cb = [flow, writer = std::move(writer)](const journal::Entry& je) mutable { - writer.Write(je); + writer = new JournalWriter{}; + auto journal_cb = [flow, cntx, writer](const journal::Entry& je) mutable { + writer->Write(je); + if (auto ec = writer->Flush(flow->conn->socket()); ec) + cntx->ReportError(ec); }; cb_id = sf_->journal()->RegisterOnChange(std::move(journal_cb)); } - flow->cleanup = [flow, this, cb_id]() { + flow->cleanup = [this, cb_id, writer, flow]() { + if (writer) + delete writer; if (cb_id) sf_->journal()->UnregisterOnChange(cb_id); flow->TryShutdownSocket(); diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 3603f9b60..cc6d1a5f1 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -159,7 +159,7 @@ class DflyCmd { void StopFullSyncInThread(FlowInfo* flow, EngineShard* shard); // Start stable sync in thread. Called for each flow. - facade::OpStatus StartStableSyncInThread(FlowInfo* flow, EngineShard* shard); + facade::OpStatus StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); // Fiber that runs full sync for each flow. void FullSyncFb(FlowInfo* flow, Context* cntx); diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index ae337100b..789d6dcd8 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -18,154 +18,180 @@ using namespace std; namespace dfly { -JournalWriter::JournalWriter(io::Sink* sink, std::optional dbid) - : sink_{sink}, cur_dbid_{dbid} { +std::error_code JournalWriter::Flush(io::Sink* sink) { + if (auto ec = sink->Write(buf_.InputBuffer()); ec) + return ec; + buf_.Clear(); + return {}; } -error_code JournalWriter::Write(uint64_t v) { +base::IoBuf& JournalWriter::Accumulated() { + return buf_; +} + +void JournalWriter::Write(uint64_t v) { uint8_t buf[10]; unsigned len = WritePackedUInt(v, buf); - return sink_->Write(io::Bytes{buf, len}); + buf_.EnsureCapacity(sizeof(buf)); + memcpy(buf_.AppendBuffer().data(), buf, len); + buf_.CommitWrite(len); } -error_code JournalWriter::Write(std::string_view sv) { - RETURN_ON_ERR(Write(sv.size())); - return sink_->Write(io::Buffer(sv)); +void JournalWriter::Write(std::string_view sv) { + Write(sv.size()); + buf_.EnsureCapacity(sv.size()); + memcpy(buf_.AppendBuffer().data(), sv.data(), sv.size()); + buf_.CommitWrite(sv.size()); } -error_code JournalWriter::Write(CmdArgList args) { - RETURN_ON_ERR(Write(args.size())); +void JournalWriter::Write(CmdArgList args) { + Write(args.size()); for (auto v : args) - RETURN_ON_ERR(Write(facade::ToSV(v))); - - return std::error_code{}; + Write(facade::ToSV(v)); } -error_code JournalWriter::Write(std::pair args) { +void JournalWriter::Write(std::pair args) { auto [cmd, tail_args] = args; - RETURN_ON_ERR(Write(1 + tail_args.size())); - RETURN_ON_ERR(Write(cmd)); + Write(1 + tail_args.size()); + Write(cmd); for (auto v : tail_args) - RETURN_ON_ERR(Write(v)); - - return std::error_code{}; + Write(v); } -error_code JournalWriter::Write(std::monostate) { - return std::error_code{}; +void JournalWriter::Write(std::monostate) { } -error_code JournalWriter::Write(const journal::Entry& entry) { +void JournalWriter::Write(const journal::Entry& entry) { // Check if entry has a new db index and we need to emit a SELECT entry. if (entry.opcode != journal::Op::SELECT && (!cur_dbid_ || entry.dbid != *cur_dbid_)) { - RETURN_ON_ERR(Write(journal::Entry{journal::Op::SELECT, entry.dbid})); + Write(journal::Entry{journal::Op::SELECT, entry.dbid}); cur_dbid_ = entry.dbid; } - RETURN_ON_ERR(Write(uint8_t(entry.opcode))); + Write(uint8_t(entry.opcode)); switch (entry.opcode) { case journal::Op::SELECT: return Write(entry.dbid); case journal::Op::COMMAND: - RETURN_ON_ERR(Write(entry.txid)); - RETURN_ON_ERR(Write(entry.shard_cnt)); + Write(entry.txid); + Write(entry.shard_cnt); return std::visit([this](const auto& payload) { return Write(payload); }, entry.payload); default: break; }; - return std::error_code{}; } -JournalReader::JournalReader(DbIndex dbid) : buf_{}, dbid_{dbid} { +JournalReader::JournalReader(io::Source* source, DbIndex dbid) + : str_buf_{}, source_{source}, buf_{4096}, dbid_{dbid} { } void JournalReader::SetDb(DbIndex dbid) { dbid_ = dbid; } -template io::Result ReadPackedUIntTyped(io::Source* source) { - uint64_t v; - SET_OR_UNEXPECT(ReadPackedUInt(source), v); - if (v > std::numeric_limits::max()) - return make_unexpected(make_error_code(errc::result_out_of_range)); - return static_cast(v); +void JournalReader::SetSource(io::Source* source) { + CHECK_EQ(buf_.InputLen(), 0ULL); + source_ = source; } -io::Result JournalReader::ReadU8(io::Source* source) { - return ReadPackedUIntTyped(source); -} +std::error_code JournalReader::EnsureRead(size_t num) { + // Check if we already have enough. + if (buf_.InputLen() >= num) + return {}; -io::Result JournalReader::ReadU16(io::Source* source) { - return ReadPackedUIntTyped(source); -} + uint64_t remainder = num - buf_.InputLen(); + buf_.EnsureCapacity(remainder); -io::Result JournalReader::ReadU32(io::Source* source) { - return ReadPackedUIntTyped(source); -} - -io::Result JournalReader::ReadU64(io::Source* source) { - return ReadPackedUIntTyped(source); -} - -io::Result JournalReader::ReadString(io::Source* source) { - size_t size = 0; - SET_OR_UNEXPECT(ReadU64(source), size); - - buf_.EnsureCapacity(size); - auto dest = buf_.AppendBuffer().first(size); - uint64_t read = 0; - SET_OR_UNEXPECT(source->Read(dest), read); + // Try reading at least how much we need, but possibly more + uint64_t read; + SET_OR_RETURN(source_->ReadAtLeast(buf_.AppendBuffer(), remainder), read); + CHECK(read >= remainder); buf_.CommitWrite(read); - if (read != size) - return make_unexpected(std::make_error_code(std::errc::message_size)); + return {}; +} + +template io::Result JournalReader::ReadUInt() { + // Determine type and number of following bytes. + if (auto ec = EnsureRead(1); ec) + return make_unexpected(ec); + PackedUIntMeta meta{buf_.InputBuffer()[0]}; + buf_.ConsumeInput(1); + + if (auto ec = EnsureRead(meta.ByteSize()); ec) + return make_unexpected(ec); + + // Read and check intenger. + uint64_t res; + SET_OR_UNEXPECT(ReadPackedUInt(meta, buf_.InputBuffer()), res); + buf_.ConsumeInput(meta.ByteSize()); + + if (res > std::numeric_limits::max()) + return make_unexpected(make_error_code(errc::result_out_of_range)); + return static_cast(res); +} + +template io::Result JournalReader::ReadUInt(); +template io::Result JournalReader::ReadUInt(); +template io::Result JournalReader::ReadUInt(); +template io::Result JournalReader::ReadUInt(); + +io::Result JournalReader::ReadString() { + size_t size = 0; + SET_OR_UNEXPECT(ReadUInt(), size); + + if (auto ec = EnsureRead(size); ec) + return make_unexpected(ec); + + unsigned offset = str_buf_.size(); + str_buf_.resize(offset + size); + buf_.ReadAndConsume(size, str_buf_.data() + offset); return size; } -std::error_code JournalReader::Read(io::Source* source, CmdArgVec* vec) { - buf_.ConsumeInput(buf_.InputBuffer().size()); +std::error_code JournalReader::Read(CmdArgVec* vec) { + size_t num_strings = 0; + SET_OR_RETURN(ReadUInt(), num_strings); + vec->resize(num_strings); - size_t size = 0; - SET_OR_RETURN(ReadU64(source), size); - - vec->resize(size); + // Read all strings consecutively. + str_buf_.clear(); for (auto& span : *vec) { - size_t len; - SET_OR_RETURN(ReadString(source), len); - span = MutableSlice{nullptr, len}; + size_t size; + SET_OR_RETURN(ReadString(), size); + span = MutableSlice{nullptr, size}; } - size_t offset = 0; + // Set span pointers, now that string buffer won't reallocate. + char* ptr = str_buf_.data(); for (auto& span : *vec) { - size_t len = span.size(); - auto ptr = buf_.InputBuffer().subspan(offset).data(); - span = MutableSlice{reinterpret_cast(ptr), len}; - offset += len; + span = {ptr, span.size()}; + ptr += span.size(); } return std::error_code{}; } -io::Result JournalReader::ReadEntry(io::Source* source) { +io::Result JournalReader::ReadEntry() { uint8_t opcode; - SET_OR_UNEXPECT(ReadU8(source), opcode); + SET_OR_UNEXPECT(ReadUInt(), opcode); journal::ParsedEntry entry{static_cast(opcode), dbid_}; switch (entry.opcode) { case journal::Op::COMMAND: - SET_OR_UNEXPECT(ReadU64(source), entry.txid); - SET_OR_UNEXPECT(ReadU32(source), entry.shard_cnt); + SET_OR_UNEXPECT(ReadUInt(), entry.txid); + SET_OR_UNEXPECT(ReadUInt(), entry.shard_cnt); entry.payload = CmdArgVec{}; - if (auto ec = Read(source, &*entry.payload); ec) + if (auto ec = Read(&*entry.payload); ec) return make_unexpected(ec); break; case journal::Op::SELECT: - SET_OR_UNEXPECT(ReadU16(source), dbid_); - return ReadEntry(source); + SET_OR_UNEXPECT(ReadUInt(), dbid_); + return ReadEntry(); default: break; }; diff --git a/src/server/journal/serializer.h b/src/server/journal/serializer.h index 06a443af4..574322cb3 100644 --- a/src/server/journal/serializer.h +++ b/src/server/journal/serializer.h @@ -18,24 +18,26 @@ namespace dfly { // It automatically keeps track of the current database index. class JournalWriter { public: - // Initialize with sink and optional start database index. If no start index is set, - // a SELECT will be issued before the first entry. - JournalWriter(io::Sink* sink, std::optional dbid = std::nullopt); + // Write single entry to internal buffer. + void Write(const journal::Entry& entry); - // Write single entry. - std::error_code Write(const journal::Entry& entry); + // Flush internal buffer to sink. + std::error_code Flush(io::Sink* sink_); + + // Return reference to internal buffer. + base::IoBuf& Accumulated(); private: - std::error_code Write(uint64_t v); // Write packed unsigned integer. - std::error_code Write(std::string_view sv); // Write string. - std::error_code Write(CmdArgList args); - std::error_code Write(std::pair args); + void Write(uint64_t v); // Write packed unsigned integer. + void Write(std::string_view sv); // Write string. + void Write(CmdArgList args); + void Write(std::pair args); - std::error_code Write(std::monostate); // Overload for empty std::variant + void Write(std::monostate); // Overload for empty std::variant private: - io::Sink* sink_; - std::optional cur_dbid_; + base::IoBuf buf_{}; + std::optional cur_dbid_{}; }; // JournalReader allows deserializing journal entries from a source. @@ -43,29 +45,33 @@ class JournalWriter { struct JournalReader { public: // Initialize start database index. - JournalReader(DbIndex dbid); + JournalReader(io::Source* source, DbIndex dbid); // Overwrite current db index. void SetDb(DbIndex dbid); + // Overwrite current source and ensure there is no leftover from previous. + void SetSource(io::Source* source); + // Try reading entry from source. - io::Result ReadEntry(io::Source* source); + io::Result ReadEntry(); private: - // TODO: Templated endian encoding to not repeat...? - io::Result ReadU8(io::Source* source); - io::Result ReadU16(io::Source* source); - io::Result ReadU32(io::Source* source); - io::Result ReadU64(io::Source* source); + // Read from source until buffer contains at least num bytes. + std::error_code EnsureRead(size_t num); - // Read string into internal buffer and return size. - io::Result ReadString(io::Source* source); + // Read unsigned integer in packed encoding. + template io::Result ReadUInt(); - // Read argument array into internal buffer and build slice. - // TODO: Inline store span data inside buffer to avoid alloaction - std::error_code Read(io::Source* source, CmdArgVec* vec); + // Read and append string to string buffer, return size. + io::Result ReadString(); + + // Read argument array into string buffer. + std::error_code Read(CmdArgVec* vec); private: + std::string str_buf_; // last parsed entry points here + io::Source* source_; base::IoBuf buf_; DbIndex dbid_; }; diff --git a/src/server/journal_test.cc b/src/server/journal_test.cc index c14f96898..6c6bf8b89 100644 --- a/src/server/journal_test.cc +++ b/src/server/journal_test.cc @@ -4,6 +4,7 @@ #include "base/logging.h" #include "server/journal/serializer.h" #include "server/journal/types.h" +#include "server/serializer_commons.h" using namespace testing; using namespace std; @@ -105,20 +106,19 @@ TEST(Journal, WriteRead) { {5, 2, list("SET", "E", "2"), 1}}; // Write all entries to string file. - io::StringSink ss; - JournalWriter writer{&ss}; + JournalWriter writer{}; for (const auto& entry : test_entries) { writer.Write(entry); } // Read them back. - io::BytesSource bs{io::Buffer(ss.str())}; - JournalReader reader{0}; + io::BytesSource bs{writer.Accumulated().InputBuffer()}; + JournalReader reader{&bs, 0}; for (unsigned i = 0; i < test_entries.size(); i++) { auto& expected = test_entries[i]; - auto res = reader.ReadEntry(&bs); + auto res = reader.ReadEntry(); ASSERT_TRUE(res.has_value()); ASSERT_EQ(expected.opcode, res->opcode); diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index c777864e4..20f0ff19b 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1875,7 +1875,7 @@ error_code RdbLoaderBase::EnsureReadInternal(size_t min_sz) { return kOk; } -auto RdbLoaderBase::LoadLen(bool* is_encoded) -> io::Result { +io::Result RdbLoaderBase::LoadLen(bool* is_encoded) { if (is_encoded) *is_encoded = false; @@ -1885,33 +1885,19 @@ auto RdbLoaderBase::LoadLen(bool* is_encoded) -> io::Result { if (ec) return make_unexpected(ec); - uint64_t res = 0; - uint8_t first = mem_buf_->InputBuffer()[0]; - int type = (first & 0xC0) >> 6; - mem_buf_->ConsumeInput(1); - if (type == RDB_ENCVAL) { - /* Read a 6 bit encoding type. */ - if (is_encoded) - *is_encoded = true; - res = first & 0x3F; - } else if (type == RDB_6BITLEN) { - /* Read a 6 bit len. */ - res = first & 0x3F; - } else if (type == RDB_14BITLEN) { - res = ((first & 0x3F) << 8) | mem_buf_->InputBuffer()[0]; - mem_buf_->ConsumeInput(1); - } else if (first == RDB_32BITLEN) { - /* Read a 32 bit len. */ - res = absl::big_endian::Load32(mem_buf_->InputBuffer().data()); - mem_buf_->ConsumeInput(4); - } else if (first == RDB_64BITLEN) { - /* Read a 64 bit len. */ - res = absl::big_endian::Load64(mem_buf_->InputBuffer().data()); - mem_buf_->ConsumeInput(8); - } else { - LOG(ERROR) << "Bad length encoding " << type << " in rdbLoadLen()"; - return Unexpected(errc::rdb_file_corrupted); - } + // Read integer meta info. + auto bytes = mem_buf_->InputBuffer(); + PackedUIntMeta meta{bytes[0]}; + bytes.remove_prefix(1); + + // Read integer. + uint64_t res; + SET_OR_UNEXPECT(ReadPackedUInt(meta, bytes), res); + + if (meta.Type() == RDB_ENCVAL && is_encoded) + *is_encoded = true; + + mem_buf_->ConsumeInput(1 + meta.ByteSize()); return res; } @@ -1963,13 +1949,14 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service, DbIndex dbid) { io::BytesSource bs{io::Buffer(journal_blob)}; journal_reader_.SetDb(dbid); + journal_reader_.SetSource(&bs); // Parse and exectue in loop. size_t done = 0; JournalExecutor ex{service}; while (done < num_entries) { journal::ParsedEntry entry{}; - SET_OR_RETURN(journal_reader_.ReadEntry(&bs), entry); + SET_OR_RETURN(journal_reader_.ReadEntry(), entry); ex.Execute(entry); done++; } diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 3b67e4ad7..3b784e156 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -145,7 +145,7 @@ class RdbLoaderBase { size_t source_limit_ = SIZE_MAX; base::PODArray compr_buf_; std::unique_ptr decompress_impl_; - JournalReader journal_reader_{0}; + JournalReader journal_reader_{nullptr, 0}; }; class RdbLoader : protected RdbLoaderBase { diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index be61f401b..84521ab8b 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -27,7 +27,6 @@ extern "C" { #include "core/string_set.h" #include "server/engine_shard_set.h" #include "server/error.h" -#include "server/journal/serializer.h" #include "server/rdb_extensions.h" #include "server/serializer_commons.h" #include "server/snapshot.h" @@ -253,7 +252,7 @@ std::error_code RdbSerializer::SaveValue(const PrimeValue& pv) { error_code RdbSerializer::SelectDb(uint32_t dbid) { uint8_t buf[16]; buf[0] = RDB_OPCODE_SELECTDB; - unsigned enclen = WritePackedUInt(dbid, buf + 1); + unsigned enclen = WritePackedUInt(dbid, io::MutableBytes{buf}.subspan(1)); return WriteRaw(Bytes{buf, enclen + 1}); } @@ -683,16 +682,19 @@ size_t RdbSerializer::SerializedLen() const { } error_code RdbSerializer::WriteJournalEntries(absl::Span entries) { - // Write journal blob to string file. - io::StringSink ss{}; - JournalWriter writer{&ss}; for (const auto& entry : entries) { - RETURN_ON_ERR(writer.Write(entry)); + journal_writer_.Write(entry); } RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_BLOB)); RETURN_ON_ERR(SaveLen(entries.size())); - return SaveString(ss.str()); + + auto& buf = journal_writer_.Accumulated(); + auto bytes = buf.InputBuffer(); + RETURN_ON_ERR(SaveString(string_view{reinterpret_cast(bytes.data()), bytes.size()})); + buf.Clear(); + + return error_code{}; } error_code RdbSerializer::SaveString(string_view val) { @@ -902,7 +904,7 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { continue; if (record.db_index != last_db_index) { - unsigned enclen = WritePackedUInt(record.db_index, buf + 1); + unsigned enclen = WritePackedUInt(record.db_index, io::MutableBytes{buf}.subspan(1)); string_view str{(char*)buf, enclen + 1}; io_error = sink_->Write(io::Buffer(str)); @@ -1171,7 +1173,7 @@ void RdbSerializer::CompressBlob() { // Write encoded compressed blob len dest = mem_buf_.AppendBuffer(); - unsigned enclen = WritePackedUInt(compressed_blob.length(), dest.data()); + unsigned enclen = WritePackedUInt(compressed_blob.length(), dest); mem_buf_.CommitWrite(enclen); // Write compressed blob diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index e6e71a615..2fc62282b 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -16,6 +16,7 @@ extern "C" { #include "base/pod_array.h" #include "io/io.h" #include "server/common.h" +#include "server/journal/serializer.h" #include "server/journal/types.h" #include "server/table.h" @@ -168,10 +169,13 @@ class RdbSerializer { void CompressBlob(); void AllocateCompressorOnce(); + JournalWriter journal_writer_; + std::unique_ptr lzf_; base::IoBuf mem_buf_; base::PODArray tmp_buf_; std::string tmp_str_; + CompressionMode compression_mode_; // TODO : This compressor impl should support different compression algorithms zstd/lz4 etc. std::unique_ptr compressor_impl_; diff --git a/src/server/replica.cc b/src/server/replica.cc index b510acbaf..c85dd04ff 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -751,10 +751,10 @@ void Replica::StableSyncDflyFb(Context* cntx) { SocketSource ss{sock_.get()}; io::PrefixSource ps{prefix, &ss}; - JournalReader reader{0}; + JournalReader reader{&ps, 0}; JournalExecutor executor{&service_}; while (!cntx->IsCancelled()) { - auto res = reader.ReadEntry(&ps); + auto res = reader.ReadEntry(); if (!res) { cntx->ReportError(res.error(), "Journal format error"); return; diff --git a/src/server/serializer_commons.cc b/src/server/serializer_commons.cc index 15f4e8b91..8be70472e 100644 --- a/src/server/serializer_commons.cc +++ b/src/server/serializer_commons.cc @@ -14,11 +14,32 @@ using namespace std; namespace dfly { +int PackedUIntMeta::Type() const { + return (first_byte & 0xC0) >> 6; +} + +unsigned PackedUIntMeta::ByteSize() const { + switch (Type()) { + case RDB_ENCVAL: + case RDB_6BITLEN: + return 0; + case RDB_14BITLEN: + return 1; + }; + switch (first_byte) { + case RDB_32BITLEN: + return 4; + case RDB_64BITLEN: + return 8; + }; + return 0; +} + /* Saves an encoded unsigned integer. The first two bits in the first byte are used to * hold the encoding type. See the RDB_* definitions for more information * on the types of encoding. buf must be at least 9 bytes. * */ -unsigned WritePackedUInt(uint64_t value, uint8_t* buf) { +unsigned WritePackedUInt(uint64_t value, io::MutableBytes buf) { if (value < (1 << 6)) { /* Save a 6 bit value */ buf[0] = (value & 0xFF) | (RDB_6BITLEN << 6); @@ -35,47 +56,32 @@ unsigned WritePackedUInt(uint64_t value, uint8_t* buf) { if (value <= UINT32_MAX) { /* Save a 32 bit value */ buf[0] = RDB_32BITLEN; - absl::big_endian::Store32(buf + 1, value); + absl::big_endian::Store32(buf.data() + 1, value); return 1 + 4; } /* Save a 64 bit value */ buf[0] = RDB_64BITLEN; - absl::big_endian::Store64(buf + 1, value); + absl::big_endian::Store64(buf.data() + 1, value); return 1 + 8; } -io::Result ReadPackedUInt(io::Source* source) { - uint8_t buf[10]; - size_t read = 0; - - uint8_t first = 0; - SET_OR_UNEXPECT(source->Read(io::MutableBytes{&first, 1}), read); - if (read != 1) - return make_unexpected(make_error_code(errc::bad_message)); - - int type = (first & 0xC0) >> 6; - switch (type) { +io::Result ReadPackedUInt(PackedUIntMeta meta, io::Bytes bytes) { + DCHECK(meta.ByteSize() <= bytes.size()); + switch (meta.Type()) { + case RDB_ENCVAL: case RDB_6BITLEN: - return first & 0x3F; + return meta.first_byte & 0x3F; case RDB_14BITLEN: - SET_OR_UNEXPECT(source->Read(io::MutableBytes{buf, 1}), read); - if (read != 1) - return make_unexpected(make_error_code(errc::bad_message)); - return ((first & 0x3F) << 8) | buf[0]; + return ((meta.first_byte & 0x3F) << 8) | bytes[0]; + }; + switch (meta.first_byte) { case RDB_32BITLEN: - SET_OR_UNEXPECT(source->Read(io::MutableBytes{buf, 4}), read); - if (read != 4) - return make_unexpected(make_error_code(errc::bad_message)); - return absl::big_endian::Load32(buf); + return absl::big_endian::Load32(bytes.data()); case RDB_64BITLEN: - SET_OR_UNEXPECT(source->Read(io::MutableBytes{buf, 8}), read); - if (read != 8) - return make_unexpected(make_error_code(errc::bad_message)); - return absl::big_endian::Load64(buf); - default: - return make_unexpected(make_error_code(errc::illegal_byte_sequence)); - } + return absl::big_endian::Load64(bytes.data()); + }; + return make_unexpected(make_error_code(errc::illegal_byte_sequence)); } } // namespace dfly diff --git a/src/server/serializer_commons.h b/src/server/serializer_commons.h index 849035406..b2b987035 100644 --- a/src/server/serializer_commons.h +++ b/src/server/serializer_commons.h @@ -30,12 +30,27 @@ using nonstd::make_unexpected; dest = std::move(exp_res.value()); \ } +// Represents meta information for an encoded packed unsigned integer. +struct PackedUIntMeta { + // Initialize by first byte in sequence. + PackedUIntMeta(uint8_t first_byte) : first_byte{first_byte} { + } + + // Get underlying RDB type. + int Type() const; + + // Get additional size in bytes (excluding first one). + unsigned ByteSize() const; + + uint8_t first_byte; +}; + // Saves an packed unsigned integer. The first two bits in the first byte are used to // hold the encoding type. See the RDB_* definitions for more information // on the types of encoding. buf must be at least 9 bytes. -unsigned WritePackedUInt(uint64_t value, uint8_t* buf); +unsigned WritePackedUInt(uint64_t value, io::MutableBytes dest); // Deserialize packed unsigned integer. -io::Result ReadPackedUInt(io::Source* source); +io::Result ReadPackedUInt(PackedUIntMeta meta, io::Bytes source); } // namespace dfly diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index d439b2a3d..305679a1e 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -67,7 +67,7 @@ async def test_replication_all(df_local_factory, t_master, t_replicas, n_keys, n # Check range [n_stream_keys, n_keys] is of seed 1 await batch_check_data_async(c_replica, gen_test_data(n_keys, start=n_stream_keys, seed=1)) # Check range [0, n_stream_keys] is of seed 2 - await asyncio.sleep(0.2) + await asyncio.sleep(1.0) await batch_check_data_async(c_replica, gen_test_data(n_stream_keys, seed=2)) # Start streaming data and run REPLICAOF in parallel @@ -85,7 +85,7 @@ async def test_replication_all(df_local_factory, t_master, t_replicas, n_keys, n # Check stable state streaming await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=3)) - await asyncio.sleep(0.5) + await asyncio.sleep(1.0) await asyncio.gather(*(batch_check_data_async(c, gen_test_data(n_keys, seed=3)) for c in c_replicas))