1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: disable persistent journaling feature (#1549)

1. We have not worked on it for many months
2. It's not on the short term roadmap
3. It complicates the code around the replication.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-07-17 11:05:15 +03:00 committed by GitHub
parent 2c04311cc3
commit 9448220607
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 49 additions and 28 deletions

View file

@ -104,9 +104,9 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[0]); ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0); string_view sub_cmd = ArgS(args, 0);
if (sub_cmd == "JOURNAL" && args.size() >= 2) { /*if (sub_cmd == "JOURNAL" && args.size() >= 2) {
return Journal(args, cntx); return Journal(args, cntx);
} }*/
if (sub_cmd == "THREAD") { if (sub_cmd == "THREAD") {
return Thread(args, cntx); return Thread(args, cntx);
@ -139,6 +139,7 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
rb->SendError(kSyntaxErr); rb->SendError(kSyntaxErr);
} }
#if 0
void DflyCmd::Journal(CmdArgList args, ConnectionContext* cntx) { void DflyCmd::Journal(CmdArgList args, ConnectionContext* cntx) {
DCHECK_GE(args.size(), 2u); DCHECK_GE(args.size(), 2u);
ToUpper(&args[1]); ToUpper(&args[1]);
@ -201,6 +202,8 @@ void DflyCmd::Journal(CmdArgList args, ConnectionContext* cntx) {
return rb->SendError(reply, kSyntaxErrType); return rb->SendError(reply, kSyntaxErrType);
} }
#endif
void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) { void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
util::ProactorPool* pool = shard_set->pool(); util::ProactorPool* pool = shard_set->pool();
@ -472,8 +475,9 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
flow->saver.reset(); flow->saver.reset();
}; };
sf_->journal()->StartInThread();
// Shard can be null for io thread. // Shard can be null for io thread.
CHECK(!sf_->journal()->OpenInThread(false, ""sv)); // can only happen in persistent mode.
if (shard != nullptr) { if (shard != nullptr) {
flow->saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard); flow->saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard);
} }

View file

@ -133,7 +133,7 @@ class DflyCmd {
private: private:
// JOURNAL [START/STOP] // JOURNAL [START/STOP]
// Start or stop journaling. // Start or stop journaling.
void Journal(CmdArgList args, ConnectionContext* cntx); // void Journal(CmdArgList args, ConnectionContext* cntx);
// THREAD [to_thread] // THREAD [to_thread]
// Return connection thread index or migrate to another thread. // Return connection thread index or migrate to another thread.

View file

@ -28,9 +28,20 @@ thread_local JournalSlice journal_slice;
Journal::Journal() { Journal::Journal() {
} }
error_code Journal::OpenInThread(bool persistent, string_view dir) { void Journal::StartInThread() {
journal_slice.Init(unsigned(ProactorBase::GetIndex())); journal_slice.Init(unsigned(ProactorBase::GetIndex()));
ServerState::tlocal()->set_journal(this);
EngineShard* shard = EngineShard::tlocal();
if (shard) {
shard->set_journal(this);
}
}
#if 0
error_code Journal::OpenInThread(bool persistent, string_view dir) {
error_code ec; error_code ec;
if (persistent) { if (persistent) {
@ -40,14 +51,9 @@ error_code Journal::OpenInThread(bool persistent, string_view dir) {
} }
} }
ServerState::tlocal()->set_journal(this);
EngineShard* shard = EngineShard::tlocal();
if (shard) {
shard->set_journal(this);
}
return ec; return ec;
} }
#endif
error_code Journal::Close() { error_code Journal::Close() {
CHECK(lameduck_.load(memory_order_relaxed)); CHECK(lameduck_.load(memory_order_relaxed));
@ -65,12 +71,12 @@ error_code Journal::Close() {
shard->set_journal(nullptr); shard->set_journal(nullptr);
} }
auto ec = journal_slice.Close(); /*auto ec = journal_slice.Close();
if (ec) { if (ec) {
lock_guard lk2(ec_mu); lock_guard lk2(ec_mu);
res = ec; res = ec;
} }*/
}; };
shard_set->pool()->AwaitFiberOnAll(close_cb); shard_set->pool()->AwaitFiberOnAll(close_cb);

View file

@ -23,11 +23,15 @@ class Journal {
// and false otherwise. // and false otherwise.
bool EnterLameDuck(); // still logs ongoing transactions but refuses to start new ones. bool EnterLameDuck(); // still logs ongoing transactions but refuses to start new ones.
void StartInThread();
// Requires: journal is in lameduck mode. // Requires: journal is in lameduck mode.
std::error_code Close(); std::error_code Close();
#if 0
// Opens journal inside a Dragonfly thread. Must be called in each thread. // Opens journal inside a Dragonfly thread. Must be called in each thread.
std::error_code OpenInThread(bool persistent, std::string_view dir); std::error_code OpenInThread(bool persistent, std::string_view dir);
#endif
//******* The following functions must be called in the context of the owning shard *********// //******* The following functions must be called in the context of the owning shard *********//

View file

@ -20,9 +20,11 @@ namespace fs = std::filesystem;
namespace { namespace {
/*
string ShardName(std::string_view base, unsigned index) { string ShardName(std::string_view base, unsigned index) {
return absl::StrCat(base, "-", absl::Dec(index, absl::kZeroPad4), ".log"); return absl::StrCat(base, "-", absl::Dec(index, absl::kZeroPad4), ".log");
} }
*/
} // namespace } // namespace
@ -42,17 +44,18 @@ JournalSlice::JournalSlice() {
} }
JournalSlice::~JournalSlice() { JournalSlice::~JournalSlice() {
CHECK(!shard_file_); // CHECK(!shard_file_);
} }
void JournalSlice::Init(unsigned index) { void JournalSlice::Init(unsigned index) {
if (ring_buffer_) // calling this function multiple times is allowed and it's a no-op. // if (ring_buffer_) // calling this function multiple times is allowed and it's a no-op.
return; // return;
slice_index_ = index; slice_index_ = index;
ring_buffer_.emplace(128); // TODO: to make it configurable // ring_buffer_.emplace(128); // TODO: to make it configurable
} }
#if 0
std::error_code JournalSlice::Open(std::string_view dir) { std::error_code JournalSlice::Open(std::string_view dir) {
CHECK(!shard_file_); CHECK(!shard_file_);
DCHECK_NE(slice_index_, UINT32_MAX); DCHECK_NE(slice_index_, UINT32_MAX);
@ -111,15 +114,17 @@ error_code JournalSlice::Close() {
return ec; return ec;
} }
#endif
void JournalSlice::AddLogRecord(const Entry& entry, bool await) { void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
DCHECK(ring_buffer_); // DCHECK(ring_buffer_);
if (entry.opcode != Op::NOOP) { if (entry.opcode != Op::NOOP) {
// TODO: This is preparation for AOC style journaling, currently unused.
RingItem item;
item.lsn = lsn_;
lsn_++; lsn_++;
// TODO: This is preparation for AOC style journaling, currently unused.
#if 0
RingItem item;
item.lsn = prev_lsn;
item.opcode = entry.opcode; item.opcode = entry.opcode;
item.txid = entry.txid; item.txid = entry.txid;
VLOG(1) << "Writing item [" << item.lsn << "]: " << entry.ToString(); VLOG(1) << "Writing item [" << item.lsn << "]: " << entry.ToString();
@ -131,6 +136,7 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
CHECK_EC(ec); CHECK_EC(ec);
file_offset_ += line.size(); file_offset_ += line.size();
} }
#endif
} }
{ {

View file

@ -8,7 +8,6 @@
#include <string_view> #include <string_view>
#include "base/ring_buffer.h" #include "base/ring_buffer.h"
#include "core/uring.h"
#include "server/common.h" #include "server/common.h"
#include "server/journal/types.h" #include "server/journal/types.h"
@ -23,9 +22,11 @@ class JournalSlice {
void Init(unsigned index); void Init(unsigned index);
#if 0
std::error_code Open(std::string_view dir); std::error_code Open(std::string_view dir);
std::error_code Close(); std::error_code Close();
#endif
// This is always the LSN of the *next* journal entry. // This is always the LSN of the *next* journal entry.
LSN cur_lsn() const { LSN cur_lsn() const {
@ -36,9 +37,9 @@ class JournalSlice {
return status_ec_; return status_ec_;
} }
// Whether the file-based journaling is open. // Whether the journaling is open.
bool IsOpen() const { bool IsOpen() const {
return bool(shard_file_); return slice_index_ != UINT32_MAX;
} }
void AddLogRecord(const Entry& entry, bool await); void AddLogRecord(const Entry& entry, bool await);
@ -53,9 +54,9 @@ class JournalSlice {
private: private:
struct RingItem; struct RingItem;
std::string shard_path_; // std::string shard_path_;
std::unique_ptr<LinuxFile> shard_file_; // std::unique_ptr<LinuxFile> shard_file_;
std::optional<base::RingBuffer<RingItem>> ring_buffer_; // std::optional<base::RingBuffer<RingItem>> ring_buffer_;
mutable util::SharedMutex cb_mu_; mutable util::SharedMutex cb_mu_;
std::vector<std::pair<uint32_t, ChangeCallback>> change_cb_arr_ ABSL_GUARDED_BY(cb_mu_); std::vector<std::pair<uint32_t, ChangeCallback>> change_cb_arr_ ABSL_GUARDED_BY(cb_mu_);