1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

chore(rdb): refactor and some clean-ups of rdb code

This commit is contained in:
Roman Gershman 2022-08-29 23:41:43 +03:00
parent ae65c489e5
commit 439d79f03c
5 changed files with 137 additions and 94 deletions

View file

@ -678,6 +678,15 @@ error_code RdbSerializer::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_
return error_code{};
}
io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
io::Result<size_t> res = lf_->WriteSome(v, len, offset_, 0);
if (res) {
offset_ += *res;
}
return res;
}
AlignedBuffer::AlignedBuffer(size_t cap, ::io::Sink* upstream)
: capacity_(cap), upstream_(upstream) {
aligned_buf_ = (char*)mi_malloc_aligned(kBufLen, 4_KB);
@ -718,7 +727,7 @@ std::error_code AlignedBuffer::Write(io::Bytes record) {
return error_code{};
}
std::error_code AlignedBuffer::Flush() {
error_code AlignedBuffer::Flush() {
size_t len = (buf_offs_ + kAmask) & (~kAmask);
iovec ivec{.iov_base = aligned_buf_, .iov_len = len};
buf_offs_ = 0;
@ -726,58 +735,73 @@ std::error_code AlignedBuffer::Flush() {
return upstream_->Write(&ivec, 1);
}
struct RdbSaver::Impl {
// used for serializing non-body components in the calling fiber.
RdbSerializer serializer;
SliceSnapshot::RecordChannel channel;
vector<unique_ptr<SliceSnapshot>> shard_snapshots;
class RdbSaver::Impl {
public:
// We pass K=sz to say how many producers are pushing data in order to maintain
// correct closing semantics - channel is closing when K producers marked it as closed.
Impl(unsigned producers_len, AlignedBuffer* aligned_buf)
: serializer(aligned_buf), channel{128, producers_len}, shard_snapshots(producers_len) {
Impl(unsigned producers_len, io::Sink* sink);
error_code SaveAuxFieldStrStr(string_view key, string_view val);
RdbSerializer* serializer() {
return &meta_serializer_;
}
error_code ConsumeChannel();
void StartSnapshotting(EngineShard* shard);
error_code Flush() {
return aligned_buf_.Flush();
}
size_t Size() const {
return shard_snapshots_.size();
}
void FillFreqMap(RdbTypeFreqMap* dest) const;
private:
AlignedBuffer aligned_buf_;
// used for serializing non-body components in the calling fiber.
RdbSerializer meta_serializer_;
vector<unique_ptr<SliceSnapshot>> shard_snapshots_;
// it may not present if we write sharded rdb.
optional<SliceSnapshot::RecordChannel> channel_;
};
RdbSaver::RdbSaver(::io::Sink* sink) : aligned_buf_(kBufLen, sink) {
CHECK_NOTNULL(sink);
impl_.reset(new Impl(shard_set->size(), &aligned_buf_));
// impl_->serializer.set_sink(sink_);
// We pass K=sz to say how many producers are pushing data in order to maintain
// correct closing semantics - channel is closing when K producers marked it as closed.
RdbSaver::Impl::Impl(unsigned producers_len, io::Sink* sink)
: aligned_buf_(kBufLen, sink), meta_serializer_(&aligned_buf_),
shard_snapshots_(producers_len) {
channel_.emplace(128, producers_len);
}
RdbSaver::~RdbSaver() {
}
std::error_code RdbSaver::SaveHeader(const StringVec& lua_scripts) {
char magic[16];
size_t sz = absl::SNPrintF(magic, sizeof(magic), "REDIS%04d", RDB_VERSION);
CHECK_EQ(9u, sz);
RETURN_ON_ERR(impl_->serializer.WriteRaw(Bytes{reinterpret_cast<uint8_t*>(magic), sz}));
RETURN_ON_ERR(SaveAux(lua_scripts));
error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val) {
auto& ser = meta_serializer_;
RETURN_ON_ERR(ser.WriteOpcode(RDB_OPCODE_AUX));
RETURN_ON_ERR(ser.SaveString(key));
RETURN_ON_ERR(ser.SaveString(val));
return error_code{};
}
error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
RETURN_ON_ERR(impl_->serializer.FlushMem());
VLOG(1) << "SaveBody , snapshots count: " << impl_->shard_snapshots.size();
size_t num_written = 0;
SliceSnapshot::DbRecord record;
uint8_t buf[16];
DbIndex last_db_index = kInvalidDbId;
buf[0] = RDB_OPCODE_SELECTDB;
auto& channel = impl_->channel;
error_code RdbSaver::Impl::ConsumeChannel() {
error_code io_error;
// we can not exit on io-error since we spawn fibers that push data.
// TODO: we may signal them to stop processing and exit asap in case of the error.
uint8_t buf[16];
size_t channel_bytes = 0;
SliceSnapshot::DbRecord record;
DbIndex last_db_index = kInvalidDbId;
buf[0] = RDB_OPCODE_SELECTDB;
auto& channel = *channel_;
while (channel.Pop(record)) {
if (io_error)
continue;
@ -798,18 +822,62 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
io_error = aligned_buf_.Write(record.value);
record.value.clear();
} while (!io_error && channel.TryPop(record));
} // while (channel.pop)
size_t pushed_bytes = 0;
for (auto& ptr : impl_->shard_snapshots) {
for (auto& ptr : shard_snapshots_) {
ptr->Join();
pushed_bytes += ptr->channel_bytes();
}
DCHECK(!channel.TryPop(record));
VLOG(1) << "Blobs written " << num_written << " pulled bytes: " << channel_bytes
<< " pushed bytes: " << pushed_bytes;
VLOG(1) << "Channel pulled bytes: " << channel_bytes << " pushed bytes: " << pushed_bytes;
return io_error;
}
void RdbSaver::Impl::StartSnapshotting(EngineShard* shard) {
auto s = make_unique<SliceSnapshot>(&shard->db_slice(), &channel_.value());
s->Start();
shard_snapshots_[shard->shard_id()] = move(s);
}
void RdbSaver::Impl::FillFreqMap(RdbTypeFreqMap* dest) const {
for (auto& ptr : shard_snapshots_) {
const RdbTypeFreqMap& src_map = ptr->freq_map();
for (const auto& k_v : src_map)
(*dest)[k_v.first] += k_v.second;
}
}
RdbSaver::RdbSaver(::io::Sink* sink) {
CHECK_NOTNULL(sink);
impl_.reset(new Impl(shard_set->size(), sink));
}
RdbSaver::~RdbSaver() {
}
error_code RdbSaver::SaveHeader(const StringVec& lua_scripts) {
char magic[16];
size_t sz = absl::SNPrintF(magic, sizeof(magic), "REDIS%04d", RDB_VERSION);
CHECK_EQ(9u, sz);
RETURN_ON_ERR(impl_->serializer()->WriteRaw(Bytes{reinterpret_cast<uint8_t*>(magic), sz}));
RETURN_ON_ERR(SaveAux(lua_scripts));
return error_code{};
}
error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
RETURN_ON_ERR(impl_->serializer()->FlushMem());
VLOG(1) << "SaveBody , snapshots count: " << impl_->Size();
error_code io_error = impl_->ConsumeChannel();
if (io_error) {
VLOG(1) << "io error " << io_error;
return io_error;
@ -819,22 +887,14 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
if (freq_map) {
freq_map->clear();
for (auto& ptr : impl_->shard_snapshots) {
const RdbTypeFreqMap& src_map = ptr->freq_map();
for (const auto& k_v : src_map)
(*freq_map)[k_v.first] += k_v.second;
}
impl_->FillFreqMap(freq_map);
}
return error_code{};
}
void RdbSaver::StartSnapshotInShard(EngineShard* shard) {
DbTableArray databases = shard->db_slice().databases();
auto s = make_unique<SliceSnapshot>(std::move(databases), &shard->db_slice(), &impl_->channel);
s->Start();
impl_->shard_snapshots[shard->shard_id()] = move(s);
impl_->StartSnapshotting(shard);
}
error_code RdbSaver::SaveAux(const StringVec& lua_scripts) {
@ -844,7 +904,7 @@ error_code RdbSaver::SaveAux(const StringVec& lua_scripts) {
error_code ec;
/* Add a few fields about the state when the RDB was created. */
RETURN_ON_ERR(SaveAuxFieldStrStr("redis-ver", REDIS_VERSION));
RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("redis-ver", REDIS_VERSION));
RETURN_ON_ERR(SaveAuxFieldStrInt("redis-bits", 64));
RETURN_ON_ERR(SaveAuxFieldStrInt("ctime", time(NULL)));
@ -854,7 +914,7 @@ error_code RdbSaver::SaveAux(const StringVec& lua_scripts) {
RETURN_ON_ERR(SaveAuxFieldStrInt("aof-preamble", aof_preamble));
for (const string& s : lua_scripts) {
RETURN_ON_ERR(SaveAuxFieldStrStr("lua", s));
RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("lua", s));
}
// TODO: "repl-stream-db", "repl-id", "repl-offset"
@ -865,7 +925,7 @@ error_code RdbSaver::SaveEpilog() {
uint8_t buf[8];
uint64_t chksum;
auto& ser = impl_->serializer;
auto& ser = *impl_->serializer();
/* EOF opcode */
RETURN_ON_ERR(ser.WriteOpcode(RDB_OPCODE_EOF));
@ -879,22 +939,13 @@ error_code RdbSaver::SaveEpilog() {
RETURN_ON_ERR(ser.FlushMem());
return aligned_buf_.Flush();
}
error_code RdbSaver::SaveAuxFieldStrStr(string_view key, string_view val) {
auto& ser = impl_->serializer;
RETURN_ON_ERR(ser.WriteOpcode(RDB_OPCODE_AUX));
RETURN_ON_ERR(ser.SaveString(key));
RETURN_ON_ERR(ser.SaveString(val));
return error_code{};
return impl_->Flush();
}
error_code RdbSaver::SaveAuxFieldStrInt(string_view key, int64_t val) {
char buf[LONG_STR_SIZE];
int vlen = ll2string(buf, sizeof(buf), val);
return SaveAuxFieldStrStr(key, string_view(buf, vlen));
return impl_->SaveAuxFieldStrStr(key, string_view(buf, vlen));
}
} // namespace dfly

View file

@ -15,7 +15,7 @@ extern "C" {
#include "io/io.h"
#include "server/common.h"
#include "server/table.h"
#include "util/uring/uring_file.h"
typedef struct rax rax;
typedef struct streamCG streamCG;
@ -24,6 +24,22 @@ namespace dfly {
class EngineShard;
class LinuxWriteWrapper : public io::Sink {
public:
LinuxWriteWrapper(util::uring::LinuxFile* lf) : lf_(lf) {
}
io::Result<size_t> WriteSome(const iovec* v, uint32_t len) final;
std::error_code Close() {
return lf_->Close();
}
private:
util::uring::LinuxFile* lf_;
off_t offset_ = 0;
};
class AlignedBuffer {
public:
AlignedBuffer(size_t cap, ::io::Sink* upstream);
@ -64,15 +80,13 @@ class RdbSaver {
void StartSnapshotInShard(EngineShard* shard);
private:
struct Impl;
class Impl;
std::error_code SaveEpilog();
std::error_code SaveAux(const StringVec& lua_scripts);
std::error_code SaveAuxFieldStrStr(std::string_view key, std::string_view val);
std::error_code SaveAuxFieldStrInt(std::string_view key, int64_t val);
AlignedBuffer aligned_buf_;
std::unique_ptr<Impl> impl_;
};

View file

@ -126,29 +126,6 @@ string InferLoadFile(fs::path data_dir) {
return string{};
}
class LinuxWriteWrapper : public io::WriteFile {
public:
LinuxWriteWrapper(uring::LinuxFile* lf) : WriteFile("wrapper"), lf_(lf) {
}
io::Result<size_t> WriteSome(const iovec* v, uint32_t len) final {
io::Result<size_t> res = lf_->WriteSome(v, len, offset_, 0);
if (res) {
offset_ += *res;
}
return res;
}
std::error_code Close() final {
return lf_->Close();
}
private:
uring::LinuxFile* lf_;
off_t offset_ = 0;
};
} // namespace
bool IsValidSaveScheduleNibble(string_view time, unsigned int max) {

View file

@ -25,8 +25,9 @@ using namespace chrono_literals;
namespace this_fiber = ::boost::this_fiber;
using boost::fibers::fiber;
SliceSnapshot::SliceSnapshot(DbTableArray db_array, DbSlice* slice, RecordChannel* dest)
: db_array_(db_array), db_slice_(slice), dest_(dest) {
SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest)
: db_slice_(slice), dest_(dest) {
db_array_ = slice->databases();
}
SliceSnapshot::~SliceSnapshot() {

View file

@ -29,7 +29,7 @@ class SliceSnapshot {
using RecordChannel =
::util::fibers_ext::SimpleChannel<DbRecord, base::mpmc_bounded_queue<DbRecord>>;
SliceSnapshot(DbTableArray db_array, DbSlice* slice, RecordChannel* dest);
SliceSnapshot(DbSlice* slice, RecordChannel* dest);
~SliceSnapshot();
void Start();