mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
bug(snapshot): Fix unwriten entries in multiple snapshotting
* bug(snapshot): Multiple snapshots at the same time skips serializing some entries --------- Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
1f82f9af73
commit
d738d8d976
3 changed files with 20 additions and 0 deletions
|
@ -852,6 +852,22 @@ uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
|
|||
return ver;
|
||||
}
|
||||
|
||||
void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, PrimeIterator it,
|
||||
uint64_t upper_bound) {
|
||||
uint64_t bucket_version = it.GetVersion();
|
||||
// change_cb_ is ordered by vesion.
|
||||
for (const auto& ccb : change_cb_) {
|
||||
uint64_t cb_vesrion = ccb.first;
|
||||
DCHECK_LE(cb_vesrion, upper_bound);
|
||||
if (cb_vesrion == upper_bound) {
|
||||
return;
|
||||
}
|
||||
if (bucket_version < cb_vesrion) {
|
||||
ccb.second(db_ind, ChangeReq{it});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//! Unregisters the callback.
|
||||
void DbSlice::UnregisterOnChange(uint64_t id) {
|
||||
for (auto it = change_cb_.begin(); it != change_cb_.end(); ++it) {
|
||||
|
|
|
@ -278,6 +278,9 @@ class DbSlice {
|
|||
//! at a time of the call.
|
||||
uint64_t RegisterOnChange(ChangeCallback cb);
|
||||
|
||||
// Call registered callbacks with vesrion less than upper_bound.
|
||||
void FlushChangeToEarlierCallbacks(DbIndex db_ind, PrimeIterator it, uint64_t upper_bound);
|
||||
|
||||
//! Unregisters the callback.
|
||||
void UnregisterOnChange(uint64_t id);
|
||||
|
||||
|
|
|
@ -178,6 +178,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeIterator it) {
|
|||
++stats_.skipped;
|
||||
return false;
|
||||
}
|
||||
db_slice_->FlushChangeToEarlierCallbacks(current_db_, it, snapshot_version_);
|
||||
|
||||
stats_.loop_serialized += SerializeBucket(current_db_, it);
|
||||
return false;
|
||||
|
|
Loading…
Reference in a new issue