1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Implement snapshot consistency under the write load

This commit is contained in:
Roman Gershman 2022-01-25 07:51:15 +02:00
parent a8a05949b0
commit 7ee6bd8471
3 changed files with 83 additions and 20 deletions

View file

@ -298,6 +298,17 @@ class DashTable : public detail::DashTableBase {
return const_bucket_iterator{it.owner_, it.seg_id_, it.bucket_id_, 0};
}
// Capture Version Change. Runs cb(it) on every bucket! (not entry) in the table whose version
// would potentially change upon insertion of 'k'.
// In practice traversal is limited to a single segment. The operation is read-only and
// simulates insertion process. 'cb' must accept const_iterator. Note: the interface a bit hacky
// since the iterator may point to non-existing slot. In practice it only can be sent to
// TraverseBucket function.
// The function returns any bucket that is touched by the insertion flow. In practice, we
// could tighten estimates by taking the new version into account.
// Not sure how important this is though.
template <typename U, typename Cb> void CVCUponInsert(const U& key, Cb&& cb) const;
void Clear();
private:
@ -349,6 +360,32 @@ DashTable<_Key, _Value, Policy>::~DashTable() {
});
}
template <typename _Key, typename _Value, typename Policy>
template <typename U, typename Cb>
void DashTable<_Key, _Value, Policy>::CVCUponInsert(const U& key, Cb&& cb) const {
uint64_t key_hash = DoHash(key);
uint32_t seg_id = SegmentId(key_hash);
assert(seg_id < segment_.size());
const SegmentType* target = segment_[seg_id];
uint8_t bids[2];
unsigned num_touched = target->CVCOnInsert(key_hash, bids);
if (num_touched > 0) {
for (unsigned i = 0; i < num_touched; ++i) {
cb(const_iterator{this, seg_id, bids[i], 0});
}
return;
}
static_assert(kPhysicalBucketNum < 0xFF, "");
// Segment is full, we need to return the whole segment, because it can be split
// and its entries can be reshuffled into different buckets.
for (uint8_t i = 0; i < kPhysicalBucketNum; ++i) {
cb(const_iterator{this, seg_id, i, 0});
}
}
template <typename _Key, typename _Value, typename Policy>
void DashTable<_Key, _Value, Policy>::Clear() {
auto cb = [this](SegmentType* seg) {

View file

@ -33,9 +33,26 @@ SliceSnapshot::~SliceSnapshot() {
void SliceSnapshot::Start(DbSlice* slice) {
DCHECK(!fb_.joinable());
db_slice_ = slice;
snapshot_version_ =
slice->RegisterOnChange([this](DbIndex index, const DbSlice::ChangeReq& req) {});
auto on_change = [this, slice](DbIndex db_index, const DbSlice::ChangeReq& req) {
PrimeTable* table = slice->GetTables(db_index).first;
if (const MainIterator* it = get_if<MainIterator>(&req)) {
if (it->GetVersion() < snapshot_version_) {
side_saved_ += SerializePhysicalBucket(table, *it);
}
} else {
string_view key = get<string_view>(req);
table->CVCUponInsert(key, [this, table](PrimeTable::const_iterator it) {
if (it.MinVersion() < snapshot_version_) {
side_saved_ += SerializePhysicalBucket(table, it);
}
});
}
};
snapshot_version_ = slice->RegisterOnChange(move(on_change));
VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_;
sfile_.reset(new io::StringFile);
@ -53,7 +70,7 @@ void SliceSnapshot::Join() {
static_assert(sizeof(PrimeTable::const_iterator) == 16);
void SliceSnapshot::SerializeCb(MainIterator it) {
void SliceSnapshot::SerializeSingleEntry(MainIterator it) {
error_code ec;
string tmp;
@ -163,23 +180,28 @@ bool SliceSnapshot::SaveCb(MainIterator it) {
}
physical_mask_.set(it.bucket_id());
// Both traversals below execute atomically.
// traverse physical bucket and write into string file.
prime_table_->TraverseBucket(it, [this](auto entry_it) { this->SerializeCb(move(entry_it)); });
// Theoretically we could merge version_cb into the traversal above but then would would need
// to give up on DCHECK.
auto version_cb = [this](MainIterator entry_it) {
DCHECK_LE(entry_it.GetVersion(), snapshot_version_);
DVLOG(3) << "Bumping up version " << entry_it.bucket_id() << ":" << entry_it.slot_id();
entry_it.SetVersion(snapshot_version_);
};
prime_table_->TraverseBucket(it, version_cb);
SerializePhysicalBucket(prime_table_, it);
return false;
}
unsigned SliceSnapshot::SerializePhysicalBucket(PrimeTable* table, PrimeTable::const_iterator it) {
// Both traversals below execute atomically.
// traverse physical bucket and write into string file.
unsigned result = 0;
table->TraverseBucket(it, [&](auto entry_it) {
++result;
SerializeSingleEntry(move(entry_it));
});
table->TraverseBucket(it, [this](MainIterator entry_it) {
DCHECK_LE(entry_it.GetVersion(), snapshot_version_);
DVLOG(3) << "Bumping up version " << entry_it.bucket_id() << ":" << entry_it.slot_id();
entry_it.SetVersion(snapshot_version_);
});
return result;
}
} // namespace dfly

View file

@ -33,9 +33,12 @@ class SliceSnapshot {
private:
void FiberFunc();
bool FlushSfile(bool force);
void SerializeCb(MainIterator it);
void SerializeSingleEntry(MainIterator it);
bool SaveCb(MainIterator it);
// Returns number of entries serialized.
unsigned SerializePhysicalBucket(PrimeTable* table, PrimeTable::const_iterator it);
::boost::fibers::fiber fb_;
enum {PHYSICAL_LEN = 128};
@ -47,9 +50,10 @@ class SliceSnapshot {
// version upper bound for entries that should be saved (not included).
uint64_t snapshot_version_ = 0;
PrimeTable* prime_table_;
DbSlice* db_slice_ = nullptr;
StringChannel* dest_;
size_t serialized_ = 0, skipped_ = 0;
size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0;
};
} // namespace dfly