1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(replica): enable multi cmd sync on flag (#656)

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-01-09 16:04:05 +02:00 committed by GitHub
parent 53f3d860db
commit 60b3dd305a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 37 deletions

View file

@ -8,6 +8,7 @@ extern "C" {
}
#include <absl/cleanup/cleanup.h>
#include <absl/flags/flag.h>
#include <absl/functional/bind_front.h>
#include <absl/strings/escaping.h>
#include <absl/strings/str_cat.h>
@ -25,12 +26,16 @@ extern "C" {
#include "server/rdb_load.h"
#include "util/proactor_base.h"
ABSL_FLAG(bool, enable_multi_shard_sync, true,
"Execute multi shards commands on replica syncrhonized");
namespace dfly {
using namespace std;
using namespace util;
using namespace boost::asio;
using namespace facade;
using absl::GetFlag;
using absl::StrCat;
namespace {
@ -103,6 +108,8 @@ Replica::Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* s
: service_(*service), master_context_(context) {
master_context_.dfly_flow_id = dfly_flow_id;
multi_shard_exe_ = shared_exe_data;
use_multi_shard_exe_sync_ = GetFlag(FLAGS_enable_multi_shard_sync);
executor_.reset(new JournalExecutor(service));
}
Replica::~Replica() {
@ -710,9 +717,9 @@ error_code Replica::StartStableSyncFlow(Context* cntx) {
// sock_.reset(mythread->CreateSocket());
// RETURN_ON_ERR(sock_->Connect(master_context_.master_ep));
sync_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyReadFb, this, cntx);
// TODO(ADI) : add flag for skipping the flow of blocking on multi shard entries
// and compare performance aginst the blocking on multi shard entries flow.
execution_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyExecFb, this, cntx);
if (use_multi_shard_exe_sync_) {
execution_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyExecFb, this, cntx);
}
return std::error_code{};
}
@ -795,37 +802,53 @@ void Replica::StableSyncDflyReadFb(Context* cntx) {
break;
}
}
InsertTxDataToShardResource(std::move(tx_data), cntx);
if (use_multi_shard_exe_sync_) {
InsertTxDataToShardResource(std::move(tx_data));
} else {
ExecuteTxWithNoShardSync(std::move(tx_data), cntx);
}
waker_.notify();
}
return;
}
void Replica::InsertTxDataToShardResource(TranactionData&& tx_data, Context* cntx) {
bool me_insert = false;
void Replica::ExecuteTxWithNoShardSync(TranactionData&& tx_data, Context* cntx) {
if (cntx->IsCancelled()) {
return;
}
bool was_insert = false;
if (tx_data.IsGlobalCmd()) {
was_insert = InsertTxToSharedMap(tx_data);
}
ExecuteTx(std::move(tx_data), was_insert, cntx);
}
bool Replica::InsertTxToSharedMap(const TranactionData& tx_data) {
std::lock_guard lg{multi_shard_exe_->map_mu};
auto [it, was_insert] =
multi_shard_exe_->tx_sync_execution.emplace(tx_data.txid, tx_data.shard_cnt);
VLOG(2) << "txid: " << tx_data.txid << " unique_shard_cnt_: " << tx_data.shard_cnt
<< " was_insert: " << was_insert;
it->second.block.Dec();
return was_insert;
}
void Replica::InsertTxDataToShardResource(TranactionData&& tx_data) {
bool was_insert = false;
if (tx_data.shard_cnt > 1) {
multi_shard_exe_->map_mu.lock();
bool global_cmd =
(tx_data.commands.size() == 1 && tx_data.commands.front().cmd_args.size() == 1);
auto [it, was_insert] = multi_shard_exe_->tx_sync_execution.emplace(
std::piecewise_construct, std::forward_as_tuple(tx_data.txid),
std::forward_as_tuple(tx_data.shard_cnt, global_cmd));
VLOG(2) << "txid: " << tx_data.txid << " unique_shard_cnt_: " << tx_data.shard_cnt
<< " was_insert: " << was_insert;
me_insert = was_insert;
it->second.block.Dec();
multi_shard_exe_->map_mu.unlock();
was_insert = InsertTxToSharedMap(tx_data);
}
VLOG(2) << "txid: " << tx_data.txid << " pushed to queue";
trans_data_queue_.push(std::make_pair(std::move(tx_data), me_insert));
trans_data_queue_.push(std::make_pair(std::move(tx_data), was_insert));
}
void Replica::StableSyncDflyExecFb(Context* cntx) {
JournalExecutor executor{&service_};
while (!cntx->IsCancelled()) {
waker_.await([&]() { return (!trans_data_queue_.empty() || cntx->IsCancelled()); });
if (cntx->IsCancelled()) {
@ -833,7 +856,7 @@ void Replica::StableSyncDflyExecFb(Context* cntx) {
}
DCHECK(!trans_data_queue_.empty());
auto& data = trans_data_queue_.front();
ExecuteTx(&executor, std::move(data.first), data.second, cntx);
ExecuteTx(std::move(data.first), data.second, cntx);
trans_data_queue_.pop();
waker_.notify();
}
@ -841,14 +864,13 @@ void Replica::StableSyncDflyExecFb(Context* cntx) {
return;
}
void Replica::ExecuteTx(JournalExecutor* executor, TranactionData&& tx_data, bool inserted_by_me,
Context* cntx) {
void Replica::ExecuteTx(TranactionData&& tx_data, bool inserted_by_me, Context* cntx) {
if (cntx->IsCancelled()) {
return;
}
if (tx_data.shard_cnt <= 1) { // not multi shard cmd
VLOG(2) << "Execute single shard cmd";
executor->Execute(tx_data.dbid, tx_data.commands.front());
if (tx_data.shard_cnt <= 1 || (!use_multi_shard_exe_sync_ && !tx_data.IsGlobalCmd())) {
VLOG(2) << "Execute cmd without sync between shards. shard_cnt: " << tx_data.shard_cnt;
executor_->Execute(tx_data.dbid, tx_data.commands.front());
return;
}
@ -869,7 +891,7 @@ void Replica::ExecuteTx(JournalExecutor* executor, TranactionData&& tx_data, boo
return;
VLOG(2) << "Execute txid: " << tx_data.txid << " block wait finished";
if (multi_shard_data.is_global_cmd) {
if (tx_data.IsGlobalCmd()) {
VLOG(2) << "Execute txid: " << tx_data.txid << " global command execution";
// Wait until all shards flows get to execution step of this transaction.
multi_shard_data.barrier.Wait();
@ -879,7 +901,7 @@ void Replica::ExecuteTx(JournalExecutor* executor, TranactionData&& tx_data, boo
// Global command will be executed only from one flow fiber. This ensure corectness of data in
// replica.
if (inserted_by_me) {
executor->Execute(tx_data.dbid, tx_data.commands);
executor_->Execute(tx_data.dbid, tx_data.commands);
}
// Wait until exection is done, to make sure we done execute next commands while the global is
// executed.
@ -889,7 +911,7 @@ void Replica::ExecuteTx(JournalExecutor* executor, TranactionData&& tx_data, boo
return;
} else { // Non gloabl command will be executed by each the flow fiber
VLOG(2) << "Execute txid: " << tx_data.txid << " executing shard transaction commands";
executor->Execute(tx_data.dbid, tx_data.commands);
executor_->Execute(tx_data.dbid, tx_data.commands);
}
// Erase from map can be done only after all flow fibers executed the transaction commands.
@ -1151,4 +1173,8 @@ bool Replica::TranactionData::UpdateFromParsedEntry(journal::ParsedEntry&& entry
return false;
}
bool Replica::TranactionData::IsGlobalCmd() const {
return commands.size() == 1 && commands.front().cmd_args.size() == 1;
}
} // namespace dfly

View file

@ -60,6 +60,7 @@ class Replica {
std::vector<journal::ParsedEntry::CmdData> commands;
// Update the data from ParsedEntry and return if all shard transaction commands were recieved.
bool UpdateFromParsedEntry(journal::ParsedEntry&& entry);
bool IsGlobalCmd() const;
};
struct MultiShardExecution {
@ -69,9 +70,8 @@ class Replica {
util::fibers_ext::Barrier barrier;
std::atomic_uint32_t counter;
util::fibers_ext::BlockingCounter block;
bool is_global_cmd;
TxExecutionSync(uint32_t counter, bool is_global)
: barrier(counter), counter(counter), block(counter), is_global_cmd(is_global) {
TxExecutionSync(uint32_t counter) : barrier(counter), counter(counter), block(counter) {
}
};
@ -158,9 +158,10 @@ class Replica {
// Send command, update last_io_time, return error.
std::error_code SendCommand(std::string_view command, facade::ReqSerializer* serializer);
void ExecuteTx(JournalExecutor* executor, TranactionData&& tx_data, bool inserted_by_me,
Context* cntx);
void InsertTxDataToShardResource(TranactionData&& tx_data, Context* cntx);
void ExecuteTx(TranactionData&& tx_data, bool inserted_by_me, Context* cntx);
void InsertTxDataToShardResource(TranactionData&& tx_data);
void ExecuteTxWithNoShardSync(TranactionData&& tx_data, Context* cntx);
bool InsertTxToSharedMap(const TranactionData& tx_data);
public: /* Utility */
struct Info {
@ -199,6 +200,8 @@ class Replica {
std::queue<std::pair<TranactionData, bool>> trans_data_queue_;
static constexpr size_t kYieldAfterItemsInQueue = 50;
::util::fibers_ext::EventCount waker_;
bool use_multi_shard_exe_sync_;
std::unique_ptr<JournalExecutor> executor_;
// MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode.
::boost::fibers::fiber sync_fb_;