diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 2ea721b00..0860a0b3a 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -104,9 +104,9 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) { ToUpper(&args[0]); string_view sub_cmd = ArgS(args, 0); - if (sub_cmd == "JOURNAL" && args.size() >= 2) { + /*if (sub_cmd == "JOURNAL" && args.size() >= 2) { return Journal(args, cntx); - } + }*/ if (sub_cmd == "THREAD") { return Thread(args, cntx); @@ -139,6 +139,7 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) { rb->SendError(kSyntaxErr); } +#if 0 void DflyCmd::Journal(CmdArgList args, ConnectionContext* cntx) { DCHECK_GE(args.size(), 2u); ToUpper(&args[1]); @@ -201,6 +202,8 @@ void DflyCmd::Journal(CmdArgList args, ConnectionContext* cntx) { return rb->SendError(reply, kSyntaxErrType); } +#endif + void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) { RedisReplyBuilder* rb = static_cast(cntx->reply_builder()); util::ProactorPool* pool = shard_set->pool(); @@ -472,8 +475,9 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha flow->saver.reset(); }; + sf_->journal()->StartInThread(); + // Shard can be null for io thread. - CHECK(!sf_->journal()->OpenInThread(false, ""sv)); // can only happen in persistent mode. if (shard != nullptr) { flow->saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard); } diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 2f0212cbb..0f6e7db80 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -133,7 +133,7 @@ class DflyCmd { private: // JOURNAL [START/STOP] // Start or stop journaling. - void Journal(CmdArgList args, ConnectionContext* cntx); + // void Journal(CmdArgList args, ConnectionContext* cntx); // THREAD [to_thread] // Return connection thread index or migrate to another thread. diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 55883b35d..80cf91c75 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -28,9 +28,20 @@ thread_local JournalSlice journal_slice; Journal::Journal() { } -error_code Journal::OpenInThread(bool persistent, string_view dir) { +void Journal::StartInThread() { journal_slice.Init(unsigned(ProactorBase::GetIndex())); + ServerState::tlocal()->set_journal(this); + EngineShard* shard = EngineShard::tlocal(); + if (shard) { + shard->set_journal(this); + } +} + +#if 0 +error_code Journal::OpenInThread(bool persistent, string_view dir) { + + error_code ec; if (persistent) { @@ -40,14 +51,9 @@ error_code Journal::OpenInThread(bool persistent, string_view dir) { } } - ServerState::tlocal()->set_journal(this); - EngineShard* shard = EngineShard::tlocal(); - if (shard) { - shard->set_journal(this); - } - return ec; } +#endif error_code Journal::Close() { CHECK(lameduck_.load(memory_order_relaxed)); @@ -65,12 +71,12 @@ error_code Journal::Close() { shard->set_journal(nullptr); } - auto ec = journal_slice.Close(); + /*auto ec = journal_slice.Close(); if (ec) { lock_guard lk2(ec_mu); res = ec; - } + }*/ }; shard_set->pool()->AwaitFiberOnAll(close_cb); diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index bd25a7bf1..fb26a1c7a 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -23,11 +23,15 @@ class Journal { // and false otherwise. bool EnterLameDuck(); // still logs ongoing transactions but refuses to start new ones. + void StartInThread(); + // Requires: journal is in lameduck mode. std::error_code Close(); +#if 0 // Opens journal inside a Dragonfly thread. Must be called in each thread. std::error_code OpenInThread(bool persistent, std::string_view dir); +#endif //******* The following functions must be called in the context of the owning shard *********// diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 852006794..7b6051488 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -20,9 +20,11 @@ namespace fs = std::filesystem; namespace { +/* string ShardName(std::string_view base, unsigned index) { return absl::StrCat(base, "-", absl::Dec(index, absl::kZeroPad4), ".log"); } +*/ } // namespace @@ -42,17 +44,18 @@ JournalSlice::JournalSlice() { } JournalSlice::~JournalSlice() { - CHECK(!shard_file_); + // CHECK(!shard_file_); } void JournalSlice::Init(unsigned index) { - if (ring_buffer_) // calling this function multiple times is allowed and it's a no-op. - return; + // if (ring_buffer_) // calling this function multiple times is allowed and it's a no-op. + // return; slice_index_ = index; - ring_buffer_.emplace(128); // TODO: to make it configurable + // ring_buffer_.emplace(128); // TODO: to make it configurable } +#if 0 std::error_code JournalSlice::Open(std::string_view dir) { CHECK(!shard_file_); DCHECK_NE(slice_index_, UINT32_MAX); @@ -111,15 +114,17 @@ error_code JournalSlice::Close() { return ec; } +#endif void JournalSlice::AddLogRecord(const Entry& entry, bool await) { - DCHECK(ring_buffer_); - + // DCHECK(ring_buffer_); if (entry.opcode != Op::NOOP) { - // TODO: This is preparation for AOC style journaling, currently unused. - RingItem item; - item.lsn = lsn_; lsn_++; +// TODO: This is preparation for AOC style journaling, currently unused. +#if 0 + RingItem item; + item.lsn = prev_lsn; + item.opcode = entry.opcode; item.txid = entry.txid; VLOG(1) << "Writing item [" << item.lsn << "]: " << entry.ToString(); @@ -131,6 +136,7 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) { CHECK_EC(ec); file_offset_ += line.size(); } +#endif } { diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index 7e918cfef..cc1f5b3f2 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -8,7 +8,6 @@ #include #include "base/ring_buffer.h" -#include "core/uring.h" #include "server/common.h" #include "server/journal/types.h" @@ -23,9 +22,11 @@ class JournalSlice { void Init(unsigned index); +#if 0 std::error_code Open(std::string_view dir); std::error_code Close(); +#endif // This is always the LSN of the *next* journal entry. LSN cur_lsn() const { @@ -36,9 +37,9 @@ class JournalSlice { return status_ec_; } - // Whether the file-based journaling is open. + // Whether the journaling is open. bool IsOpen() const { - return bool(shard_file_); + return slice_index_ != UINT32_MAX; } void AddLogRecord(const Entry& entry, bool await); @@ -53,9 +54,9 @@ class JournalSlice { private: struct RingItem; - std::string shard_path_; - std::unique_ptr shard_file_; - std::optional> ring_buffer_; + // std::string shard_path_; + // std::unique_ptr shard_file_; + // std::optional> ring_buffer_; mutable util::SharedMutex cb_mu_; std::vector> change_cb_arr_ ABSL_GUARDED_BY(cb_mu_);