mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
test: add test for replication deadlock on replication timeout (#3691)
* test: add test for replication deadlock on replication timeout Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
efa4efd2bf
commit
409c2a3beb
3 changed files with 41 additions and 2 deletions
|
@ -208,9 +208,9 @@ void Replica::MainReplicationFb() {
|
|||
}
|
||||
|
||||
// Give a lower timeout for connect, because we're
|
||||
reconnect_count_++;
|
||||
ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_reconnect_timeout_ms) * 1ms, &cntx_);
|
||||
if (ec) {
|
||||
reconnect_count_++;
|
||||
LOG(WARNING) << "Error connecting to " << server().Description() << " " << ec;
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -326,7 +326,7 @@ class DflyInstance:
|
|||
async def metrics(self):
|
||||
session = aiohttp.ClientSession()
|
||||
resp = await session.get(f"http://localhost:{self.port}/metrics")
|
||||
data = await resp.text()
|
||||
data = await resp.text(encoding="utf-8")
|
||||
await session.close()
|
||||
return {
|
||||
metric_family.name: metric_family
|
||||
|
|
|
@ -2338,6 +2338,45 @@ async def test_announce_ip_port(df_factory):
|
|||
assert port == "1337"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory, df_seeder_factory):
|
||||
# setting replication_timeout to a very small value to force the replica to timeout
|
||||
master = df_factory.create(replication_timeout=100, vmodule="replica=2,dflycmd=2")
|
||||
replica = df_factory.create()
|
||||
|
||||
df_factory.start_all([master, replica])
|
||||
|
||||
c_master = master.client()
|
||||
c_replica = replica.client()
|
||||
|
||||
await c_master.execute_command("debug", "populate", "200000", "foo", "5000")
|
||||
seeder = df_seeder_factory.create(port=master.port)
|
||||
seeder_task = asyncio.create_task(seeder.run())
|
||||
|
||||
await asyncio.sleep(0.5) # wait for seeder running
|
||||
|
||||
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||
|
||||
# wait for full sync
|
||||
async with async_timeout.timeout(3):
|
||||
await wait_for_replicas_state(c_replica, state="full_sync", timeout=0.05)
|
||||
|
||||
await c_replica.execute_command(
|
||||
"debug replica pause"
|
||||
) # puase replica to trigger reconnect on master
|
||||
|
||||
await asyncio.sleep(1)
|
||||
|
||||
await c_replica.execute_command("debug replica resume") # resume replication
|
||||
|
||||
await asyncio.sleep(1) # replica will start resync
|
||||
seeder.stop()
|
||||
await seeder_task
|
||||
|
||||
await check_all_replicas_finished([c_replica], c_master)
|
||||
await assert_replica_reconnections(replica, 0)
|
||||
|
||||
|
||||
async def test_master_stalled_disconnect(df_factory: DflyInstanceFactory):
|
||||
# disconnect after 1 second of being blocked
|
||||
master = df_factory.create(replication_timeout=1000)
|
||||
|
|
Loading…
Reference in a new issue