1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix: fix bug in cluster/slot_set (#3143)

* fix: fix bug in cluster/slot_set

* fix: fix slot flushes
This commit is contained in:
Borys 2024-06-07 14:31:11 +03:00 committed by GitHub
parent 3c6d72b93a
commit 39dd73fc71
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 87 additions and 28 deletions

View file

@ -558,4 +558,55 @@ TEST_F(ClusterConfigTest, InvalidConfigMigrationsWithoutIP) {
EXPECT_EQ(config, nullptr);
}
TEST_F(ClusterConfigTest, SlotSetAPI) {
{
SlotSet ss(false);
EXPECT_EQ(ss.ToSlotRanges(), SlotRanges());
EXPECT_FALSE(ss.All());
EXPECT_TRUE(ss.Empty());
}
{
SlotSet ss(true);
EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, SlotRange::kMaxSlotId}}));
EXPECT_TRUE(ss.All());
EXPECT_FALSE(ss.Empty());
}
{
SlotSet ss(SlotRanges({{0, 1000}, {1001, 2000}}));
EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({SlotRange{0, 2000}}));
EXPECT_EQ(ss.Count(), 2001);
for (uint16_t i = 0; i < 2000; ++i) {
EXPECT_TRUE(ss.Contains(i));
}
for (uint16_t i = 2001; i <= SlotRange::kMaxSlotId; ++i) {
EXPECT_FALSE(ss.Contains(i));
}
EXPECT_FALSE(ss.All());
EXPECT_FALSE(ss.Empty());
ss.Set(5010, true);
EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5010, 5010}}));
ss.Set({SlotRange{5000, 5100}}, true);
EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5000, 5100}}));
ss.Set(5050, false);
EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5000, 5049}, {5051, 5100}}));
ss.Set(5500, false);
EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5000, 5049}, {5051, 5100}}));
ss.Set({SlotRange{5090, 5100}}, false);
EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5000, 5049}, {5051, 5089}}));
SlotSet ss1(SlotRanges({{1001, 2000}}));
EXPECT_EQ(ss.GetRemovedSlots(ss1).ToSlotRanges(),
SlotRanges({{0, 1000}, {5000, 5049}, {5051, 5089}}));
EXPECT_EQ(ss1.GetRemovedSlots(ss).ToSlotRanges(), SlotRanges());
}
}
} // namespace dfly::cluster

View file

@ -508,7 +508,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
lock_guard gu(set_config_mu);
VLOG(1) << "Setting new cluster config: " << json_str;
RemoveOutgoingMigrations(new_config->GetFinishedOutgoingMigrations(tl_cluster_config));
auto out_migrations_slots = RemoveOutgoingMigrations(new_config, tl_cluster_config);
RemoveIncomingMigrations(new_config->GetFinishedIncomingMigrations(tl_cluster_config));
// Prevent simultaneous config update from outgoing migration
@ -565,6 +565,8 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
SlotSet after = tl_cluster_config->GetOwnedSlots();
if (ServerState::tlocal()->is_master) {
auto deleted_slots = (before.GetRemovedSlots(after)).ToSlotRanges();
deleted_slots.insert(deleted_slots.end(), out_migrations_slots.begin(),
out_migrations_slots.end());
LOG_IF(INFO, !deleted_slots.empty())
<< "Flushing newly unowned slots: " << SlotRange::ToString(deleted_slots);
DeleteSlots(deleted_slots);
@ -736,8 +738,11 @@ std::shared_ptr<IncomingSlotMigration> ClusterFamily::GetIncomingMigration(
return nullptr;
}
void ClusterFamily::RemoveOutgoingMigrations(const std::vector<MigrationInfo>& migrations) {
SlotRanges ClusterFamily::RemoveOutgoingMigrations(shared_ptr<ClusterConfig> new_config,
shared_ptr<ClusterConfig> old_config) {
auto migrations = new_config->GetFinishedOutgoingMigrations(old_config);
lock_guard lk(migration_mu_);
SlotRanges removed_slots;
for (const auto& m : migrations) {
auto it = std::find_if(outgoing_migration_jobs_.begin(), outgoing_migration_jobs_.end(),
[&m](const auto& om) {
@ -747,13 +752,20 @@ void ClusterFamily::RemoveOutgoingMigrations(const std::vector<MigrationInfo>& m
DCHECK(it != outgoing_migration_jobs_.end());
DCHECK(it->get() != nullptr);
OutgoingMigration& migration = *it->get();
LOG(INFO) << "Outgoing migration cancelled: slots " << SlotRange::ToString(migration.GetSlots())
<< " to " << migration.GetHostIp() << ":" << migration.GetPort();
const auto& slots = migration.GetSlots();
removed_slots.insert(removed_slots.end(), slots.begin(), slots.end());
LOG(INFO) << "Outgoing migration cancelled: slots " << SlotRange::ToString(slots) << " to "
<< migration.GetHostIp() << ":" << migration.GetPort();
migration.Finish();
outgoing_migration_jobs_.erase(it);
}
// Flush non-owned migrations
SlotSet migration_slots(removed_slots);
SlotSet removed = migration_slots.GetRemovedSlots(new_config->GetOwnedSlots());
// Flushing of removed slots is done outside this function.
return removed.ToSlotRanges();
}
namespace {
@ -779,6 +791,7 @@ bool RemoveIncomingMigrationImpl(std::vector<std::shared_ptr<IncomingSlotMigrati
// all fibers has migration shared_ptr so we don't need to join it and can erase
jobs.erase(it);
// TODO make it outside in one run with other slots that should be flushed
if (!removed.Empty()) {
auto removed_ranges = make_shared<SlotRanges>(removed.ToSlotRanges());
LOG_IF(WARNING, migration->GetState() == MigrationState::C_FINISHED)

View file

@ -78,7 +78,8 @@ class ClusterFamily {
std::shared_ptr<IncomingSlotMigration> GetIncomingMigration(std::string_view source_id);
void StartSlotMigrations(std::vector<MigrationInfo> migrations);
void RemoveOutgoingMigrations(const std::vector<MigrationInfo>& migrations);
SlotRanges RemoveOutgoingMigrations(std::shared_ptr<ClusterConfig> new_config,
std::shared_ptr<ClusterConfig> old_config);
void RemoveIncomingMigrations(const std::vector<MigrationInfo>& migrations);
// store info about migration and create unique session id

View file

@ -41,7 +41,7 @@ class SlotSet {
void Set(const SlotRanges& slot_ranges, bool value) {
for (const auto& slot_range : slot_ranges) {
for (auto i = slot_range.start; i <= slot_range.end; ++i) {
slots_->set(i);
slots_->set(i, value);
}
}
}

View file

@ -144,7 +144,7 @@ async def wait_for_status(admin_client, node_id, status, timeout=10):
return
else:
logging.debug(f"SLOT-MIGRATION-STATUS is {response}, not {status}")
await asyncio.sleep(0.05)
await asyncio.sleep(0.1)
raise RuntimeError("Timeout to achieve migrations status")
@ -1303,21 +1303,15 @@ async def test_cluster_config_reapply(df_local_factory: DflyInstanceFactory):
logging.debug("Migrating slots 6000-8000")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
while "FINISHED" not in await nodes[1].admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id
):
logging.debug("SLOT-MIGRATION-STATUS is not FINISHED")
await asyncio.sleep(0.05)
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")
assert await nodes[0].admin_client.dbsize() == SIZE
assert await nodes[1].admin_client.dbsize() == SIZE
assert [SIZE, SIZE] == [await node.client.dbsize() for node in nodes]
logging.debug("Reapply config with migration")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
await asyncio.sleep(0.1)
assert await nodes[0].admin_client.dbsize() == SIZE
assert await nodes[1].admin_client.dbsize() == SIZE
assert [SIZE, SIZE] == [await node.client.dbsize() for node in nodes]
logging.debug("Finalizing migration")
nodes[0].migrations = []
@ -1326,15 +1320,15 @@ async def test_cluster_config_reapply(df_local_factory: DflyInstanceFactory):
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
logging.debug("Migration finalized")
await asyncio.sleep(0.1)
assert [0, SIZE] == [await node.admin_client.dbsize() for node in nodes]
await asyncio.sleep(1)
assert [0, SIZE] == [await node.client.dbsize() for node in nodes]
for i in range(SIZE):
assert str(i) == await nodes[1].admin_client.get(f"{{key50}}:{i}")
assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}")
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
@pytest.mark.skip(reason="The test is broken")
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory):
"""Check data migration from one node to another."""
@ -1352,8 +1346,8 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory):
SIZE = 10_000
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
for i in range(SIZE):
assert await nodes[0].admin_client.set(f"{{key50}}:{i}", i) # key50 belongs to slot 6686
assert [SIZE, 0] == [await node.admin_client.dbsize() for node in nodes]
assert await nodes[0].client.set(f"{{key50}}:{i}", i) # key50 belongs to slot 6686
assert [SIZE, 0] == [await node.client.dbsize() for node in nodes]
nodes[0].migrations = [
MigrationInfo("127.0.0.1", instances[1].admin_port, [(6000, 8000)], nodes[1].id)
@ -1364,13 +1358,13 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory):
logging.debug("Cancelling migration")
nodes[0].migrations = []
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
assert SIZE == await nodes[0].admin_client.dbsize()
assert SIZE == await nodes[0].client.dbsize()
while True:
db_size = await nodes[1].admin_client.dbsize()
db_size = await nodes[1].client.dbsize()
if 0 == db_size:
break
logging.debug(f"target dbsize is {db_size}")
logging.debug(await nodes[1].admin_client.execute_command("KEYS", "*"))
logging.debug(await nodes[1].client.execute_command("KEYS", "*"))
await asyncio.sleep(0.1)
logging.debug("Reissuing migration")
@ -1379,7 +1373,7 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory):
)
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")
assert [SIZE, SIZE] == [await node.admin_client.dbsize() for node in nodes]
assert [SIZE, SIZE] == [await node.client.dbsize() for node in nodes]
logging.debug("Finalizing migration")
nodes[0].migrations = []
@ -1388,12 +1382,12 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory):
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
logging.debug("Migration finalized")
while 0 != await nodes[0].admin_client.dbsize():
while 0 != await nodes[0].client.dbsize():
logging.debug(f"wait until source dbsize is empty")
await asyncio.sleep(0.1)
for i in range(SIZE):
assert str(i) == await nodes[1].admin_client.get(f"{{key50}}:{i}")
assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}")
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])