mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
feat(replica): atomicity for multi shard commands (#598)
This commit is contained in:
parent
e6721d8160
commit
aacab66434
4 changed files with 67 additions and 34 deletions
|
@ -9,20 +9,42 @@
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
|
||||||
JournalExecutor::JournalExecutor(Service* service) : service_{service} {
|
JournalExecutor::JournalExecutor(Service* service)
|
||||||
|
: service_{service}, conn_context_{&null_sink_, nullptr} {
|
||||||
|
conn_context_.is_replicating = true;
|
||||||
|
conn_context_.journal_emulated = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void JournalExecutor::Execute(std::vector<journal::ParsedEntry>& entries) {
|
||||||
|
DCHECK_GT(entries.size(), 1U);
|
||||||
|
conn_context_.conn_state.db_index = entries.front().dbid;
|
||||||
|
|
||||||
|
std::string multi_cmd = {"MULTI"};
|
||||||
|
auto ms = MutableSlice{&multi_cmd[0], multi_cmd.size()};
|
||||||
|
auto span = CmdArgList{&ms, 1};
|
||||||
|
service_->DispatchCommand(span, &conn_context_);
|
||||||
|
|
||||||
|
for (auto& entry : entries) {
|
||||||
|
if (entry.payload) {
|
||||||
|
DCHECK_EQ(entry.dbid, conn_context_.conn_state.db_index);
|
||||||
|
span = CmdArgList{entry.payload->data(), entry.payload->size()};
|
||||||
|
|
||||||
|
service_->DispatchCommand(span, &conn_context_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string exec_cmd = {"EXEC"};
|
||||||
|
ms = {&exec_cmd[0], exec_cmd.size()};
|
||||||
|
span = {&ms, 1};
|
||||||
|
service_->DispatchCommand(span, &conn_context_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void JournalExecutor::Execute(journal::ParsedEntry& entry) {
|
void JournalExecutor::Execute(journal::ParsedEntry& entry) {
|
||||||
if (entry.payload) {
|
conn_context_.conn_state.db_index = entry.dbid;
|
||||||
io::NullSink null_sink;
|
if (entry.payload) { // TODO - when this is false?
|
||||||
ConnectionContext conn_context{&null_sink, nullptr};
|
|
||||||
conn_context.is_replicating = true;
|
|
||||||
conn_context.journal_emulated = true;
|
|
||||||
conn_context.conn_state.db_index = entry.dbid;
|
|
||||||
|
|
||||||
auto span = CmdArgList{entry.payload->data(), entry.payload->size()};
|
auto span = CmdArgList{entry.payload->data(), entry.payload->size()};
|
||||||
|
|
||||||
service_->DispatchCommand(span, &conn_context);
|
service_->DispatchCommand(span, &conn_context_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,10 +14,13 @@ class Service;
|
||||||
class JournalExecutor {
|
class JournalExecutor {
|
||||||
public:
|
public:
|
||||||
JournalExecutor(Service* service);
|
JournalExecutor(Service* service);
|
||||||
|
void Execute(std::vector<journal::ParsedEntry>& entries);
|
||||||
void Execute(journal::ParsedEntry& entry);
|
void Execute(journal::ParsedEntry& entry);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Service* service_;
|
Service* service_;
|
||||||
|
ConnectionContext conn_context_;
|
||||||
|
io::NullSink null_sink_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
|
@ -759,13 +759,13 @@ void Replica::StableSyncDflyFb(Context* cntx) {
|
||||||
cntx->ReportError(res.error(), "Journal format error");
|
cntx->ReportError(res.error(), "Journal format error");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ExecuteEntry(&executor, res.value());
|
ExecuteEntry(&executor, std::move(res.value()));
|
||||||
last_io_time_ = sock_->proactor()->GetMonotonicTimeNs();
|
last_io_time_ = sock_->proactor()->GetMonotonicTimeNs();
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Replica::ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry& entry) {
|
void Replica::ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry&& entry) {
|
||||||
if (entry.shard_cnt <= 1) { // not multi shard cmd
|
if (entry.shard_cnt <= 1) { // not multi shard cmd
|
||||||
executor->Execute(entry);
|
executor->Execute(entry);
|
||||||
return;
|
return;
|
||||||
|
@ -773,46 +773,53 @@ void Replica::ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry& entr
|
||||||
|
|
||||||
// Multi shard command flow:
|
// Multi shard command flow:
|
||||||
// step 1: Fiber wait until all the fibers that should execute this tranaction got
|
// step 1: Fiber wait until all the fibers that should execute this tranaction got
|
||||||
// to the journal entry of the transaction.
|
// to the journal entry of the transaction. This step enforces that replica will execute multi
|
||||||
// step 2: execute the command (All fibers)
|
// shard commands that finished on master.
|
||||||
// step 3: Fiber wait until all fibers finished the execution
|
// step 2: Execute the commands from one fiber. This step ensures atomicity of replica.
|
||||||
// By step 1 we enforce that replica will execute multi shard commands that finished on master
|
// step 3: Fiber wait until all fibers finished the execution. This step ensures atomicity of
|
||||||
// By step 3 we ensures the correctness of flushall/flushdb commands
|
// operations on replica.
|
||||||
|
|
||||||
// TODO: this implemantaion does not support atomicity in replica
|
|
||||||
// Although multi shard transaction happen in step 2 very close to each other,
|
|
||||||
// user can query replica between executions.
|
|
||||||
// To support atomicity we should have one fiber in step 2 which will excute all the entries of
|
|
||||||
// the transaction together. In case of global comand such as flushdb the command can be executed
|
|
||||||
// by only one fiber.
|
|
||||||
|
|
||||||
// TODO: support error handler in this flow
|
// TODO: support error handler in this flow
|
||||||
|
|
||||||
// Only the first fiber to reach the transaction will create data for transaction in map
|
// Only the first fiber to reach the transaction will create data for transaction in map
|
||||||
multi_shard_exe_->map_mu.lock();
|
multi_shard_exe_->map_mu.lock();
|
||||||
auto [it, was_insert] = multi_shard_exe_->tx_sync_execution.emplace(entry.txid, entry.shard_cnt);
|
auto [it, was_insert] = multi_shard_exe_->tx_sync_execution.emplace(entry.txid, entry.shard_cnt);
|
||||||
|
VLOG(2) << "txid: " << entry.txid << " unique_shard_cnt_: " << entry.shard_cnt
|
||||||
|
<< " was_insert: " << was_insert;
|
||||||
|
|
||||||
|
TxId txid = entry.txid;
|
||||||
|
// entries_vec will store all entries of trasaction and will be executed by the fiber that
|
||||||
|
// inserted the txid to map. In case of global command the inserting fiber will executed his
|
||||||
|
// entry.
|
||||||
|
bool global_cmd = (entry.payload.value().size() == 1);
|
||||||
|
if (!global_cmd) {
|
||||||
|
it->second.entries_vec.push_back(std::move(entry));
|
||||||
|
}
|
||||||
|
auto& tx_sync = it->second;
|
||||||
|
|
||||||
// Note: we must release the mutex befor calling wait on barrier
|
// Note: we must release the mutex befor calling wait on barrier
|
||||||
multi_shard_exe_->map_mu.unlock();
|
multi_shard_exe_->map_mu.unlock();
|
||||||
|
|
||||||
VLOG(2) << "txid: " << entry.txid << " unique_shard_cnt_: " << entry.shard_cnt
|
|
||||||
<< " was_insert: " << was_insert;
|
|
||||||
|
|
||||||
// step 1
|
// step 1
|
||||||
it->second.barrier.wait();
|
tx_sync.barrier.wait();
|
||||||
// step 2
|
// step 2
|
||||||
|
if (was_insert) {
|
||||||
|
if (global_cmd) {
|
||||||
executor->Execute(entry);
|
executor->Execute(entry);
|
||||||
|
} else {
|
||||||
|
executor->Execute(tx_sync.entries_vec);
|
||||||
|
}
|
||||||
|
}
|
||||||
// step 3
|
// step 3
|
||||||
it->second.barrier.wait();
|
tx_sync.barrier.wait();
|
||||||
|
|
||||||
// Note: erase from map can be done only after all fibers returned from wait.
|
// Note: erase from map can be done only after all fibers returned from wait.
|
||||||
// The last fiber which will decrease the counter to 0 will be the one to erase the data from map
|
// The last fiber which will decrease the counter to 0 will be the one to erase the data from map
|
||||||
auto val = it->second.counter.fetch_sub(1, std::memory_order_relaxed);
|
auto val = tx_sync.counter.fetch_sub(1, std::memory_order_relaxed);
|
||||||
VLOG(2) << "txid: " << entry.txid << " unique_shard_cnt_: " << entry.shard_cnt
|
VLOG(2) << "txid: " << txid << " counter: " << val;
|
||||||
<< " counter: " << val;
|
|
||||||
if (val == 1) {
|
if (val == 1) {
|
||||||
std::lock_guard lg{multi_shard_exe_->map_mu};
|
std::lock_guard lg{multi_shard_exe_->map_mu};
|
||||||
multi_shard_exe_->tx_sync_execution.erase(entry.txid);
|
multi_shard_exe_->tx_sync_execution.erase(txid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,6 +58,7 @@ class Replica {
|
||||||
std::atomic_uint32_t counter;
|
std::atomic_uint32_t counter;
|
||||||
TxExecutionSync(uint32_t counter) : barrier(counter), counter(counter) {
|
TxExecutionSync(uint32_t counter) : barrier(counter), counter(counter) {
|
||||||
}
|
}
|
||||||
|
std::vector<journal::ParsedEntry> entries_vec;
|
||||||
};
|
};
|
||||||
|
|
||||||
std::unordered_map<TxId, TxExecutionSync> tx_sync_execution;
|
std::unordered_map<TxId, TxExecutionSync> tx_sync_execution;
|
||||||
|
@ -141,7 +142,7 @@ class Replica {
|
||||||
// Send command, update last_io_time, return error.
|
// Send command, update last_io_time, return error.
|
||||||
std::error_code SendCommand(std::string_view command, facade::ReqSerializer* serializer);
|
std::error_code SendCommand(std::string_view command, facade::ReqSerializer* serializer);
|
||||||
|
|
||||||
void ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry& entry);
|
void ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry&& entry);
|
||||||
|
|
||||||
public: /* Utility */
|
public: /* Utility */
|
||||||
struct Info {
|
struct Info {
|
||||||
|
|
Loading…
Reference in a new issue