From 300ccf32181e2e8de62ec55dd63360ad7e877eae Mon Sep 17 00:00:00 2001 From: Roy Jacobson Date: Sun, 30 Apr 2023 14:02:47 +0300 Subject: [PATCH] fix(server): Wrong replication state (#1150) * fix(server): Read replication state better * Add basic test for ROLE command and replication --- src/server/replica.cc | 3 ++- src/server/replica.h | 3 ++- src/server/server_family.cc | 15 +++++++++------ tests/dragonfly/replication_test.py | 29 +++++++++++++++++++++++++++++ 4 files changed, 42 insertions(+), 8 deletions(-) diff --git a/src/server/replica.cc b/src/server/replica.cc index 94220da46..508415878 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -1229,7 +1229,8 @@ Replica::Info Replica::GetInfo() const { res.host = master_context_.host; res.port = master_context_.port; res.master_link_established = (state_mask_.load() & R_TCP_CONNECTED); - res.sync_in_progress = (state_mask_.load() & R_SYNCING); + res.full_sync_in_progress = (state_mask_.load() & R_SYNCING); + res.full_sync_done = (state_mask_.load() & R_SYNC_OK); res.master_last_io_sec = (ProactorBase::GetMonotonicTimeNs() - last_io_time) / 1000000000UL; return res; }); diff --git a/src/server/replica.h b/src/server/replica.h index 019b2a73f..9baebc519 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -192,7 +192,8 @@ class Replica { std::string host; uint16_t port; bool master_link_established; - bool sync_in_progress; // snapshot sync. + bool full_sync_in_progress; + bool full_sync_done; time_t master_last_io_sec; // monotonic clock. }; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index e9c2931b0..9d57e7efc 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1685,7 +1685,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { const char* link = rinfo.master_link_established ? "up" : "down"; append("master_link_status", link); append("master_last_io_seconds_ago", rinfo.master_last_io_sec); - append("master_sync_in_progress", rinfo.sync_in_progress); + append("master_sync_in_progress", rinfo.full_sync_in_progress); } } @@ -2033,17 +2033,20 @@ void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) { } else { auto replica_ptr = replica_; + CHECK(replica_ptr); Replica::Info rinfo = replica_ptr->GetInfo(); (*cntx)->StartArray(4); (*cntx)->SendBulkString("replica"); (*cntx)->SendBulkString(rinfo.host); (*cntx)->SendBulkString(absl::StrCat(rinfo.port)); - if (rinfo.sync_in_progress) { - (*cntx)->SendBulkString("full_sync"); - } else if (!rinfo.master_link_established) { - (*cntx)->SendBulkString("connecting"); - } else { + if (rinfo.full_sync_done) { (*cntx)->SendBulkString("stable_sync"); + } else if (rinfo.full_sync_in_progress) { + (*cntx)->SendBulkString("full_sync"); + } else if (rinfo.master_link_established) { + (*cntx)->SendBulkString("preparation"); + } else { + (*cntx)->SendBulkString("connecting"); } } } diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 635ce02cd..7ff6f390f 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -871,3 +871,32 @@ async def test_script_transfer(df_local_factory): assert await c_replica.evalsha(sha, 0) == i await c_master.connection_pool.disconnect() await c_replica.connection_pool.disconnect() + + +@dfly_args({"proactor_threads": 4}) +@pytest.mark.asyncio +async def test_role_command(df_local_factory, n_keys=20): + master = df_local_factory.create(port=BASE_PORT) + replica = df_local_factory.create(port=BASE_PORT+1, logtostdout=True) + + df_local_factory.start_all([master, replica]) + + c_master = aioredis.Redis(port=master.port) + c_replica = aioredis.Redis(port=replica.port) + + assert await c_master.execute_command("role") == [b'master', []] + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_replica) + + assert await c_master.execute_command("role") == [ + b'master', [[b'127.0.0.1', bytes(str(replica.port), 'ascii'), b'stable_sync']]] + assert await c_replica.execute_command("role") == [ + b'replica', b'localhost', bytes(str(master.port), 'ascii'), b'stable_sync'] + + master.stop() + await asyncio.sleep(0.1) + assert await c_replica.execute_command("role") == [ + b'replica', b'localhost', bytes(str(master.port), 'ascii'), b'connecting'] + + await c_master.connection_pool.disconnect() + await c_replica.connection_pool.disconnect()