1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

refactor: remove FULL-SYNC-CUT cmd #2687 (#2688)

* refactor: remove FULL-SYNC-CUT cmd #2687
This commit is contained in:
Borys 2024-03-06 14:26:35 +02:00 committed by GitHub
parent 66b87e16c2
commit dfedaf7e6e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 61 additions and 139 deletions

View file

@ -701,10 +701,6 @@ void ClusterFamily::DflyClusterMigrationFinalize(CmdArgList args, ConnectionCont
if (!migration)
return cntx->SendError(kIdNotFound);
if (migration->GetState() != MigrationState::C_STABLE_SYNC) {
return cntx->SendError("Migration process is not in STABLE_SYNC state");
}
// TODO implement blocking on migrated slots only
bool is_block_active = true;
@ -742,8 +738,6 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
MigrationConf(args, cntx);
} else if (sub_cmd == "FLOW") {
DflyMigrateFlow(args, cntx);
} else if (sub_cmd == "FULL-SYNC-CUT") {
DflyMigrateFullSyncCut(args, cntx);
} else if (sub_cmd == "ACK") {
DflyMigrateAck(args, cntx);
} else {
@ -864,36 +858,6 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
info->StartFlow(&shard->db_slice(), sync_id, server_family_->journal(), cntx->conn()->socket());
}
void ClusterFamily::DflyMigrateFullSyncCut(CmdArgList args, ConnectionContext* cntx) {
CHECK(cntx->slot_migration_id != 0);
CmdArgParser parser{args};
auto [sync_id, shard_id] = parser.Next<uint32_t, uint32_t>();
if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply());
}
VLOG(1) << "Full sync cut "
<< " sync_id: " << sync_id << " shard_id: " << shard_id << " shard";
std::lock_guard lck(migration_mu_);
auto migration_it = std::find_if(
incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(),
[cntx](const auto& el) { return cntx->slot_migration_id == el->GetLocalSyncId(); });
if (migration_it == incoming_migrations_jobs_.end()) {
LOG(WARNING) << "Couldn't find migration id";
return cntx->SendError(kIdNotFound);
}
(*migration_it)->SetStableSyncForFlow(shard_id);
if ((*migration_it)->GetState() == MigrationState::C_STABLE_SYNC) {
LOG(INFO) << "STABLE-SYNC state is set for sync_id " << sync_id;
}
cntx->SendOk();
}
void ClusterFamily::FinalizeIncomingMigration(uint32_t local_sync_id) {
lock_guard lk(migration_mu_);
auto it =

View file

@ -71,8 +71,6 @@ class ClusterFamily {
// source for migration
void DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx);
void DflyMigrateFullSyncCut(CmdArgList args, ConnectionContext* cntx);
void DflyMigrateAck(CmdArgList args, ConnectionContext* cntx);
// create a ClusterSlotMigration entity which will execute migration

View file

@ -110,16 +110,6 @@ ClusterSlotMigration::Info ClusterSlotMigration::GetInfo() const {
return {ctx.host, ctx.port};
}
void ClusterSlotMigration::SetStableSyncForFlow(uint32_t flow) {
DCHECK(shard_flows_.size() > flow);
shard_flows_[flow]->SetStableSync();
if (std::all_of(shard_flows_.begin(), shard_flows_.end(),
[](const auto& el) { return el->IsStableSync(); })) {
state_ = MigrationState::C_STABLE_SYNC;
}
}
bool ClusterSlotMigration::IsFinalized() const {
return std::all_of(shard_flows_.begin(), shard_flows_.end(),
[](const auto& el) { return el->IsFinalized(); });

View file

@ -42,8 +42,6 @@ class ClusterSlotMigration : private ProtocolClient {
return state_;
}
void SetStableSyncForFlow(uint32_t flow);
void Stop();
const SlotRanges& GetSlots() const {

View file

@ -6,6 +6,7 @@
#include <atomic>
#include "base/logging.h"
#include "server/db_slice.h"
#include "server/journal/streamer.h"
@ -17,30 +18,32 @@ class OutgoingMigration::SliceSlotMigration {
SliceSlotMigration(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal,
Context* cntx, io::Sink* dest)
: streamer_(slice, std::move(slots), sync_id, journal, cntx) {
streamer_.Start(dest);
state_.store(MigrationState::C_FULL_SYNC, memory_order_relaxed);
sync_fb_ = Fiber("slot-snapshot", [this, dest] { streamer_.Start(dest); });
}
void Cancel() {
streamer_.Cancel();
}
void WaitForSnapshotFinished() {
sync_fb_.JoinIfNeeded();
}
void Finalize() {
streamer_.SendFinalize();
state_.store(MigrationState::C_FINISHED, memory_order_relaxed);
}
MigrationState GetState() const {
auto state = state_.load(memory_order_relaxed);
return state == MigrationState::C_FULL_SYNC && streamer_.IsSnapshotFinished()
? MigrationState::C_STABLE_SYNC
: state;
return state_.load(memory_order_relaxed);
}
private:
RestoreStreamer streamer_;
// Atomic only for simple read operation, writes - from the same thread, reads - from any thread
atomic<MigrationState> state_ = MigrationState::C_CONNECTING;
Fiber sync_fb_;
};
OutgoingMigration::OutgoingMigration(std::uint32_t flows_num, std::string ip, uint16_t port,
@ -48,15 +51,25 @@ OutgoingMigration::OutgoingMigration(std::uint32_t flows_num, std::string ip, ui
: host_ip_(ip), port_(port), slots_(slots), cntx_(err_handler), slot_migrations_(flows_num) {
}
OutgoingMigration::~OutgoingMigration() = default;
OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded();
}
void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal,
io::Sink* dest) {
const auto shard_id = slice->shard_id();
std::lock_guard lck(flows_mu_);
slot_migrations_[shard_id] =
std::make_unique<SliceSlotMigration>(slice, slots_, sync_id, journal, &cntx_, dest);
MigrationState state = MigrationState::C_NO_STATE;
{
std::lock_guard lck(flows_mu_);
slot_migrations_[shard_id] =
std::make_unique<SliceSlotMigration>(slice, slots_, sync_id, journal, &cntx_, dest);
state = GetStateImpl();
}
if (state == MigrationState::C_FULL_SYNC) {
main_sync_fb_ = Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this);
}
}
void OutgoingMigration::Finalize(uint32_t shard_id) {
@ -75,10 +88,20 @@ MigrationState OutgoingMigration::GetState() const {
MigrationState OutgoingMigration::GetStateImpl() const {
MigrationState min_state = MigrationState::C_MAX_INVALID;
for (const auto& slot_migration : slot_migrations_) {
if (slot_migration)
if (slot_migration) {
min_state = std::min(min_state, slot_migration->GetState());
} else {
min_state = MigrationState::C_NO_STATE;
}
}
return min_state;
}
void OutgoingMigration::SyncFb() {
for (auto& migration : slot_migrations_) {
migration->WaitForSnapshotFinished();
}
VLOG(1) << "Migrations snapshot is finihed";
}
} // namespace dfly

View file

@ -47,6 +47,8 @@ class OutgoingMigration {
// SliceSlotMigration manages state and data transfering for the corresponding shard
class SliceSlotMigration;
void SyncFb();
private:
std::string host_ip_;
uint16_t port_;
@ -54,6 +56,8 @@ class OutgoingMigration {
Context cntx_;
mutable Mutex flows_mu_;
std::vector<std::unique_ptr<SliceSlotMigration>> slot_migrations_ ABSL_GUARDED_BY(flows_mu_);
Fiber main_sync_fb_;
};
} // namespace dfly

View file

@ -65,39 +65,30 @@ void RestoreStreamer::Start(io::Sink* dest) {
JournalStreamer::Start(dest);
DCHECK(!snapshot_fb_.IsJoinable());
snapshot_fb_ = fb2::Fiber("slot-snapshot", [this] {
PrimeTable::Cursor cursor;
uint64_t last_yield = 0;
PrimeTable* pt = &db_slice_->databases()[0]->prime;
PrimeTable::Cursor cursor;
uint64_t last_yield = 0;
PrimeTable* pt = &db_slice_->databases()[0]->prime;
do {
if (fiber_cancellation_.IsCancelled())
return;
do {
if (fiber_cancellation_.IsCancelled())
return;
bool written = false;
cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) {
if (WriteBucket(it)) {
written = true;
}
});
if (written) {
NotifyWritten(true);
bool written = false;
cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) {
if (WriteBucket(it)) {
written = true;
}
++last_yield;
});
if (written) {
NotifyWritten(true);
}
++last_yield;
if (last_yield >= 100) {
ThisFiber::Yield();
last_yield = 0;
}
} while (cursor);
VLOG(2) << "FULL-SYNC-CUT for " << sync_id_ << " : " << db_slice_->shard_id();
WriteCommand(make_pair("DFLYMIGRATE", ArgSlice{"FULL-SYNC-CUT", absl::StrCat(sync_id_),
absl::StrCat(db_slice_->shard_id())}));
NotifyWritten(true);
snapshot_finished_ = true;
});
if (last_yield >= 100) {
ThisFiber::Yield();
last_yield = 0;
}
} while (cursor);
}
void RestoreStreamer::SendFinalize() {
@ -110,12 +101,10 @@ void RestoreStreamer::SendFinalize() {
}
RestoreStreamer::~RestoreStreamer() {
CHECK(!snapshot_fb_.IsJoinable());
}
void RestoreStreamer::Cancel() {
fiber_cancellation_.Cancel();
snapshot_fb_.JoinIfNeeded();
db_slice_->UnregisterOnChange(snapshot_version_);
JournalStreamer::Cancel();
}

View file

@ -82,7 +82,6 @@ class RestoreStreamer : public JournalStreamer {
uint64_t snapshot_version_ = 0;
SlotSet my_slots_;
uint32_t sync_id_;
Fiber snapshot_fb_;
Cancellation fiber_cancellation_;
bool snapshot_finished_ = false;
};

View file

@ -816,22 +816,7 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
c_nodes_admin,
)
while (
await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port)
)
!= "STABLE_SYNC"
):
await asyncio.sleep(0.05)
status = await c_nodes_admin[0].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[1].port)
)
assert "STABLE_SYNC" == status
status = await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
assert ["out 127.0.0.1:30002 STABLE_SYNC"] == status
await asyncio.sleep(0.5)
try:
await c_nodes_admin[1].execute_command(
"DFLYCLUSTER",
@ -850,12 +835,6 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
c_nodes_admin,
)
status = await c_nodes_admin[0].execute_command("DFLYCLUSTER SLOT-MIGRATION-STATUS")
assert ["out 127.0.0.1:30002 STABLE_SYNC"] == status
status = await c_nodes_admin[1].execute_command("DFLYCLUSTER SLOT-MIGRATION-STATUS")
assert ["in 127.0.0.1:31001 STABLE_SYNC"] == status
await close_clients(*c_nodes, *c_nodes_admin)
@ -920,13 +899,7 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
assert await c_nodes[0].set("KEY0", "value")
assert await c_nodes[0].set("KEY1", "value")
while (
await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port)
)
!= "STABLE_SYNC"
):
await asyncio.sleep(0.05)
await asyncio.sleep(0.5)
assert await c_nodes[0].set("KEY4", "value")
assert await c_nodes[0].set("KEY5", "value")
@ -1090,28 +1063,12 @@ async def test_cluster_fuzzymigration(
keeping = node.slots[num_outgoing:]
node.next_slots.extend(keeping)
# Busy loop for migrations to finish - all in stable state
iterations = 0
while True:
for node in nodes:
states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
print(states)
if not all(s.endswith("STABLE_SYNC") for s in states) and not states == "NO_STATE":
break
else:
break
iterations += 1
assert iterations < 100
await asyncio.sleep(0.1)
# Give seeder one more second
# some more time fo seeder
await asyncio.sleep(1.0)
# Stop seeder
seeder.stop()
await fill_task
await asyncio.sleep(1.0)
# Counter that pushes values to a list
async def list_counter(key, client: aioredis.RedisCluster):