diff --git a/src/server/journal/executor.cc b/src/server/journal/executor.cc index 193f001f4..c4f4466dc 100644 --- a/src/server/journal/executor.cc +++ b/src/server/journal/executor.cc @@ -9,20 +9,42 @@ namespace dfly { -JournalExecutor::JournalExecutor(Service* service) : service_{service} { +JournalExecutor::JournalExecutor(Service* service) + : service_{service}, conn_context_{&null_sink_, nullptr} { + conn_context_.is_replicating = true; + conn_context_.journal_emulated = true; +} + +void JournalExecutor::Execute(std::vector& entries) { + DCHECK_GT(entries.size(), 1U); + conn_context_.conn_state.db_index = entries.front().dbid; + + std::string multi_cmd = {"MULTI"}; + auto ms = MutableSlice{&multi_cmd[0], multi_cmd.size()}; + auto span = CmdArgList{&ms, 1}; + service_->DispatchCommand(span, &conn_context_); + + for (auto& entry : entries) { + if (entry.payload) { + DCHECK_EQ(entry.dbid, conn_context_.conn_state.db_index); + span = CmdArgList{entry.payload->data(), entry.payload->size()}; + + service_->DispatchCommand(span, &conn_context_); + } + } + + std::string exec_cmd = {"EXEC"}; + ms = {&exec_cmd[0], exec_cmd.size()}; + span = {&ms, 1}; + service_->DispatchCommand(span, &conn_context_); } void JournalExecutor::Execute(journal::ParsedEntry& entry) { - if (entry.payload) { - io::NullSink null_sink; - ConnectionContext conn_context{&null_sink, nullptr}; - conn_context.is_replicating = true; - conn_context.journal_emulated = true; - conn_context.conn_state.db_index = entry.dbid; - + conn_context_.conn_state.db_index = entry.dbid; + if (entry.payload) { // TODO - when this is false? auto span = CmdArgList{entry.payload->data(), entry.payload->size()}; - service_->DispatchCommand(span, &conn_context); + service_->DispatchCommand(span, &conn_context_); } } diff --git a/src/server/journal/executor.h b/src/server/journal/executor.h index 5cf25c91a..3a1085006 100644 --- a/src/server/journal/executor.h +++ b/src/server/journal/executor.h @@ -14,10 +14,13 @@ class Service; class JournalExecutor { public: JournalExecutor(Service* service); + void Execute(std::vector& entries); void Execute(journal::ParsedEntry& entry); private: Service* service_; + ConnectionContext conn_context_; + io::NullSink null_sink_; }; } // namespace dfly diff --git a/src/server/replica.cc b/src/server/replica.cc index 485bb93d4..b510acbaf 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -759,13 +759,13 @@ void Replica::StableSyncDflyFb(Context* cntx) { cntx->ReportError(res.error(), "Journal format error"); return; } - ExecuteEntry(&executor, res.value()); + ExecuteEntry(&executor, std::move(res.value())); last_io_time_ = sock_->proactor()->GetMonotonicTimeNs(); } return; } -void Replica::ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry& entry) { +void Replica::ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry&& entry) { if (entry.shard_cnt <= 1) { // not multi shard cmd executor->Execute(entry); return; @@ -773,46 +773,53 @@ void Replica::ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry& entr // Multi shard command flow: // step 1: Fiber wait until all the fibers that should execute this tranaction got - // to the journal entry of the transaction. - // step 2: execute the command (All fibers) - // step 3: Fiber wait until all fibers finished the execution - // By step 1 we enforce that replica will execute multi shard commands that finished on master - // By step 3 we ensures the correctness of flushall/flushdb commands - - // TODO: this implemantaion does not support atomicity in replica - // Although multi shard transaction happen in step 2 very close to each other, - // user can query replica between executions. - // To support atomicity we should have one fiber in step 2 which will excute all the entries of - // the transaction together. In case of global comand such as flushdb the command can be executed - // by only one fiber. + // to the journal entry of the transaction. This step enforces that replica will execute multi + // shard commands that finished on master. + // step 2: Execute the commands from one fiber. This step ensures atomicity of replica. + // step 3: Fiber wait until all fibers finished the execution. This step ensures atomicity of + // operations on replica. // TODO: support error handler in this flow // Only the first fiber to reach the transaction will create data for transaction in map multi_shard_exe_->map_mu.lock(); auto [it, was_insert] = multi_shard_exe_->tx_sync_execution.emplace(entry.txid, entry.shard_cnt); + VLOG(2) << "txid: " << entry.txid << " unique_shard_cnt_: " << entry.shard_cnt + << " was_insert: " << was_insert; + + TxId txid = entry.txid; + // entries_vec will store all entries of trasaction and will be executed by the fiber that + // inserted the txid to map. In case of global command the inserting fiber will executed his + // entry. + bool global_cmd = (entry.payload.value().size() == 1); + if (!global_cmd) { + it->second.entries_vec.push_back(std::move(entry)); + } + auto& tx_sync = it->second; // Note: we must release the mutex befor calling wait on barrier multi_shard_exe_->map_mu.unlock(); - VLOG(2) << "txid: " << entry.txid << " unique_shard_cnt_: " << entry.shard_cnt - << " was_insert: " << was_insert; - // step 1 - it->second.barrier.wait(); + tx_sync.barrier.wait(); // step 2 - executor->Execute(entry); + if (was_insert) { + if (global_cmd) { + executor->Execute(entry); + } else { + executor->Execute(tx_sync.entries_vec); + } + } // step 3 - it->second.barrier.wait(); + tx_sync.barrier.wait(); // Note: erase from map can be done only after all fibers returned from wait. // The last fiber which will decrease the counter to 0 will be the one to erase the data from map - auto val = it->second.counter.fetch_sub(1, std::memory_order_relaxed); - VLOG(2) << "txid: " << entry.txid << " unique_shard_cnt_: " << entry.shard_cnt - << " counter: " << val; + auto val = tx_sync.counter.fetch_sub(1, std::memory_order_relaxed); + VLOG(2) << "txid: " << txid << " counter: " << val; if (val == 1) { std::lock_guard lg{multi_shard_exe_->map_mu}; - multi_shard_exe_->tx_sync_execution.erase(entry.txid); + multi_shard_exe_->tx_sync_execution.erase(txid); } } diff --git a/src/server/replica.h b/src/server/replica.h index 317f18d17..3b12fd803 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -58,6 +58,7 @@ class Replica { std::atomic_uint32_t counter; TxExecutionSync(uint32_t counter) : barrier(counter), counter(counter) { } + std::vector entries_vec; }; std::unordered_map tx_sync_execution; @@ -141,7 +142,7 @@ class Replica { // Send command, update last_io_time, return error. std::error_code SendCommand(std::string_view command, facade::ReqSerializer* serializer); - void ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry& entry); + void ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry&& entry); public: /* Utility */ struct Info {