mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
fix(server): fix bug in replication on cached mode (#3156)
* fix server: fix replication on cached mode Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
664cba9e10
commit
805c024fc2
3 changed files with 7 additions and 33 deletions
|
@ -1544,31 +1544,9 @@ std::enable_if_t<UV, unsigned> Segment<Key, Value, Policy>::CVCOnBump(uint64_t v
|
|||
result_bid[result++] = bid;
|
||||
}
|
||||
const uint8_t target_bid = BucketIndex(hash);
|
||||
const auto& target = bucket_[target_bid];
|
||||
result_bid[result++] = target_bid;
|
||||
const uint8_t probing_bid = NextBid(target_bid);
|
||||
const auto& probing = bucket_[probing_bid];
|
||||
|
||||
unsigned stash_pos = bid - kBucketNum;
|
||||
uint8_t fp_hash = hash & kFpMask;
|
||||
|
||||
auto find_stash = [&](unsigned, unsigned pos) {
|
||||
return stash_pos == pos ? slot : BucketType::kNanSlot;
|
||||
};
|
||||
|
||||
auto do_fun = [&result, &result_bid, fp_hash, find_stash](auto& target, auto target_bid,
|
||||
bool probe) {
|
||||
if (target.HasStashOverflow()) {
|
||||
result_bid[result++] = target_bid;
|
||||
} else {
|
||||
SlotId slot_id = target.IterateStash(fp_hash, probe, find_stash).second;
|
||||
if (slot_id != BucketType::kNanSlot) {
|
||||
result_bid[result++] = target_bid;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
do_fun(target, target_bid, false);
|
||||
do_fun(probing, probing_bid, true);
|
||||
result_bid[result++] = probing_bid;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -385,13 +385,13 @@ TEST_F(DashTest, BumpUp) {
|
|||
key = segment_.Key(kSecondStashId, 9);
|
||||
hash = dt_.DoHash(key);
|
||||
|
||||
EXPECT_EQ(2, segment_.CVCOnBump(2, kSecondStashId, 9, hash, touched_bid));
|
||||
EXPECT_EQ(3, segment_.CVCOnBump(2, kSecondStashId, 9, hash, touched_bid));
|
||||
EXPECT_EQ(touched_bid[0], kSecondStashId);
|
||||
// There used to be a bug here because two bucket slots are swapped within BumpUp
|
||||
// and both of them have version < version_threshold. If we don't return them both
|
||||
// then full sync phase during replication might fail to capture the changes leading
|
||||
// to data inconsistencies between master and replica.
|
||||
// Bumpup will move the key to either its original bucket or a probing bucket.
|
||||
// Since we can't determine the exact bucket before calling bumpup, CVCOnBump
|
||||
// returns both the original bucket and the probing bucket.
|
||||
EXPECT_EQ(touched_bid[1], 0);
|
||||
EXPECT_EQ(touched_bid[2], 1);
|
||||
|
||||
segment_.BumpUp(kSecondStashId, 9, hash, RelaxedBumpPolicy{});
|
||||
ASSERT_TRUE(key == segment_.Key(0, kSlotNum - 1) || key == segment_.Key(1, kSlotNum - 1));
|
||||
|
|
|
@ -57,10 +57,6 @@ Test full replication pipeline. Test full sync with streaming changes and stable
|
|||
async def test_replication_all(
|
||||
df_local_factory: DflyInstanceFactory, t_master, t_replicas, seeder_config, stream_target, mode
|
||||
):
|
||||
# Temporary disable the test until it passes reliably with cache mode.
|
||||
if mode:
|
||||
pytest.skip()
|
||||
|
||||
if mode:
|
||||
mode["maxmemory"] = str(t_master * 256) + "mb"
|
||||
|
||||
|
|
Loading…
Reference in a new issue