mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
fix: Fix test_take_over_seeder
(#3385)
* fix: Fix `test_take_over_seeder` There are a few issues with the test: 1. Not using the admin port, which could cause pause to deadlock 2. Not waiting for some of the `task`s (although that won't cause a failure) But also in the product code: 1. We used to `std::move()` the same pointer multiple times 2. We assigned to the same status object from multiple threads Hopefully this fixes the test. It used to fail every ~100 attempts on my machine, now it's been >1,000 and they all passed. * add comments * remove shard_ptr param
This commit is contained in:
parent
181d356341
commit
fb4222d01e
3 changed files with 43 additions and 38 deletions
|
@ -71,8 +71,8 @@ std::string_view SyncStateName(DflyCmd::SyncState sync_state) {
|
|||
return "unsupported";
|
||||
}
|
||||
|
||||
OpStatus WaitReplicaFlowToCatchup(absl::Time end_time, shared_ptr<DflyCmd::ReplicaInfo> replica,
|
||||
EngineShard* shard) {
|
||||
bool WaitReplicaFlowToCatchup(absl::Time end_time, DflyCmd::ReplicaInfo* replica,
|
||||
EngineShard* shard) {
|
||||
// We don't want any writes to the journal after we send the `PING`,
|
||||
// and expirations could ruin that.
|
||||
namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(false);
|
||||
|
@ -84,17 +84,17 @@ OpStatus WaitReplicaFlowToCatchup(absl::Time end_time, shared_ptr<DflyCmd::Repli
|
|||
LOG(WARNING) << "Couldn't synchronize with replica for takeover in time: " << replica->address
|
||||
<< ":" << replica->listening_port << ", last acked: " << flow->last_acked_lsn
|
||||
<< ", expecting " << shard->journal()->GetLsn();
|
||||
return OpStatus::TIMED_OUT;
|
||||
return false;
|
||||
}
|
||||
if (replica->cntx.IsCancelled()) {
|
||||
return OpStatus::CANCELLED;
|
||||
return false;
|
||||
}
|
||||
VLOG(1) << "Replica lsn:" << flow->last_acked_lsn
|
||||
<< " master lsn:" << shard->journal()->GetLsn();
|
||||
ThisFiber::SleepFor(1ms);
|
||||
}
|
||||
|
||||
return OpStatus::OK;
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
@ -395,24 +395,31 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
VLOG(1) << "AwaitCurrentDispatches done";
|
||||
|
||||
absl::Cleanup([] {
|
||||
absl::Cleanup cleanup([] {
|
||||
VLOG(2) << "Enabling expiration";
|
||||
shard_set->RunBriefInParallel([](EngineShard* shard) {
|
||||
namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(true);
|
||||
});
|
||||
VLOG(2) << "Enable expiration";
|
||||
});
|
||||
|
||||
atomic_bool catchup_success = true;
|
||||
if (*status == OpStatus::OK) {
|
||||
auto cb = [replica_ptr = std::move(replica_ptr), end_time, &status](EngineShard* shard) {
|
||||
status = WaitReplicaFlowToCatchup(end_time, std::move(replica_ptr), shard);
|
||||
auto cb = [replica_ptr = std::move(replica_ptr), end_time,
|
||||
&catchup_success](EngineShard* shard) {
|
||||
if (!WaitReplicaFlowToCatchup(end_time, replica_ptr.get(), shard)) {
|
||||
catchup_success.store(false);
|
||||
}
|
||||
};
|
||||
shard_set->RunBlockingInParallel(std::move(cb));
|
||||
}
|
||||
|
||||
if (*status != OpStatus::OK) {
|
||||
VLOG(1) << "WaitReplicaFlowToCatchup done";
|
||||
|
||||
if (*status != OpStatus::OK || !catchup_success.load()) {
|
||||
sf_->service().SwitchState(GlobalState::TAKEN_OVER, GlobalState::ACTIVE);
|
||||
return cntx->SendError("Takeover failed!");
|
||||
}
|
||||
|
||||
cntx->SendOk();
|
||||
|
||||
if (save_flag) {
|
||||
|
|
|
@ -96,29 +96,25 @@ TransactionData TransactionData::FromEntry(journal::ParsedEntry&& entry) {
|
|||
|
||||
std::optional<TransactionData> TransactionReader::NextTxData(JournalReader* reader, Context* cntx) {
|
||||
io::Result<journal::ParsedEntry> res;
|
||||
while (true) {
|
||||
if (res = reader->ReadEntry(); !res) {
|
||||
cntx->ReportError(res.error());
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
// When LSN opcode is sent master does not increase journal lsn.
|
||||
if (lsn_.has_value() && res->opcode != journal::Op::LSN) {
|
||||
++*lsn_;
|
||||
VLOG(2) << "read lsn: " << *lsn_;
|
||||
}
|
||||
|
||||
TransactionData tx_data = TransactionData::FromEntry(std::move(res.value()));
|
||||
if (lsn_.has_value() && tx_data.opcode == journal::Op::LSN) {
|
||||
DCHECK_NE(tx_data.lsn, 0u);
|
||||
LOG_IF_EVERY_N(WARNING, tx_data.lsn != *lsn_, 10000)
|
||||
<< "master lsn:" << tx_data.lsn << " replica lsn" << *lsn_;
|
||||
DCHECK_EQ(tx_data.lsn, *lsn_);
|
||||
}
|
||||
return tx_data;
|
||||
if (res = reader->ReadEntry(); !res) {
|
||||
cntx->ReportError(res.error());
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
// When LSN opcode is sent master does not increase journal lsn.
|
||||
if (lsn_.has_value() && res->opcode != journal::Op::LSN) {
|
||||
++*lsn_;
|
||||
VLOG(2) << "read lsn: " << *lsn_;
|
||||
}
|
||||
|
||||
TransactionData tx_data = TransactionData::FromEntry(std::move(res.value()));
|
||||
if (lsn_.has_value() && tx_data.opcode == journal::Op::LSN) {
|
||||
DCHECK_NE(tx_data.lsn, 0u);
|
||||
LOG_IF_EVERY_N(WARNING, tx_data.lsn != *lsn_, 10000)
|
||||
<< "master lsn:" << tx_data.lsn << " replica lsn" << *lsn_;
|
||||
DCHECK_EQ(tx_data.lsn, *lsn_);
|
||||
}
|
||||
return tx_data;
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -1227,7 +1227,7 @@ async def test_take_over_seeder(
|
|||
request, df_factory, df_seeder_factory, master_threads, replica_threads
|
||||
):
|
||||
master = df_factory.create(
|
||||
proactor_threads=master_threads, dbfilename=f"dump_{tmp_file_name()}"
|
||||
proactor_threads=master_threads, dbfilename=f"dump_{tmp_file_name()}", admin_port=ADMIN_PORT
|
||||
)
|
||||
replica = df_factory.create(proactor_threads=replica_threads)
|
||||
df_factory.start_all([master, replica])
|
||||
|
@ -1236,30 +1236,32 @@ async def test_take_over_seeder(
|
|||
|
||||
c_replica = replica.client()
|
||||
|
||||
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||
await c_replica.execute_command(f"REPLICAOF localhost {master.admin_port}")
|
||||
await wait_available_async(c_replica)
|
||||
|
||||
fill_task = asyncio.create_task(seeder.run())
|
||||
|
||||
stop_info = False
|
||||
|
||||
async def info_task():
|
||||
async def info_replication():
|
||||
my_client = replica.client()
|
||||
while not stop_info:
|
||||
info = await my_client.info("replication")
|
||||
asyncio.sleep(0.5)
|
||||
await my_client.info("replication")
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
info_task = asyncio.create_task(info_task())
|
||||
info_task = asyncio.create_task(info_replication())
|
||||
|
||||
# Give the seeder a bit of time.
|
||||
await asyncio.sleep(3)
|
||||
logging.debug("running repltakover")
|
||||
await c_replica.execute_command(f"REPLTAKEOVER 5 SAVE")
|
||||
await c_replica.execute_command(f"REPLTAKEOVER 10 SAVE")
|
||||
logging.debug("after running repltakover")
|
||||
seeder.stop()
|
||||
await fill_task
|
||||
|
||||
assert await c_replica.execute_command("role") == ["master", []]
|
||||
stop_info = True
|
||||
await info_task
|
||||
|
||||
# Need to wait a bit to give time to write the shutdown snapshot
|
||||
await asyncio.sleep(1)
|
||||
|
|
Loading…
Reference in a new issue