1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(cluster): add tx execution in cluster_shard_migration (#2385)

* feat(cluster): add tx execution in cluster_shard_migration
refactor(replication): move code that is common for cluster and
replica into a separate file, add full-sync-cut cmd
This commit is contained in:
Borys 2024-01-22 21:19:39 +02:00 committed by GitHub
parent 7debe3c685
commit a16b940a65
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 538 additions and 274 deletions

View file

@ -17,6 +17,7 @@ endif()
add_library(dfly_transaction db_slice.cc malloc_stats.cc blocking_controller.cc
command_registry.cc cluster/unique_slot_checker.cc
journal/tx_executor.cc
common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc
server_state.cc table.cc top_keys.cc transaction.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
@ -42,7 +43,7 @@ add_library(dragonfly_lib engine_shard_set.cc channel_store.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc
top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc
cluster/cluster_family.cc cluster/cluster_slot_migration.cc
cluster/cluster_shard_migration.cc
cluster/cluster_shard_migration.cc cluster/outgoing_slot_migration.cc
acl/user.cc acl/user_registry.cc acl/acl_family.cc
acl/validator.cc acl/helpers.cc)

View file

@ -19,8 +19,12 @@
namespace dfly {
using SlotId = uint16_t;
// TODO consider to use bit set or some more compact way to store SlotId
using SlotSet = absl::flat_hash_set<SlotId>;
// MigrationState constants are ordered in state changing order
enum class MigrationState : uint8_t { C_NO_STATE, C_CONNECTING, C_FULL_SYNC, C_STABLE_SYNC };
class ClusterConfig {
public:
static constexpr SlotId kMaxSlotNum = 0x3FFF;

View file

@ -22,7 +22,6 @@
#include "server/error.h"
#include "server/journal/journal.h"
#include "server/main_service.h"
#include "server/replica.h"
#include "server/server_family.h"
#include "server/server_state.h"
@ -633,18 +632,18 @@ void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionCon
return cntx->SendOk();
}
static std::string_view state_to_str(ClusterSlotMigration::State state) {
static std::string_view state_to_str(MigrationState state) {
switch (state) {
case ClusterSlotMigration::C_NO_STATE:
case MigrationState::C_NO_STATE:
return "NO_STATE"sv;
case ClusterSlotMigration::C_CONNECTING:
case MigrationState::C_CONNECTING:
return "CONNECTING"sv;
case ClusterSlotMigration::C_FULL_SYNC:
case MigrationState::C_FULL_SYNC:
return "FULL_SYNC"sv;
case ClusterSlotMigration::C_STABLE_SYNC:
case MigrationState::C_STABLE_SYNC:
return "STABLE_SYNC"sv;
}
DCHECK(false) << "Unknown State value " << state;
DCHECK(false) << "Unknown State value " << static_cast<underlying_type_t<MigrationState>>(state);
return "UNDEFINED_STATE"sv;
}
@ -662,14 +661,14 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext*
for (const auto& m : incoming_migrations_jobs_) {
const auto& info = m->GetInfo();
if (info.host == host_ip && info.port == port)
return rb->SendSimpleString(state_to_str(info.state));
return rb->SendSimpleString(state_to_str(m->GetState()));
}
// find outgoing slot migration
for (const auto& [_, info] : outgoing_migration_infos_) {
if (info->host_ip == host_ip && info->port == port)
return rb->SendSimpleString(state_to_str(info->state));
for (const auto& [_, info] : outgoing_migration_jobs_) {
if (info->GetHostIp() == host_ip && info->GetPort() == port)
return rb->SendSimpleString(state_to_str(info->GetState()));
}
} else if (auto arr_size = incoming_migrations_jobs_.size() + outgoing_migration_infos_.size();
} else if (auto arr_size = incoming_migrations_jobs_.size() + outgoing_migration_jobs_.size();
arr_size != 0) {
rb->StartArray(arr_size);
const auto& send_answer = [rb](std::string_view direction, std::string_view host, uint16_t port,
@ -680,14 +679,14 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext*
lock_guard lk(migration_mu_);
for (const auto& m : incoming_migrations_jobs_) {
const auto& info = m->GetInfo();
send_answer("in", info.host, info.port, info.state);
send_answer("in", info.host, info.port, m->GetState());
}
for (const auto& [_, info] : outgoing_migration_infos_) {
send_answer("out", info->host_ip, info->port, info->state);
for (const auto& [_, info] : outgoing_migration_jobs_) {
send_answer("out", info->GetHostIp(), info->GetPort(), info->GetState());
}
return;
}
return rb->SendSimpleString(state_to_str(ClusterSlotMigration::C_NO_STATE));
return rb->SendSimpleString(state_to_str(MigrationState::C_NO_STATE));
}
void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
@ -697,7 +696,9 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
if (sub_cmd == "CONF") {
MigrationConf(args, cntx);
} else if (sub_cmd == "FLOW") {
Flow(args, cntx);
DflyMigrateFlow(args, cntx);
} else if (sub_cmd == "FULL-SYNC-CUT") {
DflyMigrateFullSyncCut(args, cntx);
} else {
cntx->SendError(facade::UnknownSubCmd(sub_cmd, "DFLYMIGRATE"), facade::kSyntaxErrType);
}
@ -712,7 +713,8 @@ ClusterSlotMigration* ClusterFamily::AddMigration(std::string host_ip, uint16_t
}
}
return incoming_migrations_jobs_
.emplace_back(make_unique<ClusterSlotMigration>(std::string(host_ip), port, std::move(slots)))
.emplace_back(make_unique<ClusterSlotMigration>(std::string(host_ip), port,
&server_family_->service(), std::move(slots)))
.get();
}
@ -746,7 +748,7 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
}
}
auto sync_id = CreateMigrationSession(cntx, port, std::move(slots));
auto sync_id = CreateOutgoingMigration(cntx, port, std::move(slots));
cntx->conn()->SetName("slot_migration_ctrl");
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
@ -756,18 +758,25 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
return;
}
uint32_t ClusterFamily::CreateMigrationSession(ConnectionContext* cntx, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots) {
uint32_t ClusterFamily::CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots) {
std::lock_guard lk(migration_mu_);
auto sync_id = next_sync_id_++;
auto info = make_shared<MigrationInfo>(shard_set->size(), cntx->conn()->RemoteEndpointAddress(),
sync_id, port, std::move(slots));
auto [it, inserted] = outgoing_migration_infos_.emplace(sync_id, info);
auto err_handler = [this, sync_id](const GenericError& err) {
LOG(INFO) << "Slot migration error: " << err.Format();
// Todo add error processing, stop migration process
// fb2::Fiber("stop_Migration", &ClusterFamily::StopMigration, this, sync_id).Detach();
};
auto info =
make_shared<OutgoingMigration>(shard_set->size(), cntx->conn()->RemoteEndpointAddress(), port,
std::move(slots), err_handler);
auto [it, inserted] = outgoing_migration_jobs_.emplace(sync_id, std::move(info));
CHECK(inserted);
return sync_id;
}
void ClusterFamily::Flow(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
CmdArgParser parser{args};
auto [sync_id, shard_id] = parser.Next<uint32_t, uint32_t>();
@ -775,32 +784,58 @@ void ClusterFamily::Flow(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendError(err->MakeReply());
}
VLOG(1) << "Create flow "
<< " sync_id: " << sync_id << " shard_id: " << shard_id << " shard";
VLOG(1) << "Create flow sync_id: " << sync_id << " shard_id: " << shard_id;
cntx->conn()->SetName(absl::StrCat("migration_flow_", sync_id));
auto info = GetMigrationInfo(sync_id);
auto info = GetOutgoingMigration(sync_id);
if (!info)
cntx->SendError(kIdNotFound);
info->flows[shard_id].conn = cntx->conn();
return cntx->SendError(kIdNotFound);
cntx->conn()->Migrate(shard_set->pool()->at(shard_id));
info->state = ClusterSlotMigration::State::C_FULL_SYNC;
cntx->SendOk();
info->flows[shard_id].conn->socket()->Write(io::Buffer("SYNC"));
EngineShard* shard = EngineShard::tlocal();
DCHECK(shard->shard_id() == shard_id);
LOG(INFO) << "Started migation with target node " << info->host_ip << ":" << info->port;
info->StartFlow(&shard->db_slice(), sync_id, server_family_->journal(), cntx->conn()->socket());
}
shared_ptr<ClusterFamily::MigrationInfo> ClusterFamily::GetMigrationInfo(uint32_t sync_id) {
void ClusterFamily::DflyMigrateFullSyncCut(CmdArgList args, ConnectionContext* cntx) {
CmdArgParser parser{args};
auto [sync_id, shard_id] = parser.Next<uint32_t, uint32_t>();
if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply());
}
VLOG(1) << "Full sync cut "
<< " sync_id: " << sync_id << " shard_id: " << shard_id << " shard";
std::lock_guard lck(migration_mu_);
auto migration_it =
std::find_if(incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(),
[sync_id](const auto& el) { return el->GetSyncId() == sync_id; });
if (migration_it == incoming_migrations_jobs_.end()) {
LOG(WARNING) << "Couldn't find migration id";
return cntx->SendError(kIdNotFound);
}
(*migration_it)->SetStableSyncForFlow(shard_id);
if ((*migration_it)->GetState() == MigrationState::C_STABLE_SYNC) {
(*migration_it)->Stop();
LOG(INFO) << "STABLE-SYNC state is set for sync_id " << sync_id;
}
cntx->SendOk();
}
shared_ptr<OutgoingMigration> ClusterFamily::GetOutgoingMigration(uint32_t sync_id) {
unique_lock lk(migration_mu_);
auto sync_it = outgoing_migration_infos_.find(sync_id);
return sync_it != outgoing_migration_infos_.end() ? sync_it->second : nullptr;
auto sync_it = outgoing_migration_jobs_.find(sync_id);
return sync_it != outgoing_migration_jobs_.end() ? sync_it->second : nullptr;
}
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx);

View file

@ -11,13 +11,13 @@
#include "facade/conn_context.h"
#include "server/cluster/cluster_config.h"
#include "server/cluster/cluster_slot_migration.h"
#include "server/cluster/outgoing_slot_migration.h"
#include "server/common.h"
namespace dfly {
class CommandRegistry;
class ConnectionContext;
class ServerFamily;
class DflyCmd;
class ClusterFamily {
public:
@ -58,7 +58,7 @@ class ClusterFamily {
// DFLYMIGRATE CONF initiate first step in slots migration procedure
// MigrationConf process this request and saving slots range and
// target node port in outgoing_migration_infos_.
// target node port in outgoing_migration_jobs_.
// return sync_id and shard number to the target node
void MigrationConf(CmdArgList args, ConnectionContext* cntx);
@ -66,43 +66,19 @@ class ClusterFamily {
// this request should be done for every shard on the target node
// this method assocciate connection and shard that will be the data
// source for migration
void Flow(CmdArgList args, ConnectionContext* cntx);
void DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx);
void DflyMigrateFullSyncCut(CmdArgList args, ConnectionContext* cntx);
// create a ClusterSlotMigration entity which will execute migration
ClusterSlotMigration* AddMigration(std::string host_ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots);
// store info about migration and create unique session id
uint32_t CreateMigrationSession(ConnectionContext* cntx, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots);
uint32_t CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots);
// FlowInfo is used to store state, connection, and all auxiliary data
// that is needed for correct slots (per shard) data transfer
struct FlowInfo {
facade::Connection* conn = nullptr;
};
// Whole slots migration process information
struct MigrationInfo {
MigrationInfo() = default;
MigrationInfo(std::uint32_t flows_num, std::string ip, uint32_t sync_id, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots)
: host_ip(ip),
flows(flows_num),
slots(slots),
sync_id(sync_id),
port(port),
state(ClusterSlotMigration::State::C_CONNECTING) {
}
std::string host_ip;
std::vector<FlowInfo> flows;
std::vector<ClusterConfig::SlotRange> slots;
uint32_t sync_id;
uint16_t port;
ClusterSlotMigration::State state = ClusterSlotMigration::State::C_NO_STATE;
};
std::shared_ptr<MigrationInfo> GetMigrationInfo(uint32_t sync_id);
std::shared_ptr<OutgoingMigration> GetOutgoingMigration(uint32_t sync_id);
mutable Mutex migration_mu_; // guard migrations operations
// holds all incoming slots migrations that are currently in progress.
@ -111,8 +87,8 @@ class ClusterFamily {
uint32_t next_sync_id_ = 1;
// holds all outgoing slots migrations that are currently in progress
using MigrationInfoMap = absl::btree_map<uint32_t, std::shared_ptr<MigrationInfo>>;
MigrationInfoMap outgoing_migration_infos_;
using OutgoingMigrationMap = absl::btree_map<uint32_t, std::shared_ptr<OutgoingMigration>>;
OutgoingMigrationMap outgoing_migration_jobs_;
private:
ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const;

View file

@ -4,10 +4,13 @@
#include "server/cluster/cluster_shard_migration.h"
#include <absl/flags/flag.h>
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>
#include "base/logging.h"
#include "server/error.h"
#include "server/journal/serializer.h"
#include "server/journal/tx_executor.h"
ABSL_DECLARE_FLAG(int, source_connect_timeout_ms);
@ -19,8 +22,9 @@ using namespace util;
using absl::GetFlag;
ClusterShardMigration::ClusterShardMigration(ServerContext server_context, uint32_t shard_id,
uint32_t sync_id)
uint32_t sync_id, Service* service)
: ProtocolClient(server_context), source_shard_id_(shard_id), sync_id_(sync_id) {
executor_ = std::make_unique<JournalExecutor>(service);
}
ClusterShardMigration::~ClusterShardMigration() {
@ -57,16 +61,40 @@ void ClusterShardMigration::FullSyncShardFb(Context* cntx) {
DCHECK(leftover_buf_);
io::PrefixSource ps{leftover_buf_->InputBuffer(), Sock()};
uint8_t ok_buf[4];
ps.ReadAtLeast(io::MutableBytes{ok_buf, 4}, 4);
JournalReader reader{&ps, 0};
TransactionReader tx_reader{};
if (string_view(reinterpret_cast<char*>(ok_buf), 4) != "SYNC") {
VLOG(1) << "FullSyncShardFb incorrect data transfer";
cntx->ReportError(std::make_error_code(errc::protocol_error),
"Incorrect FullSync data, only for tets");
while (!cntx->IsCancelled()) {
if (cntx->IsCancelled())
break;
auto tx_data = tx_reader.NextTxData(&reader, cntx);
if (!tx_data)
break;
TouchIoTime();
if (!tx_data->is_ping) {
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
} else {
// TODO check about ping logic
}
}
}
VLOG(1) << "FullSyncShardFb finished after reading 4 bytes";
void ClusterShardMigration::ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) {
if (cntx->IsCancelled()) {
return;
}
CHECK(tx_data.shard_cnt <= 1); // we don't support sync for multishard execution
if (!tx_data.IsGlobalCmd()) {
VLOG(2) << "Execute cmd without sync between shards. txid: " << tx_data.txid;
executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands));
} else {
// TODO check which global commands should be supported
CHECK(false) << "We don't support command: " << ToSV(tx_data.commands.front().cmd_args[0])
<< "in cluster migration process.";
}
}
void ClusterShardMigration::Cancel() {

View file

@ -4,30 +4,47 @@
#pragma once
#include "base/io_buf.h"
#include "server/journal/executor.h"
#include "server/protocol_client.h"
namespace dfly {
class Service;
class TransactionData;
class MultiShardExecution;
// ClusterShardMigration manage data receiving in slots migration process.
// It is created per shard on the target node to initiate FLOW step.
class ClusterShardMigration : public ProtocolClient {
public:
ClusterShardMigration(ServerContext server_context, uint32_t shard_id, uint32_t sync_id);
ClusterShardMigration(ServerContext server_context, uint32_t shard_id, uint32_t sync_id,
Service* service);
~ClusterShardMigration();
std::error_code StartSyncFlow(Context* cntx);
void Cancel();
void SetStableSync() {
is_stable_sync_.store(true);
}
bool IsStableSync() {
return is_stable_sync_.load();
}
private:
void FullSyncShardFb(Context* cntx);
void JoinFlow();
void ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx);
void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx);
private:
uint32_t source_shard_id_;
uint32_t sync_id_;
std::optional<base::IoBuf> leftover_buf_;
std::unique_ptr<JournalExecutor> executor_;
Fiber sync_fb_;
std::atomic_bool is_stable_sync_ = false;
};
} // namespace dfly

View file

@ -1,6 +1,7 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/cluster/cluster_slot_migration.h"
#include <absl/cleanup/cleanup.h>
@ -9,6 +10,7 @@
#include "base/logging.h"
#include "server/cluster/cluster_shard_migration.h"
#include "server/error.h"
#include "server/journal/tx_executor.h"
#include "server/main_service.h"
ABSL_FLAG(int, source_connect_timeout_ms, 20000,
@ -35,9 +37,9 @@ vector<vector<unsigned>> Partition(unsigned num_flows) {
} // namespace
ClusterSlotMigration::ClusterSlotMigration(string host_ip, uint16_t port,
ClusterSlotMigration::ClusterSlotMigration(string host_ip, uint16_t port, Service* se,
std::vector<ClusterConfig::SlotRange> slots)
: ProtocolClient(move(host_ip), port), slots_(std::move(slots)) {
: ProtocolClient(move(host_ip), port), service_(*se), slots_(std::move(slots)) {
}
ClusterSlotMigration::~ClusterSlotMigration() {
@ -62,7 +64,7 @@ error_code ClusterSlotMigration::Start(ConnectionContext* cntx) {
ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_);
RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source"));
state_ = ClusterSlotMigration::C_CONNECTING;
state_ = MigrationState::C_CONNECTING;
VLOG(1) << "Greeting";
ec = Greet();
@ -98,13 +100,29 @@ error_code ClusterSlotMigration::Greet() {
ClusterSlotMigration::Info ClusterSlotMigration::GetInfo() const {
const auto& ctx = server();
return {ctx.host, ctx.port, state_};
return {ctx.host, ctx.port};
}
void ClusterSlotMigration::SetStableSyncForFlow(uint32_t flow) {
DCHECK(shard_flows_.size() > flow);
shard_flows_[flow]->SetStableSync();
if (std::all_of(shard_flows_.begin(), shard_flows_.end(),
[](const auto& el) { return el->IsStableSync(); })) {
state_ = MigrationState::C_STABLE_SYNC;
}
}
void ClusterSlotMigration::Stop() {
for (auto& flow : shard_flows_) {
flow->Cancel();
}
}
void ClusterSlotMigration::MainMigrationFb() {
VLOG(1) << "Main migration fiber started";
state_ = ClusterSlotMigration::C_FULL_SYNC;
state_ = MigrationState::C_FULL_SYNC;
// TODO add reconnection code
if (auto ec = InitiateSlotsMigration(); ec) {
@ -116,7 +134,7 @@ void ClusterSlotMigration::MainMigrationFb() {
std::error_code ClusterSlotMigration::InitiateSlotsMigration() {
shard_flows_.resize(source_shards_num_);
for (unsigned i = 0; i < source_shards_num_; ++i) {
shard_flows_[i].reset(new ClusterShardMigration(server(), i, sync_id_));
shard_flows_[i].reset(new ClusterShardMigration(server(), i, sync_id_, &service_));
}
// Switch to new error handler that closes flow sockets.

View file

@ -3,31 +3,40 @@
//
#pragma once
#include "server/cluster/cluster_shard_migration.h"
#include "server/protocol_client.h"
namespace dfly {
class ClusterShardMigration;
class Service;
// The main entity on the target side that manage slots migration process
// Creates initial connection between the target and source node,
// manage migration process state and data
class ClusterSlotMigration : ProtocolClient {
public:
enum State : uint8_t { C_NO_STATE, C_CONNECTING, C_FULL_SYNC, C_STABLE_SYNC };
struct Info {
std::string host;
uint16_t port;
State state;
};
ClusterSlotMigration(std::string host_ip, uint16_t port,
ClusterSlotMigration(std::string host_ip, uint16_t port, Service* se,
std::vector<ClusterConfig::SlotRange> slots);
~ClusterSlotMigration();
// Initiate connection with source node and create migration fiber
std::error_code Start(ConnectionContext* cntx);
Info GetInfo() const;
uint32_t GetSyncId() const {
return sync_id_;
}
MigrationState GetState() const {
return state_;
}
void SetStableSyncForFlow(uint32_t flow);
void Stop();
private:
// Send DFLYMIGRATE CONF to the source and get info about migration process
@ -37,12 +46,13 @@ class ClusterSlotMigration : ProtocolClient {
std::error_code InitiateSlotsMigration();
private:
Service& service_;
Mutex flows_op_mu_;
std::vector<std::unique_ptr<ClusterShardMigration>> shard_flows_;
std::vector<ClusterConfig::SlotRange> slots_;
uint32_t source_shards_num_ = 0;
uint32_t sync_id_ = 0;
State state_ = C_NO_STATE;
MigrationState state_ = MigrationState::C_NO_STATE;
Fiber sync_fb_;
};

View file

@ -0,0 +1,69 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/cluster/outgoing_slot_migration.h"
#include "server/db_slice.h"
#include "server/journal/streamer.h"
namespace dfly {
class OutgoingMigration::SliceSlotMigration {
public:
SliceSlotMigration(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal,
Context* cntx)
: streamer_(slice, std::move(slots), sync_id, journal, cntx) {
}
void Start(io::Sink* dest) {
streamer_.Start(dest);
state_ = MigrationState::C_FULL_SYNC;
}
MigrationState GetState() const {
return state_ == MigrationState::C_FULL_SYNC && streamer_.IsSnapshotFinished()
? MigrationState::C_STABLE_SYNC
: state_;
}
private:
RestoreStreamer streamer_;
MigrationState state_ = MigrationState::C_CONNECTING;
};
OutgoingMigration::OutgoingMigration(std::uint32_t flows_num, std::string ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots,
Context::ErrHandler err_handler)
: host_ip_(ip), port_(port), slots_(slots), cntx_(err_handler), slot_migrations_(flows_num) {
}
OutgoingMigration::~OutgoingMigration() = default;
void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal,
io::Sink* dest) {
SlotSet sset;
for (const auto& slot_range : slots_) {
for (auto i = slot_range.start; i <= slot_range.end; ++i)
sset.insert(i);
}
const auto shard_id = slice->shard_id();
std::lock_guard lck(flows_mu_);
slot_migrations_[shard_id] =
std::make_unique<SliceSlotMigration>(slice, std::move(sset), sync_id, journal, &cntx_);
slot_migrations_[shard_id]->Start(dest);
}
MigrationState OutgoingMigration::GetState() const {
std::lock_guard lck(flows_mu_);
MigrationState min_state = MigrationState::C_STABLE_SYNC;
for (const auto& slot_migration : slot_migrations_) {
if (slot_migration)
min_state = std::min(min_state, slot_migration->GetState());
}
return min_state;
}
} // namespace dfly

View file

@ -0,0 +1,50 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include "io/io.h"
#include "server/cluster/cluster_config.h"
#include "server/common.h"
namespace dfly {
namespace journal {
class Journal;
}
class DbSlice;
// Whole slots migration process information
class OutgoingMigration {
public:
OutgoingMigration() = default;
~OutgoingMigration();
OutgoingMigration(std::uint32_t flows_num, std::string ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots, Context::ErrHandler err_handler);
void StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal, io::Sink* dest);
MigrationState GetState() const;
const std::string& GetHostIp() const {
return host_ip_;
};
uint16_t GetPort() const {
return port_;
};
private:
// SliceSlotMigration manages state and data transfering for the corresponding shard
class SliceSlotMigration;
private:
std::string host_ip_;
uint16_t port_;
std::vector<ClusterConfig::SlotRange> slots_;
Context cntx_;
mutable Mutex flows_mu_;
std::vector<std::unique_ptr<SliceSlotMigration>> slot_migrations_ ABSL_GUARDED_BY(flows_mu_);
};
} // namespace dfly

View file

@ -49,17 +49,17 @@ void JournalStreamer::WriterFb(io::Sink* dest) {
}
}
RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, uint32_t flow_id,
RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id,
journal::Journal* journal, Context* cntx)
: JournalStreamer(journal, cntx),
db_slice_(slice),
my_slots_(std::move(slots)),
sync_id_(sync_id),
flow_id_(flow_id) {
sync_id_(sync_id) {
DCHECK(slice != nullptr);
}
void RestoreStreamer::Start(io::Sink* dest) {
VLOG(2) << "RestoreStreamer start";
auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this);
snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb));
@ -84,8 +84,11 @@ void RestoreStreamer::Start(io::Sink* dest) {
}
} while (cursor);
WriteCommand(make_pair(
"DFLYMIGRATE", ArgSlice{"FULL-SYNC-CUT", absl::StrCat(sync_id_), absl::StrCat(flow_id_)}));
VLOG(2) << "FULL-SYNC-CUT for " << sync_id_ << " : " << db_slice_->shard_id();
WriteCommand(make_pair("DFLYMIGRATE", ArgSlice{"FULL-SYNC-CUT", absl::StrCat(sync_id_),
absl::StrCat(db_slice_->shard_id())}));
NotifyWritten(true);
snapshot_finished_ = true;
});
}
@ -96,6 +99,12 @@ void RestoreStreamer::Cancel() {
JournalStreamer::Cancel();
}
RestoreStreamer::~RestoreStreamer() {
fiber_cancellation_.Cancel();
snapshot_fb_.JoinIfNeeded();
db_slice_->UnregisterOnChange(snapshot_version_);
}
bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const {
if (!item.slot.has_value()) {
return false;

View file

@ -53,12 +53,18 @@ class JournalStreamer : protected BufferedStreamerBase {
// Only handles relevant slots, while ignoring all others.
class RestoreStreamer : public JournalStreamer {
public:
RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, uint32_t flow_id,
journal::Journal* journal, Context* cntx);
RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal,
Context* cntx);
void Start(io::Sink* dest) override;
void Cancel() override;
bool IsSnapshotFinished() const {
return snapshot_finished_;
}
~RestoreStreamer();
private:
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
bool ShouldWrite(const journal::JournalItem& item) const override;
@ -72,9 +78,9 @@ class RestoreStreamer : public JournalStreamer {
uint64_t snapshot_version_ = 0;
SlotSet my_slots_;
uint32_t sync_id_;
uint32_t flow_id_;
Fiber snapshot_fb_;
Cancellation fiber_cancellation_;
bool snapshot_finished_ = false;
};
} // namespace dfly

View file

@ -0,0 +1,134 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "tx_executor.h"
#include <absl/strings/match.h>
#include "base/logging.h"
#include "server/journal/serializer.h"
using namespace std;
using namespace facade;
namespace dfly {
bool MultiShardExecution::InsertTxToSharedMap(TxId txid, uint32_t shard_cnt) {
std::unique_lock lk(map_mu);
auto [it, was_insert] = tx_sync_execution.emplace(txid, shard_cnt);
lk.unlock();
VLOG(2) << "txid: " << txid << " unique_shard_cnt_: " << shard_cnt
<< " was_insert: " << was_insert;
it->second.block.Dec();
return was_insert;
}
MultiShardExecution::TxExecutionSync& MultiShardExecution::Find(TxId txid) {
std::lock_guard lk(map_mu);
VLOG(2) << "Execute txid: " << txid;
auto it = tx_sync_execution.find(txid);
DCHECK(it != tx_sync_execution.end());
return it->second;
}
void MultiShardExecution::Erase(TxId txid) {
std::lock_guard lg{map_mu};
tx_sync_execution.erase(txid);
}
void MultiShardExecution::CancelAllBlockingEntities() {
lock_guard lk{map_mu};
for (auto& tx_data : tx_sync_execution) {
tx_data.second.barrier.Cancel();
tx_data.second.block.Cancel();
}
}
bool TransactionData::AddEntry(journal::ParsedEntry&& entry) {
++journal_rec_count;
switch (entry.opcode) {
case journal::Op::PING:
is_ping = true;
return true;
case journal::Op::EXPIRED:
case journal::Op::COMMAND:
commands.push_back(std::move(entry.cmd));
[[fallthrough]];
case journal::Op::EXEC:
shard_cnt = entry.shard_cnt;
dbid = entry.dbid;
txid = entry.txid;
return true;
case journal::Op::MULTI_COMMAND:
commands.push_back(std::move(entry.cmd));
dbid = entry.dbid;
return false;
default:
DCHECK(false) << "Unsupported opcode";
}
return false;
}
bool TransactionData::IsGlobalCmd() const {
if (commands.size() > 1) {
return false;
}
auto& command = commands.front();
if (command.cmd_args.empty()) {
return false;
}
auto& args = command.cmd_args;
if (absl::EqualsIgnoreCase(ToSV(args[0]), "FLUSHDB"sv) ||
absl::EqualsIgnoreCase(ToSV(args[0]), "FLUSHALL"sv) ||
(absl::EqualsIgnoreCase(ToSV(args[0]), "DFLYCLUSTER"sv) &&
absl::EqualsIgnoreCase(ToSV(args[1]), "FLUSHSLOTS"sv))) {
return true;
}
return false;
}
TransactionData TransactionData::FromSingle(journal::ParsedEntry&& entry) {
TransactionData data;
bool res = data.AddEntry(std::move(entry));
DCHECK(res);
return data;
}
std::optional<TransactionData> TransactionReader::NextTxData(JournalReader* reader, Context* cntx) {
io::Result<journal::ParsedEntry> res;
while (true) {
if (res = reader->ReadEntry(); !res) {
cntx->ReportError(res.error());
return std::nullopt;
}
// Check if journal command can be executed right away.
// Expiration checks lock on master, so it never conflicts with running multi transactions.
if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND ||
res->opcode == journal::Op::PING)
return TransactionData::FromSingle(std::move(res.value()));
// Otherwise, continue building multi command.
DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC);
DCHECK(res->txid > 0);
auto txid = res->txid;
auto& txdata = current_[txid];
if (txdata.AddEntry(std::move(res.value()))) {
auto out = std::move(txdata);
current_.erase(txid);
return out;
}
}
return std::nullopt;
}
} // namespace dfly

View file

@ -0,0 +1,68 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <unordered_map>
#include "server/common.h"
#include "server/journal/types.h"
namespace dfly {
class JournalReader;
// Coordinator for multi shard execution.
class MultiShardExecution {
public:
struct TxExecutionSync {
Barrier barrier;
std::atomic_uint32_t counter;
BlockingCounter block;
explicit TxExecutionSync(uint32_t counter)
: barrier(counter), counter(counter), block(counter) {
}
};
bool InsertTxToSharedMap(TxId txid, uint32_t shard_cnt);
TxExecutionSync& Find(TxId txid);
void Erase(TxId txid);
void CancelAllBlockingEntities();
private:
Mutex map_mu;
std::unordered_map<TxId, TxExecutionSync> tx_sync_execution;
};
// This class holds the commands of transaction in single shard.
// Once all commands were received, the transaction can be executed.
struct TransactionData {
// Update the data from ParsedEntry and return true if all shard transaction commands were
// received.
bool AddEntry(journal::ParsedEntry&& entry);
bool IsGlobalCmd() const;
static TransactionData FromSingle(journal::ParsedEntry&& entry);
TxId txid{0};
DbIndex dbid{0};
uint32_t shard_cnt{0};
absl::InlinedVector<journal::ParsedEntry::CmdData, 1> commands{0};
uint32_t journal_rec_count{0}; // Count number of source entries to check offset.
bool is_ping = false; // For Op::PING entries.
};
// Utility for reading TransactionData from a journal reader.
// The journal stream can contain interleaved data for multiple multi transactions,
// expiries and out of order executed transactions that need to be grouped on the replica side.
struct TransactionReader {
std::optional<TransactionData> NextTxData(JournalReader* reader, Context* cntx);
private:
// Stores ongoing multi transaction data.
absl::flat_hash_map<TxId, TransactionData> current_;
};
} // namespace dfly

View file

@ -598,14 +598,7 @@ error_code Replica::ConsumeDflyStream() {
flow->Cancel();
}
// Iterate over map and cancel all blocking entities
{
lock_guard lk{multi_shard_exe_->map_mu};
for (auto& tx_data : multi_shard_exe_->tx_sync_execution) {
tx_data.second.barrier.Cancel();
tx_data.second.block.Cancel();
}
}
multi_shard_exe_->CancelAllBlockingEntities();
};
RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler)));
@ -901,31 +894,16 @@ void DflyShardReplica::ExecuteTxWithNoShardSync(TransactionData&& tx_data, Conte
return;
}
bool was_insert = false;
if (tx_data.IsGlobalCmd()) {
was_insert = InsertTxToSharedMap(tx_data);
}
bool was_insert = tx_data.IsGlobalCmd() &&
multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, tx_data.shard_cnt);
ExecuteTx(std::move(tx_data), was_insert, cntx);
}
bool DflyShardReplica::InsertTxToSharedMap(const TransactionData& tx_data) {
std::unique_lock lk(multi_shard_exe_->map_mu);
auto [it, was_insert] =
multi_shard_exe_->tx_sync_execution.emplace(tx_data.txid, tx_data.shard_cnt);
lk.unlock();
VLOG(2) << "txid: " << tx_data.txid << " unique_shard_cnt_: " << tx_data.shard_cnt
<< " was_insert: " << was_insert;
it->second.block.Dec();
return was_insert;
}
void DflyShardReplica::InsertTxDataToShardResource(TransactionData&& tx_data) {
bool was_insert = false;
if (tx_data.shard_cnt > 1) {
was_insert = InsertTxToSharedMap(tx_data);
was_insert = multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, tx_data.shard_cnt);
}
VLOG(2) << "txid: " << tx_data.txid << " pushed to queue";
@ -957,12 +935,7 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me,
return;
}
VLOG(2) << "Execute txid: " << tx_data.txid;
std::unique_lock lk(multi_shard_exe_->map_mu);
auto it = multi_shard_exe_->tx_sync_execution.find(tx_data.txid);
DCHECK(it != multi_shard_exe_->tx_sync_execution.end());
auto& multi_shard_data = it->second;
lk.unlock();
auto& multi_shard_data = multi_shard_exe_->Find(tx_data.txid);
VLOG(2) << "Execute txid: " << tx_data.txid << " waiting for data in all shards";
// Wait until shards flows got transaction data and inserted to map.
@ -1004,8 +977,7 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me,
auto val = multi_shard_data.counter.fetch_sub(1, std::memory_order_relaxed);
VLOG(2) << "txid: " << tx_data.txid << " counter: " << val;
if (val == 1) {
std::lock_guard lg{multi_shard_exe_->map_mu};
multi_shard_exe_->tx_sync_execution.erase(tx_data.txid);
multi_shard_exe_->Erase(tx_data.txid);
}
}
@ -1131,92 +1103,6 @@ std::string Replica::GetSyncId() const {
return master_context_.dfly_session_id;
}
bool DflyShardReplica::TransactionData::AddEntry(journal::ParsedEntry&& entry) {
++journal_rec_count;
switch (entry.opcode) {
case journal::Op::PING:
is_ping = true;
return true;
case journal::Op::EXPIRED:
case journal::Op::COMMAND:
commands.push_back(std::move(entry.cmd));
[[fallthrough]];
case journal::Op::EXEC:
shard_cnt = entry.shard_cnt;
dbid = entry.dbid;
txid = entry.txid;
return true;
case journal::Op::MULTI_COMMAND:
commands.push_back(std::move(entry.cmd));
dbid = entry.dbid;
return false;
default:
DCHECK(false) << "Unsupported opcode";
}
return false;
}
bool DflyShardReplica::TransactionData::IsGlobalCmd() const {
if (commands.size() > 1) {
return false;
}
auto& command = commands.front();
if (command.cmd_args.empty()) {
return false;
}
auto& args = command.cmd_args;
if (absl::EqualsIgnoreCase(ToSV(args[0]), "FLUSHDB"sv) ||
absl::EqualsIgnoreCase(ToSV(args[0]), "FLUSHALL"sv) ||
(absl::EqualsIgnoreCase(ToSV(args[0]), "DFLYCLUSTER"sv) &&
absl::EqualsIgnoreCase(ToSV(args[1]), "FLUSHSLOTS"sv))) {
return true;
}
return false;
}
DflyShardReplica::TransactionData DflyShardReplica::TransactionData::FromSingle(
journal::ParsedEntry&& entry) {
TransactionData data;
bool res = data.AddEntry(std::move(entry));
DCHECK(res);
return data;
}
auto DflyShardReplica::TransactionReader::NextTxData(JournalReader* reader, Context* cntx)
-> optional<TransactionData> {
io::Result<journal::ParsedEntry> res;
while (true) {
if (res = reader->ReadEntry(); !res) {
cntx->ReportError(res.error());
return std::nullopt;
}
// Check if journal command can be executed right away.
// Expiration checks lock on master, so it never conflicts with running multi transactions.
if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND ||
res->opcode == journal::Op::PING)
return TransactionData::FromSingle(std::move(res.value()));
// Otherwise, continue building multi command.
DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC);
DCHECK(res->txid > 0);
auto txid = res->txid;
auto& txdata = current_[txid];
if (txdata.AddEntry(std::move(res.value()))) {
auto out = std::move(txdata);
current_.erase(txid);
return out;
}
}
return std::nullopt;
}
uint32_t DflyShardReplica::FlowId() const {
return flow_id_;
}

View file

@ -13,6 +13,7 @@
#include "facade/facade_types.h"
#include "facade/redis_parser.h"
#include "server/common.h"
#include "server/journal/tx_executor.h"
#include "server/journal/types.h"
#include "server/protocol_client.h"
#include "server/version.h"
@ -30,23 +31,6 @@ class JournalExecutor;
struct JournalReader;
class DflyShardReplica;
// Coordinator for multi shard execution.
struct MultiShardExecution {
Mutex map_mu;
struct TxExecutionSync {
Barrier barrier;
std::atomic_uint32_t counter;
BlockingCounter block;
explicit TxExecutionSync(uint32_t counter)
: barrier(counter), counter(counter), block(counter) {
}
};
std::unordered_map<TxId, TxExecutionSync> tx_sync_execution;
};
// The attributes of the master we are connecting to.
struct MasterContext {
std::string master_repl_id;
@ -193,36 +177,6 @@ class DflyShardReplica : public ProtocolClient {
Service* service, std::shared_ptr<MultiShardExecution> multi_shard_exe);
~DflyShardReplica();
// This class holds the commands of transaction in single shard.
// Once all commands were received, the transaction can be executed.
struct TransactionData {
// Update the data from ParsedEntry and return true if all shard transaction commands were
// received.
bool AddEntry(journal::ParsedEntry&& entry);
bool IsGlobalCmd() const;
static TransactionData FromSingle(journal::ParsedEntry&& entry);
TxId txid{0};
DbIndex dbid{0};
uint32_t shard_cnt{0};
absl::InlinedVector<journal::ParsedEntry::CmdData, 1> commands{0};
uint32_t journal_rec_count{0}; // Count number of source entries to check offset.
bool is_ping = false; // For Op::PING entries.
};
// Utility for reading TransactionData from a journal reader.
// The journal stream can contain interleaved data for multiple multi transactions,
// expiries and out of order executed transactions that need to be grouped on the replica side.
struct TransactionReader {
std::optional<TransactionData> NextTxData(JournalReader* reader, Context* cntx);
private:
// Stores ongoing multi transaction data.
absl::flat_hash_map<TxId, TransactionData> current_;
};
void Cancel();
void JoinFlow();
@ -246,7 +200,6 @@ class DflyShardReplica : public ProtocolClient {
void ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx);
void InsertTxDataToShardResource(TransactionData&& tx_data);
void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx);
bool InsertTxToSharedMap(const TransactionData& tx_data);
uint32_t FlowId() const;

View file

@ -806,15 +806,15 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
status = await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port)
)
assert "FULL_SYNC" == status
assert "STABLE_SYNC" == status
status = await c_nodes_admin[0].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[1].port)
)
assert "FULL_SYNC" == status
assert "STABLE_SYNC" == status
status = await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
assert ["out 127.0.0.1:30002 FULL_SYNC"] == status
assert ["out 127.0.0.1:30002 STABLE_SYNC"] == status
try:
await c_nodes_admin[1].execute_command(