1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(cluster): Send flush slots cmd from masters to replicas. (#1484)

This fixes potential data diffs between master and replica upon slot moving

Fixes #1320
This commit is contained in:
Shahar Mike 2023-06-28 22:04:51 +03:00 committed by GitHub
parent 3ebfdb1e19
commit 5c11beb919
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 269 additions and 38 deletions

View file

@ -18,6 +18,7 @@
#include "server/conn_context.h"
#include "server/dflycmd.h"
#include "server/error.h"
#include "server/journal/journal.h"
#include "server/main_service.h"
#include "server/replica.h"
#include "server/server_family.h"
@ -392,7 +393,7 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError(kClusterDisabled);
}
if (!cntx->owner()->IsAdmin()) {
if (cntx->owner() && !cntx->owner()->IsAdmin()) {
return (*cntx)->SendError(kDflyClusterCmdPort);
}
@ -404,6 +405,8 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
return DflyClusterConfig(args, cntx);
} else if (sub_cmd == "MYID") {
return DflyClusterMyId(args, cntx);
} else if (sub_cmd == "FLUSHSLOTS") {
return DflyClusterFlushSlots(args, cntx);
}
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType);
@ -429,6 +432,59 @@ SlotSet GetDeletedSlots(bool is_first_config, const SlotSet& before, const SlotS
// Guards set configuration, so that we won't handle 2 in parallel.
Mutex set_config_mu;
void DeleteSlots(const SlotSet& slots) {
if (slots.empty()) {
return;
}
auto cb = [&](auto*) {
EngineShard* shard = EngineShard::tlocal();
if (shard == nullptr)
return;
shard->db_slice().FlushSlots(slots);
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
}
void WriteFlushSlotsToJournal(const SlotSet& slots) {
if (slots.empty()) {
return;
}
// Build args
vector<string> args;
args.reserve(slots.size() + 1);
args.push_back("FLUSHSLOTS");
for (const SlotId slot : slots) {
args.push_back(absl::StrCat(slot));
}
// Build view
vector<string_view> args_view(args.size());
for (size_t i = 0; i < args.size(); ++i) {
args_view[i] = args[i];
}
auto cb = [&](auto*) {
EngineShard* shard = EngineShard::tlocal();
if (shard == nullptr) {
return;
}
auto journal = EngineShard::tlocal()->journal();
if (journal == nullptr) {
return;
}
// Send journal entry
journal->RecordEntry(/* txid= */ 0, journal::Op::COMMAND, /* dbid= */ 0,
/* shard_cnt= */ shard_set->size(), make_pair("DFLYCLUSTER", args_view),
false);
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
}
} // namespace
void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) {
@ -468,17 +524,10 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
SlotSet after = tl_cluster_config->GetOwnedSlots();
// Delete old slots data.
SlotSet deleted_slot_ids = GetDeletedSlots(is_first_config, before, after);
if (!deleted_slot_ids.empty()) {
auto cb = [&](auto*) {
EngineShard* shard = EngineShard::tlocal();
if (shard == nullptr)
return;
shard->db_slice().FlushSlots(deleted_slot_ids);
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
if (ServerState::tlocal()->is_master) {
auto deleted_slots = GetDeletedSlots(is_first_config, before, after);
DeleteSlots(deleted_slots);
WriteFlushSlotsToJournal(deleted_slots);
}
return rb->SendOk();
@ -537,6 +586,26 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* c
}
}
void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx) {
SinkReplyBuilder* rb = cntx->reply_builder();
args.remove_prefix(1); // Removes "FLUSHSLOTS" subcmd string
SlotSet slots;
slots.reserve(args.size());
for (size_t i = 0; i < args.size(); ++i) {
unsigned slot;
if (!absl::SimpleAtoi(ArgS(args, i), &slot) || (slot > ClusterConfig::kMaxSlotNum)) {
return rb->SendError(kSyntaxErrType);
}
slots.insert(static_cast<SlotId>(slot));
}
DeleteSlots(slots);
return rb->SendOk();
}
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx);
inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) {

View file

@ -46,6 +46,7 @@ class ClusterFamily {
void DflyClusterConfig(CmdArgList args, ConnectionContext* cntx);
void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx);
void DflyClusterMyId(CmdArgList args, ConnectionContext* cntx);
void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx);
ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const;

View file

@ -47,6 +47,10 @@ class ClusterFamilyTest : public BaseFamilyTest {
EXPECT_LE(absl::Now(), deadline)
<< "Timeout of " << timeout << " reached when expecting condition";
}
string GetMyId() {
return RunAdmin({"dflycluster", "myid"}).GetString();
}
};
TEST_F(ClusterFamilyTest, DflyClusterOnlyOnAdminPort) {
@ -474,7 +478,7 @@ TEST_F(ClusterFamilyTest, ClusterGetSlotInfo) {
"replicas": []
}
])json";
string config = absl::Substitute(config_template, RunAdmin({"dflycluster", "myid"}).GetString());
string config = absl::Substitute(config_template, GetMyId());
EXPECT_EQ(RunAdmin({"dflycluster", "config", config}), "OK");
@ -528,7 +532,7 @@ TEST_F(ClusterFamilyTest, ClusterSlotsPopulate) {
"replicas": []
}
])json";
string config = absl::Substitute(config_template, RunAdmin({"dflycluster", "myid"}).GetString());
string config = absl::Substitute(config_template, GetMyId());
EXPECT_EQ(RunAdmin({"dflycluster", "config", config}), "OK");
@ -563,7 +567,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigDeleteSlots) {
"replicas": []
}
])json";
string config = absl::Substitute(config_template, RunAdmin({"dflycluster", "myid"}).GetString());
string config = absl::Substitute(config_template, GetMyId());
EXPECT_EQ(RunAdmin({"dflycluster", "config", config}), "OK");
@ -608,7 +612,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigDeleteSlotsNoCrashOnShutdown) {
"replicas": []
}
])json";
string config = absl::Substitute(config_template, RunAdmin({"dflycluster", "myid"}).GetString());
string config = absl::Substitute(config_template, GetMyId());
EXPECT_EQ(RunAdmin({"dflycluster", "config", config}), "OK");
@ -659,7 +663,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigDeleteSomeSlots) {
"replicas": []
}
])json";
string config = absl::Substitute(config_template, RunAdmin({"dflycluster", "myid"}).GetString());
string config = absl::Substitute(config_template, GetMyId());
EXPECT_EQ(RunAdmin({"dflycluster", "config", config}), "OK");
@ -733,6 +737,34 @@ TEST_F(ClusterFamilyTest, Keyslot) {
CheckedInt({"cluster", "keyslot", "123{def}456"}));
}
TEST_F(ClusterFamilyTest, FlushSlots) {
EXPECT_EQ(Run({"debug", "populate", "100", "key", "4", "slots", "0", "1"}), "OK");
EXPECT_THAT(RunAdmin({"dflycluster", "getslotinfo", "slots", "0", "1"}),
RespArray(ElementsAre(RespArray(ElementsAre(IntArg(0), "key_count", Not(IntArg(0)),
"total_reads", _, "total_writes", _)),
RespArray(ElementsAre(IntArg(1), "key_count", Not(IntArg(0)),
"total_reads", _, "total_writes", _)))));
ExpectConditionWithinTimeout([&]() {
return RunAdmin({"dflycluster", "flushslots", "0"}) == "OK";
});
EXPECT_THAT(RunAdmin({"dflycluster", "getslotinfo", "slots", "0", "1"}),
RespArray(ElementsAre(RespArray(ElementsAre(IntArg(0), "key_count", IntArg(0),
"total_reads", _, "total_writes", _)),
RespArray(ElementsAre(IntArg(1), "key_count", Not(IntArg(0)),
"total_reads", _, "total_writes", _)))));
EXPECT_EQ(RunAdmin({"dflycluster", "flushslots", "0", "1"}), "OK");
EXPECT_THAT(RunAdmin({"dflycluster", "getslotinfo", "slots", "0", "1"}),
RespArray(ElementsAre(RespArray(ElementsAre(IntArg(0), "key_count", IntArg(0),
"total_reads", _, "total_writes", _)),
RespArray(ElementsAre(IntArg(1), "key_count", IntArg(0),
"total_reads", _, "total_writes", _)))));
}
TEST_F(ClusterFamilyTest, ClusterCrossSlot) {
string config_template = R"json(
[
@ -751,7 +783,7 @@ TEST_F(ClusterFamilyTest, ClusterCrossSlot) {
"replicas": []
}
])json";
string config = absl::Substitute(config_template, RunAdmin({"dflycluster", "myid"}).GetString());
string config = absl::Substitute(config_template, GetMyId());
EXPECT_EQ(RunAdmin({"dflycluster", "config", config}), "OK");
EXPECT_EQ(Run({"SET", "key", "value"}), "OK");
@ -767,7 +799,7 @@ TEST_F(ClusterFamilyTest, ClusterCrossSlot) {
EXPECT_THAT(Run({"MGET", "key{tag}", "key2{tag}"}), RespArray(ElementsAre("value", "value2")));
}
class ClusterFamilyEmulatedTest : public BaseFamilyTest {
class ClusterFamilyEmulatedTest : public ClusterFamilyTest {
public:
ClusterFamilyEmulatedTest() {
SetTestFlag("cluster_mode", "emulated");
@ -786,17 +818,17 @@ TEST_F(ClusterFamilyEmulatedTest, ClusterInfo) {
TEST_F(ClusterFamilyEmulatedTest, ClusterShards) {
EXPECT_THAT(Run({"cluster", "shards"}),
RespArray(ElementsAre("slots", //
RespArray(ElementsAre(IntArg(0), IntArg(16383))), //
"nodes", //
RespArray(ElementsAre( //
RespArray(ElementsAre( //
"id", RunAdmin({"dflycluster", "myid"}).GetString(), //
"endpoint", "fake-host", //
"ip", "fake-host", //
"port", IntArg(6379), //
"role", "master", //
"replication-offset", IntArg(0), //
RespArray(ElementsAre("slots", //
RespArray(ElementsAre(IntArg(0), IntArg(16383))), //
"nodes", //
RespArray(ElementsAre( //
RespArray(ElementsAre( //
"id", GetMyId(), //
"endpoint", "fake-host", //
"ip", "fake-host", //
"port", IntArg(6379), //
"role", "master", //
"replication-offset", IntArg(0), //
"health", "online")))))));
}
@ -807,13 +839,12 @@ TEST_F(ClusterFamilyEmulatedTest, ClusterSlots) {
RespArray(ElementsAre( //
"fake-host", //
IntArg(6379), //
RunAdmin({"dflycluster", "myid"}).GetString())))));
GetMyId())))));
}
TEST_F(ClusterFamilyEmulatedTest, ClusterNodes) {
EXPECT_THAT(Run({"cluster", "nodes"}),
RunAdmin({"dflycluster", "myid"}).GetString() +
" fake-host:6379@6379 myself,master - 0 0 0 connected 0-16383\r\n");
GetMyId() + " fake-host:6379@6379 myself,master - 0 0 0 connected 0-16383\r\n");
}
} // namespace

View file

@ -3,6 +3,8 @@
//
#include "server/replica.h"
#include "absl/strings/match.h"
extern "C" {
#include "redis/rdb.h"
}
@ -1348,7 +1350,24 @@ bool Replica::TransactionData::AddEntry(journal::ParsedEntry&& entry) {
}
bool Replica::TransactionData::IsGlobalCmd() const {
return commands.size() == 1 && commands.front().cmd_args.size() == 1;
if (commands.size() > 1) {
return false;
}
auto& command = commands.front();
if (command.cmd_args.empty()) {
return false;
}
auto& args = command.cmd_args;
if (absl::EqualsIgnoreCase(ToSV(args[0]), "FLUSHDB"sv) ||
absl::EqualsIgnoreCase(ToSV(args[0]), "FLUSHALL"sv) ||
(absl::EqualsIgnoreCase(ToSV(args[0]), "DFLYCLUSTER"sv) &&
absl::EqualsIgnoreCase(ToSV(args[1]), "FLUSHSLOTS"sv))) {
return true;
}
return false;
}
Replica::TransactionData Replica::TransactionData::FromSingle(journal::ParsedEntry&& entry) {

View file

@ -378,9 +378,8 @@ async def test_cluster_replica_sets_non_owned_keys(df_local_factory):
await push_config(replica_config, [c_replica_admin])
# The replica should have deleted the key.
# Note: this is not the long-term intended behavior. It will change after we fix #1320.
assert await c_replica.execute_command("dbsize") == 0
# The replica should *not* have deleted the key.
assert await c_replica.execute_command("dbsize") == 1
# Set another key on the master, which it owns but the replica does not own.
await c_master.set("key2", "value");
@ -388,7 +387,7 @@ async def test_cluster_replica_sets_non_owned_keys(df_local_factory):
# See that the key exists in both replica and master
assert await c_master.execute_command("dbsize") == 2
assert await c_replica.execute_command("dbsize") == 1
assert await c_replica.execute_command("dbsize") == 2
# The replica should still reply with MOVED, despite having that key.
try:
@ -397,6 +396,118 @@ async def test_cluster_replica_sets_non_owned_keys(df_local_factory):
except redis.exceptions.ResponseError as e:
assert re.match(r"MOVED \d+ localhost:1111", e.args[0])
await push_config(replica_config, [c_master_admin])
await asyncio.sleep(0.5)
assert await c_master.execute_command("dbsize") == 0
assert await c_replica.execute_command("dbsize") == 0
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_flush_slots_after_config_change(df_local_factory):
# Start and configure cluster with 1 master and 1 replica, both own all slots
master = df_local_factory.create(port=BASE_PORT, admin_port=BASE_PORT+1000)
replica = df_local_factory.create(port=BASE_PORT+1, admin_port=BASE_PORT+1001)
df_local_factory.start_all([master, replica])
c_master = aioredis.Redis(port=master.port)
c_master_admin = aioredis.Redis(port=master.admin_port)
master_id = await get_node_id(c_master_admin)
c_replica = aioredis.Redis(port=replica.port)
c_replica_admin = aioredis.Redis(port=replica.admin_port)
replica_id = await get_node_id(c_replica_admin)
config = f"""
[
{{
"slot_ranges": [
{{
"start": 0,
"end": 16383
}}
],
"master": {{
"id": "{master_id}",
"ip": "localhost",
"port": {master.port}
}},
"replicas": [
{{
"id": "{replica_id}",
"ip": "localhost",
"port": {replica.port}
}}
]
}}
]
"""
await push_config(config, [c_master_admin, c_replica_admin])
await c_master.execute_command("debug", "populate", "100000");
assert await c_master.execute_command("dbsize") == 100_000
# Setup replication and make sure that it works properly.
await c_replica.execute_command("REPLICAOF", "localhost", master.port)
await check_all_replicas_finished([c_replica], c_master)
assert await c_replica.execute_command("dbsize") == 100_000
resp = await c_master_admin.execute_command("dflycluster", "getslotinfo", "slots", "0")
assert resp[0][0] == 0
slot_0_size = resp[0][2]
print(f'Slot 0 size = {slot_0_size}')
assert slot_0_size > 0
config = f"""
[
{{
"slot_ranges": [
{{
"start": 1,
"end": 16383
}}
],
"master": {{
"id": "{master_id}",
"ip": "localhost",
"port": {master.port}
}},
"replicas": [
{{
"id": "{replica_id}",
"ip": "localhost",
"port": {replica.port}
}}
]
}},
{{
"slot_ranges": [
{{
"start": 0,
"end": 0
}}
],
"master": {{
"id": "other-master",
"ip": "localhost",
"port": 9000
}},
"replicas": [
{{
"id": "other-replica",
"ip": "localhost",
"port": 9001
}}
]
}}
]
"""
await push_config(config, [c_master_admin, c_replica_admin])
await asyncio.sleep(0.5)
assert await c_master.execute_command("dbsize") == (100_000 - slot_0_size)
assert await c_replica.execute_command("dbsize") == (100_000 - slot_0_size)
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_native_client(df_local_factory):