1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore(server): rdb save can now save into tcp socket directly. (#317)

In more detail, RdbSaver uses AlignedBuffer that writes into io::Sink in chunks of 4KB.
It's great for the direct file I/O, but bad for sockets that receive blocks of 4KB with garbage
at the end. I improved the code around this and actually simplified the logic, so now AlignedBuffer
is just another Sink that is passed into serializer when writing into files. When sending to
sockets a socket sink is passed instead.

Also many other unrelated changes grouped into this pretty big cr.
1. dashtable readability improvements.
2. Move methods from facade::ConnectionContext - into facade::Service,
   make ConnectionContext a dumb object.
3. Optionally allow journal to be memory only (not backed up by a disk)
   by using a ring buffer to store last k entries in each journal slice. Also renamed
   journal_shard into journal_slice because journal has presence in each DF thread and not
   only in its shards.
4. Introduce journal::Entry that will consolidate any store change that happens in the thread.
5. Introduce GetRandomHex utility function.
6. Introduce two hooks: ServerFamily::OnClose that is called when a connection is closed,
   and ServerFamily::BreakOnShutdown that is called when process exits and any background fibers neet to
   break early.
7. Pull some noisy info logs out of rdb_load class.
8. Snapshot class now has the ability to subscribe to journal changes, thus it can include concurrent changes into the snapshot.
   Currently only journal::Op::VAL is supported (it's part of RDB format anyway).

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-09-20 01:09:03 -07:00 committed by GitHub
parent 1733af4cf6
commit 0a1b5eb297
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 573 additions and 318 deletions

View file

@ -716,12 +716,12 @@ template <typename U, typename V, typename EvictionPolicy>
auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, EvictionPolicy& ev) auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, EvictionPolicy& ev)
-> std::pair<iterator, bool> { -> std::pair<iterator, bool> {
uint64_t key_hash = DoHash(key); uint64_t key_hash = DoHash(key);
uint32_t seg_id = SegmentId(key_hash); uint32_t target_seg_id = SegmentId(key_hash);
while (true) { while (true) {
// Keep last global_depth_ msb bits of the hash. // Keep last global_depth_ msb bits of the hash.
assert(seg_id < segment_.size()); assert(target_seg_id < segment_.size());
SegmentType* target = segment_[seg_id]; SegmentType* target = segment_[target_seg_id];
// Load heap allocated segment data - to avoid TLB miss when accessing the bucket. // Load heap allocated segment data - to avoid TLB miss when accessing the bucket.
__builtin_prefetch(target, 0, 1); __builtin_prefetch(target, 0, 1);
@ -731,12 +731,12 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
if (res) { // success if (res) { // success
++size_; ++size_;
return std::make_pair(iterator{this, seg_id, it.index, it.slot}, true); return std::make_pair(iterator{this, target_seg_id, it.index, it.slot}, true);
} }
/*duplicate insert, insertion failure*/ /*duplicate insert, insertion failure*/
if (it.found()) { if (it.found()) {
return std::make_pair(iterator{this, seg_id, it.index, it.slot}, false); return std::make_pair(iterator{this, target_seg_id, it.index, it.slot}, false);
} }
// At this point we must split the segment. // At this point we must split the segment.
@ -749,12 +749,12 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
hotspot.key_hash = key_hash; hotspot.key_hash = key_hash;
for (unsigned j = 0; j < HotspotBuckets::kRegularBuckets; ++j) { for (unsigned j = 0; j < HotspotBuckets::kRegularBuckets; ++j) {
hotspot.probes.by_type.regular_buckets[j] = bucket_iterator{this, seg_id, bid[j]}; hotspot.probes.by_type.regular_buckets[j] = bucket_iterator{this, target_seg_id, bid[j]};
} }
for (unsigned i = 0; i < Policy::kStashBucketNum; ++i) { for (unsigned i = 0; i < Policy::kStashBucketNum; ++i) {
hotspot.probes.by_type.stash_buckets[i] = hotspot.probes.by_type.stash_buckets[i] =
bucket_iterator{this, seg_id, uint8_t(kLogicalBucketNum + i), 0}; bucket_iterator{this, target_seg_id, uint8_t(kLogicalBucketNum + i), 0};
} }
hotspot.num_buckets = HotspotBuckets::kNumBuckets; hotspot.num_buckets = HotspotBuckets::kNumBuckets;
@ -770,7 +770,7 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
/*unsigned start = (bid[HotspotBuckets::kNumBuckets - 1] + 1) % kLogicalBucketNum; /*unsigned start = (bid[HotspotBuckets::kNumBuckets - 1] + 1) % kLogicalBucketNum;
for (unsigned i = 0; i < HotspotBuckets::kNumBuckets; ++i) { for (unsigned i = 0; i < HotspotBuckets::kNumBuckets; ++i) {
uint8_t id = (start + i) % kLogicalBucketNum; uint8_t id = (start + i) % kLogicalBucketNum;
buckets.probes.arr[i] = bucket_iterator{this, seg_id, id}; buckets.probes.arr[i] = bucket_iterator{this, target_seg_id, id};
} }
garbage_collected_ += ev.GarbageCollect(buckets, this); garbage_collected_ += ev.GarbageCollect(buckets, this);
*/ */
@ -804,12 +804,12 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
if (target->local_depth() == global_depth_) { if (target->local_depth() == global_depth_) {
IncreaseDepth(global_depth_ + 1); IncreaseDepth(global_depth_ + 1);
seg_id = SegmentId(key_hash); target_seg_id = SegmentId(key_hash);
assert(seg_id < segment_.size() && segment_[seg_id] == target); assert(target_seg_id < segment_.size() && segment_[target_seg_id] == target);
} }
ev.RecordSplit(target); ev.RecordSplit(target);
Split(seg_id); Split(target_seg_id);
} }
return std::make_pair(iterator{}, false); return std::make_pair(iterator{}, false);

View file

@ -1220,6 +1220,8 @@ void Segment<Key, Value, Policy>::Split(HFunc&& hfn, Segment* dest_right) {
auto it = dest_right->InsertUniq(std::forward<Key_t>(Key(bid, slot)), auto it = dest_right->InsertUniq(std::forward<Key_t>(Key(bid, slot)),
std::forward<Value_t>(Value(bid, slot)), hash); std::forward<Value_t>(Value(bid, slot)), hash);
(void)it; (void)it;
assert(it.index != kNanBid);
if constexpr (USE_VERSION) { if constexpr (USE_VERSION) {
// Update the version in the destination bucket. // Update the version in the destination bucket.
uint64_t ver = stash.GetVersion(); uint64_t ver = stash.GetVersion();

View file

@ -17,8 +17,8 @@ class ConnectionContext {
public: public:
ConnectionContext(::io::Sink* stream, Connection* owner); ConnectionContext(::io::Sink* stream, Connection* owner);
// We won't have any virtual methods, probably. However, since we allocate derived class, // We won't have any virtual methods, probably. However, since we allocate a derived class,
// we need to declare a virtual d-tor so we could delete them inside Connection. // we need to declare a virtual d-tor, so we could properly delete it from Connection code.
virtual ~ConnectionContext() {} virtual ~ConnectionContext() {}
Connection* owner() { Connection* owner() {
@ -51,10 +51,6 @@ class ConnectionContext {
bool authenticated: 1; bool authenticated: 1;
bool force_dispatch: 1; // whether we should route all requests to the dispatch fiber. bool force_dispatch: 1; // whether we should route all requests to the dispatch fiber.
virtual void OnClose() {}
virtual std::string GetContextInfo() const { return std::string{}; }
private: private:
Connection* owner_; Connection* owner_;
Protocol protocol_ = Protocol::REDIS; Protocol protocol_ = Protocol::REDIS;

View file

@ -300,7 +300,7 @@ string Connection::GetClientInfo() const {
absl::StrAppend(&res, " age=", now - creation_time_, " idle=", now - last_interaction_); absl::StrAppend(&res, " age=", now - creation_time_, " idle=", now - last_interaction_);
absl::StrAppend(&res, " phase=", phase_, " "); absl::StrAppend(&res, " phase=", phase_, " ");
if (cc_) { if (cc_) {
absl::StrAppend(&res, cc_->GetContextInfo()); absl::StrAppend(&res, service_->GetContextInfo(cc_.get()));
} }
return res; return res;
@ -374,7 +374,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
VLOG(1) << "Before dispatch_fb.join()"; VLOG(1) << "Before dispatch_fb.join()";
dispatch_fb.join(); dispatch_fb.join();
VLOG(1) << "After dispatch_fb.join()"; VLOG(1) << "After dispatch_fb.join()";
cc_->OnClose(); service_->OnClose(cc_.get());
stats->read_buf_capacity -= io_buf_.Capacity(); stats->read_buf_capacity -= io_buf_.Capacity();

View file

@ -32,6 +32,13 @@ class ServiceInterface {
virtual void ConfigureHttpHandlers(util::HttpListenerBase* base) { virtual void ConfigureHttpHandlers(util::HttpListenerBase* base) {
} }
virtual void OnClose(ConnectionContext* cntx) {
}
virtual std::string GetContextInfo(ConnectionContext* cntx) {
return {};
}
}; };
} // namespace facade } // namespace facade

View file

@ -27,7 +27,7 @@
#define MAXMEMORY_NO_EVICTION (7<<8) #define MAXMEMORY_NO_EVICTION (7<<8)
#define CONFIG_RUN_ID_SIZE 40 #define CONFIG_RUN_ID_SIZE 40U
#define EVPOOL_CACHED_SDS_SIZE 255 #define EVPOOL_CACHED_SDS_SIZE 255
#define EVPOOL_SIZE 16 #define EVPOOL_SIZE 16

View file

@ -2,7 +2,7 @@ add_executable(dragonfly dfly_main.cc)
cxx_link(dragonfly base dragonfly_lib) cxx_link(dragonfly base dragonfly_lib)
add_library(dfly_transaction db_slice.cc engine_shard_set.cc blocking_controller.cc common.cc add_library(dfly_transaction db_slice.cc engine_shard_set.cc blocking_controller.cc common.cc
io_mgr.cc journal/journal.cc journal/journal_shard.cc table.cc io_mgr.cc journal/journal.cc journal/journal_slice.cc table.cc
tiered_storage.cc transaction.cc) tiered_storage.cc transaction.cc)
cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib) cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib)

View file

@ -5,6 +5,7 @@
#pragma once #pragma once
#include <absl/strings/ascii.h> #include <absl/strings/ascii.h>
#include <absl/strings/str_cat.h>
#include <absl/types/span.h> #include <absl/types/span.h>
#include <string_view> #include <string_view>
@ -131,4 +132,28 @@ extern unsigned kernel_version;
const char* GlobalStateName(GlobalState gs); const char* GlobalStateName(GlobalState gs);
template <typename RandGen> std::string GetRandomHex(RandGen& gen, size_t len) {
static_assert(std::is_same<uint64_t, decltype(gen())>::value);
std::string res(len, '\0');
size_t indx = 0;
for (size_t i = 0; i < len / 16; ++i) { // 2 chars per byte
absl::AlphaNum an(absl::Hex(gen(), absl::kZeroPad16));
for (unsigned j = 0; j < 16; ++j) {
res[indx++] = an.Piece()[j];
}
}
if (indx < res.size()) {
absl::AlphaNum an(absl::Hex(gen(), absl::kZeroPad16));
for (unsigned j = 0; indx < res.size(); indx++, j++) {
res[indx] = an.Piece()[j];
}
}
return res;
}
} // namespace dfly } // namespace dfly

View file

@ -211,45 +211,6 @@ void ConnectionContext::SendSubscriptionChangedResponse(string_view action,
(*this)->SendLong(count); (*this)->SendLong(count);
} }
void ConnectionContext::OnClose() {
if (!conn_state.exec_info.watched_keys.empty()) {
shard_set->RunBriefInParallel([this](EngineShard* shard) {
return shard->db_slice().UnregisterConnectionWatches(&conn_state.exec_info);
});
}
if (!conn_state.subscribe_info)
return;
if (!conn_state.subscribe_info->channels.empty()) {
auto token = conn_state.subscribe_info->borrow_token;
UnsubscribeAll(false);
// Check that all borrowers finished processing
token.Wait();
}
if (conn_state.subscribe_info) {
DCHECK(!conn_state.subscribe_info->patterns.empty());
auto token = conn_state.subscribe_info->borrow_token;
PUnsubscribeAll(false);
// Check that all borrowers finished processing
token.Wait();
DCHECK(!conn_state.subscribe_info);
}
}
string ConnectionContext::GetContextInfo() const {
char buf[16] = {0};
unsigned index = 0;
if (async_dispatch)
buf[index++] = 'a';
if (conn_closing)
buf[index++] = 't';
return index ? absl::StrCat("flags:", buf) : string();
}
void ConnectionState::ExecInfo::Clear() { void ConnectionState::ExecInfo::Clear() {
state = EXEC_INACTIVE; state = EXEC_INACTIVE;
body.clear(); body.clear();

View file

@ -83,8 +83,11 @@ struct ConnectionState {
// For set op - it's the flag value we are storing along with the value. // For set op - it's the flag value we are storing along with the value.
// For get op - we use it as a mask of MCGetMask values. // For get op - we use it as a mask of MCGetMask values.
uint32_t memcache_flag = 0; uint32_t memcache_flag = 0;
// If it's a replication client - then it holds positive sync session id.
uint32_t sync_session_id = 0; // If this server is master, and this connection is from a secondary replica,
// then it holds positive sync session id.
uint32_t repl_session_id = 0;
uint32_t repl_threadid = kuint32max;
ExecInfo exec_info; ExecInfo exec_info;
std::optional<ScriptInfo> script_info; std::optional<ScriptInfo> script_info;
@ -97,8 +100,6 @@ class ConnectionContext : public facade::ConnectionContext {
: facade::ConnectionContext(stream, owner) { : facade::ConnectionContext(stream, owner) {
} }
void OnClose() override;
struct DebugInfo { struct DebugInfo {
uint32_t shards_count = 0; uint32_t shards_count = 0;
TxClock clock = 0; TxClock clock = 0;
@ -123,8 +124,6 @@ class ConnectionContext : public facade::ConnectionContext {
bool is_replicating = false; bool is_replicating = false;
std::string GetContextInfo() const override;
private: private:
void SendSubscriptionChangedResponse(std::string_view action, void SendSubscriptionChangedResponse(std::string_view action,
std::optional<std::string_view> topic, std::optional<std::string_view> topic,

View file

@ -67,6 +67,7 @@ class PrimeEvictionPolicy {
can_evict_(can_evict) { can_evict_(can_evict) {
} }
// A hook function that is called every time a segment is full and requires splitting.
void RecordSplit(PrimeTable::Segment_t* segment) { void RecordSplit(PrimeTable::Segment_t* segment) {
mem_budget_ -= PrimeTable::kSegBytes; mem_budget_ -= PrimeTable::kSegBytes;
DVLOG(1) << "split: " << segment->SlowSize() << "/" << segment->capacity(); DVLOG(1) << "split: " << segment->SlowSize() << "/" << segment->capacity();

View file

@ -113,6 +113,23 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
rb->SendError(kSyntaxErr); rb->SendError(kSyntaxErr);
} }
void DflyCmd::OnClose(ConnectionContext* cntx) {
boost::fibers::fiber repl_fb;
if (cntx->conn_state.repl_session_id > 0 && cntx->conn_state.repl_threadid != kuint32max) {
unique_lock lk(mu_);
auto it = sync_info_.find(cntx->conn_state.repl_session_id);
if (it != sync_info_.end()) {
VLOG(1) << "Found tbd: " << cntx->conn_state.repl_session_id;
}
}
if (repl_fb.joinable()) {
repl_fb.join();
}
}
void DflyCmd::HandleJournal(CmdArgList args, ConnectionContext* cntx) { void DflyCmd::HandleJournal(CmdArgList args, ConnectionContext* cntx) {
DCHECK_GE(args.size(), 3u); DCHECK_GE(args.size(), 3u);
ToUpper(&args[2]); ToUpper(&args[2]);
@ -127,7 +144,26 @@ void DflyCmd::HandleJournal(CmdArgList args, ConnectionContext* cntx) {
journal::Journal* journal = ServerState::tlocal()->journal(); journal::Journal* journal = ServerState::tlocal()->journal();
if (!journal) { if (!journal) {
string dir = absl::GetFlag(FLAGS_dir); string dir = absl::GetFlag(FLAGS_dir);
sf_->journal()->StartLogging(dir);
atomic_uint32_t created{0};
auto* pool = shard_set->pool();
auto open_cb = [&](auto* pb) {
auto ec = sf_->journal()->OpenInThread(true, dir);
if (ec) {
LOG(ERROR) << "Could not create journal " << ec;
} else {
created.fetch_add(1, memory_order_relaxed);
}
};
pool->AwaitFiberOnAll(open_cb);
if (created.load(memory_order_acquire) != pool->size()) {
LOG(FATAL) << "TBD / revert";
}
// We can not use transaction distribution mechanism because we must open journal for all
// threads and not only for shards.
trans->Schedule(); trans->Schedule();
auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
trans->Execute(barrier_cb, true); trans->Execute(barrier_cb, true);
@ -165,4 +201,8 @@ uint32_t DflyCmd::AllocateSyncSession() {
return it->first; return it->first;
} }
void DflyCmd::BreakOnShutdown() {
VLOG(1) << "BreakOnShutdown";
}
} // namespace dfly } // namespace dfly

View file

@ -29,6 +29,11 @@ class DflyCmd {
uint32_t AllocateSyncSession(); uint32_t AllocateSyncSession();
void OnClose(ConnectionContext* cntx);
// stops all background processes so we could exit in orderly manner.
void BreakOnShutdown();
private: private:
void HandleJournal(CmdArgList args, ConnectionContext* cntx); void HandleJournal(CmdArgList args, ConnectionContext* cntx);

View file

@ -8,7 +8,7 @@
#include "base/logging.h" #include "base/logging.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "server/journal/journal_shard.h" #include "server/journal/journal_slice.h"
#include "server/server_state.h" #include "server/server_state.h"
namespace dfly { namespace dfly {
@ -21,43 +21,33 @@ namespace fibers = boost::fibers;
namespace { namespace {
thread_local JournalShard journal_shard; // Present in all threads (not only in shard threads).
thread_local JournalSlice journal_slice;
} // namespace } // namespace
Journal::Journal() { Journal::Journal() {
} }
error_code Journal::StartLogging(std::string_view dir) { error_code Journal::OpenInThread(bool persistent, string_view dir) {
if (journal_shard.IsOpen()) { journal_slice.Init(unsigned(ProactorBase::GetIndex()));
return error_code{};
error_code ec;
if (persistent) {
ec = journal_slice.Open(dir);
if (ec) {
return ec;
}
} }
auto* pool = shard_set->pool();
atomic_uint32_t created{0};
lock_guard lk(state_mu_);
auto open_cb = [&](auto* pb) {
auto ec = journal_shard.Open(dir, unsigned(ProactorBase::GetIndex()));
if (ec) {
LOG(FATAL) << "Could not create journal " << ec; // TODO
} else {
created.fetch_add(1, memory_order_relaxed);
ServerState::tlocal()->set_journal(this); ServerState::tlocal()->set_journal(this);
EngineShard* shard = EngineShard::tlocal(); EngineShard* shard = EngineShard::tlocal();
if (shard) { if (shard) {
shard->set_journal(this); shard->set_journal(this);
} }
}
};
pool->AwaitFiberOnAll(open_cb); return ec;
if (created.load(memory_order_acquire) != pool->size()) {
LOG(FATAL) << "TBD / revert";
}
return error_code{};
} }
error_code Journal::Close() { error_code Journal::Close() {
@ -76,7 +66,7 @@ error_code Journal::Close() {
shard->set_journal(nullptr); shard->set_journal(nullptr);
} }
auto ec = journal_shard.Close(); auto ec = journal_slice.Close();
if (ec) { if (ec) {
lock_guard lk2(ec_mu); lock_guard lk2(ec_mu);
@ -89,21 +79,30 @@ error_code Journal::Close() {
return res; return res;
} }
uint32_t Journal::RegisterOnChange(ChangeCallback cb) {
return journal_slice.RegisterOnChange(cb);
}
void Journal::Unregister(uint32_t id) {
journal_slice.Unregister(id);
}
bool Journal::SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards) { bool Journal::SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards) {
if (!journal_shard.IsOpen() || lameduck_.load(memory_order_relaxed)) if (!journal_slice.IsOpen() || lameduck_.load(memory_order_relaxed))
return false; return false;
journal_shard.AddLogRecord(txid, unsigned(Op::SCHED)); // TODO: to complete the metadata.
journal_slice.AddLogRecord(Entry::Sched(txid));
return true; return true;
} }
LSN Journal::GetLsn() const { LSN Journal::GetLsn() const {
return journal_shard.cur_lsn(); return journal_slice.cur_lsn();
} }
bool Journal::EnterLameDuck() { bool Journal::EnterLameDuck() {
if (!journal_shard.IsOpen()) { if (!journal_slice.IsOpen()) {
return false; return false;
} }
@ -112,15 +111,17 @@ bool Journal::EnterLameDuck() {
return res; return res;
} }
void Journal::RecordEntry(const Entry& entry) {
journal_slice.AddLogRecord(entry);
}
/*
void Journal::OpArgs(TxId txid, Op opcode, Span keys) { void Journal::OpArgs(TxId txid, Op opcode, Span keys) {
DCHECK(journal_shard.IsOpen()); DCHECK(journal_slice.IsOpen());
journal_shard.AddLogRecord(txid, unsigned(opcode)); journal_slice.AddLogRecord(txid, opcode);
}
void Journal::RecordEntry(TxId txid, const PrimeKey& key, const PrimeValue& pval) {
journal_shard.AddLogRecord(txid, unsigned(Op::VAL));
} }
*/
} // namespace journal } // namespace journal
} // namespace dfly } // namespace dfly

View file

@ -4,8 +4,7 @@
#pragma once #pragma once
#include "server/common.h" #include "server/journal/types.h"
#include "server/table.h"
#include "util/proactor_pool.h" #include "util/proactor_pool.h"
namespace dfly { namespace dfly {
@ -14,17 +13,6 @@ class Transaction;
namespace journal { namespace journal {
enum class Op : uint8_t {
NOOP = 0,
LOCK = 1,
UNLOCK = 2,
LOCK_SHARD = 3,
UNLOCK_SHARD = 4,
SCHED = 5,
VAL = 10,
DEL,
MSET,
};
class Journal { class Journal {
public: public:
@ -32,8 +20,6 @@ class Journal {
Journal(); Journal();
std::error_code StartLogging(std::string_view dir);
// Returns true if journal has been active and changed its state to lameduck mode // Returns true if journal has been active and changed its state to lameduck mode
// and false otherwise. // and false otherwise.
bool EnterLameDuck(); // still logs ongoing transactions but refuses to start new ones. bool EnterLameDuck(); // still logs ongoing transactions but refuses to start new ones.
@ -41,10 +27,20 @@ class Journal {
// Requires: journal is in lameduck mode. // Requires: journal is in lameduck mode.
std::error_code Close(); std::error_code Close();
// Opens journal inside a Dragonfly thread. Must be called in each thread.
std::error_code OpenInThread(bool persistent, std::string_view dir);
//******* The following functions must be called in the context of the owning shard *********//
uint32_t RegisterOnChange(ChangeCallback cb);
void Unregister(uint32_t id);
// Returns true if transaction was scheduled, false if journal is inactive // Returns true if transaction was scheduled, false if journal is inactive
// or in lameduck mode and does not log new transactions. // or in lameduck mode and does not log new transactions.
bool SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards); bool SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards);
/*
void AddCmd(TxId txid, Op opcode, Span args) { void AddCmd(TxId txid, Op opcode, Span args) {
OpArgs(txid, opcode, args); OpArgs(txid, opcode, args);
} }
@ -56,13 +52,12 @@ class Journal {
void Unlock(TxId txid, Span keys) { void Unlock(TxId txid, Span keys) {
OpArgs(txid, Op::UNLOCK, keys); OpArgs(txid, Op::UNLOCK, keys);
} }
*/
LSN GetLsn() const; LSN GetLsn() const;
void RecordEntry(TxId txid, const PrimeKey& key, const PrimeValue& pval); void RecordEntry(const Entry& entry);
private: private:
void OpArgs(TxId id, Op opcode, Span keys);
mutable boost::fibers::mutex state_mu_; mutable boost::fibers::mutex state_mu_;

View file

@ -2,12 +2,11 @@
// See LICENSE for licensing terms. // See LICENSE for licensing terms.
// //
#include "server/journal/journal_shard.h" #include "server/journal/journal_slice.h"
#include <fcntl.h>
#include <absl/container/inlined_vector.h> #include <absl/container/inlined_vector.h>
#include <absl/strings/str_cat.h> #include <absl/strings/str_cat.h>
#include <fcntl.h>
#include <filesystem> #include <filesystem>
@ -35,17 +34,30 @@ string ShardName(std::string_view base, unsigned index) {
CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \ CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \
} while (false) } while (false)
struct JournalSlice::RingItem {
LSN lsn;
TxId txid;
Op opcode;
};
JournalSlice::JournalSlice() {
JournalShard::JournalShard() {
} }
JournalShard::~JournalShard() { JournalSlice::~JournalSlice() {
CHECK(!shard_file_); CHECK(!shard_file_);
} }
std::error_code JournalShard::Open(const std::string_view dir, unsigned index) { void JournalSlice::Init(unsigned index) {
if (ring_buffer_) // calling this function multiple times is allowed and it's a no-op.
return;
slice_index_ = index;
ring_buffer_.emplace(128); // TODO: to make it configurable
}
std::error_code JournalSlice::Open(std::string_view dir) {
CHECK(!shard_file_); CHECK(!shard_file_);
DCHECK_NE(slice_index_, UINT32_MAX);
fs::path dir_path; fs::path dir_path;
@ -65,7 +77,8 @@ std::error_code JournalShard::Open(const std::string_view dir, unsigned index) {
} }
// LOG(INFO) << int(dir_status.type()); // LOG(INFO) << int(dir_status.type());
} }
dir_path.append(ShardName("journal", index));
dir_path.append(ShardName("journal", slice_index_));
shard_path_ = dir_path; shard_path_ = dir_path;
// For file integrity guidelines see: // For file integrity guidelines see:
@ -81,15 +94,14 @@ std::error_code JournalShard::Open(const std::string_view dir, unsigned index) {
DVLOG(1) << "Opened journal " << shard_path_; DVLOG(1) << "Opened journal " << shard_path_;
shard_file_ = std::move(res).value(); shard_file_ = std::move(res).value();
shard_index_ = index;
file_offset_ = 0; file_offset_ = 0;
status_ec_.clear(); status_ec_.clear();
return error_code{}; return error_code{};
} }
error_code JournalShard::Close() { error_code JournalSlice::Close() {
VLOG(1) << "JournalShard::Close"; VLOG(1) << "JournalSlice::Close";
CHECK(shard_file_); CHECK(shard_file_);
lameduck_ = true; lameduck_ = true;
@ -103,13 +115,44 @@ error_code JournalShard::Close() {
return ec; return ec;
} }
void JournalShard::AddLogRecord(TxId txid, unsigned opcode) { void JournalSlice::AddLogRecord(const Entry& entry) {
string line = absl::StrCat(lsn_, " ", txid, " ", opcode, "\n"); DCHECK(ring_buffer_);
error_code ec = shard_file_->Write(io::Buffer(line), file_offset_, 0);
CHECK_EC(ec); for (const auto& k_v : change_cb_arr_) {
file_offset_ += line.size(); k_v.second(entry);
}
RingItem item;
item.lsn = lsn_;
item.opcode = entry.opcode;
item.txid = entry.txid;
VLOG(1) << "Writing item " << item.lsn;
ring_buffer_->EmplaceOrOverride(move(item));
if (shard_file_) {
string line = absl::StrCat(lsn_, " ", entry.txid, " ", entry.opcode, "\n");
error_code ec = shard_file_->Write(io::Buffer(line), file_offset_, 0);
CHECK_EC(ec);
file_offset_ += line.size();
}
++lsn_; ++lsn_;
} }
uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) {
uint32_t id = next_cb_id_++;
change_cb_arr_.emplace_back(id, std::move(cb));
return id;
}
void JournalSlice::Unregister(uint32_t id) {
for (auto it = change_cb_arr_.begin(); it != change_cb_arr_.end(); ++it) {
if (it->first == id) {
change_cb_arr_.erase(it);
break;
}
}
}
} // namespace journal } // namespace journal
} // namespace dfly } // namespace dfly

View file

@ -9,18 +9,23 @@
#include <optional> #include <optional>
#include <string_view> #include <string_view>
#include "base/ring_buffer.h"
#include "server/common.h" #include "server/common.h"
#include "server/journal/types.h"
#include "util/uring/uring_file.h" #include "util/uring/uring_file.h"
namespace dfly { namespace dfly {
namespace journal { namespace journal {
class JournalShard { // Journal slice is present for both shards and io threads.
class JournalSlice {
public: public:
JournalShard(); JournalSlice();
~JournalShard(); ~JournalSlice();
std::error_code Open(const std::string_view dir, unsigned index); void Init(unsigned index);
std::error_code Open(std::string_view dir);
std::error_code Close(); std::error_code Close();
@ -32,20 +37,30 @@ class JournalShard {
return status_ec_; return status_ec_;
} }
// Whether the file-based journaling is open.
bool IsOpen() const { bool IsOpen() const {
return bool(shard_file_); return bool(shard_file_);
} }
void AddLogRecord(TxId txid, unsigned opcode); void AddLogRecord(const Entry& entry);
uint32_t RegisterOnChange(ChangeCallback cb);
void Unregister(uint32_t);
private: private:
struct RingItem;
std::string shard_path_; std::string shard_path_;
std::unique_ptr<util::uring::LinuxFile> shard_file_; std::unique_ptr<util::uring::LinuxFile> shard_file_;
std::optional<base::RingBuffer<RingItem>> ring_buffer_;
std::vector<std::pair<uint32_t, ChangeCallback>> change_cb_arr_;
size_t file_offset_ = 0; size_t file_offset_ = 0;
LSN lsn_ = 1; LSN lsn_ = 1;
unsigned shard_index_ = -1; uint32_t slice_index_ = UINT32_MAX;
uint32_t next_cb_id_ = 1;
std::error_code status_ec_; std::error_code status_ec_;

View file

@ -0,0 +1,50 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include "server/common.h"
#include "server/table.h"
namespace dfly {
namespace journal {
enum class Op : uint8_t {
NOOP = 0,
LOCK = 1,
UNLOCK = 2,
LOCK_SHARD = 3,
UNLOCK_SHARD = 4,
SCHED = 5,
VAL = 10,
DEL,
MSET,
};
// TODO: to pass all the attributes like ttl, stickiness etc.
struct Entry {
Entry(Op op, DbIndex did, TxId tid, std::string_view skey)
: opcode(op), db_ind(did), txid(tid), key(skey) {
}
Entry(DbIndex did, TxId tid, std::string_view skey, const PrimeValue& pval)
: Entry(Op::VAL, did, tid, skey) {
pval_ptr = &pval;
}
static Entry Sched(TxId tid) {
return Entry{Op::SCHED, 0, tid, {}};
}
Op opcode;
DbIndex db_ind;
TxId txid;
std::string_view key;
const PrimeValue* pval_ptr = nullptr;
uint64_t expire_ms = 0; // 0 means no expiry.
};
using ChangeCallback = std::function<void(const Entry&)>;
} // namespace journal
} // namespace dfly

View file

@ -51,9 +51,10 @@ string GetString(EngineShard* shard, const PrimeValue& pv) {
return res; return res;
} }
inline void RecordJournal(const OpArgs& op_args, const PrimeKey& pkey, const PrimeKey& pvalue) { inline void RecordJournal(const OpArgs& op_args, string_view key, const PrimeKey& pvalue) {
if (op_args.shard->journal()) { if (op_args.shard->journal()) {
op_args.shard->journal()->RecordEntry(op_args.txid, pkey, pvalue); journal::Entry entry{op_args.db_ind, op_args.txid, key, pvalue};
op_args.shard->journal()->RecordEntry(entry);
} }
} }
@ -63,7 +64,7 @@ void SetString(const OpArgs& op_args, string_view key, const string& value) {
db_slice.PreUpdate(op_args.db_ind, it_output); db_slice.PreUpdate(op_args.db_ind, it_output);
it_output->second.SetString(value); it_output->second.SetString(value);
db_slice.PostUpdate(op_args.db_ind, it_output, key); db_slice.PostUpdate(op_args.db_ind, it_output, key);
RecordJournal(op_args, it_output->first, it_output->second); RecordJournal(op_args, key, it_output->second);
} }
string JsonType(const json& val) { string JsonType(const json& val) {

View file

@ -675,10 +675,11 @@ facade::ConnectionContext* Service::CreateContext(util::FiberSocketBase* peer,
// a bit of a hack. I set up breaker callback here for the owner. // a bit of a hack. I set up breaker callback here for the owner.
// Should work though it's confusing to have it here. // Should work though it's confusing to have it here.
owner->RegisterOnBreak([res](uint32_t) { owner->RegisterOnBreak([res, this](uint32_t) {
if (res->transaction) { if (res->transaction) {
res->transaction->BreakOnClose(); res->transaction->BreakOnShutdown();
} }
this->server_family().BreakOnShutdown();
}); });
return res; return res;
@ -1060,7 +1061,7 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
// How do we know that subsribers did not disappear after we fetched them? // How do we know that subsribers did not disappear after we fetched them?
// Each subscriber object hold a borrow_token. // Each subscriber object hold a borrow_token.
// ConnectionContext::OnClose does not reset subscribe_info before all tokens are returned. // OnClose does not reset subscribe_info before all tokens are returned.
vector<ChannelSlice::Subscriber> subscriber_arr = shard_set->Await(sid, std::move(cb)); vector<ChannelSlice::Subscriber> subscriber_arr = shard_set->Await(sid, std::move(cb));
atomic_uint32_t published{0}; atomic_uint32_t published{0};
@ -1249,6 +1250,45 @@ void Service::ConfigureHttpHandlers(util::HttpListenerBase* base) {
base->RegisterCb("/txz", TxTable); base->RegisterCb("/txz", TxTable);
} }
void Service::OnClose(facade::ConnectionContext* cntx) {
ConnectionContext* server_cntx = static_cast<ConnectionContext*>(cntx);
ConnectionState& conn_state = server_cntx->conn_state;
if (conn_state.subscribe_info) { // Clean-ups related to PUBSUB
if (!conn_state.subscribe_info->channels.empty()) {
auto token = conn_state.subscribe_info->borrow_token;
server_cntx->UnsubscribeAll(false);
// Check that all borrowers finished processing.
// token is increased in channel_slice (the publisher side).
token.Wait();
}
if (conn_state.subscribe_info) {
DCHECK(!conn_state.subscribe_info->patterns.empty());
auto token = conn_state.subscribe_info->borrow_token;
server_cntx->PUnsubscribeAll(false);
// Check that all borrowers finished processing
token.Wait();
DCHECK(!conn_state.subscribe_info);
}
}
server_family_.OnClose(server_cntx);
}
string Service::GetContextInfo(facade::ConnectionContext* cntx) {
char buf[16] = {0};
unsigned index = 0;
if (cntx->async_dispatch)
buf[index++] = 'a';
if (cntx->conn_closing)
buf[index++] = 't';
return index ? absl::StrCat("flags:", buf) : string();
}
using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx); using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx);
#define HFUNC(x) SetHandler(&Service::x) #define HFUNC(x) SetHandler(&Service::x)

View file

@ -82,6 +82,8 @@ class Service : public facade::ServiceInterface {
GlobalState SwitchState(GlobalState from , GlobalState to); GlobalState SwitchState(GlobalState from , GlobalState to);
void ConfigureHttpHandlers(util::HttpListenerBase* base) final; void ConfigureHttpHandlers(util::HttpListenerBase* base) final;
void OnClose(facade::ConnectionContext* cntx) final;
std::string GetContextInfo(facade::ConnectionContext* cntx) final;
private: private:
static void Quit(CmdArgList args, ConnectionContext* cntx); static void Quit(CmdArgList args, ConnectionContext* cntx);

View file

@ -1004,9 +1004,8 @@ error_code RdbLoader::Load(io::Source* src) {
bc.Wait(); // wait for sentinels to report. bc.Wait(); // wait for sentinels to report.
absl::Duration dur = absl::Now() - start; absl::Duration dur = absl::Now() - start;
double seconds = double(absl::ToInt64Milliseconds(dur)) / 1000; load_time_ = double(absl::ToInt64Milliseconds(dur)) / 1000;
LOG(INFO) << "Done loading RDB, keys loaded: " << keys_loaded; keys_loaded_ = keys_loaded;
LOG(INFO) << "Loading finished after " << strings::HumanReadableElapsedTime(seconds);
return kOk; return kOk;
} }

View file

@ -34,10 +34,20 @@ class RdbLoader {
::io::Bytes Leftover() const { ::io::Bytes Leftover() const {
return mem_buf_.InputBuffer(); return mem_buf_.InputBuffer();
} }
size_t bytes_read() const { size_t bytes_read() const {
return bytes_read_; return bytes_read_;
} }
size_t keys_loaded() const {
return keys_loaded_;
}
// returns time in seconds.
double load_time() const {
return load_time_;
}
private: private:
using MutableBytes = ::io::MutableBytes; using MutableBytes = ::io::MutableBytes;
struct ObjSettings; struct ObjSettings;
@ -49,8 +59,8 @@ class RdbLoader {
struct LoadTrace; struct LoadTrace;
using RdbVariant = std::variant<long long, base::PODArray<char>, LzfString, using RdbVariant =
std::unique_ptr<LoadTrace>>; std::variant<long long, base::PODArray<char>, LzfString, std::unique_ptr<LoadTrace>>;
struct OpaqueObj { struct OpaqueObj {
RdbVariant obj; RdbVariant obj;
int rdb_type; int rdb_type;
@ -164,6 +174,9 @@ class RdbLoader {
::io::Source* src_ = nullptr; ::io::Source* src_ = nullptr;
size_t bytes_read_ = 0; size_t bytes_read_ = 0;
size_t source_limit_ = SIZE_MAX; size_t source_limit_ = SIZE_MAX;
size_t keys_loaded_ = 0;
double load_time_ = 0;
DbIndex cur_db_index_ = 0; DbIndex cur_db_index_ = 0;
::boost::fibers::mutex mu_; ::boost::fibers::mutex mu_;

View file

@ -2,13 +2,14 @@
// See LICENSE for licensing terms. // See LICENSE for licensing terms.
// //
#include "core/string_set.h"
#include "server/rdb_save.h" #include "server/rdb_save.h"
#include <absl/cleanup/cleanup.h> #include <absl/cleanup/cleanup.h>
#include <absl/strings/str_cat.h> #include <absl/strings/str_cat.h>
#include <absl/strings/str_format.h> #include <absl/strings/str_format.h>
#include "core/string_set.h"
extern "C" { extern "C" {
#include "redis/intset.h" #include "redis/intset.h"
#include "redis/listpack.h" #include "redis/listpack.h"
@ -159,10 +160,6 @@ constexpr size_t kAmask = 4_KB - 1;
RdbSerializer::RdbSerializer(io::Sink* s) : sink_(s), mem_buf_{4_KB}, tmp_buf_(nullptr) { RdbSerializer::RdbSerializer(io::Sink* s) : sink_(s), mem_buf_{4_KB}, tmp_buf_(nullptr) {
} }
RdbSerializer::RdbSerializer(AlignedBuffer* aligned_buf) : RdbSerializer((io::Sink*)nullptr) {
aligned_buf_ = aligned_buf;
}
RdbSerializer::~RdbSerializer() { RdbSerializer::~RdbSerializer() {
} }
@ -311,7 +308,7 @@ error_code RdbSerializer::SaveSetObject(const PrimeValue& obj) {
RETURN_ON_ERR(SaveString(string_view{ele, sdslen(ele)})); RETURN_ON_ERR(SaveString(string_view{ele, sdslen(ele)}));
} }
} else if (obj.Encoding() == kEncodingStrMap2) { } else if (obj.Encoding() == kEncodingStrMap2) {
StringSet *set = (StringSet*)obj.RObjPtr(); StringSet* set = (StringSet*)obj.RObjPtr();
RETURN_ON_ERR(SaveLen(set->Size())); RETURN_ON_ERR(SaveLen(set->Size()));
@ -593,22 +590,14 @@ error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {
io::Bytes ib = mem_buf_.InputBuffer(); io::Bytes ib = mem_buf_.InputBuffer();
if (ib.empty()) { if (ib.empty()) {
if (sink_) { return sink_->Write(buf);
return sink_->Write(buf);
} else {
return aligned_buf_->Write(buf);
}
} else {
if (sink_) {
iovec v[2] = {{.iov_base = const_cast<uint8_t*>(ib.data()), .iov_len = ib.size()},
{.iov_base = const_cast<uint8_t*>(buf.data()), .iov_len = buf.size()}};
RETURN_ON_ERR(sink_->Write(v, ABSL_ARRAYSIZE(v)));
} else {
RETURN_ON_ERR(aligned_buf_->Write(ib));
RETURN_ON_ERR(aligned_buf_->Write(buf));
}
mem_buf_.ConsumeInput(ib.size());
} }
// else
iovec v[2] = {{.iov_base = const_cast<uint8_t*>(ib.data()), .iov_len = ib.size()},
{.iov_base = const_cast<uint8_t*>(buf.data()), .iov_len = buf.size()}};
RETURN_ON_ERR(sink_->Write(v, ABSL_ARRAYSIZE(v)));
mem_buf_.ConsumeInput(ib.size());
return error_code{}; return error_code{};
} }
@ -620,11 +609,7 @@ error_code RdbSerializer::FlushMem() {
DVLOG(2) << "FlushMem " << sz << " bytes"; DVLOG(2) << "FlushMem " << sz << " bytes";
// interrupt point. // interrupt point.
if (sink_) { RETURN_ON_ERR(sink_->Write(mem_buf_.InputBuffer()));
RETURN_ON_ERR(sink_->Write(mem_buf_.InputBuffer()));
} else {
RETURN_ON_ERR(aligned_buf_->Write(mem_buf_.InputBuffer()));
}
mem_buf_.ConsumeInput(sz); mem_buf_.ConsumeInput(sz);
return error_code{}; return error_code{};
@ -705,37 +690,37 @@ AlignedBuffer::~AlignedBuffer() {
mi_free(aligned_buf_); mi_free(aligned_buf_);
} }
// TODO: maybe to derive AlignedBuffer from Sink? io::Result<size_t> AlignedBuffer::WriteSome(const iovec* v, uint32_t len) {
std::error_code AlignedBuffer::Write(io::Bytes record) { size_t total_len = 0;
if (buf_offs_ + record.size() < capacity_) { uint32_t vindx = 0;
memcpy(aligned_buf_ + buf_offs_, record.data(), record.size());
buf_offs_ += record.size(); for (; vindx < len; ++vindx) {
return error_code{}; auto item = v[vindx];
total_len += item.iov_len;
while (buf_offs_ + item.iov_len > capacity_) {
size_t to_write = capacity_ - buf_offs_;
memcpy(aligned_buf_ + buf_offs_, item.iov_base, to_write);
iovec ivec{.iov_base = aligned_buf_, .iov_len = capacity_};
error_code ec = upstream_->Write(&ivec, 1);
if (ec)
return nonstd::make_unexpected(ec);
item.iov_len -= to_write;
item.iov_base = reinterpret_cast<char*>(item.iov_base) + to_write;
buf_offs_ = 0;
}
DCHECK_GT(item.iov_len, 0u);
memcpy(aligned_buf_ + buf_offs_, item.iov_base, item.iov_len);
buf_offs_ += item.iov_len;
} }
memcpy(aligned_buf_ + buf_offs_, record.data(), capacity_ - buf_offs_); return total_len;
size_t record_offs = capacity_ - buf_offs_;
buf_offs_ = 0;
size_t needed;
do {
iovec ivec{.iov_base = aligned_buf_, .iov_len = capacity_};
RETURN_ON_ERR(upstream_->Write(&ivec, 1));
needed = record.size() - record_offs;
if (needed < capacity_)
break;
memcpy(aligned_buf_, record.data() + record_offs, capacity_);
record_offs += capacity_;
} while (true);
if (needed) {
memcpy(aligned_buf_, record.data() + record_offs, needed);
buf_offs_ = needed;
}
return error_code{};
} }
// Note that it may write more than AlignedBuffer has at this point since it rounds up the length
// to the nearest page boundary.
error_code AlignedBuffer::Flush() { error_code AlignedBuffer::Flush() {
size_t len = (buf_offs_ + kAmask) & (~kAmask); size_t len = (buf_offs_ + kAmask) & (~kAmask);
iovec ivec{.iov_base = aligned_buf_, .iov_len = len}; iovec ivec{.iov_base = aligned_buf_, .iov_len = len};
@ -748,7 +733,7 @@ class RdbSaver::Impl {
public: public:
// We pass K=sz to say how many producers are pushing data in order to maintain // We pass K=sz to say how many producers are pushing data in order to maintain
// correct closing semantics - channel is closing when K producers marked it as closed. // correct closing semantics - channel is closing when K producers marked it as closed.
Impl(unsigned producers_len, io::Sink* sink); Impl(bool align_writes, unsigned producers_len, io::Sink* sink);
error_code SaveAuxFieldStrStr(string_view key, string_view val); error_code SaveAuxFieldStrStr(string_view key, string_view val);
@ -758,10 +743,13 @@ class RdbSaver::Impl {
error_code ConsumeChannel(); error_code ConsumeChannel();
void StartSnapshotting(EngineShard* shard); void StartSnapshotting(bool include_journal_changes, EngineShard* shard);
error_code Flush() { error_code Flush() {
return aligned_buf_.Flush(); if (aligned_buf_)
return aligned_buf_->Flush();
return error_code{};
} }
size_t Size() const { size_t Size() const {
@ -771,20 +759,23 @@ class RdbSaver::Impl {
void FillFreqMap(RdbTypeFreqMap* dest) const; void FillFreqMap(RdbTypeFreqMap* dest) const;
private: private:
AlignedBuffer aligned_buf_; io::Sink* sink_;
// used for serializing non-body components in the calling fiber. // used for serializing non-body components in the calling fiber.
RdbSerializer meta_serializer_; RdbSerializer meta_serializer_;
vector<unique_ptr<SliceSnapshot>> shard_snapshots_; vector<unique_ptr<SliceSnapshot>> shard_snapshots_;
SliceSnapshot::RecordChannel channel_; SliceSnapshot::RecordChannel channel_;
std::optional<AlignedBuffer> aligned_buf_;
}; };
// We pass K=sz to say how many producers are pushing data in order to maintain // We pass K=sz to say how many producers are pushing data in order to maintain
// correct closing semantics - channel is closing when K producers marked it as closed. // correct closing semantics - channel is closing when K producers marked it as closed.
RdbSaver::Impl::Impl(unsigned producers_len, io::Sink* sink) RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, io::Sink* sink)
: aligned_buf_(kBufLen, sink), meta_serializer_(&aligned_buf_), : sink_(sink), meta_serializer_(sink),
shard_snapshots_(producers_len), channel_{128, producers_len} { shard_snapshots_(producers_len), channel_{128, producers_len} {
if (align_writes) {
aligned_buf_.emplace(kBufLen, sink);
meta_serializer_.set_sink(&aligned_buf_.value());
}
} }
error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val) { error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val) {
@ -799,8 +790,6 @@ error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val)
error_code RdbSaver::Impl::ConsumeChannel() { error_code RdbSaver::Impl::ConsumeChannel() {
error_code io_error; error_code io_error;
// we can not exit on io-error since we spawn fibers that push data.
// TODO: we may signal them to stop processing and exit asap in case of the error.
uint8_t buf[16]; uint8_t buf[16];
size_t channel_bytes = 0; size_t channel_bytes = 0;
SliceSnapshot::DbRecord record; SliceSnapshot::DbRecord record;
@ -808,6 +797,9 @@ error_code RdbSaver::Impl::ConsumeChannel() {
buf[0] = RDB_OPCODE_SELECTDB; buf[0] = RDB_OPCODE_SELECTDB;
// we can not exit on io-error since we spawn fibers that push data.
// TODO: we may signal them to stop processing and exit asap in case of the error.
auto& channel = channel_; auto& channel = channel_;
while (channel.Pop(record)) { while (channel.Pop(record)) {
if (io_error) if (io_error)
@ -816,9 +808,13 @@ error_code RdbSaver::Impl::ConsumeChannel() {
do { do {
if (record.db_index != last_db_index) { if (record.db_index != last_db_index) {
unsigned enclen = SerializeLen(record.db_index, buf + 1); unsigned enclen = SerializeLen(record.db_index, buf + 1);
char* str = (char*)buf; string_view str{(char*)buf, enclen + 1};
io_error = aligned_buf_.Write(string_view{str, enclen + 1}); if (aligned_buf_) {
io_error = aligned_buf_->Write(str);
} else {
io_error = sink_->Write(io::Buffer(str));
}
if (io_error) if (io_error)
break; break;
last_db_index = record.db_index; last_db_index = record.db_index;
@ -826,7 +822,12 @@ error_code RdbSaver::Impl::ConsumeChannel() {
DVLOG(2) << "Pulled " << record.id; DVLOG(2) << "Pulled " << record.id;
channel_bytes += record.value.size(); channel_bytes += record.value.size();
io_error = aligned_buf_.Write(record.value);
if (aligned_buf_) {
io_error = aligned_buf_->Write(record.value);
} else {
io_error = sink_->Write(io::Buffer(record.value));
}
record.value.clear(); record.value.clear();
} while (!io_error && channel.TryPop(record)); } while (!io_error && channel.TryPop(record));
} // while (channel.pop) } // while (channel.pop)
@ -844,10 +845,10 @@ error_code RdbSaver::Impl::ConsumeChannel() {
return io_error; return io_error;
} }
void RdbSaver::Impl::StartSnapshotting(EngineShard* shard) { void RdbSaver::Impl::StartSnapshotting(bool include_journal_changes, EngineShard* shard) {
auto s = make_unique<SliceSnapshot>(&shard->db_slice(), &channel_); auto s = make_unique<SliceSnapshot>(&shard->db_slice(), &channel_);
s->Start(); s->Start(include_journal_changes);
// For single shard configuration, we maintain only one snapshot, // For single shard configuration, we maintain only one snapshot,
// so we do not have to map it via shard_id. // so we do not have to map it via shard_id.
@ -863,10 +864,10 @@ void RdbSaver::Impl::FillFreqMap(RdbTypeFreqMap* dest) const {
} }
} }
RdbSaver::RdbSaver(::io::Sink* sink, bool single_shard) { RdbSaver::RdbSaver(::io::Sink* sink, bool single_shard, bool align_writes) {
CHECK_NOTNULL(sink); CHECK_NOTNULL(sink);
impl_.reset(new Impl(single_shard ? 1 : shard_set->size(), sink)); impl_.reset(new Impl(align_writes, single_shard ? 1 : shard_set->size(), sink));
} }
RdbSaver::~RdbSaver() { RdbSaver::~RdbSaver() {
@ -904,8 +905,8 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
return error_code{}; return error_code{};
} }
void RdbSaver::StartSnapshotInShard(EngineShard* shard) { void RdbSaver::StartSnapshotInShard(bool include_journal_changes, EngineShard* shard) {
impl_->StartSnapshotting(shard); impl_->StartSnapshotting(include_journal_changes, shard);
} }
error_code RdbSaver::SaveAux(const StringVec& lua_scripts) { error_code RdbSaver::SaveAux(const StringVec& lua_scripts) {

View file

@ -40,21 +40,24 @@ class LinuxWriteWrapper : public io::Sink {
off_t offset_ = 0; off_t offset_ = 0;
}; };
class AlignedBuffer { class AlignedBuffer : public ::io::Sink {
public: public:
using io::Sink::Write;
AlignedBuffer(size_t cap, ::io::Sink* upstream); AlignedBuffer(size_t cap, ::io::Sink* upstream);
~AlignedBuffer(); ~AlignedBuffer();
// TODO: maybe to derive AlignedBuffer from Sink?
std::error_code Write(std::string_view buf) { std::error_code Write(std::string_view buf) {
return Write(io::Buffer(buf)); return Write(io::Buffer(buf));
} }
std::error_code Write(io::Bytes buf); io::Result<size_t> WriteSome(const iovec* v, uint32_t len) final;
std::error_code Flush(); std::error_code Flush();
::io::Sink* upstream() { return upstream_;} ::io::Sink* upstream() {
return upstream_;
}
private: private:
size_t capacity_; size_t capacity_;
@ -70,7 +73,9 @@ class RdbSaver {
// to snapshot all the datastore shards. // to snapshot all the datastore shards.
// single_shard - false, means we capture all the data using a single RdbSaver instance // single_shard - false, means we capture all the data using a single RdbSaver instance
// (corresponds to legacy, redis compatible mode) // (corresponds to legacy, redis compatible mode)
explicit RdbSaver(::io::Sink* sink, bool single_shard); // if align_writes is true - writes data in aligned chunks of 4KB to fit direct I/O requirements.
explicit RdbSaver(::io::Sink* sink, bool single_shard, bool align_writes);
~RdbSaver(); ~RdbSaver();
std::error_code SaveHeader(const StringVec& lua_scripts); std::error_code SaveHeader(const StringVec& lua_scripts);
@ -81,7 +86,8 @@ class RdbSaver {
std::error_code SaveBody(RdbTypeFreqMap* freq_map); std::error_code SaveBody(RdbTypeFreqMap* freq_map);
// Initiates the serialization in the shard's thread. // Initiates the serialization in the shard's thread.
void StartSnapshotInShard(EngineShard* shard); // TODO: to implement break functionality to allow stopping early.
void StartSnapshotInShard(bool include_journal_changes, EngineShard* shard);
private: private:
class Impl; class Impl;
@ -94,13 +100,12 @@ class RdbSaver {
std::unique_ptr<Impl> impl_; std::unique_ptr<Impl> impl_;
}; };
// TODO: it does not make sense that RdbSerializer will buffer into unaligned
// mem_buf_ and then flush it into the next level. We should probably use AlignedBuffer
// directly.
class RdbSerializer { class RdbSerializer {
public: public:
// TODO: for aligned cased, it does not make sense that RdbSerializer buffers into unaligned
// mem_buf_ and then flush it into the next level. We should probably use AlignedBuffer
// directly.
RdbSerializer(::io::Sink* s); RdbSerializer(::io::Sink* s);
RdbSerializer(AlignedBuffer* aligned_buf);
~RdbSerializer(); ~RdbSerializer();
@ -117,6 +122,7 @@ class RdbSerializer {
// Must be called in the thread to which `it` belongs. // Must be called in the thread to which `it` belongs.
// Returns the serialized rdb_type or the error. // Returns the serialized rdb_type or the error.
// expire_ms = 0 means no expiry.
io::Result<uint8_t> SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms); io::Result<uint8_t> SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms);
std::error_code WriteRaw(const ::io::Bytes& buf); std::error_code WriteRaw(const ::io::Bytes& buf);
std::error_code SaveString(std::string_view val); std::error_code SaveString(std::string_view val);
@ -143,8 +149,7 @@ class RdbSerializer {
std::error_code SaveStreamPEL(rax* pel, bool nacks); std::error_code SaveStreamPEL(rax* pel, bool nacks);
std::error_code SaveStreamConsumers(streamCG* cg); std::error_code SaveStreamConsumers(streamCG* cg);
::io::Sink* sink_ = nullptr; ::io::Sink* sink_;
AlignedBuffer* aligned_buf_ = nullptr;
std::unique_ptr<LZF_HSLOT[]> lzf_; std::unique_ptr<LZF_HSLOT[]> lzf_;
base::IoBuf mem_buf_; base::IoBuf mem_buf_;

View file

@ -8,7 +8,6 @@
#include <absl/random/random.h> // for master_id_ generation. #include <absl/random/random.h> // for master_id_ generation.
#include <absl/strings/match.h> #include <absl/strings/match.h>
#include <absl/strings/str_join.h> #include <absl/strings/str_join.h>
#include <sys/resource.h> #include <sys/resource.h>
#include <chrono> #include <chrono>
@ -71,6 +70,8 @@ using util::http::StringResponse;
namespace { namespace {
const auto kRdbWriteFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT;
using EngineFunc = void (ServerFamily::*)(CmdArgList args, ConnectionContext* cntx); using EngineFunc = void (ServerFamily::*)(CmdArgList args, ConnectionContext* cntx);
inline CommandId::Handler HandlerFunc(ServerFamily* se, EngineFunc f) { inline CommandId::Handler HandlerFunc(ServerFamily* se, EngineFunc f) {
@ -152,7 +153,7 @@ bool IsValidSaveScheduleNibble(string_view time, unsigned int max) {
class RdbSnapshot { class RdbSnapshot {
public: public:
RdbSnapshot(bool single_shard, uring::LinuxFile* fl) RdbSnapshot(bool single_shard, uring::LinuxFile* fl)
: file_(fl), linux_sink_(fl), saver_(&linux_sink_, single_shard) { : file_(fl), linux_sink_(fl), saver_(&linux_sink_, single_shard, kRdbWriteFlags & O_DIRECT) {
} }
error_code Start(const StringVec& lua_scripts); error_code Start(const StringVec& lua_scripts);
@ -191,7 +192,7 @@ error_code RdbSnapshot::Close() {
} }
void RdbSnapshot::StartInShard(EngineShard* shard) { void RdbSnapshot::StartInShard(EngineShard* shard) {
saver_.StartSnapshotInShard(shard); saver_.StartSnapshotInShard(false, shard);
started_ = true; started_ = true;
} }
@ -279,19 +280,9 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) {
journal_.reset(new journal::Journal); journal_.reset(new journal::Journal);
{ {
// TODO: if we start using random generator in more places, we should probably
// refactor this code.
absl::InsecureBitGen eng; absl::InsecureBitGen eng;
absl::uniform_int_distribution<uint32_t> ud; master_id_ = GetRandomHex(eng, CONFIG_RUN_ID_SIZE);
DCHECK_EQ(CONFIG_RUN_ID_SIZE, master_id_.size());
absl::AlphaNum a1(absl::Hex(eng(), absl::kZeroPad16));
absl::AlphaNum a2(absl::Hex(eng(), absl::kZeroPad16));
absl::AlphaNum a3(absl::Hex(ud(eng), absl::kZeroPad8));
absl::StrAppend(&master_id_, a1, a2, a3);
size_t constexpr kConfigRunIdSize = CONFIG_RUN_ID_SIZE;
DCHECK_EQ(kConfigRunIdSize, master_id_.size());
} }
} }
@ -472,6 +463,11 @@ error_code ServerFamily::LoadRdb(const std::string& rdb_file) {
RdbLoader loader(script_mgr()); RdbLoader loader(script_mgr());
ec = loader.Load(&fs); ec = loader.Load(&fs);
if (!ec) {
LOG(INFO) << "Done loading RDB, keys loaded: " << loader.keys_loaded();
LOG(INFO) << "Loading finished after "
<< strings::HumanReadableElapsedTime(loader.load_time());
}
} else { } else {
ec = res.error(); ec = res.error();
} }
@ -556,8 +552,8 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
&resp->body()); &resp->body());
AppendMetricWithoutLabels("memory_used_peak_bytes", "", used_mem_peak.load(memory_order_relaxed), AppendMetricWithoutLabels("memory_used_peak_bytes", "", used_mem_peak.load(memory_order_relaxed),
MetricType::GAUGE, &resp->body()); MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("comitted_memory", "", GetMallocCurrentCommitted(), AppendMetricWithoutLabels("comitted_memory", "", GetMallocCurrentCommitted(), MetricType::GAUGE,
MetricType::GAUGE, &resp->body()); &resp->body());
AppendMetricWithoutLabels("memory_max_bytes", "", max_memory_limit, MetricType::GAUGE, AppendMetricWithoutLabels("memory_max_bytes", "", max_memory_limit, MetricType::GAUGE,
&resp->body()); &resp->body());
@ -618,6 +614,10 @@ void ServerFamily::PauseReplication(bool pause) {
} }
} }
void ServerFamily::OnClose(ConnectionContext* cntx) {
dfly_cmd_->OnClose(cntx);
}
void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext* cntx) { void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext* cntx) {
if (!section.empty()) { if (!section.empty()) {
return cntx->reply_builder()->SendError(""); return cntx->reply_builder()->SendError("");
@ -697,7 +697,6 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
service_.SwitchState(GlobalState::SAVING, GlobalState::ACTIVE); service_.SwitchState(GlobalState::SAVING, GlobalState::ACTIVE);
}; };
const auto kFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT;
auto start = absl::Now(); auto start = absl::Now();
shared_ptr<LastSaveInfo> save_info; shared_ptr<LastSaveInfo> save_info;
StringVec lua_scripts = script_mgr_->GetLuaScripts(); StringVec lua_scripts = script_mgr_->GetLuaScripts();
@ -745,7 +744,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
abs_path += shard_file; abs_path += shard_file;
VLOG(1) << "Saving to " << abs_path; VLOG(1) << "Saving to " << abs_path;
auto res = uring::OpenLinux(abs_path.generic_string(), kFlags, 0666); auto res = uring::OpenLinux(abs_path.generic_string(), kRdbWriteFlags, 0666);
if (res) { if (res) {
snapshots[sid].reset(new RdbSnapshot{true, res.value().release()}); snapshots[sid].reset(new RdbSnapshot{true, res.value().release()});
@ -773,7 +772,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
ExtendFilename(now, -1, &filename); ExtendFilename(now, -1, &filename);
path += filename; path += filename;
auto res = uring::OpenLinux(path.generic_string(), kFlags, 0666); auto res = uring::OpenLinux(path.generic_string(), kRdbWriteFlags, 0666);
if (!res) { if (!res) {
return res.error(); return res.error();
} }
@ -862,6 +861,10 @@ void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendLong(num_keys.load(memory_order_relaxed)); return (*cntx)->SendLong(num_keys.load(memory_order_relaxed));
} }
void ServerFamily::BreakOnShutdown() {
dfly_cmd_->BreakOnShutdown();
}
void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) {
DCHECK(cntx->transaction); DCHECK(cntx->transaction);
DoFlush(cntx->transaction, cntx->transaction->db_index()); DoFlush(cntx->transaction, cntx->transaction->db_index());
@ -910,7 +913,9 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
if (sub_cmd == "SETNAME" && args.size() == 3) { if (sub_cmd == "SETNAME" && args.size() == 3) {
cntx->owner()->SetName(ArgS(args, 2)); cntx->owner()->SetName(ArgS(args, 2));
return (*cntx)->SendOk(); return (*cntx)->SendOk();
} else if (sub_cmd == "LIST") { }
if (sub_cmd == "LIST") {
vector<string> client_info; vector<string> client_info;
fibers::mutex mu; fibers::mutex mu;
auto cb = [&](util::Connection* conn) { auto cb = [&](util::Connection* conn) {
@ -1377,14 +1382,16 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
if (cmd == "CAPA") { if (cmd == "CAPA") {
if (arg == "dragonfly" && args.size() == 3 && i == 1) { if (arg == "dragonfly" && args.size() == 3 && i == 1) {
uint32_t sid = dfly_cmd_->AllocateSyncSession(); uint32_t sid = dfly_cmd_->AllocateSyncSession();
cntx->owner()->SetName(absl::StrCat("repl_ctrl_", sid));
string sync_id = absl::StrCat("SYNC", sid); string sync_id = absl::StrCat("SYNC", sid);
cntx->conn_state.sync_session_id = sid; cntx->conn_state.repl_session_id = sid;
// The response for 'capa dragonfly' is: <masterid> <syncid> <numthreads> // The response for 'capa dragonfly' is: <masterid> <syncid> <numthreads>
(*cntx)->StartArray(3); (*cntx)->StartArray(3);
(*cntx)->SendSimpleString(master_id_); (*cntx)->SendSimpleString(master_id_);
(*cntx)->SendSimpleString(sync_id); (*cntx)->SendSimpleString(sync_id);
(*cntx)->SendLong(shard_set->size()); (*cntx)->SendLong(shard_set->pool()->size());
return; return;
} }
} else { } else {
@ -1487,8 +1494,10 @@ void ServerFamily::Register(CommandRegistry* registry) {
<< CI{"REPLICAOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf) << CI{"REPLICAOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf)
<< CI{"REPLCONF", CO::ADMIN | CO::LOADING, -1, 0, 0, 0}.HFUNC(ReplConf) << CI{"REPLCONF", CO::ADMIN | CO::LOADING, -1, 0, 0, 0}.HFUNC(ReplConf)
<< CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0}.HFUNC(Role) << CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0}.HFUNC(Role)
<< CI{"SYNC", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Sync) // We won't support DF->REDIS replication for now, hence we do not need to support
<< CI{"PSYNC", CO::ADMIN | CO::GLOBAL_TRANS, 3, 0, 0, 0}.HFUNC(Psync) // these commands.
// << CI{"SYNC", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Sync)
// << CI{"PSYNC", CO::ADMIN | CO::GLOBAL_TRANS, 3, 0, 0, 0}.HFUNC(Psync)
<< CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script) << CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script)
<< CI{"DFLY", CO::ADMIN | CO::GLOBAL_TRANS, -2, 0, 0, 0}.HFUNC(Dfly); << CI{"DFLY", CO::ADMIN | CO::GLOBAL_TRANS, -2, 0, 0, 0}.HFUNC(Dfly);
} }

View file

@ -102,6 +102,10 @@ class ServerFamily {
return journal_.get(); return journal_.get();
} }
void OnClose(ConnectionContext* cntx);
void BreakOnShutdown();
private: private:
uint32_t shard_count() const { uint32_t shard_count() const {
return shard_set->size(); return shard_set->size();

View file

@ -22,8 +22,9 @@ class Journal;
// Present in every server thread. This class differs from EngineShard. The latter manages // Present in every server thread. This class differs from EngineShard. The latter manages
// state around engine shards while the former represents coordinator/connection state. // state around engine shards while the former represents coordinator/connection state.
// There may be threads that handle engine shards but not IO, there may be threads that handle IO // There may be threads that handle engine shards but not IO, there may be threads that handle IO
// but not engine shards and there can be threads that handle both. This class is present only // but not engine shards and there can be threads that handle both.
// for threads that handle IO and manage incoming connections. // Instances of ServerState are present only for threads that handle
// IO and manage incoming connections.
class ServerState { // public struct - to allow initialization. class ServerState { // public struct - to allow initialization.
ServerState(const ServerState&) = delete; ServerState(const ServerState&) = delete;
void operator=(const ServerState&) = delete; void operator=(const ServerState&) = delete;

View file

@ -13,6 +13,8 @@ extern "C" {
#include "base/logging.h" #include "base/logging.h"
#include "server/db_slice.h" #include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/journal/journal.h"
#include "server/rdb_save.h" #include "server/rdb_save.h"
#include "util/fiber_sched_algo.h" #include "util/fiber_sched_algo.h"
#include "util/proactor_base.h" #include "util/proactor_base.h"
@ -25,15 +27,14 @@ using namespace chrono_literals;
namespace this_fiber = ::boost::this_fiber; namespace this_fiber = ::boost::this_fiber;
using boost::fibers::fiber; using boost::fibers::fiber;
SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest) SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest) : db_slice_(slice), dest_(dest) {
: db_slice_(slice), dest_(dest) {
db_array_ = slice->databases(); db_array_ = slice->databases();
} }
SliceSnapshot::~SliceSnapshot() { SliceSnapshot::~SliceSnapshot() {
} }
void SliceSnapshot::Start() { void SliceSnapshot::Start(bool include_journal_changes) {
DCHECK(!fb_.joinable()); DCHECK(!fb_.joinable());
auto on_change = [this](DbIndex db_index, const DbSlice::ChangeReq& req) { auto on_change = [this](DbIndex db_index, const DbSlice::ChangeReq& req) {
@ -42,6 +43,14 @@ void SliceSnapshot::Start() {
snapshot_version_ = db_slice_->RegisterOnChange(move(on_change)); snapshot_version_ = db_slice_->RegisterOnChange(move(on_change));
VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_; VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_;
if (include_journal_changes) {
auto* journal = db_slice_->shard_owner()->journal();
DCHECK(journal);
journal_cb_id_ = journal->RegisterOnChange(
[this](const journal::Entry& e) { OnJournalEntry(e); });
}
sfile_.reset(new io::StringFile); sfile_.reset(new io::StringFile);
rdb_serializer_.reset(new RdbSerializer(sfile_.get())); rdb_serializer_.reset(new RdbSerializer(sfile_.get()));
@ -49,6 +58,8 @@ void SliceSnapshot::Start() {
fb_ = fiber([this] { fb_ = fiber([this] {
FiberFunc(); FiberFunc();
db_slice_->UnregisterOnChange(snapshot_version_); db_slice_->UnregisterOnChange(snapshot_version_);
if (journal_cb_id_)
db_slice_->shard_owner()->journal()->Unregister(journal_cb_id_);
}); });
} }
@ -141,15 +152,8 @@ bool SliceSnapshot::FlushSfile(bool force) {
} }
VLOG(2) << "FlushSfile " << sfile_->val.size() << " bytes"; VLOG(2) << "FlushSfile " << sfile_->val.size() << " bytes";
string tmp = std::move(sfile_->val); // important to move before pushing! DbRecord rec = GetDbRecord(savecb_current_db_, std::move(sfile_->val), num_records_in_blob_);
channel_bytes_ += tmp.size(); num_records_in_blob_ = 0; // We can not move this line after the push, because Push is blocking.
DbRecord rec{.db_index = savecb_current_db_,
.id = rec_id_,
.num_records = num_records_in_blob_,
.value = std::move(tmp)};
DVLOG(2) << "Pushed " << rec_id_;
++rec_id_;
num_records_in_blob_ = 0;
dest_->Push(std::move(rec)); dest_->Push(std::move(rec));
return true; return true;
@ -206,6 +210,32 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
} }
} }
void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) {
CHECK(journal::Op::VAL == entry.opcode);
PrimeKey pkey{entry.key};
if (entry.db_ind == savecb_current_db_) {
++num_records_in_blob_;
io::Result<uint8_t> res =
rdb_serializer_->SaveEntry(pkey, *entry.pval_ptr, entry.expire_ms);
CHECK(res); // we write to StringFile.
} else {
io::StringFile sfile;
RdbSerializer tmp_serializer(&sfile);
io::Result<uint8_t> res =
tmp_serializer.SaveEntry(pkey, *entry.pval_ptr, entry.expire_ms);
CHECK(res); // we write to StringFile.
error_code ec = tmp_serializer.FlushMem();
CHECK(!ec && !sfile.val.empty());
DbRecord rec = GetDbRecord(entry.db_ind, std::move(sfile.val), 1);
dest_->Push(std::move(rec));
}
}
unsigned SliceSnapshot::SerializePhysicalBucket(DbIndex db_index, PrimeTable::bucket_iterator it) { unsigned SliceSnapshot::SerializePhysicalBucket(DbIndex db_index, PrimeTable::bucket_iterator it) {
DCHECK_LT(it.GetVersion(), snapshot_version_); DCHECK_LT(it.GetVersion(), snapshot_version_);
@ -234,17 +264,19 @@ unsigned SliceSnapshot::SerializePhysicalBucket(DbIndex db_index, PrimeTable::bu
error_code ec = tmp_serializer.FlushMem(); error_code ec = tmp_serializer.FlushMem();
CHECK(!ec && !sfile.val.empty()); CHECK(!ec && !sfile.val.empty());
string tmp = std::move(sfile.val); dest_->Push(GetDbRecord(db_index, std::move(sfile.val), result));
channel_bytes_ += tmp.size();
DbRecord rec{
.db_index = db_index, .id = rec_id_, .num_records = result, .value = std::move(tmp)};
DVLOG(2) << "Pushed " << rec_id_;
++rec_id_;
dest_->Push(std::move(rec));
} }
return result; return result;
} }
auto SliceSnapshot::GetDbRecord(DbIndex db_index, std::string value, unsigned num_records)
-> DbRecord {
channel_bytes_ += value.size();
auto id = rec_id_++;
DVLOG(2) << "Pushed " << id;
return DbRecord{
.db_index = db_index, .id = id, .num_records = num_records, .value = std::move(value)};
}
} // namespace dfly } // namespace dfly

View file

@ -13,6 +13,10 @@
namespace dfly { namespace dfly {
namespace journal {
struct Entry;
} // namespace journal
class RdbSerializer; class RdbSerializer;
class SliceSnapshot { class SliceSnapshot {
@ -32,7 +36,7 @@ class SliceSnapshot {
SliceSnapshot(DbSlice* slice, RecordChannel* dest); SliceSnapshot(DbSlice* slice, RecordChannel* dest);
~SliceSnapshot(); ~SliceSnapshot();
void Start(); void Start(bool include_journal_changes);
void Join(); void Join();
uint64_t snapshot_version() const { uint64_t snapshot_version() const {
@ -59,10 +63,12 @@ class SliceSnapshot {
bool SaveCb(PrimeIterator it); bool SaveCb(PrimeIterator it);
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
void OnJournalEntry(const journal::Entry& entry);
// Returns number of entries serialized. // Returns number of entries serialized.
// Updates the version of the bucket to snapshot version. // Updates the version of the bucket to snapshot version.
unsigned SerializePhysicalBucket(DbIndex db_index, PrimeTable::bucket_iterator it); unsigned SerializePhysicalBucket(DbIndex db_index, PrimeTable::bucket_iterator it);
DbRecord GetDbRecord(DbIndex db_index, std::string value, unsigned num_records);
::boost::fibers::fiber fb_; ::boost::fibers::fiber fb_;
@ -82,6 +88,7 @@ class SliceSnapshot {
size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0, savecb_calls_ = 0; size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0, savecb_calls_ = 0;
uint64_t rec_id_ = 0; uint64_t rec_id_ = 0;
uint32_t num_records_in_blob_ = 0; uint32_t num_records_in_blob_ = 0;
uint32_t journal_cb_id_ = 0;
}; };
} // namespace dfly } // namespace dfly

View file

@ -64,9 +64,10 @@ string_view GetSlice(EngineShard* shard, const PrimeValue& pv, string* tmp) {
return pv.GetSlice(tmp); return pv.GetSlice(tmp);
} }
inline void RecordJournal(const OpArgs& op_args, const PrimeKey& pkey, const PrimeKey& pvalue) { inline void RecordJournal(const OpArgs& op_args, string_view key, const PrimeKey& pvalue) {
if (op_args.shard->journal()) { if (op_args.shard->journal()) {
op_args.shard->journal()->RecordEntry(op_args.txid, pkey, pvalue); journal::Entry entry{op_args.db_ind, op_args.txid, key, pvalue};
op_args.shard->journal()->RecordEntry(entry);
} }
} }
@ -104,7 +105,7 @@ OpResult<uint32_t> OpSetRange(const OpArgs& op_args, string_view key, size_t sta
memcpy(s.data() + start, value.data(), value.size()); memcpy(s.data() + start, value.data(), value.size());
it->second.SetString(s); it->second.SetString(s);
db_slice.PostUpdate(op_args.db_ind, it, key, !added); db_slice.PostUpdate(op_args.db_ind, it, key, !added);
RecordJournal(op_args, it->first, it->second); RecordJournal(op_args, key, it->second);
return it->second.Size(); return it->second.Size();
} }
@ -141,8 +142,8 @@ OpResult<string> OpGetRange(const OpArgs& op_args, string_view key, int32_t star
return string(slice.substr(start, end - start + 1)); return string(slice.substr(start, end - start + 1));
}; };
size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key, string_view val, size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key,
bool prepend) { string_view val, bool prepend) {
string tmp, new_val; string tmp, new_val;
auto* shard = op_args.shard; auto* shard = op_args.shard;
string_view slice = GetSlice(shard, it->second, &tmp); string_view slice = GetSlice(shard, it->second, &tmp);
@ -155,7 +156,7 @@ size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key,
db_slice.PreUpdate(op_args.db_ind, it); db_slice.PreUpdate(op_args.db_ind, it);
it->second.SetString(new_val); it->second.SetString(new_val);
db_slice.PostUpdate(op_args.db_ind, it, key, true); db_slice.PostUpdate(op_args.db_ind, it, key, true);
RecordJournal(op_args, it->first, it->second); RecordJournal(op_args, key, it->second);
return new_val.size(); return new_val.size();
} }
@ -169,7 +170,7 @@ OpResult<uint32_t> ExtendOrSet(const OpArgs& op_args, string_view key, string_vi
if (inserted) { if (inserted) {
it->second.SetString(val); it->second.SetString(val);
db_slice.PostUpdate(op_args.db_ind, it, key, false); db_slice.PostUpdate(op_args.db_ind, it, key, false);
RecordJournal(op_args, it->first, it->second); RecordJournal(op_args, key, it->second);
return val.size(); return val.size();
} }
@ -180,7 +181,7 @@ OpResult<uint32_t> ExtendOrSet(const OpArgs& op_args, string_view key, string_vi
return ExtendExisting(op_args, it, key, val, prepend); return ExtendExisting(op_args, it, key, val, prepend);
} }
OpResult<bool> ExtendOrSkip(const OpArgs& op_args, std::string_view key, std::string_view val, OpResult<bool> ExtendOrSkip(const OpArgs& op_args, string_view key, string_view val,
bool prepend) { bool prepend) {
auto& db_slice = op_args.shard->db_slice(); auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING); OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING);
@ -201,7 +202,7 @@ OpResult<string> OpGet(const OpArgs& op_args, string_view key) {
return GetString(op_args.shard, pv); return GetString(op_args.shard, pv);
} }
OpResult<double> OpIncrFloat(const OpArgs& op_args, std::string_view key, double val) { OpResult<double> OpIncrFloat(const OpArgs& op_args, string_view key, double val) {
auto& db_slice = op_args.shard->db_slice(); auto& db_slice = op_args.shard->db_slice();
auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
@ -211,7 +212,7 @@ OpResult<double> OpIncrFloat(const OpArgs& op_args, std::string_view key, double
char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf)); char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf));
it->second.SetString(str); it->second.SetString(str);
db_slice.PostUpdate(op_args.db_ind, it, key, false); db_slice.PostUpdate(op_args.db_ind, it, key, false);
RecordJournal(op_args, it->first, it->second); RecordJournal(op_args, key, it->second);
return val; return val;
} }
@ -243,13 +244,13 @@ OpResult<double> OpIncrFloat(const OpArgs& op_args, std::string_view key, double
db_slice.PreUpdate(op_args.db_ind, it); db_slice.PreUpdate(op_args.db_ind, it);
it->second.SetString(str); it->second.SetString(str);
db_slice.PostUpdate(op_args.db_ind, it, key, true); db_slice.PostUpdate(op_args.db_ind, it, key, true);
RecordJournal(op_args, it->first, it->second); RecordJournal(op_args, key, it->second);
return base; return base;
} }
// if skip_on_missing - returns KEY_NOTFOUND. // if skip_on_missing - returns KEY_NOTFOUND.
OpResult<int64_t> OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t incr, OpResult<int64_t> OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr,
bool skip_on_missing) { bool skip_on_missing) {
auto& db_slice = op_args.shard->db_slice(); auto& db_slice = op_args.shard->db_slice();
@ -270,7 +271,7 @@ OpResult<int64_t> OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t
return OpStatus::OUT_OF_MEMORY; return OpStatus::OUT_OF_MEMORY;
} }
RecordJournal(op_args, it->first, it->second); RecordJournal(op_args, key, it->second);
return incr; return incr;
} }
@ -295,7 +296,7 @@ OpResult<int64_t> OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t
db_slice.PreUpdate(op_args.db_ind, it); db_slice.PreUpdate(op_args.db_ind, it);
it->second.SetInt(new_val); it->second.SetInt(new_val);
db_slice.PostUpdate(op_args.db_ind, it, key); db_slice.PostUpdate(op_args.db_ind, it, key);
RecordJournal(op_args, it->first, it->second); RecordJournal(op_args, key, it->second);
return new_val; return new_val;
} }
@ -393,7 +394,7 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value
} }
} }
RecordJournal(op_args_, it->first, it->second); RecordJournal(op_args_, key, it->second);
return OpStatus::OK; return OpStatus::OK;
} }
@ -447,7 +448,7 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt
} }
db_slice.PostUpdate(params.db_index, it, key); db_slice.PostUpdate(params.db_index, it, key);
RecordJournal(op_args_, it->first, it->second); RecordJournal(op_args_, key, it->second);
return OpStatus::OK; return OpStatus::OK;
} }
@ -572,7 +573,7 @@ void StringFamily::SetNx(CmdArgList args, ConnectionContext* cntx) {
void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) { void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
get_qps.Inc(); get_qps.Inc();
std::string_view key = ArgS(args, 1); string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) { return OpGet(t->GetOpArgs(shard), key); }; auto cb = [&](Transaction* t, EngineShard* shard) { return OpGet(t->GetOpArgs(shard), key); };
@ -596,8 +597,8 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
} }
void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) { void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1); string_view key = ArgS(args, 1);
std::string_view value = ArgS(args, 2); string_view value = ArgS(args, 2);
std::optional<string> prev_val; std::optional<string> prev_val;
SetCmd::SetParams sparams{cntx->db_index()}; SetCmd::SetParams sparams{cntx->db_index()};
@ -624,15 +625,15 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) {
} }
void StringFamily::Incr(CmdArgList args, ConnectionContext* cntx) { void StringFamily::Incr(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1); string_view key = ArgS(args, 1);
return IncrByGeneric(key, 1, cntx); return IncrByGeneric(key, 1, cntx);
} }
void StringFamily::IncrBy(CmdArgList args, ConnectionContext* cntx) { void StringFamily::IncrBy(CmdArgList args, ConnectionContext* cntx) {
DCHECK_EQ(3u, args.size()); DCHECK_EQ(3u, args.size());
std::string_view key = ArgS(args, 1); string_view key = ArgS(args, 1);
std::string_view sval = ArgS(args, 2); string_view sval = ArgS(args, 2);
int64_t val; int64_t val;
if (!absl::SimpleAtoi(sval, &val)) { if (!absl::SimpleAtoi(sval, &val)) {
@ -642,8 +643,8 @@ void StringFamily::IncrBy(CmdArgList args, ConnectionContext* cntx) {
} }
void StringFamily::IncrByFloat(CmdArgList args, ConnectionContext* cntx) { void StringFamily::IncrByFloat(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1); string_view key = ArgS(args, 1);
std::string_view sval = ArgS(args, 2); string_view sval = ArgS(args, 2);
double val; double val;
if (!absl::SimpleAtod(sval, &val)) { if (!absl::SimpleAtod(sval, &val)) {
@ -666,13 +667,13 @@ void StringFamily::IncrByFloat(CmdArgList args, ConnectionContext* cntx) {
} }
void StringFamily::Decr(CmdArgList args, ConnectionContext* cntx) { void StringFamily::Decr(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1); string_view key = ArgS(args, 1);
return IncrByGeneric(key, -1, cntx); return IncrByGeneric(key, -1, cntx);
} }
void StringFamily::DecrBy(CmdArgList args, ConnectionContext* cntx) { void StringFamily::DecrBy(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1); string_view key = ArgS(args, 1);
std::string_view sval = ArgS(args, 2); string_view sval = ArgS(args, 2);
int64_t val; int64_t val;
if (!absl::SimpleAtoi(sval, &val)) { if (!absl::SimpleAtoi(sval, &val)) {
@ -693,7 +694,7 @@ void StringFamily::Prepend(CmdArgList args, ConnectionContext* cntx) {
ExtendGeneric(std::move(args), true, cntx); ExtendGeneric(std::move(args), true, cntx);
} }
void StringFamily::IncrByGeneric(std::string_view key, int64_t val, ConnectionContext* cntx) { void StringFamily::IncrByGeneric(string_view key, int64_t val, ConnectionContext* cntx) {
bool skip_on_missing = cntx->protocol() == Protocol::MEMCACHE; bool skip_on_missing = cntx->protocol() == Protocol::MEMCACHE;
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
@ -725,8 +726,8 @@ void StringFamily::IncrByGeneric(std::string_view key, int64_t val, ConnectionCo
} }
void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContext* cntx) { void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1); string_view key = ArgS(args, 1);
std::string_view sval = ArgS(args, 2); string_view sval = ArgS(args, 2);
if (cntx->protocol() == Protocol::REDIS) { if (cntx->protocol() == Protocol::REDIS) {
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {

View file

@ -1160,7 +1160,7 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
return false; return false;
} }
void Transaction::BreakOnClose() { void Transaction::BreakOnShutdown() {
if (coordinator_state_ & COORD_BLOCKED) { if (coordinator_state_ & COORD_BLOCKED) {
coordinator_state_ |= COORD_CANCELLED; coordinator_state_ |= COORD_CANCELLED;
blocking_ec_.notify(); blocking_ec_.notify();

View file

@ -169,7 +169,7 @@ class Transaction {
// this transaction has been awaked. // this transaction has been awaked.
bool NotifySuspended(TxId committed_ts, ShardId sid); bool NotifySuspended(TxId committed_ts, ShardId sid);
void BreakOnClose(); void BreakOnShutdown();
// Called by EngineShard when performing Execute over the tx queue. // Called by EngineShard when performing Execute over the tx queue.
// Returns true if transaction should be kept in the queue. // Returns true if transaction should be kept in the queue.