mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
fix(server): Wrong replication state (#1150)
* fix(server): Read replication state better * Add basic test for ROLE command and replication
This commit is contained in:
parent
46093e33ff
commit
300ccf3218
4 changed files with 42 additions and 8 deletions
|
@ -1229,7 +1229,8 @@ Replica::Info Replica::GetInfo() const {
|
|||
res.host = master_context_.host;
|
||||
res.port = master_context_.port;
|
||||
res.master_link_established = (state_mask_.load() & R_TCP_CONNECTED);
|
||||
res.sync_in_progress = (state_mask_.load() & R_SYNCING);
|
||||
res.full_sync_in_progress = (state_mask_.load() & R_SYNCING);
|
||||
res.full_sync_done = (state_mask_.load() & R_SYNC_OK);
|
||||
res.master_last_io_sec = (ProactorBase::GetMonotonicTimeNs() - last_io_time) / 1000000000UL;
|
||||
return res;
|
||||
});
|
||||
|
|
|
@ -192,7 +192,8 @@ class Replica {
|
|||
std::string host;
|
||||
uint16_t port;
|
||||
bool master_link_established;
|
||||
bool sync_in_progress; // snapshot sync.
|
||||
bool full_sync_in_progress;
|
||||
bool full_sync_done;
|
||||
time_t master_last_io_sec; // monotonic clock.
|
||||
};
|
||||
|
||||
|
|
|
@ -1685,7 +1685,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
|
|||
const char* link = rinfo.master_link_established ? "up" : "down";
|
||||
append("master_link_status", link);
|
||||
append("master_last_io_seconds_ago", rinfo.master_last_io_sec);
|
||||
append("master_sync_in_progress", rinfo.sync_in_progress);
|
||||
append("master_sync_in_progress", rinfo.full_sync_in_progress);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2033,17 +2033,20 @@ void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
} else {
|
||||
auto replica_ptr = replica_;
|
||||
CHECK(replica_ptr);
|
||||
Replica::Info rinfo = replica_ptr->GetInfo();
|
||||
(*cntx)->StartArray(4);
|
||||
(*cntx)->SendBulkString("replica");
|
||||
(*cntx)->SendBulkString(rinfo.host);
|
||||
(*cntx)->SendBulkString(absl::StrCat(rinfo.port));
|
||||
if (rinfo.sync_in_progress) {
|
||||
(*cntx)->SendBulkString("full_sync");
|
||||
} else if (!rinfo.master_link_established) {
|
||||
(*cntx)->SendBulkString("connecting");
|
||||
} else {
|
||||
if (rinfo.full_sync_done) {
|
||||
(*cntx)->SendBulkString("stable_sync");
|
||||
} else if (rinfo.full_sync_in_progress) {
|
||||
(*cntx)->SendBulkString("full_sync");
|
||||
} else if (rinfo.master_link_established) {
|
||||
(*cntx)->SendBulkString("preparation");
|
||||
} else {
|
||||
(*cntx)->SendBulkString("connecting");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -871,3 +871,32 @@ async def test_script_transfer(df_local_factory):
|
|||
assert await c_replica.evalsha(sha, 0) == i
|
||||
await c_master.connection_pool.disconnect()
|
||||
await c_replica.connection_pool.disconnect()
|
||||
|
||||
|
||||
@dfly_args({"proactor_threads": 4})
|
||||
@pytest.mark.asyncio
|
||||
async def test_role_command(df_local_factory, n_keys=20):
|
||||
master = df_local_factory.create(port=BASE_PORT)
|
||||
replica = df_local_factory.create(port=BASE_PORT+1, logtostdout=True)
|
||||
|
||||
df_local_factory.start_all([master, replica])
|
||||
|
||||
c_master = aioredis.Redis(port=master.port)
|
||||
c_replica = aioredis.Redis(port=replica.port)
|
||||
|
||||
assert await c_master.execute_command("role") == [b'master', []]
|
||||
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||
await wait_available_async(c_replica)
|
||||
|
||||
assert await c_master.execute_command("role") == [
|
||||
b'master', [[b'127.0.0.1', bytes(str(replica.port), 'ascii'), b'stable_sync']]]
|
||||
assert await c_replica.execute_command("role") == [
|
||||
b'replica', b'localhost', bytes(str(master.port), 'ascii'), b'stable_sync']
|
||||
|
||||
master.stop()
|
||||
await asyncio.sleep(0.1)
|
||||
assert await c_replica.execute_command("role") == [
|
||||
b'replica', b'localhost', bytes(str(master.port), 'ascii'), b'connecting']
|
||||
|
||||
await c_master.connection_pool.disconnect()
|
||||
await c_replica.connection_pool.disconnect()
|
||||
|
|
Loading…
Reference in a new issue