1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore(replication-tests): add cache_mode on test replication all (#2685)

* add cache_mode cases on test_replication_all
* fix CVCOnBumpUp to not skip some of the modified buckets
This commit is contained in:
Kostas Kyrimis 2024-03-27 14:28:52 +02:00 committed by GitHub
parent 6e32139ada
commit cd20c4003d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 50 additions and 45 deletions

View file

@ -534,7 +534,7 @@ template <typename _Key, typename _Value, typename Policy = DefaultSegmentPolicy
// Can return upto 3 buckets.
template <bool UV = Policy::USE_VERSION>
std::enable_if_t<UV, unsigned> CVCOnBump(uint64_t ver_threshold, unsigned bid, unsigned slot,
Hash_t hash, uint8_t result_bid[2]) const;
Hash_t hash, uint8_t result_bid[3]) const;
// Finds a valid entry going from specified indices up.
Iterator FindValidStartingFrom(unsigned bid, unsigned slot) const;
@ -1506,46 +1506,41 @@ template <bool UV>
std::enable_if_t<UV, unsigned> Segment<Key, Value, Policy>::CVCOnBump(uint64_t ver_threshold,
unsigned bid, unsigned slot,
Hash_t hash,
uint8_t result_bid[2]) const {
uint8_t result_bid[3]) const {
if (bid < kRegularBucketCnt) {
// right now we do not migrate entries from nid to bid, only from stash to normal buckets.
// Right now we do not migrate entries from nid to bid, only from stash to normal buckets.
// The reason for this is that CVCBumpUp implementation swaps the slots of the same bucket
// so there is no further action needed.
return 0;
}
// Stash case.
if (bucket_[bid].GetVersion() < ver_threshold) {
// if we move an entry from a bucket that has lower version
// then the destination bucket whatever it is - won't cross ver_threshold,
// but we may need to save that entry.
result_bid[0] = bid;
return 1;
}
// At this point we know that the source (stash) bucket has version >= ver_threshold.
// There are three actors (interesting buckets). The stash bucket, the target bucket and its
// adjacent bucket (probe). To understand the code below consider the cases in CVCBumpUp:
// 1. If the bid is not a stash bucket, then just swap the slots of the target.
// 2. If there is empty space in target or probe bucket insert the slot there and remove
// it from the stash bucket.
// 3. If there is no empty space then we need to swap slots with either the target or the probe
// bucket. Furthermore, if the target or the probe have one of their stash bits reference the
// stash, then the stash bit entry is cleared. In total 2 buckets are modified.
// Case 1 is handled by the if statement above and cases 2 and 3 below. We should return via
// result_bid all the buckets(with version less than threshold) that CVCBumpUp will modify.
// Note, that for case 2 & 3 we might return an extra bucket id even though this bucket was not
// changed. An example of that is TryMoveFromStash which will first try to insert on the target
// bucket and if that fails it will retry with the probe bucket. Since we don't really know
// which of the two we insert to we are pesimistic and assume that both of them got modified. I
// suspect we could optimize this out by looking at the fingerprints but for now I care about
// correctness and returning the correct modified buckets. Besides, we are on a path of updating
// the version anyway which will assert that the bucket won't be send again during snapshotting.
unsigned result = 0;
if (bucket_[bid].GetVersion() < ver_threshold) {
result_bid[result++] = bid;
}
const uint8_t target_bid = BucketIndex(hash);
const auto& target = bucket_[target_bid];
if (!target.IsFull()) {
if (target.GetVersion() < ver_threshold)
result_bid[result++] = target_bid;
return result;
}
const uint8_t probing_bid = NextBid(target_bid);
const auto& probing = bucket_[probing_bid];
if (!probing.IsFull()) {
if (probing.GetVersion() < ver_threshold)
result_bid[result++] = probing_bid;
return result;
}
assert(result == 0);
unsigned stash_pos = bid - kRegularBucketCnt;
uint8_t fp_hash = hash & kFpMask;
@ -1706,6 +1701,7 @@ auto Segment<Key, Value, Policy>::BumpUp(uint8_t bid, SlotId slot, Hash_t key_ha
// bucket_offs - 0 if exact bucket, 1 if neighbour
unsigned bucket_offs = target.UnsetStashPtr(fp_hash, stash_pos, &next);
uint8_t swap_bid = (target_bid + bucket_offs) % kRegularBucketCnt;
// TODO exit early when target_bid == swap_bid
auto& swapb = bucket_[swap_bid];
constexpr unsigned kLastSlot = kNumSlots - 1;

View file

@ -368,8 +368,9 @@ TEST_F(DashTest, BumpUp) {
key = segment_.Key(kFirstStashId, 5);
hash = dt_.DoHash(key);
EXPECT_EQ(1, segment_.CVCOnBump(1, kFirstStashId, 5, hash, touched_bid));
EXPECT_EQ(touched_bid[0], 1);
EXPECT_EQ(2, segment_.CVCOnBump(1, kFirstStashId, 5, hash, touched_bid));
EXPECT_EQ(touched_bid[0], 0);
EXPECT_EQ(touched_bid[1], 1);
// Bump up
segment_.BumpUp(kFirstStashId, 5, hash, RelaxedBumpPolicy{});
@ -384,8 +385,13 @@ TEST_F(DashTest, BumpUp) {
key = segment_.Key(kSecondStashId, 9);
hash = dt_.DoHash(key);
EXPECT_EQ(1, segment_.CVCOnBump(2, kSecondStashId, 9, hash, touched_bid));
EXPECT_EQ(2, segment_.CVCOnBump(2, kSecondStashId, 9, hash, touched_bid));
EXPECT_EQ(touched_bid[0], kSecondStashId);
// There used to be a bug here because two bucket slots are swapped within BumpUp
// and both of them have version < version_threshold. If we don't return them both
// then full sync phase during replication might fail to capture the changes leading
// to data inconsistencies between master and replica.
EXPECT_EQ(touched_bid[1], 0);
segment_.BumpUp(kSecondStashId, 9, hash, RelaxedBumpPolicy{});
ASSERT_TRUE(key == segment_.Key(0, kNumSlots - 1) || key == segment_.Key(1, kNumSlots - 1));

View file

@ -53,14 +53,17 @@ Test full replication pipeline. Test full sync with streaming changes and stable
pytest.param(8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, marks=M_STRESS),
],
)
@pytest.mark.parametrize("mode", [({}), ({"cache_mode": "true"})])
async def test_replication_all(
df_local_factory: DflyInstanceFactory,
t_master,
t_replicas,
seeder_config,
stream_target,
df_local_factory: DflyInstanceFactory, t_master, t_replicas, seeder_config, stream_target, mode
):
master = df_local_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master)
if seeder_config["key_target"] == 1_000_000:
pytest.skip()
if mode:
mode["maxmemory"] = str(t_master * 256) + "mb"
master = df_local_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master, **mode)
replicas = [
df_local_factory.create(admin_port=ADMIN_PORT + i + 1, proactor_threads=t)
for i, t in enumerate(t_replicas)
@ -99,17 +102,17 @@ async def test_replication_all(
await stream_task
# Check data after full sync
async def check():
await check_all_replicas_finished(c_replicas, c_master)
hashes = await asyncio.gather(*(SeederV2.capture(c) for c in [c_master] + c_replicas))
assert len(set(hashes)) == 1
await check()
# Stream more data in stable state
await seeder.run(c_master, target_ops=stream_target)
# Check data after stable state stream
await check_all_replicas_finished(c_replicas, c_master)
hashes = await asyncio.gather(*(SeederV2.capture(c) for c in [c_master] + c_replicas))
assert len(set(hashes)) == 1
await check()
await disconnect_clients(c_master, *c_replicas)