From 5c11beb91922c499186da16cab94c1ade2c5cf28 Mon Sep 17 00:00:00 2001 From: Shahar Mike Date: Wed, 28 Jun 2023 22:04:51 +0300 Subject: [PATCH] feat(cluster): Send flush slots cmd from masters to replicas. (#1484) This fixes potential data diffs between master and replica upon slot moving Fixes #1320 --- src/server/cluster/cluster_family.cc | 93 ++++++++++++++--- src/server/cluster/cluster_family.h | 1 + src/server/cluster/cluster_family_test.cc | 73 +++++++++---- src/server/replica.cc | 21 +++- tests/dragonfly/cluster_test.py | 119 +++++++++++++++++++++- 5 files changed, 269 insertions(+), 38 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 6ccb056d4..c1c6732ec 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -18,6 +18,7 @@ #include "server/conn_context.h" #include "server/dflycmd.h" #include "server/error.h" +#include "server/journal/journal.h" #include "server/main_service.h" #include "server/replica.h" #include "server/server_family.h" @@ -392,7 +393,7 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) { return (*cntx)->SendError(kClusterDisabled); } - if (!cntx->owner()->IsAdmin()) { + if (cntx->owner() && !cntx->owner()->IsAdmin()) { return (*cntx)->SendError(kDflyClusterCmdPort); } @@ -404,6 +405,8 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) { return DflyClusterConfig(args, cntx); } else if (sub_cmd == "MYID") { return DflyClusterMyId(args, cntx); + } else if (sub_cmd == "FLUSHSLOTS") { + return DflyClusterFlushSlots(args, cntx); } return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType); @@ -429,6 +432,59 @@ SlotSet GetDeletedSlots(bool is_first_config, const SlotSet& before, const SlotS // Guards set configuration, so that we won't handle 2 in parallel. Mutex set_config_mu; + +void DeleteSlots(const SlotSet& slots) { + if (slots.empty()) { + return; + } + + auto cb = [&](auto*) { + EngineShard* shard = EngineShard::tlocal(); + if (shard == nullptr) + return; + + shard->db_slice().FlushSlots(slots); + }; + shard_set->pool()->AwaitFiberOnAll(std::move(cb)); +} + +void WriteFlushSlotsToJournal(const SlotSet& slots) { + if (slots.empty()) { + return; + } + + // Build args + vector args; + args.reserve(slots.size() + 1); + args.push_back("FLUSHSLOTS"); + for (const SlotId slot : slots) { + args.push_back(absl::StrCat(slot)); + } + + // Build view + vector args_view(args.size()); + for (size_t i = 0; i < args.size(); ++i) { + args_view[i] = args[i]; + } + + auto cb = [&](auto*) { + EngineShard* shard = EngineShard::tlocal(); + if (shard == nullptr) { + return; + } + + auto journal = EngineShard::tlocal()->journal(); + if (journal == nullptr) { + return; + } + + // Send journal entry + journal->RecordEntry(/* txid= */ 0, journal::Op::COMMAND, /* dbid= */ 0, + /* shard_cnt= */ shard_set->size(), make_pair("DFLYCLUSTER", args_view), + false); + }; + shard_set->pool()->AwaitFiberOnAll(std::move(cb)); +} } // namespace void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) { @@ -468,17 +524,10 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) SlotSet after = tl_cluster_config->GetOwnedSlots(); - // Delete old slots data. - SlotSet deleted_slot_ids = GetDeletedSlots(is_first_config, before, after); - if (!deleted_slot_ids.empty()) { - auto cb = [&](auto*) { - EngineShard* shard = EngineShard::tlocal(); - if (shard == nullptr) - return; - - shard->db_slice().FlushSlots(deleted_slot_ids); - }; - shard_set->pool()->AwaitFiberOnAll(std::move(cb)); + if (ServerState::tlocal()->is_master) { + auto deleted_slots = GetDeletedSlots(is_first_config, before, after); + DeleteSlots(deleted_slots); + WriteFlushSlotsToJournal(deleted_slots); } return rb->SendOk(); @@ -537,6 +586,26 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* c } } +void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx) { + SinkReplyBuilder* rb = cntx->reply_builder(); + + args.remove_prefix(1); // Removes "FLUSHSLOTS" subcmd string + + SlotSet slots; + slots.reserve(args.size()); + for (size_t i = 0; i < args.size(); ++i) { + unsigned slot; + if (!absl::SimpleAtoi(ArgS(args, i), &slot) || (slot > ClusterConfig::kMaxSlotNum)) { + return rb->SendError(kSyntaxErrType); + } + slots.insert(static_cast(slot)); + } + + DeleteSlots(slots); + + return rb->SendOk(); +} + using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx); inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) { diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 49538379a..a8b9d4816 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -46,6 +46,7 @@ class ClusterFamily { void DflyClusterConfig(CmdArgList args, ConnectionContext* cntx); void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx); void DflyClusterMyId(CmdArgList args, ConnectionContext* cntx); + void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx); ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const; diff --git a/src/server/cluster/cluster_family_test.cc b/src/server/cluster/cluster_family_test.cc index 92c886e76..11aaf10ae 100644 --- a/src/server/cluster/cluster_family_test.cc +++ b/src/server/cluster/cluster_family_test.cc @@ -47,6 +47,10 @@ class ClusterFamilyTest : public BaseFamilyTest { EXPECT_LE(absl::Now(), deadline) << "Timeout of " << timeout << " reached when expecting condition"; } + + string GetMyId() { + return RunAdmin({"dflycluster", "myid"}).GetString(); + } }; TEST_F(ClusterFamilyTest, DflyClusterOnlyOnAdminPort) { @@ -474,7 +478,7 @@ TEST_F(ClusterFamilyTest, ClusterGetSlotInfo) { "replicas": [] } ])json"; - string config = absl::Substitute(config_template, RunAdmin({"dflycluster", "myid"}).GetString()); + string config = absl::Substitute(config_template, GetMyId()); EXPECT_EQ(RunAdmin({"dflycluster", "config", config}), "OK"); @@ -528,7 +532,7 @@ TEST_F(ClusterFamilyTest, ClusterSlotsPopulate) { "replicas": [] } ])json"; - string config = absl::Substitute(config_template, RunAdmin({"dflycluster", "myid"}).GetString()); + string config = absl::Substitute(config_template, GetMyId()); EXPECT_EQ(RunAdmin({"dflycluster", "config", config}), "OK"); @@ -563,7 +567,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigDeleteSlots) { "replicas": [] } ])json"; - string config = absl::Substitute(config_template, RunAdmin({"dflycluster", "myid"}).GetString()); + string config = absl::Substitute(config_template, GetMyId()); EXPECT_EQ(RunAdmin({"dflycluster", "config", config}), "OK"); @@ -608,7 +612,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigDeleteSlotsNoCrashOnShutdown) { "replicas": [] } ])json"; - string config = absl::Substitute(config_template, RunAdmin({"dflycluster", "myid"}).GetString()); + string config = absl::Substitute(config_template, GetMyId()); EXPECT_EQ(RunAdmin({"dflycluster", "config", config}), "OK"); @@ -659,7 +663,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigDeleteSomeSlots) { "replicas": [] } ])json"; - string config = absl::Substitute(config_template, RunAdmin({"dflycluster", "myid"}).GetString()); + string config = absl::Substitute(config_template, GetMyId()); EXPECT_EQ(RunAdmin({"dflycluster", "config", config}), "OK"); @@ -733,6 +737,34 @@ TEST_F(ClusterFamilyTest, Keyslot) { CheckedInt({"cluster", "keyslot", "123{def}456"})); } +TEST_F(ClusterFamilyTest, FlushSlots) { + EXPECT_EQ(Run({"debug", "populate", "100", "key", "4", "slots", "0", "1"}), "OK"); + + EXPECT_THAT(RunAdmin({"dflycluster", "getslotinfo", "slots", "0", "1"}), + RespArray(ElementsAre(RespArray(ElementsAre(IntArg(0), "key_count", Not(IntArg(0)), + "total_reads", _, "total_writes", _)), + RespArray(ElementsAre(IntArg(1), "key_count", Not(IntArg(0)), + "total_reads", _, "total_writes", _))))); + + ExpectConditionWithinTimeout([&]() { + return RunAdmin({"dflycluster", "flushslots", "0"}) == "OK"; + }); + + EXPECT_THAT(RunAdmin({"dflycluster", "getslotinfo", "slots", "0", "1"}), + RespArray(ElementsAre(RespArray(ElementsAre(IntArg(0), "key_count", IntArg(0), + "total_reads", _, "total_writes", _)), + RespArray(ElementsAre(IntArg(1), "key_count", Not(IntArg(0)), + "total_reads", _, "total_writes", _))))); + + EXPECT_EQ(RunAdmin({"dflycluster", "flushslots", "0", "1"}), "OK"); + + EXPECT_THAT(RunAdmin({"dflycluster", "getslotinfo", "slots", "0", "1"}), + RespArray(ElementsAre(RespArray(ElementsAre(IntArg(0), "key_count", IntArg(0), + "total_reads", _, "total_writes", _)), + RespArray(ElementsAre(IntArg(1), "key_count", IntArg(0), + "total_reads", _, "total_writes", _))))); +} + TEST_F(ClusterFamilyTest, ClusterCrossSlot) { string config_template = R"json( [ @@ -751,7 +783,7 @@ TEST_F(ClusterFamilyTest, ClusterCrossSlot) { "replicas": [] } ])json"; - string config = absl::Substitute(config_template, RunAdmin({"dflycluster", "myid"}).GetString()); + string config = absl::Substitute(config_template, GetMyId()); EXPECT_EQ(RunAdmin({"dflycluster", "config", config}), "OK"); EXPECT_EQ(Run({"SET", "key", "value"}), "OK"); @@ -767,7 +799,7 @@ TEST_F(ClusterFamilyTest, ClusterCrossSlot) { EXPECT_THAT(Run({"MGET", "key{tag}", "key2{tag}"}), RespArray(ElementsAre("value", "value2"))); } -class ClusterFamilyEmulatedTest : public BaseFamilyTest { +class ClusterFamilyEmulatedTest : public ClusterFamilyTest { public: ClusterFamilyEmulatedTest() { SetTestFlag("cluster_mode", "emulated"); @@ -786,17 +818,17 @@ TEST_F(ClusterFamilyEmulatedTest, ClusterInfo) { TEST_F(ClusterFamilyEmulatedTest, ClusterShards) { EXPECT_THAT(Run({"cluster", "shards"}), - RespArray(ElementsAre("slots", // - RespArray(ElementsAre(IntArg(0), IntArg(16383))), // - "nodes", // - RespArray(ElementsAre( // - RespArray(ElementsAre( // - "id", RunAdmin({"dflycluster", "myid"}).GetString(), // - "endpoint", "fake-host", // - "ip", "fake-host", // - "port", IntArg(6379), // - "role", "master", // - "replication-offset", IntArg(0), // + RespArray(ElementsAre("slots", // + RespArray(ElementsAre(IntArg(0), IntArg(16383))), // + "nodes", // + RespArray(ElementsAre( // + RespArray(ElementsAre( // + "id", GetMyId(), // + "endpoint", "fake-host", // + "ip", "fake-host", // + "port", IntArg(6379), // + "role", "master", // + "replication-offset", IntArg(0), // "health", "online"))))))); } @@ -807,13 +839,12 @@ TEST_F(ClusterFamilyEmulatedTest, ClusterSlots) { RespArray(ElementsAre( // "fake-host", // IntArg(6379), // - RunAdmin({"dflycluster", "myid"}).GetString()))))); + GetMyId()))))); } TEST_F(ClusterFamilyEmulatedTest, ClusterNodes) { EXPECT_THAT(Run({"cluster", "nodes"}), - RunAdmin({"dflycluster", "myid"}).GetString() + - " fake-host:6379@6379 myself,master - 0 0 0 connected 0-16383\r\n"); + GetMyId() + " fake-host:6379@6379 myself,master - 0 0 0 connected 0-16383\r\n"); } } // namespace diff --git a/src/server/replica.cc b/src/server/replica.cc index 5923f9d68..10ed2a9e7 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -3,6 +3,8 @@ // #include "server/replica.h" +#include "absl/strings/match.h" + extern "C" { #include "redis/rdb.h" } @@ -1348,7 +1350,24 @@ bool Replica::TransactionData::AddEntry(journal::ParsedEntry&& entry) { } bool Replica::TransactionData::IsGlobalCmd() const { - return commands.size() == 1 && commands.front().cmd_args.size() == 1; + if (commands.size() > 1) { + return false; + } + + auto& command = commands.front(); + if (command.cmd_args.empty()) { + return false; + } + + auto& args = command.cmd_args; + if (absl::EqualsIgnoreCase(ToSV(args[0]), "FLUSHDB"sv) || + absl::EqualsIgnoreCase(ToSV(args[0]), "FLUSHALL"sv) || + (absl::EqualsIgnoreCase(ToSV(args[0]), "DFLYCLUSTER"sv) && + absl::EqualsIgnoreCase(ToSV(args[1]), "FLUSHSLOTS"sv))) { + return true; + } + + return false; } Replica::TransactionData Replica::TransactionData::FromSingle(journal::ParsedEntry&& entry) { diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 5d49e1238..dfbae7be0 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -378,9 +378,8 @@ async def test_cluster_replica_sets_non_owned_keys(df_local_factory): await push_config(replica_config, [c_replica_admin]) - # The replica should have deleted the key. - # Note: this is not the long-term intended behavior. It will change after we fix #1320. - assert await c_replica.execute_command("dbsize") == 0 + # The replica should *not* have deleted the key. + assert await c_replica.execute_command("dbsize") == 1 # Set another key on the master, which it owns but the replica does not own. await c_master.set("key2", "value"); @@ -388,7 +387,7 @@ async def test_cluster_replica_sets_non_owned_keys(df_local_factory): # See that the key exists in both replica and master assert await c_master.execute_command("dbsize") == 2 - assert await c_replica.execute_command("dbsize") == 1 + assert await c_replica.execute_command("dbsize") == 2 # The replica should still reply with MOVED, despite having that key. try: @@ -397,6 +396,118 @@ async def test_cluster_replica_sets_non_owned_keys(df_local_factory): except redis.exceptions.ResponseError as e: assert re.match(r"MOVED \d+ localhost:1111", e.args[0]) + await push_config(replica_config, [c_master_admin]) + await asyncio.sleep(0.5) + assert await c_master.execute_command("dbsize") == 0 + assert await c_replica.execute_command("dbsize") == 0 + + +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +async def test_cluster_flush_slots_after_config_change(df_local_factory): + # Start and configure cluster with 1 master and 1 replica, both own all slots + master = df_local_factory.create(port=BASE_PORT, admin_port=BASE_PORT+1000) + replica = df_local_factory.create(port=BASE_PORT+1, admin_port=BASE_PORT+1001) + df_local_factory.start_all([master, replica]) + + c_master = aioredis.Redis(port=master.port) + c_master_admin = aioredis.Redis(port=master.admin_port) + master_id = await get_node_id(c_master_admin) + + c_replica = aioredis.Redis(port=replica.port) + c_replica_admin = aioredis.Redis(port=replica.admin_port) + replica_id = await get_node_id(c_replica_admin) + + config = f""" + [ + {{ + "slot_ranges": [ + {{ + "start": 0, + "end": 16383 + }} + ], + "master": {{ + "id": "{master_id}", + "ip": "localhost", + "port": {master.port} + }}, + "replicas": [ + {{ + "id": "{replica_id}", + "ip": "localhost", + "port": {replica.port} + }} + ] + }} + ] + """ + await push_config(config, [c_master_admin, c_replica_admin]) + + await c_master.execute_command("debug", "populate", "100000"); + assert await c_master.execute_command("dbsize") == 100_000 + + # Setup replication and make sure that it works properly. + await c_replica.execute_command("REPLICAOF", "localhost", master.port) + await check_all_replicas_finished([c_replica], c_master) + assert await c_replica.execute_command("dbsize") == 100_000 + + resp = await c_master_admin.execute_command("dflycluster", "getslotinfo", "slots", "0") + assert resp[0][0] == 0 + slot_0_size = resp[0][2] + print(f'Slot 0 size = {slot_0_size}') + assert slot_0_size > 0 + + config = f""" + [ + {{ + "slot_ranges": [ + {{ + "start": 1, + "end": 16383 + }} + ], + "master": {{ + "id": "{master_id}", + "ip": "localhost", + "port": {master.port} + }}, + "replicas": [ + {{ + "id": "{replica_id}", + "ip": "localhost", + "port": {replica.port} + }} + ] + }}, + {{ + "slot_ranges": [ + {{ + "start": 0, + "end": 0 + }} + ], + "master": {{ + "id": "other-master", + "ip": "localhost", + "port": 9000 + }}, + "replicas": [ + {{ + "id": "other-replica", + "ip": "localhost", + "port": 9001 + }} + ] + }} + ] + """ + await push_config(config, [c_master_admin, c_replica_admin]) + + await asyncio.sleep(0.5) + + assert await c_master.execute_command("dbsize") == (100_000 - slot_0_size) + assert await c_replica.execute_command("dbsize") == (100_000 - slot_0_size) + @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_native_client(df_local_factory):