1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(replica): support multi shard commands new flow (#645)

This commit is contained in:
adiholden 2023-01-08 12:55:25 +02:00 committed by GitHub
parent 5bc216776d
commit 77bd5f5a35
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 137 additions and 73 deletions

View file

@ -16,21 +16,10 @@ JournalExecutor::JournalExecutor(Service* service)
}
void JournalExecutor::Execute(DbIndex dbid, std::vector<journal::ParsedEntry::CmdData>& cmds) {
DCHECK_GT(cmds.size(), 1U);
conn_context_.conn_state.db_index = dbid;
std::string multi_cmd = {"MULTI"};
auto ms = MutableSlice{&multi_cmd[0], multi_cmd.size()};
auto span = CmdArgList{&ms, 1};
service_->DispatchCommand(span, &conn_context_);
for (auto& cmd : cmds) {
Execute(cmd);
}
std::string exec_cmd = {"EXEC"};
ms = {&exec_cmd[0], exec_cmd.size()};
span = {&ms, 1};
service_->DispatchCommand(span, &conn_context_);
}
void JournalExecutor::Execute(DbIndex dbid, journal::ParsedEntry::CmdData& cmd) {

View file

@ -106,8 +106,12 @@ Replica::Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* s
}
Replica::~Replica() {
if (sync_fb_.joinable())
if (sync_fb_.joinable()) {
sync_fb_.join();
}
if (execution_fb_.joinable()) {
execution_fb_.join();
}
if (sock_) {
auto ec = sock_->Close();
@ -160,7 +164,8 @@ void Replica::Stop() {
// Make sure the replica fully stopped and did all cleanup,
// so we can freely release resources (connections).
sync_fb_.join();
if (sync_fb_.joinable())
sync_fb_.join();
}
void Replica::Pause(bool pause) {
@ -561,8 +566,19 @@ error_code Replica::ConsumeDflyStream() {
// Make sure the flows are not in a state transition
lock_guard lk{flows_op_mu_};
DefaultErrorHandler(ge);
for (auto& flow : shard_flows_)
for (auto& flow : shard_flows_) {
flow->CloseSocket();
flow->waker_.notifyAll();
}
// Iterate over map and cancle all blocking entities
{
lock_guard{multi_shard_exe_->map_mu};
for (auto& tx_data : multi_shard_exe_->tx_sync_execution) {
tx_data.second.barrier.Cancel();
tx_data.second.block.Cancel();
}
}
};
RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler)));
@ -605,6 +621,9 @@ void Replica::JoinAllFlows() {
if (flow->sync_fb_.joinable()) {
flow->sync_fb_.join();
}
if (flow->execution_fb_.joinable()) {
flow->execution_fb_.join();
}
}
}
@ -688,7 +707,10 @@ error_code Replica::StartStableSyncFlow(Context* cntx) {
CHECK(sock_->IsOpen());
// sock_.reset(mythread->CreateSocket());
// RETURN_ON_ERR(sock_->Connect(master_context_.master_ep));
sync_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyFb, this, cntx);
sync_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyReadFb, this, cntx);
// TODO(ADI) : add flag for skipping the flow of blocking on multi shard entries
// and compare performance aginst the blocking on multi shard entries flow.
execution_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyExecFb, this, cntx);
return std::error_code{};
}
@ -739,7 +761,7 @@ void Replica::FullSyncDflyFb(string eof_token, fibers_ext::BlockingCounter bc, C
VLOG(1) << "FullSyncDflyFb finished after reading " << loader.bytes_read() << " bytes";
}
void Replica::StableSyncDflyFb(Context* cntx) {
void Replica::StableSyncDflyReadFb(Context* cntx) {
// Check leftover from full sync.
io::Bytes prefix{};
if (leftover_buf_ && leftover_buf_->InputLen() > 0) {
@ -750,91 +772,131 @@ void Replica::StableSyncDflyFb(Context* cntx) {
io::PrefixSource ps{prefix, &ss};
JournalReader reader{&ps, 0};
JournalExecutor executor{&service_};
while (!cntx->IsCancelled()) {
TranactionData tx_data;
while (!cntx->IsCancelled()) {
waker_.await([&]() {
return ((trans_data_queue_.size() < kYieldAfterItemsInQueue) || cntx->IsCancelled());
});
if (cntx->IsCancelled()) {
return;
}
auto res = reader.ReadEntry();
if (!res) {
cntx->ReportError(res.error(), "Journal format error");
return;
}
bool should_execute = tx_data.UpdateFromParsedEntry(std::move(*res));
if (should_execute == true) {
last_io_time_ = sock_->proactor()->GetMonotonicTimeNs();
bool tx_data_full = tx_data.UpdateFromParsedEntry(std::move(*res));
if (tx_data_full == true) {
break;
}
}
ExecuteCmd(&executor, std::move(tx_data), cntx);
last_io_time_ = sock_->proactor()->GetMonotonicTimeNs();
InsertTxDataToShardResource(std::move(tx_data), cntx);
waker_.notify();
}
return;
}
void Replica::ExecuteCmd(JournalExecutor* executor, TranactionData&& tx_data, Context* cntx) {
void Replica::InsertTxDataToShardResource(TranactionData&& tx_data, Context* cntx) {
bool me_insert = false;
if (tx_data.shard_cnt > 1) {
multi_shard_exe_->map_mu.lock();
bool global_cmd =
(tx_data.commands.size() == 1 && tx_data.commands.front().cmd_args.size() == 1);
auto [it, was_insert] = multi_shard_exe_->tx_sync_execution.emplace(
std::piecewise_construct, std::forward_as_tuple(tx_data.txid),
std::forward_as_tuple(tx_data.shard_cnt, global_cmd));
VLOG(2) << "txid: " << tx_data.txid << " unique_shard_cnt_: " << tx_data.shard_cnt
<< " was_insert: " << was_insert;
me_insert = was_insert;
it->second.block.Dec();
multi_shard_exe_->map_mu.unlock();
}
VLOG(2) << "txid: " << tx_data.txid << " pushed to queue";
trans_data_queue_.push(std::make_pair(std::move(tx_data), me_insert));
}
void Replica::StableSyncDflyExecFb(Context* cntx) {
JournalExecutor executor{&service_};
while (!cntx->IsCancelled()) {
waker_.await([&]() { return (!trans_data_queue_.empty() || cntx->IsCancelled()); });
if (cntx->IsCancelled()) {
return;
}
DCHECK(!trans_data_queue_.empty());
auto& data = trans_data_queue_.front();
ExecuteTx(&executor, std::move(data.first), data.second, cntx);
trans_data_queue_.pop();
waker_.notify();
}
return;
}
void Replica::ExecuteTx(JournalExecutor* executor, TranactionData&& tx_data, bool inserted_by_me,
Context* cntx) {
if (cntx->IsCancelled()) {
return;
}
if (tx_data.shard_cnt <= 1) { // not multi shard cmd
VLOG(2) << "Execute single shard cmd";
executor->Execute(tx_data.dbid, tx_data.commands.front());
return;
}
// Multi shard command flow:
// step 1: Fiber wait until all the fibers that should execute this tranaction got
// to the journal entry of the transaction. This step enforces that replica will execute multi
// shard commands that finished on master.
// step 2: Execute the commands from one fiber. This step ensures atomicity of replica.
// step 3: Fiber wait until all fibers finished the execution. This step ensures atomicity of
// operations on replica.
// TODO: support error handler in this flow
// Only the first fiber to reach the transaction will create data for transaction in map
VLOG(2) << "Execute txid: " << tx_data.txid;
multi_shard_exe_->map_mu.lock();
auto [it, was_insert] =
multi_shard_exe_->tx_sync_execution.emplace(tx_data.txid, tx_data.shard_cnt);
VLOG(2) << "txid: " << tx_data.txid << " unique_shard_cnt_: " << tx_data.shard_cnt
<< " was_insert: " << was_insert;
TxId txid = tx_data.txid;
// entries_vec will store all entries of trasaction and will be executed by the fiber that
// inserted the txid to map. In case of global command the inserting fiber will executed his
// entry.
bool global_cmd = (tx_data.commands.size() == 1 && tx_data.commands.front().cmd_args.size() == 1);
if (!global_cmd) {
for (auto& cmd : tx_data.commands) {
it->second.commands.push_back(std::move(cmd));
}
}
auto& tx_sync = it->second;
// Note: we must release the mutex befor calling wait on barrier
auto it = multi_shard_exe_->tx_sync_execution.find(tx_data.txid);
DCHECK(it != multi_shard_exe_->tx_sync_execution.end());
auto& multi_shard_data = it->second;
multi_shard_exe_->map_mu.unlock();
// step 1
tx_sync.barrier.wait();
VLOG(2) << "Execute txid: " << tx_data.txid << " waiting for data in all shards";
// Wait until shards flows got transaction data and inserted to map.
// This step enforces that replica will execute multi shard commands that finished on master
// and replica recieved all the commands from all shards.
multi_shard_data.block.Wait();
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;
VLOG(2) << "Execute txid: " << tx_data.txid << " block wait finished";
// step 2
if (was_insert) {
if (global_cmd) {
executor->Execute(tx_data.dbid, tx_data.commands.front());
} else {
executor->Execute(tx_data.dbid, tx_sync.commands);
if (multi_shard_data.is_global_cmd) {
VLOG(2) << "Execute txid: " << tx_data.txid << " global command execution";
// Wait until all shards flows get to execution step of this transaction.
multi_shard_data.barrier.Wait();
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;
// Global command will be executed only from one flow fiber. This ensure corectness of data in
// replica.
if (inserted_by_me) {
executor->Execute(tx_data.dbid, tx_data.commands);
}
// Wait until exection is done, to make sure we done execute next commands while the global is
// executed.
multi_shard_data.barrier.Wait();
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;
} else { // Non gloabl command will be executed by each the flow fiber
VLOG(2) << "Execute txid: " << tx_data.txid << " executing shard transaction commands";
executor->Execute(tx_data.dbid, tx_data.commands);
}
// step 3
tx_sync.barrier.wait();
// Note: erase from map can be done only after all fibers returned from wait.
// Erase from map can be done only after all flow fibers executed the transaction commands.
// The last fiber which will decrease the counter to 0 will be the one to erase the data from map
auto val = tx_sync.counter.fetch_sub(1, std::memory_order_relaxed);
VLOG(2) << "txid: " << txid << " counter: " << val;
auto val = multi_shard_data.counter.fetch_sub(1, std::memory_order_relaxed);
VLOG(2) << "txid: " << tx_data.txid << " counter: " << val;
if (val == 1) {
std::lock_guard lg{multi_shard_exe_->map_mu};
multi_shard_exe_->tx_sync_execution.erase(txid);
multi_shard_exe_->tx_sync_execution.erase(tx_data.txid);
}
}

View file

@ -6,6 +6,7 @@
#include <boost/fiber/barrier.hpp>
#include <boost/fiber/fiber.hpp>
#include <boost/fiber/mutex.hpp>
#include <queue>
#include <variant>
#include "base/io_buf.h"
@ -57,7 +58,7 @@ class Replica {
uint32_t shard_cnt;
DbIndex dbid;
std::vector<journal::ParsedEntry::CmdData> commands;
// Update the data from ParsedEntry and return if its ready for execution.
// Update the data from ParsedEntry and return if all shard transaction commands were recieved.
bool UpdateFromParsedEntry(journal::ParsedEntry&& entry);
};
@ -65,11 +66,13 @@ class Replica {
boost::fibers::mutex map_mu;
struct TxExecutionSync {
boost::fibers::barrier barrier;
util::fibers_ext::Barrier barrier;
std::atomic_uint32_t counter;
TxExecutionSync(uint32_t counter) : barrier(counter), counter(counter) {
util::fibers_ext::BlockingCounter block;
bool is_global_cmd;
TxExecutionSync(uint32_t counter, bool is_global)
: barrier(counter), counter(counter), block(counter), is_global_cmd(is_global) {
}
std::vector<journal::ParsedEntry::CmdData> commands;
};
std::unordered_map<TxId, TxExecutionSync> tx_sync_execution;
@ -125,7 +128,9 @@ class Replica {
Context* cntx);
// Single flow stable state sync fiber spawned by StartStableSyncFlow.
void StableSyncDflyFb(Context* cntx);
void StableSyncDflyReadFb(Context* cntx);
void StableSyncDflyExecFb(Context* cntx);
private: /* Utility */
struct PSyncResponse {
@ -153,7 +158,9 @@ class Replica {
// Send command, update last_io_time, return error.
std::error_code SendCommand(std::string_view command, facade::ReqSerializer* serializer);
void ExecuteCmd(JournalExecutor* executor, TranactionData&& tx_data, Context* cntx);
void ExecuteTx(JournalExecutor* executor, TranactionData&& tx_data, bool inserted_by_me,
Context* cntx);
void InsertTxDataToShardResource(TranactionData&& tx_data, Context* cntx);
public: /* Utility */
struct Info {
@ -189,8 +196,14 @@ class Replica {
std::shared_ptr<MultiShardExecution> multi_shard_exe_;
std::queue<std::pair<TranactionData, bool>> trans_data_queue_;
static constexpr size_t kYieldAfterItemsInQueue = 50;
::util::fibers_ext::EventCount waker_;
// MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode.
::boost::fibers::fiber sync_fb_;
::boost::fibers::fiber execution_fb_;
std::vector<std::unique_ptr<Replica>> shard_flows_;
// Guard operations where flows might be in a mixed state (transition/setup)