1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Introduce snapshot isolation using versioned buckets

This commit is contained in:
Roman Gershman 2022-01-22 21:05:49 +02:00
parent 8d19a6211d
commit ce46ba7cb1
5 changed files with 85 additions and 14 deletions

View file

@ -549,12 +549,17 @@ template <typename _Key, typename _Value, typename Policy>
template <typename Cb>
uint64_t DashTable<_Key, _Value, Policy>::Traverse(uint64_t cursor, Cb&& cb) {
unsigned bid = (cursor >> 8) & 0xFF;
if (bid >= kLogicalBucketNum) // sanity.
return 0;
uint32_t sid = SegmentId(cursor);
auto hash_fun = [this](const auto& k) { return policy_.HashFn(k); };
bool fetched = false;
while (!fetched && bid < kLogicalBucketNum) {
// We fix bid and go over all segments. Once we reach the end we increase bid and repeat.
do {
SegmentType& s = *segment_[sid];
auto dt_cb = [&](const SegmentIterator& it) { cb(iterator{this, sid, it.index, it.slot}); };
@ -563,10 +568,13 @@ uint64_t DashTable<_Key, _Value, Policy>::Traverse(uint64_t cursor, Cb&& cb) {
if (sid >= segment_.size()) {
sid = 0;
++bid;
}
}
return bid >= kLogicalBucketNum ? 0 : (uint64_t(sid) << (64 - global_depth_)) | (bid << 8);
if (bid >= kLogicalBucketNum)
return 0; // "End of traversal" cursor.
}
} while (!fetched);
return (uint64_t(sid) << (64 - global_depth_)) | (bid << 8);
}
template <typename _Key, typename _Value, typename Policy>

View file

@ -1164,7 +1164,7 @@ bool Segment<Key, Value, Policy>::TraverseLogicalBucket(uint8_t bid, HashFn&& hf
const Bucket& b = bucket_[bid];
bool found = false;
if (b.GetProbe(false)) {
if (b.GetProbe(false)) { // Check items that this bucket owns.
b.ForEachSlot([&](SlotId slot, bool probe) {
if (!probe) {
found = true;
@ -1175,6 +1175,7 @@ bool Segment<Key, Value, Policy>::TraverseLogicalBucket(uint8_t bid, HashFn&& hf
uint8_t nid = (bid + 1) % kNumBuckets;
const Bucket& next = bucket_[nid];
// check for probing entries in the next bucket, i.e. those that should reside in b.
if (next.GetProbe(true)) {
next.ForEachSlot([&](SlotId slot, bool probe) {
if (probe) {
@ -1185,6 +1186,7 @@ bool Segment<Key, Value, Policy>::TraverseLogicalBucket(uint8_t bid, HashFn&& hf
});
}
// Finally go over stash buckets and find those entries that belong to b.
if (b.HasStash()) {
// do not bother with overflow fps. Just go over all the stash buckets.
for (uint8_t j = kNumBuckets; j < kTotalBuckets; ++j) {

View file

@ -421,7 +421,7 @@ void RdbSaver::StartSnapshotInShard(EngineShard* shard) {
auto pair = shard->db_slice().GetTables(0);
auto s = make_unique<RdbSnapshot>(pair.first, pair.second, &impl_->channel);
s->Start();
s->Start(666); // TODO: to introduce slice versioning.
impl_->handles[shard->shard_id()] = move(s);
}

View file

@ -27,13 +27,14 @@ RdbSnapshot::RdbSnapshot(PrimeTable* prime, ExpireTable* et, StringChannel* dest
RdbSnapshot::~RdbSnapshot() {
}
void RdbSnapshot::Start() {
void RdbSnapshot::Start(uint64_t version) {
DCHECK(!fb_.joinable());
VLOG(1) << "DbSaver::Start";
sfile_.reset(new io::StringFile);
rdb_serializer_.reset(new RdbSerializer(sfile_.get()));
snapshot_version_ = version;
fb_ = fibers::fiber([this] { FiberFunc(); });
}
@ -60,21 +61,69 @@ void RdbSnapshot::PhysicalCb(MainIterator it) {
++processed_;
}
// Serializes all the entries with version less than top_version.
// Serializes all the entries with version less than snapshot_version_.
void RdbSnapshot::FiberFunc() {
this_fiber::properties<FiberProps>().set_name("RdbSnapshot");
VLOG(1) << "Saving entries with version less than " << snapshot_version_;
uint64_t cursor = 0;
uint64_t skipped = 0;
bitset<128> physical;
vector<MainIterator> physical_list;
// it's important that cb will run uninterrupted.
// so no I/O work inside it.
// We flush our string file to disk in the traverse loop below.
auto save_cb = [&](const MainIterator& it) {
this->PhysicalCb(it);
static_assert(physical.size() > PrimeTable::kPhysicalBucketNum);
// The algorithm is to go over all the buckets and serialize entries that
// have version < snapshot_version_. In order to serialize each entry exactly once we update its
// version to snapshot_version_ once it has been serialized.
// Due to how bucket versions work we can not update individual entries - they may affect their
// neighbours in the bucket. Instead we handle serialization at physical bucket granularity.
// To further complicate things, Table::Traverse covers a logical bucket that may comprise of
// several physical buckets. The reason for this complication is that we need to guarantee
// a stable traversal during prime table mutations. PrimeTable::Traverse guarantees an atomic
// traversal of a single logical bucket, it also guarantees 100% coverage of all items
// that existed when the traversal started and survived until it finished.
//
// It's important that cb will run atomically so we avoid anu I/O work inside it.
// Instead, we flush our string file to disk in the traverse loop below.
auto save_cb = [&](MainIterator it) {
uint64_t v = it.GetVersion();
if (v >= snapshot_version_) {
// either has been already serialized or added after snapshotting started.
DVLOG(2) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id()
<< " at " << v;
++skipped;
return false;
}
// if we touched that physical bucket - skip it.
// If DashTable interface would introduce TraversePhysicalBuckets - where it
// goes over each bucket once - we would not need to check for uniqueness here .
// But right now we must to make sure we TraverseBucket exactly once for each physical
// bucket.
if (physical.test(it.bucket_id())) {
return false;
}
physical.set(it.bucket_id());
physical_list.push_back(it);
// traverse physical bucket and write into string file.
// TODO: I think we can avoid using physical_list by calling here
// prime_table_->TraverseBucket(it, version_cb);
prime_table_->TraverseBucket(it, [this](auto&& it) { this->PhysicalCb(it); });
return false;
};
auto version_cb = [&](MainIterator it) {
DCHECK_LE(it.GetVersion(), snapshot_version_);
DVLOG(2) << "Bumping up version " << it.bucket_id() << ":" << it.slot_id();
it.SetVersion(snapshot_version_);
};
uint64_t last_yield = 0;
do {
DVLOG(2) << "traverse cusrsor " << cursor;
@ -84,7 +133,13 @@ void RdbSnapshot::FiberFunc() {
// Therefore we save first, and then update version in one atomic swipe.
uint64_t next = prime_table_->Traverse(cursor, save_cb);
// Traverse physical buckets that were touched and update their version.
for (auto it : physical_list) {
prime_table_->TraverseBucket(it, version_cb);
}
cursor = next;
physical.reset();
physical_list.clear();
// Flush if needed.
FlushSfile();

View file

@ -20,9 +20,13 @@ class RdbSnapshot {
RdbSnapshot(PrimeTable* prime, ExpireTable* et, StringChannel* dest);
~RdbSnapshot();
void Start();
void Start(uint64_t version);
void Join();
uint64_t snapshot_version() const {
return snapshot_version_;
}
private:
void FiberFunc();
void FlushSfile();
@ -33,6 +37,8 @@ class RdbSnapshot {
std::unique_ptr<io::StringFile> sfile_;
std::unique_ptr<RdbSerializer> rdb_serializer_;
// version upper bound for entries that should be saved (not included).
uint64_t snapshot_version_ = 0;
PrimeTable* prime_table_;
StringChannel* dest_;