diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 94d661bd2..46690dbcc 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -17,6 +17,7 @@ endif() add_library(dfly_transaction db_slice.cc malloc_stats.cc blocking_controller.cc command_registry.cc cluster/unique_slot_checker.cc + journal/tx_executor.cc common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc server_state.cc table.cc top_keys.cc transaction.cc serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc @@ -42,7 +43,7 @@ add_library(dragonfly_lib engine_shard_set.cc channel_store.cc zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc cluster/cluster_family.cc cluster/cluster_slot_migration.cc - cluster/cluster_shard_migration.cc + cluster/cluster_shard_migration.cc cluster/outgoing_slot_migration.cc acl/user.cc acl/user_registry.cc acl/acl_family.cc acl/validator.cc acl/helpers.cc) diff --git a/src/server/cluster/cluster_config.h b/src/server/cluster/cluster_config.h index 82ae89fea..1a501a209 100644 --- a/src/server/cluster/cluster_config.h +++ b/src/server/cluster/cluster_config.h @@ -19,8 +19,12 @@ namespace dfly { using SlotId = uint16_t; +// TODO consider to use bit set or some more compact way to store SlotId using SlotSet = absl::flat_hash_set; +// MigrationState constants are ordered in state changing order +enum class MigrationState : uint8_t { C_NO_STATE, C_CONNECTING, C_FULL_SYNC, C_STABLE_SYNC }; + class ClusterConfig { public: static constexpr SlotId kMaxSlotNum = 0x3FFF; diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index ed68bd2d2..a1c6e2bb5 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -22,7 +22,6 @@ #include "server/error.h" #include "server/journal/journal.h" #include "server/main_service.h" -#include "server/replica.h" #include "server/server_family.h" #include "server/server_state.h" @@ -633,18 +632,18 @@ void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionCon return cntx->SendOk(); } -static std::string_view state_to_str(ClusterSlotMigration::State state) { +static std::string_view state_to_str(MigrationState state) { switch (state) { - case ClusterSlotMigration::C_NO_STATE: + case MigrationState::C_NO_STATE: return "NO_STATE"sv; - case ClusterSlotMigration::C_CONNECTING: + case MigrationState::C_CONNECTING: return "CONNECTING"sv; - case ClusterSlotMigration::C_FULL_SYNC: + case MigrationState::C_FULL_SYNC: return "FULL_SYNC"sv; - case ClusterSlotMigration::C_STABLE_SYNC: + case MigrationState::C_STABLE_SYNC: return "STABLE_SYNC"sv; } - DCHECK(false) << "Unknown State value " << state; + DCHECK(false) << "Unknown State value " << static_cast>(state); return "UNDEFINED_STATE"sv; } @@ -662,14 +661,14 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* for (const auto& m : incoming_migrations_jobs_) { const auto& info = m->GetInfo(); if (info.host == host_ip && info.port == port) - return rb->SendSimpleString(state_to_str(info.state)); + return rb->SendSimpleString(state_to_str(m->GetState())); } // find outgoing slot migration - for (const auto& [_, info] : outgoing_migration_infos_) { - if (info->host_ip == host_ip && info->port == port) - return rb->SendSimpleString(state_to_str(info->state)); + for (const auto& [_, info] : outgoing_migration_jobs_) { + if (info->GetHostIp() == host_ip && info->GetPort() == port) + return rb->SendSimpleString(state_to_str(info->GetState())); } - } else if (auto arr_size = incoming_migrations_jobs_.size() + outgoing_migration_infos_.size(); + } else if (auto arr_size = incoming_migrations_jobs_.size() + outgoing_migration_jobs_.size(); arr_size != 0) { rb->StartArray(arr_size); const auto& send_answer = [rb](std::string_view direction, std::string_view host, uint16_t port, @@ -680,14 +679,14 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* lock_guard lk(migration_mu_); for (const auto& m : incoming_migrations_jobs_) { const auto& info = m->GetInfo(); - send_answer("in", info.host, info.port, info.state); + send_answer("in", info.host, info.port, m->GetState()); } - for (const auto& [_, info] : outgoing_migration_infos_) { - send_answer("out", info->host_ip, info->port, info->state); + for (const auto& [_, info] : outgoing_migration_jobs_) { + send_answer("out", info->GetHostIp(), info->GetPort(), info->GetState()); } return; } - return rb->SendSimpleString(state_to_str(ClusterSlotMigration::C_NO_STATE)); + return rb->SendSimpleString(state_to_str(MigrationState::C_NO_STATE)); } void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) { @@ -697,7 +696,9 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) { if (sub_cmd == "CONF") { MigrationConf(args, cntx); } else if (sub_cmd == "FLOW") { - Flow(args, cntx); + DflyMigrateFlow(args, cntx); + } else if (sub_cmd == "FULL-SYNC-CUT") { + DflyMigrateFullSyncCut(args, cntx); } else { cntx->SendError(facade::UnknownSubCmd(sub_cmd, "DFLYMIGRATE"), facade::kSyntaxErrType); } @@ -712,7 +713,8 @@ ClusterSlotMigration* ClusterFamily::AddMigration(std::string host_ip, uint16_t } } return incoming_migrations_jobs_ - .emplace_back(make_unique(std::string(host_ip), port, std::move(slots))) + .emplace_back(make_unique(std::string(host_ip), port, + &server_family_->service(), std::move(slots))) .get(); } @@ -746,7 +748,7 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) { } } - auto sync_id = CreateMigrationSession(cntx, port, std::move(slots)); + auto sync_id = CreateOutgoingMigration(cntx, port, std::move(slots)); cntx->conn()->SetName("slot_migration_ctrl"); auto* rb = static_cast(cntx->reply_builder()); @@ -756,18 +758,25 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) { return; } -uint32_t ClusterFamily::CreateMigrationSession(ConnectionContext* cntx, uint16_t port, - std::vector slots) { +uint32_t ClusterFamily::CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port, + std::vector slots) { std::lock_guard lk(migration_mu_); auto sync_id = next_sync_id_++; - auto info = make_shared(shard_set->size(), cntx->conn()->RemoteEndpointAddress(), - sync_id, port, std::move(slots)); - auto [it, inserted] = outgoing_migration_infos_.emplace(sync_id, info); + auto err_handler = [this, sync_id](const GenericError& err) { + LOG(INFO) << "Slot migration error: " << err.Format(); + + // Todo add error processing, stop migration process + // fb2::Fiber("stop_Migration", &ClusterFamily::StopMigration, this, sync_id).Detach(); + }; + auto info = + make_shared(shard_set->size(), cntx->conn()->RemoteEndpointAddress(), port, + std::move(slots), err_handler); + auto [it, inserted] = outgoing_migration_jobs_.emplace(sync_id, std::move(info)); CHECK(inserted); return sync_id; } -void ClusterFamily::Flow(CmdArgList args, ConnectionContext* cntx) { +void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) { CmdArgParser parser{args}; auto [sync_id, shard_id] = parser.Next(); @@ -775,32 +784,58 @@ void ClusterFamily::Flow(CmdArgList args, ConnectionContext* cntx) { return cntx->SendError(err->MakeReply()); } - VLOG(1) << "Create flow " - << " sync_id: " << sync_id << " shard_id: " << shard_id << " shard"; + VLOG(1) << "Create flow sync_id: " << sync_id << " shard_id: " << shard_id; cntx->conn()->SetName(absl::StrCat("migration_flow_", sync_id)); - auto info = GetMigrationInfo(sync_id); + auto info = GetOutgoingMigration(sync_id); if (!info) - cntx->SendError(kIdNotFound); - - info->flows[shard_id].conn = cntx->conn(); + return cntx->SendError(kIdNotFound); cntx->conn()->Migrate(shard_set->pool()->at(shard_id)); - info->state = ClusterSlotMigration::State::C_FULL_SYNC; - cntx->SendOk(); - info->flows[shard_id].conn->socket()->Write(io::Buffer("SYNC")); + EngineShard* shard = EngineShard::tlocal(); + DCHECK(shard->shard_id() == shard_id); - LOG(INFO) << "Started migation with target node " << info->host_ip << ":" << info->port; + info->StartFlow(&shard->db_slice(), sync_id, server_family_->journal(), cntx->conn()->socket()); } -shared_ptr ClusterFamily::GetMigrationInfo(uint32_t sync_id) { +void ClusterFamily::DflyMigrateFullSyncCut(CmdArgList args, ConnectionContext* cntx) { + CmdArgParser parser{args}; + auto [sync_id, shard_id] = parser.Next(); + + if (auto err = parser.Error(); err) { + return cntx->SendError(err->MakeReply()); + } + + VLOG(1) << "Full sync cut " + << " sync_id: " << sync_id << " shard_id: " << shard_id << " shard"; + + std::lock_guard lck(migration_mu_); + auto migration_it = + std::find_if(incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(), + [sync_id](const auto& el) { return el->GetSyncId() == sync_id; }); + + if (migration_it == incoming_migrations_jobs_.end()) { + LOG(WARNING) << "Couldn't find migration id"; + return cntx->SendError(kIdNotFound); + } + + (*migration_it)->SetStableSyncForFlow(shard_id); + if ((*migration_it)->GetState() == MigrationState::C_STABLE_SYNC) { + (*migration_it)->Stop(); + LOG(INFO) << "STABLE-SYNC state is set for sync_id " << sync_id; + } + + cntx->SendOk(); +} + +shared_ptr ClusterFamily::GetOutgoingMigration(uint32_t sync_id) { unique_lock lk(migration_mu_); - auto sync_it = outgoing_migration_infos_.find(sync_id); - return sync_it != outgoing_migration_infos_.end() ? sync_it->second : nullptr; + auto sync_it = outgoing_migration_jobs_.find(sync_id); + return sync_it != outgoing_migration_jobs_.end() ? sync_it->second : nullptr; } using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx); diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 835e23b5f..615b932b4 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -11,13 +11,13 @@ #include "facade/conn_context.h" #include "server/cluster/cluster_config.h" #include "server/cluster/cluster_slot_migration.h" +#include "server/cluster/outgoing_slot_migration.h" #include "server/common.h" namespace dfly { class CommandRegistry; class ConnectionContext; class ServerFamily; -class DflyCmd; class ClusterFamily { public: @@ -58,7 +58,7 @@ class ClusterFamily { // DFLYMIGRATE CONF initiate first step in slots migration procedure // MigrationConf process this request and saving slots range and - // target node port in outgoing_migration_infos_. + // target node port in outgoing_migration_jobs_. // return sync_id and shard number to the target node void MigrationConf(CmdArgList args, ConnectionContext* cntx); @@ -66,43 +66,19 @@ class ClusterFamily { // this request should be done for every shard on the target node // this method assocciate connection and shard that will be the data // source for migration - void Flow(CmdArgList args, ConnectionContext* cntx); + void DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx); + + void DflyMigrateFullSyncCut(CmdArgList args, ConnectionContext* cntx); // create a ClusterSlotMigration entity which will execute migration ClusterSlotMigration* AddMigration(std::string host_ip, uint16_t port, std::vector slots); // store info about migration and create unique session id - uint32_t CreateMigrationSession(ConnectionContext* cntx, uint16_t port, - std::vector slots); + uint32_t CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port, + std::vector slots); - // FlowInfo is used to store state, connection, and all auxiliary data - // that is needed for correct slots (per shard) data transfer - struct FlowInfo { - facade::Connection* conn = nullptr; - }; - - // Whole slots migration process information - struct MigrationInfo { - MigrationInfo() = default; - MigrationInfo(std::uint32_t flows_num, std::string ip, uint32_t sync_id, uint16_t port, - std::vector slots) - : host_ip(ip), - flows(flows_num), - slots(slots), - sync_id(sync_id), - port(port), - state(ClusterSlotMigration::State::C_CONNECTING) { - } - std::string host_ip; - std::vector flows; - std::vector slots; - uint32_t sync_id; - uint16_t port; - ClusterSlotMigration::State state = ClusterSlotMigration::State::C_NO_STATE; - }; - - std::shared_ptr GetMigrationInfo(uint32_t sync_id); + std::shared_ptr GetOutgoingMigration(uint32_t sync_id); mutable Mutex migration_mu_; // guard migrations operations // holds all incoming slots migrations that are currently in progress. @@ -111,8 +87,8 @@ class ClusterFamily { uint32_t next_sync_id_ = 1; // holds all outgoing slots migrations that are currently in progress - using MigrationInfoMap = absl::btree_map>; - MigrationInfoMap outgoing_migration_infos_; + using OutgoingMigrationMap = absl::btree_map>; + OutgoingMigrationMap outgoing_migration_jobs_; private: ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const; diff --git a/src/server/cluster/cluster_shard_migration.cc b/src/server/cluster/cluster_shard_migration.cc index 119e02f3d..90ec26fb6 100644 --- a/src/server/cluster/cluster_shard_migration.cc +++ b/src/server/cluster/cluster_shard_migration.cc @@ -4,10 +4,13 @@ #include "server/cluster/cluster_shard_migration.h" #include +#include #include #include "base/logging.h" #include "server/error.h" +#include "server/journal/serializer.h" +#include "server/journal/tx_executor.h" ABSL_DECLARE_FLAG(int, source_connect_timeout_ms); @@ -19,8 +22,9 @@ using namespace util; using absl::GetFlag; ClusterShardMigration::ClusterShardMigration(ServerContext server_context, uint32_t shard_id, - uint32_t sync_id) + uint32_t sync_id, Service* service) : ProtocolClient(server_context), source_shard_id_(shard_id), sync_id_(sync_id) { + executor_ = std::make_unique(service); } ClusterShardMigration::~ClusterShardMigration() { @@ -57,16 +61,40 @@ void ClusterShardMigration::FullSyncShardFb(Context* cntx) { DCHECK(leftover_buf_); io::PrefixSource ps{leftover_buf_->InputBuffer(), Sock()}; - uint8_t ok_buf[4]; - ps.ReadAtLeast(io::MutableBytes{ok_buf, 4}, 4); + JournalReader reader{&ps, 0}; + TransactionReader tx_reader{}; - if (string_view(reinterpret_cast(ok_buf), 4) != "SYNC") { - VLOG(1) << "FullSyncShardFb incorrect data transfer"; - cntx->ReportError(std::make_error_code(errc::protocol_error), - "Incorrect FullSync data, only for tets"); + while (!cntx->IsCancelled()) { + if (cntx->IsCancelled()) + break; + + auto tx_data = tx_reader.NextTxData(&reader, cntx); + if (!tx_data) + break; + + TouchIoTime(); + + if (!tx_data->is_ping) { + ExecuteTxWithNoShardSync(std::move(*tx_data), cntx); + } else { + // TODO check about ping logic + } } +} - VLOG(1) << "FullSyncShardFb finished after reading 4 bytes"; +void ClusterShardMigration::ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) { + if (cntx->IsCancelled()) { + return; + } + CHECK(tx_data.shard_cnt <= 1); // we don't support sync for multishard execution + if (!tx_data.IsGlobalCmd()) { + VLOG(2) << "Execute cmd without sync between shards. txid: " << tx_data.txid; + executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands)); + } else { + // TODO check which global commands should be supported + CHECK(false) << "We don't support command: " << ToSV(tx_data.commands.front().cmd_args[0]) + << "in cluster migration process."; + } } void ClusterShardMigration::Cancel() { diff --git a/src/server/cluster/cluster_shard_migration.h b/src/server/cluster/cluster_shard_migration.h index e5d31484f..50be78182 100644 --- a/src/server/cluster/cluster_shard_migration.h +++ b/src/server/cluster/cluster_shard_migration.h @@ -4,30 +4,47 @@ #pragma once #include "base/io_buf.h" +#include "server/journal/executor.h" #include "server/protocol_client.h" namespace dfly { +class Service; +class TransactionData; +class MultiShardExecution; + // ClusterShardMigration manage data receiving in slots migration process. // It is created per shard on the target node to initiate FLOW step. class ClusterShardMigration : public ProtocolClient { public: - ClusterShardMigration(ServerContext server_context, uint32_t shard_id, uint32_t sync_id); + ClusterShardMigration(ServerContext server_context, uint32_t shard_id, uint32_t sync_id, + Service* service); ~ClusterShardMigration(); std::error_code StartSyncFlow(Context* cntx); void Cancel(); + void SetStableSync() { + is_stable_sync_.store(true); + } + bool IsStableSync() { + return is_stable_sync_.load(); + } + private: void FullSyncShardFb(Context* cntx); void JoinFlow(); + void ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx); + void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx); + private: uint32_t source_shard_id_; uint32_t sync_id_; std::optional leftover_buf_; - + std::unique_ptr executor_; Fiber sync_fb_; + std::atomic_bool is_stable_sync_ = false; }; } // namespace dfly diff --git a/src/server/cluster/cluster_slot_migration.cc b/src/server/cluster/cluster_slot_migration.cc index 41b9b293b..6ed08eac6 100644 --- a/src/server/cluster/cluster_slot_migration.cc +++ b/src/server/cluster/cluster_slot_migration.cc @@ -1,6 +1,7 @@ // Copyright 2023, DragonflyDB authors. All rights reserved. // See LICENSE for licensing terms. // + #include "server/cluster/cluster_slot_migration.h" #include @@ -9,6 +10,7 @@ #include "base/logging.h" #include "server/cluster/cluster_shard_migration.h" #include "server/error.h" +#include "server/journal/tx_executor.h" #include "server/main_service.h" ABSL_FLAG(int, source_connect_timeout_ms, 20000, @@ -35,9 +37,9 @@ vector> Partition(unsigned num_flows) { } // namespace -ClusterSlotMigration::ClusterSlotMigration(string host_ip, uint16_t port, +ClusterSlotMigration::ClusterSlotMigration(string host_ip, uint16_t port, Service* se, std::vector slots) - : ProtocolClient(move(host_ip), port), slots_(std::move(slots)) { + : ProtocolClient(move(host_ip), port), service_(*se), slots_(std::move(slots)) { } ClusterSlotMigration::~ClusterSlotMigration() { @@ -62,7 +64,7 @@ error_code ClusterSlotMigration::Start(ConnectionContext* cntx) { ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_); RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source")); - state_ = ClusterSlotMigration::C_CONNECTING; + state_ = MigrationState::C_CONNECTING; VLOG(1) << "Greeting"; ec = Greet(); @@ -98,13 +100,29 @@ error_code ClusterSlotMigration::Greet() { ClusterSlotMigration::Info ClusterSlotMigration::GetInfo() const { const auto& ctx = server(); - return {ctx.host, ctx.port, state_}; + return {ctx.host, ctx.port}; +} + +void ClusterSlotMigration::SetStableSyncForFlow(uint32_t flow) { + DCHECK(shard_flows_.size() > flow); + shard_flows_[flow]->SetStableSync(); + + if (std::all_of(shard_flows_.begin(), shard_flows_.end(), + [](const auto& el) { return el->IsStableSync(); })) { + state_ = MigrationState::C_STABLE_SYNC; + } +} + +void ClusterSlotMigration::Stop() { + for (auto& flow : shard_flows_) { + flow->Cancel(); + } } void ClusterSlotMigration::MainMigrationFb() { VLOG(1) << "Main migration fiber started"; - state_ = ClusterSlotMigration::C_FULL_SYNC; + state_ = MigrationState::C_FULL_SYNC; // TODO add reconnection code if (auto ec = InitiateSlotsMigration(); ec) { @@ -116,7 +134,7 @@ void ClusterSlotMigration::MainMigrationFb() { std::error_code ClusterSlotMigration::InitiateSlotsMigration() { shard_flows_.resize(source_shards_num_); for (unsigned i = 0; i < source_shards_num_; ++i) { - shard_flows_[i].reset(new ClusterShardMigration(server(), i, sync_id_)); + shard_flows_[i].reset(new ClusterShardMigration(server(), i, sync_id_, &service_)); } // Switch to new error handler that closes flow sockets. diff --git a/src/server/cluster/cluster_slot_migration.h b/src/server/cluster/cluster_slot_migration.h index 506fa0316..a6605963b 100644 --- a/src/server/cluster/cluster_slot_migration.h +++ b/src/server/cluster/cluster_slot_migration.h @@ -3,31 +3,40 @@ // #pragma once -#include "server/cluster/cluster_shard_migration.h" #include "server/protocol_client.h" namespace dfly { +class ClusterShardMigration; + +class Service; // The main entity on the target side that manage slots migration process // Creates initial connection between the target and source node, // manage migration process state and data class ClusterSlotMigration : ProtocolClient { public: - enum State : uint8_t { C_NO_STATE, C_CONNECTING, C_FULL_SYNC, C_STABLE_SYNC }; - struct Info { std::string host; uint16_t port; - State state; }; - ClusterSlotMigration(std::string host_ip, uint16_t port, + ClusterSlotMigration(std::string host_ip, uint16_t port, Service* se, std::vector slots); ~ClusterSlotMigration(); // Initiate connection with source node and create migration fiber std::error_code Start(ConnectionContext* cntx); Info GetInfo() const; + uint32_t GetSyncId() const { + return sync_id_; + } + + MigrationState GetState() const { + return state_; + } + + void SetStableSyncForFlow(uint32_t flow); + void Stop(); private: // Send DFLYMIGRATE CONF to the source and get info about migration process @@ -37,12 +46,13 @@ class ClusterSlotMigration : ProtocolClient { std::error_code InitiateSlotsMigration(); private: + Service& service_; Mutex flows_op_mu_; std::vector> shard_flows_; std::vector slots_; uint32_t source_shards_num_ = 0; uint32_t sync_id_ = 0; - State state_ = C_NO_STATE; + MigrationState state_ = MigrationState::C_NO_STATE; Fiber sync_fb_; }; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc new file mode 100644 index 000000000..2440b8c0a --- /dev/null +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -0,0 +1,69 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/cluster/outgoing_slot_migration.h" + +#include "server/db_slice.h" +#include "server/journal/streamer.h" + +namespace dfly { + +class OutgoingMigration::SliceSlotMigration { + public: + SliceSlotMigration(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal, + Context* cntx) + : streamer_(slice, std::move(slots), sync_id, journal, cntx) { + } + + void Start(io::Sink* dest) { + streamer_.Start(dest); + state_ = MigrationState::C_FULL_SYNC; + } + + MigrationState GetState() const { + return state_ == MigrationState::C_FULL_SYNC && streamer_.IsSnapshotFinished() + ? MigrationState::C_STABLE_SYNC + : state_; + } + + private: + RestoreStreamer streamer_; + MigrationState state_ = MigrationState::C_CONNECTING; +}; + +OutgoingMigration::OutgoingMigration(std::uint32_t flows_num, std::string ip, uint16_t port, + std::vector slots, + Context::ErrHandler err_handler) + : host_ip_(ip), port_(port), slots_(slots), cntx_(err_handler), slot_migrations_(flows_num) { +} + +OutgoingMigration::~OutgoingMigration() = default; + +void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal, + io::Sink* dest) { + SlotSet sset; + for (const auto& slot_range : slots_) { + for (auto i = slot_range.start; i <= slot_range.end; ++i) + sset.insert(i); + } + + const auto shard_id = slice->shard_id(); + + std::lock_guard lck(flows_mu_); + slot_migrations_[shard_id] = + std::make_unique(slice, std::move(sset), sync_id, journal, &cntx_); + slot_migrations_[shard_id]->Start(dest); +} + +MigrationState OutgoingMigration::GetState() const { + std::lock_guard lck(flows_mu_); + MigrationState min_state = MigrationState::C_STABLE_SYNC; + for (const auto& slot_migration : slot_migrations_) { + if (slot_migration) + min_state = std::min(min_state, slot_migration->GetState()); + } + return min_state; +} + +} // namespace dfly diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h new file mode 100644 index 000000000..ca812682c --- /dev/null +++ b/src/server/cluster/outgoing_slot_migration.h @@ -0,0 +1,50 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// +#pragma once + +#include "io/io.h" +#include "server/cluster/cluster_config.h" +#include "server/common.h" + +namespace dfly { + +namespace journal { +class Journal; +} + +class DbSlice; + +// Whole slots migration process information +class OutgoingMigration { + public: + OutgoingMigration() = default; + ~OutgoingMigration(); + OutgoingMigration(std::uint32_t flows_num, std::string ip, uint16_t port, + std::vector slots, Context::ErrHandler err_handler); + + void StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal, io::Sink* dest); + + MigrationState GetState() const; + + const std::string& GetHostIp() const { + return host_ip_; + }; + uint16_t GetPort() const { + return port_; + }; + + private: + // SliceSlotMigration manages state and data transfering for the corresponding shard + class SliceSlotMigration; + + private: + std::string host_ip_; + uint16_t port_; + std::vector slots_; + Context cntx_; + mutable Mutex flows_mu_; + std::vector> slot_migrations_ ABSL_GUARDED_BY(flows_mu_); +}; + +} // namespace dfly diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 20ad8b224..5acec527f 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -49,17 +49,17 @@ void JournalStreamer::WriterFb(io::Sink* dest) { } } -RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, uint32_t flow_id, +RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal, Context* cntx) : JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)), - sync_id_(sync_id), - flow_id_(flow_id) { + sync_id_(sync_id) { DCHECK(slice != nullptr); } void RestoreStreamer::Start(io::Sink* dest) { + VLOG(2) << "RestoreStreamer start"; auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this); snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb)); @@ -84,8 +84,11 @@ void RestoreStreamer::Start(io::Sink* dest) { } } while (cursor); - WriteCommand(make_pair( - "DFLYMIGRATE", ArgSlice{"FULL-SYNC-CUT", absl::StrCat(sync_id_), absl::StrCat(flow_id_)})); + VLOG(2) << "FULL-SYNC-CUT for " << sync_id_ << " : " << db_slice_->shard_id(); + WriteCommand(make_pair("DFLYMIGRATE", ArgSlice{"FULL-SYNC-CUT", absl::StrCat(sync_id_), + absl::StrCat(db_slice_->shard_id())})); + NotifyWritten(true); + snapshot_finished_ = true; }); } @@ -96,6 +99,12 @@ void RestoreStreamer::Cancel() { JournalStreamer::Cancel(); } +RestoreStreamer::~RestoreStreamer() { + fiber_cancellation_.Cancel(); + snapshot_fb_.JoinIfNeeded(); + db_slice_->UnregisterOnChange(snapshot_version_); +} + bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const { if (!item.slot.has_value()) { return false; diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 9b0799af3..91a95f937 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -53,12 +53,18 @@ class JournalStreamer : protected BufferedStreamerBase { // Only handles relevant slots, while ignoring all others. class RestoreStreamer : public JournalStreamer { public: - RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, uint32_t flow_id, - journal::Journal* journal, Context* cntx); + RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal, + Context* cntx); void Start(io::Sink* dest) override; void Cancel() override; + bool IsSnapshotFinished() const { + return snapshot_finished_; + } + + ~RestoreStreamer(); + private: void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); bool ShouldWrite(const journal::JournalItem& item) const override; @@ -72,9 +78,9 @@ class RestoreStreamer : public JournalStreamer { uint64_t snapshot_version_ = 0; SlotSet my_slots_; uint32_t sync_id_; - uint32_t flow_id_; Fiber snapshot_fb_; Cancellation fiber_cancellation_; + bool snapshot_finished_ = false; }; } // namespace dfly diff --git a/src/server/journal/tx_executor.cc b/src/server/journal/tx_executor.cc new file mode 100644 index 000000000..c9edd53a6 --- /dev/null +++ b/src/server/journal/tx_executor.cc @@ -0,0 +1,134 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "tx_executor.h" + +#include + +#include "base/logging.h" +#include "server/journal/serializer.h" + +using namespace std; +using namespace facade; + +namespace dfly { + +bool MultiShardExecution::InsertTxToSharedMap(TxId txid, uint32_t shard_cnt) { + std::unique_lock lk(map_mu); + auto [it, was_insert] = tx_sync_execution.emplace(txid, shard_cnt); + lk.unlock(); + + VLOG(2) << "txid: " << txid << " unique_shard_cnt_: " << shard_cnt + << " was_insert: " << was_insert; + it->second.block.Dec(); + + return was_insert; +} + +MultiShardExecution::TxExecutionSync& MultiShardExecution::Find(TxId txid) { + std::lock_guard lk(map_mu); + VLOG(2) << "Execute txid: " << txid; + auto it = tx_sync_execution.find(txid); + DCHECK(it != tx_sync_execution.end()); + return it->second; +} + +void MultiShardExecution::Erase(TxId txid) { + std::lock_guard lg{map_mu}; + tx_sync_execution.erase(txid); +} + +void MultiShardExecution::CancelAllBlockingEntities() { + lock_guard lk{map_mu}; + for (auto& tx_data : tx_sync_execution) { + tx_data.second.barrier.Cancel(); + tx_data.second.block.Cancel(); + } +} + +bool TransactionData::AddEntry(journal::ParsedEntry&& entry) { + ++journal_rec_count; + + switch (entry.opcode) { + case journal::Op::PING: + is_ping = true; + return true; + case journal::Op::EXPIRED: + case journal::Op::COMMAND: + commands.push_back(std::move(entry.cmd)); + [[fallthrough]]; + case journal::Op::EXEC: + shard_cnt = entry.shard_cnt; + dbid = entry.dbid; + txid = entry.txid; + return true; + case journal::Op::MULTI_COMMAND: + commands.push_back(std::move(entry.cmd)); + dbid = entry.dbid; + return false; + default: + DCHECK(false) << "Unsupported opcode"; + } + return false; +} + +bool TransactionData::IsGlobalCmd() const { + if (commands.size() > 1) { + return false; + } + + auto& command = commands.front(); + if (command.cmd_args.empty()) { + return false; + } + + auto& args = command.cmd_args; + if (absl::EqualsIgnoreCase(ToSV(args[0]), "FLUSHDB"sv) || + absl::EqualsIgnoreCase(ToSV(args[0]), "FLUSHALL"sv) || + (absl::EqualsIgnoreCase(ToSV(args[0]), "DFLYCLUSTER"sv) && + absl::EqualsIgnoreCase(ToSV(args[1]), "FLUSHSLOTS"sv))) { + return true; + } + + return false; +} + +TransactionData TransactionData::FromSingle(journal::ParsedEntry&& entry) { + TransactionData data; + bool res = data.AddEntry(std::move(entry)); + DCHECK(res); + return data; +} + +std::optional TransactionReader::NextTxData(JournalReader* reader, Context* cntx) { + io::Result res; + while (true) { + if (res = reader->ReadEntry(); !res) { + cntx->ReportError(res.error()); + return std::nullopt; + } + + // Check if journal command can be executed right away. + // Expiration checks lock on master, so it never conflicts with running multi transactions. + if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND || + res->opcode == journal::Op::PING) + return TransactionData::FromSingle(std::move(res.value())); + + // Otherwise, continue building multi command. + DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC); + DCHECK(res->txid > 0); + + auto txid = res->txid; + auto& txdata = current_[txid]; + if (txdata.AddEntry(std::move(res.value()))) { + auto out = std::move(txdata); + current_.erase(txid); + return out; + } + } + + return std::nullopt; +} + +} // namespace dfly diff --git a/src/server/journal/tx_executor.h b/src/server/journal/tx_executor.h new file mode 100644 index 000000000..459ca65c0 --- /dev/null +++ b/src/server/journal/tx_executor.h @@ -0,0 +1,68 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// +#pragma once + +#include + +#include "server/common.h" +#include "server/journal/types.h" + +namespace dfly { + +class JournalReader; + +// Coordinator for multi shard execution. +class MultiShardExecution { + public: + struct TxExecutionSync { + Barrier barrier; + std::atomic_uint32_t counter; + BlockingCounter block; + + explicit TxExecutionSync(uint32_t counter) + : barrier(counter), counter(counter), block(counter) { + } + }; + + bool InsertTxToSharedMap(TxId txid, uint32_t shard_cnt); + TxExecutionSync& Find(TxId txid); + void Erase(TxId txid); + void CancelAllBlockingEntities(); + + private: + Mutex map_mu; + std::unordered_map tx_sync_execution; +}; + +// This class holds the commands of transaction in single shard. +// Once all commands were received, the transaction can be executed. +struct TransactionData { + // Update the data from ParsedEntry and return true if all shard transaction commands were + // received. + bool AddEntry(journal::ParsedEntry&& entry); + + bool IsGlobalCmd() const; + + static TransactionData FromSingle(journal::ParsedEntry&& entry); + + TxId txid{0}; + DbIndex dbid{0}; + uint32_t shard_cnt{0}; + absl::InlinedVector commands{0}; + uint32_t journal_rec_count{0}; // Count number of source entries to check offset. + bool is_ping = false; // For Op::PING entries. +}; + +// Utility for reading TransactionData from a journal reader. +// The journal stream can contain interleaved data for multiple multi transactions, +// expiries and out of order executed transactions that need to be grouped on the replica side. +struct TransactionReader { + std::optional NextTxData(JournalReader* reader, Context* cntx); + + private: + // Stores ongoing multi transaction data. + absl::flat_hash_map current_; +}; + +} // namespace dfly diff --git a/src/server/replica.cc b/src/server/replica.cc index 3f4d525ea..2bf547392 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -598,14 +598,7 @@ error_code Replica::ConsumeDflyStream() { flow->Cancel(); } - // Iterate over map and cancel all blocking entities - { - lock_guard lk{multi_shard_exe_->map_mu}; - for (auto& tx_data : multi_shard_exe_->tx_sync_execution) { - tx_data.second.barrier.Cancel(); - tx_data.second.block.Cancel(); - } - } + multi_shard_exe_->CancelAllBlockingEntities(); }; RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler))); @@ -901,31 +894,16 @@ void DflyShardReplica::ExecuteTxWithNoShardSync(TransactionData&& tx_data, Conte return; } - bool was_insert = false; - if (tx_data.IsGlobalCmd()) { - was_insert = InsertTxToSharedMap(tx_data); - } + bool was_insert = tx_data.IsGlobalCmd() && + multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, tx_data.shard_cnt); ExecuteTx(std::move(tx_data), was_insert, cntx); } -bool DflyShardReplica::InsertTxToSharedMap(const TransactionData& tx_data) { - std::unique_lock lk(multi_shard_exe_->map_mu); - auto [it, was_insert] = - multi_shard_exe_->tx_sync_execution.emplace(tx_data.txid, tx_data.shard_cnt); - lk.unlock(); - - VLOG(2) << "txid: " << tx_data.txid << " unique_shard_cnt_: " << tx_data.shard_cnt - << " was_insert: " << was_insert; - it->second.block.Dec(); - - return was_insert; -} - void DflyShardReplica::InsertTxDataToShardResource(TransactionData&& tx_data) { bool was_insert = false; if (tx_data.shard_cnt > 1) { - was_insert = InsertTxToSharedMap(tx_data); + was_insert = multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, tx_data.shard_cnt); } VLOG(2) << "txid: " << tx_data.txid << " pushed to queue"; @@ -957,12 +935,7 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, return; } - VLOG(2) << "Execute txid: " << tx_data.txid; - std::unique_lock lk(multi_shard_exe_->map_mu); - auto it = multi_shard_exe_->tx_sync_execution.find(tx_data.txid); - DCHECK(it != multi_shard_exe_->tx_sync_execution.end()); - auto& multi_shard_data = it->second; - lk.unlock(); + auto& multi_shard_data = multi_shard_exe_->Find(tx_data.txid); VLOG(2) << "Execute txid: " << tx_data.txid << " waiting for data in all shards"; // Wait until shards flows got transaction data and inserted to map. @@ -1004,8 +977,7 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, auto val = multi_shard_data.counter.fetch_sub(1, std::memory_order_relaxed); VLOG(2) << "txid: " << tx_data.txid << " counter: " << val; if (val == 1) { - std::lock_guard lg{multi_shard_exe_->map_mu}; - multi_shard_exe_->tx_sync_execution.erase(tx_data.txid); + multi_shard_exe_->Erase(tx_data.txid); } } @@ -1131,92 +1103,6 @@ std::string Replica::GetSyncId() const { return master_context_.dfly_session_id; } -bool DflyShardReplica::TransactionData::AddEntry(journal::ParsedEntry&& entry) { - ++journal_rec_count; - - switch (entry.opcode) { - case journal::Op::PING: - is_ping = true; - return true; - case journal::Op::EXPIRED: - case journal::Op::COMMAND: - commands.push_back(std::move(entry.cmd)); - [[fallthrough]]; - case journal::Op::EXEC: - shard_cnt = entry.shard_cnt; - dbid = entry.dbid; - txid = entry.txid; - return true; - case journal::Op::MULTI_COMMAND: - commands.push_back(std::move(entry.cmd)); - dbid = entry.dbid; - return false; - default: - DCHECK(false) << "Unsupported opcode"; - } - return false; -} - -bool DflyShardReplica::TransactionData::IsGlobalCmd() const { - if (commands.size() > 1) { - return false; - } - - auto& command = commands.front(); - if (command.cmd_args.empty()) { - return false; - } - - auto& args = command.cmd_args; - if (absl::EqualsIgnoreCase(ToSV(args[0]), "FLUSHDB"sv) || - absl::EqualsIgnoreCase(ToSV(args[0]), "FLUSHALL"sv) || - (absl::EqualsIgnoreCase(ToSV(args[0]), "DFLYCLUSTER"sv) && - absl::EqualsIgnoreCase(ToSV(args[1]), "FLUSHSLOTS"sv))) { - return true; - } - - return false; -} - -DflyShardReplica::TransactionData DflyShardReplica::TransactionData::FromSingle( - journal::ParsedEntry&& entry) { - TransactionData data; - bool res = data.AddEntry(std::move(entry)); - DCHECK(res); - return data; -} - -auto DflyShardReplica::TransactionReader::NextTxData(JournalReader* reader, Context* cntx) - -> optional { - io::Result res; - while (true) { - if (res = reader->ReadEntry(); !res) { - cntx->ReportError(res.error()); - return std::nullopt; - } - - // Check if journal command can be executed right away. - // Expiration checks lock on master, so it never conflicts with running multi transactions. - if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND || - res->opcode == journal::Op::PING) - return TransactionData::FromSingle(std::move(res.value())); - - // Otherwise, continue building multi command. - DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC); - DCHECK(res->txid > 0); - - auto txid = res->txid; - auto& txdata = current_[txid]; - if (txdata.AddEntry(std::move(res.value()))) { - auto out = std::move(txdata); - current_.erase(txid); - return out; - } - } - - return std::nullopt; -} - uint32_t DflyShardReplica::FlowId() const { return flow_id_; } diff --git a/src/server/replica.h b/src/server/replica.h index 61b86cd71..7c0226444 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -13,6 +13,7 @@ #include "facade/facade_types.h" #include "facade/redis_parser.h" #include "server/common.h" +#include "server/journal/tx_executor.h" #include "server/journal/types.h" #include "server/protocol_client.h" #include "server/version.h" @@ -30,23 +31,6 @@ class JournalExecutor; struct JournalReader; class DflyShardReplica; -// Coordinator for multi shard execution. -struct MultiShardExecution { - Mutex map_mu; - - struct TxExecutionSync { - Barrier barrier; - std::atomic_uint32_t counter; - BlockingCounter block; - - explicit TxExecutionSync(uint32_t counter) - : barrier(counter), counter(counter), block(counter) { - } - }; - - std::unordered_map tx_sync_execution; -}; - // The attributes of the master we are connecting to. struct MasterContext { std::string master_repl_id; @@ -193,36 +177,6 @@ class DflyShardReplica : public ProtocolClient { Service* service, std::shared_ptr multi_shard_exe); ~DflyShardReplica(); - // This class holds the commands of transaction in single shard. - // Once all commands were received, the transaction can be executed. - struct TransactionData { - // Update the data from ParsedEntry and return true if all shard transaction commands were - // received. - bool AddEntry(journal::ParsedEntry&& entry); - - bool IsGlobalCmd() const; - - static TransactionData FromSingle(journal::ParsedEntry&& entry); - - TxId txid{0}; - DbIndex dbid{0}; - uint32_t shard_cnt{0}; - absl::InlinedVector commands{0}; - uint32_t journal_rec_count{0}; // Count number of source entries to check offset. - bool is_ping = false; // For Op::PING entries. - }; - - // Utility for reading TransactionData from a journal reader. - // The journal stream can contain interleaved data for multiple multi transactions, - // expiries and out of order executed transactions that need to be grouped on the replica side. - struct TransactionReader { - std::optional NextTxData(JournalReader* reader, Context* cntx); - - private: - // Stores ongoing multi transaction data. - absl::flat_hash_map current_; - }; - void Cancel(); void JoinFlow(); @@ -246,7 +200,6 @@ class DflyShardReplica : public ProtocolClient { void ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx); void InsertTxDataToShardResource(TransactionData&& tx_data); void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx); - bool InsertTxToSharedMap(const TransactionData& tx_data); uint32_t FlowId() const; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 00efc523b..35e4224b7 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -806,15 +806,15 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory): status = await c_nodes_admin[1].execute_command( "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port) ) - assert "FULL_SYNC" == status + assert "STABLE_SYNC" == status status = await c_nodes_admin[0].execute_command( "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[1].port) ) - assert "FULL_SYNC" == status + assert "STABLE_SYNC" == status status = await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") - assert ["out 127.0.0.1:30002 FULL_SYNC"] == status + assert ["out 127.0.0.1:30002 STABLE_SYNC"] == status try: await c_nodes_admin[1].execute_command(