From 54c9633cb8cd9999aa0870afdcff290b29be1486 Mon Sep 17 00:00:00 2001 From: Shahar Mike Date: Tue, 9 Apr 2024 12:00:52 +0300 Subject: [PATCH] feat(dbslice): Add self-laundering iterator in `DbSlice` (#2815) A self-laundering iterator will enable us to, eventually, yield from fibers while holding an iterator. For example: ```cpp auto it1 = db_slice.Find(...); Yield(); // Until now - this could have invalidated `it1` auto it2 = db_slice.Find(...); ``` Why is this a good idea? Because it will enable yielding inside PreUpdate() which will allow breaking down of writing huge entries in small quantities to disk/network, eliminating the need to allocate huge chunks of memory just for serialization. Also, it'll probably unlock future developments as well, as yielding can be useful in other contexts. --- src/core/dash.h | 6 +- src/core/string_or_view.h | 72 ++++++++++++ src/server/bitops_family.cc | 11 +- src/server/db_slice.cc | 198 ++++++++++++++++----------------- src/server/db_slice.h | 178 +++++++++++++++++++++++------ src/server/detail/table.h | 8 ++ src/server/engine_shard_set.cc | 2 +- src/server/generic_family.cc | 10 +- src/server/hll_family.cc | 5 +- src/server/hset_family.cc | 7 +- src/server/json_family.cc | 11 +- src/server/list_family.cc | 15 ++- src/server/set_family.cc | 36 +++--- src/server/snapshot.cc | 3 +- src/server/stream_family.cc | 14 +-- src/server/string_family.cc | 26 ++--- src/server/string_family.h | 3 +- src/server/table.cc | 15 +-- src/server/table.h | 22 +--- src/server/tiered_storage.cc | 2 +- src/server/zset_family.cc | 38 +++---- 21 files changed, 409 insertions(+), 273 deletions(-) create mode 100644 src/core/string_or_view.h diff --git a/src/core/dash.h b/src/core/dash.h index 7dd94873e..3c34b16e5 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -408,6 +408,10 @@ class DashTable<_Key, _Value, Policy>::Iterator { ((owner_->segment_[seg_id_]->IsBusy(bucket_id_, slot_id_))); } + Owner& owner() const { + return *owner_; + } + template std::enable_if_t GetVersion() const { assert(owner_ && seg_id_ < owner_->segment_.size()); return owner_->segment_[seg_id_]->GetVersion(bucket_id_); @@ -664,7 +668,7 @@ template template auto DashTable<_Key, _Value, Policy>::Find(U&& key) const -> const_iterator { uint64_t key_hash = DoHash(key); - size_t seg_id = SegmentId(key_hash); // seg_id takes up global_depth_ high bits. + uint32_t seg_id = SegmentId(key_hash); // seg_id takes up global_depth_ high bits. const auto* target = segment_[seg_id]; // Hash structure is like this: [SSUUUUBF], where S is segment id, U - unused, diff --git a/src/core/string_or_view.h b/src/core/string_or_view.h new file mode 100644 index 000000000..a12406344 --- /dev/null +++ b/src/core/string_or_view.h @@ -0,0 +1,72 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include +#include +#include + +namespace dfly { + +class StringOrView { + public: + static StringOrView FromString(std::string s) { + StringOrView sov; + sov.val_ = std::move(s); + return sov; + } + + static StringOrView FromView(std::string_view sv) { + StringOrView sov; + sov.val_ = sv; + return sov; + } + + StringOrView() = default; + StringOrView(const StringOrView& o) = default; + StringOrView(StringOrView&& o) = default; + StringOrView& operator=(const StringOrView& o) = default; + StringOrView& operator=(StringOrView&& o) = default; + + bool operator==(const StringOrView& o) const { + return *this == o.view(); + } + + bool operator==(std::string_view o) const { + return view() == o; + } + + bool operator!=(const StringOrView& o) const { + return *this != o.view(); + } + + bool operator!=(std::string_view o) const { + return !(*this == o); + } + + std::string_view view() const { + return visit([](const auto& s) -> std::string_view { return s; }, val_); + } + + friend std::ostream& operator<<(std::ostream& o, const StringOrView& key) { + return o << key.view(); + } + + // Make hashable + template friend H AbslHashValue(H h, const StringOrView& c) { + return H::combine(std::move(h), c.view()); + } + + // If the key is backed by a string_view, replace it with a string with the same value + void MakeOwned() { + if (std::holds_alternative(val_)) + val_ = std::string{std::get(val_)}; + } + + private: + std::variant val_; +}; + +} // namespace dfly diff --git a/src/server/bitops_family.cc b/src/server/bitops_family.cc index f08b6a6d5..cd7d1fda7 100644 --- a/src/server/bitops_family.cc +++ b/src/server/bitops_family.cc @@ -279,7 +279,7 @@ bool SetBitValue(uint32_t offset, bool bit_value, std::string* entry) { class ElementAccess { bool added_ = false; - PrimeIterator element_iter_; + DbSlice::Iterator element_iter_; std::string_view key_; DbContext context_; EngineShard* shard_ = nullptr; @@ -450,8 +450,7 @@ OpResult RunBitOpNot(const OpArgs& op_args, ArgSlice keys) { EngineShard* es = op_args.shard; // if we found the value, just return, if not found then skip, otherwise report an error auto key = keys.front(); - OpResult find_res = - es->db_slice().FindAndFetchReadOnly(op_args.db_cntx, key, OBJ_STRING); + auto find_res = es->db_slice().FindAndFetchReadOnly(op_args.db_cntx, key, OBJ_STRING); if (find_res) { return GetString(find_res.value()->second); } else { @@ -472,8 +471,7 @@ OpResult RunBitOpOnShard(std::string_view op, const OpArgs& op_args // collect all the value for this shard for (auto& key : keys) { - OpResult find_res = - es->db_slice().FindAndFetchReadOnly(op_args.db_cntx, key, OBJ_STRING); + auto find_res = es->db_slice().FindAndFetchReadOnly(op_args.db_cntx, key, OBJ_STRING); if (find_res) { values.emplace_back(GetString(find_res.value()->second)); } else { @@ -1250,8 +1248,7 @@ OpResult ReadValueBitsetAt(const OpArgs& op_args, std::string_view key, ui OpResult ReadValue(const DbContext& context, std::string_view key, EngineShard* shard) { - OpResult it_res = - shard->db_slice().FindAndFetchReadOnly(context, key, OBJ_STRING); + auto it_res = shard->db_slice().FindAndFetchReadOnly(context, key, OBJ_STRING); if (!it_res.ok()) { return it_res.status(); } diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 8305d632b..b061e5946 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -157,14 +157,17 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e // based on tests - it's more efficient to pass regular buckets to gc. // stash buckets are filled last so much smaller change they have expired items. + string scratch; unsigned num_buckets = std::min(PrimeTable::HotspotBuckets::kRegularBuckets, eb.num_buckets); for (unsigned i = 0; i < num_buckets; ++i) { auto bucket_it = eb.at(i); for (; !bucket_it.is_done(); ++bucket_it) { if (bucket_it->second.HasExpire()) { + string_view key = bucket_it->first.GetSlice(&scratch); ++checked_; - auto [prime_it, exp_it] = db_slice_->ExpireIfNeeded(cntx_, bucket_it); + auto [prime_it, exp_it] = db_slice_->ExpireIfNeeded( + cntx_, DbSlice::Iterator(bucket_it, StringOrView::FromView(key))); if (prime_it.is_done()) ++res; } @@ -192,8 +195,8 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT DbTable* table = db_slice_->GetDBTable(cntx_.db_index); auto& lt = table->trans_locks; - string tmp; - string_view key = last_slot_it->first.GetSlice(&tmp); + string scratch; + string_view key = last_slot_it->first.GetSlice(&scratch); // do not evict locked keys if (lt.Find(KeyLockArgs::GetLockKey(key)).has_value()) return 0; @@ -205,7 +208,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT make_pair("DEL", delete_args), false); } - db_slice_->PerformDeletion(last_slot_it, table); + db_slice_->PerformDeletion(DbSlice::Iterator(last_slot_it, StringOrView::FromView(key)), table); ++evicted_; } @@ -338,26 +341,11 @@ void DbSlice::AutoUpdater::Run() { // Check that AutoUpdater does not run after a key was removed. // If this CHECK() failed for you, it probably means that you deleted a key while having an auto // updater in scope. You'll probably want to call Run() (or Cancel() - but be careful). - DCHECK(IsValid(fields_.db_slice->db_arr_[fields_.db_ind]->prime.Find(fields_.key))) - << "Key was removed before PostUpdate() - this is a bug!"; + DCHECK(IsValid(fields_.db_slice->db_arr_[fields_.db_ind]->prime.Find(fields_.key))); DCHECK(fields_.action == DestructorAction::kRun); CHECK_NE(fields_.db_slice, nullptr); - if (shard_set->IsTieringEnabled()) { - // When triering is enabled we can preempt on write to disk, therefore it can be invalidated - // until we run the post updated. - fields_.it = fields_.db_slice->db_arr_[fields_.db_ind]->Launder(fields_.it, fields_.key); - } else { - // Make sure that the DB has not changed in size since this object was created. - // Adding or removing elements from the DB may invalidate iterators. - CHECK_EQ(fields_.db_size, fields_.db_slice->DbSize(fields_.db_ind)) - << "Attempting to run post-update after DB was modified"; - - CHECK_EQ(fields_.deletion_count, fields_.db_slice->deletion_count_) - << "Attempting to run post-update after a deletion was issued"; - } - fields_.db_slice->PostUpdate(fields_.db_ind, fields_.it, fields_.key, fields_.orig_heap_size); Cancel(); // Reset to not run again } @@ -369,8 +357,6 @@ void DbSlice::AutoUpdater::Cancel() { DbSlice::AutoUpdater::AutoUpdater(const Fields& fields) : fields_(fields) { DCHECK(fields_.action == DestructorAction::kRun); DCHECK(IsValid(fields.it)); - fields_.db_size = fields_.db_slice->DbSize(fields_.db_ind); - fields_.deletion_count = fields_.db_slice->deletion_count_; fields_.orig_heap_size = fields.it->second.MallocUsed(); } @@ -409,14 +395,16 @@ OpResult DbSlice::FindMutableInternal(const Context& cntx return res.status(); } - PreUpdate(cntx.db_index, res->it); + auto it = Iterator(res->it, StringOrView::FromView(key)); + auto exp_it = ExpIterator(res->exp_it, StringOrView::FromView(key)); + PreUpdate(cntx.db_index, it); // PreUpdate() might have caused a deletion of `it` if (res->it.IsOccupied()) { - return {{res->it, res->exp_it, + return {{it, exp_it, AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, .db_slice = this, .db_ind = cntx.db_index, - .it = res->it, + .it = it, .key = key})}}; } else { return OpStatus::KEY_NOTFOUND; @@ -426,39 +414,40 @@ OpResult DbSlice::FindMutableInternal(const Context& cntx DbSlice::ItAndExpConst DbSlice::FindReadOnly(const Context& cntx, std::string_view key) { auto res = FindInternal(cntx, key, std::nullopt, UpdateStatsMode::kReadStats, LoadExternalMode::kDontLoad); - return {res->it, res->exp_it}; + return {ConstIterator(res->it, StringOrView::FromView(key)), + ExpConstIterator(res->exp_it, StringOrView::FromView(key))}; } -OpResult DbSlice::FindReadOnly(const Context& cntx, string_view key, - unsigned req_obj_type) { +OpResult DbSlice::FindReadOnly(const Context& cntx, string_view key, + unsigned req_obj_type) { auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kReadStats, LoadExternalMode::kDontLoad); if (res.ok()) { - return {res->it}; + return ConstIterator(res->it, StringOrView::FromView(key)); } return res.status(); } -OpResult DbSlice::FindAndFetchReadOnly(const Context& cntx, - std::string_view key, - unsigned req_obj_type) { +OpResult DbSlice::FindAndFetchReadOnly(const Context& cntx, + std::string_view key, + unsigned req_obj_type) { auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kReadStats, LoadExternalMode::kLoad); if (res.ok()) { - return {res->it}; + return ConstIterator(res->it, StringOrView::FromView(key)); } return res.status(); } -OpResult DbSlice::FindInternal(const Context& cntx, std::string_view key, - std::optional req_obj_type, - UpdateStatsMode stats_mode, - LoadExternalMode load_mode) { +OpResult DbSlice::FindInternal(const Context& cntx, std::string_view key, + std::optional req_obj_type, + UpdateStatsMode stats_mode, + LoadExternalMode load_mode) { if (!IsDbValid(cntx.db_index)) { return OpStatus::KEY_NOTFOUND; } - DbSlice::ItAndExp res; + DbSlice::PrimeItAndExp res; auto& db = *db_arr_[cntx.db_index]; res.it = db.prime.Find(key); @@ -547,14 +536,14 @@ OpResult DbSlice::FindInternal(const Context& cntx, std::stri return res; } -OpResult> DbSlice::FindFirstReadOnly(const Context& cntx, - ArgSlice args, - int req_obj_type) { +OpResult> DbSlice::FindFirstReadOnly(const Context& cntx, + ArgSlice args, + int req_obj_type) { DCHECK(!args.empty()); for (unsigned i = 0; i < args.size(); ++i) { string_view s = args[i]; - OpResult res = FindReadOnly(cntx, s, req_obj_type); + OpResult res = FindReadOnly(cntx, s, req_obj_type); if (res) return make_pair(res.value(), i); if (res.status() != OpStatus::KEY_NOTFOUND) @@ -582,17 +571,19 @@ OpResult DbSlice::AddOrFindInternal(const Context& cnt auto res = FindInternal(cntx, key, std::nullopt, UpdateStatsMode::kMutableStats, load_mode); if (res.ok()) { - PreUpdate(cntx.db_index, res->it); + Iterator it(res->it, StringOrView::FromView(key)); + ExpIterator exp_it(res->exp_it, StringOrView::FromView(key)); + PreUpdate(cntx.db_index, it); // PreUpdate() might have caused a deletion of `it` if (res->it.IsOccupied()) { return DbSlice::AddOrFindResult{ - .it = res->it, - .exp_it = res->exp_it, + .it = it, + .exp_it = exp_it, .is_new = false, .post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, .db_slice = this, .db_ind = cntx.db_index, - .it = res->it, + .it = it, .key = key})}; } else { res = OpStatus::KEY_NOTFOUND; @@ -676,13 +667,13 @@ OpResult DbSlice::AddOrFindInternal(const Context& cnt } return DbSlice::AddOrFindResult{ - .it = it, - .exp_it = ExpireIterator{}, + .it = Iterator(it, StringOrView::FromView(key)), + .exp_it = ExpIterator{}, .is_new = true, .post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, .db_slice = this, .db_ind = cntx.db_index, - .it = it, + .it = Iterator(it, StringOrView::FromView(key)), .key = key})}; } @@ -692,7 +683,7 @@ void DbSlice::ActivateDb(DbIndex db_ind) { CreateDb(db_ind); } -bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) { +bool DbSlice::Del(DbIndex db_ind, Iterator it) { if (!IsValid(it)) { return false; } @@ -708,7 +699,6 @@ bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) { } fetched_items_.erase(it->first.AsRef()); PerformDeletion(it, db.get()); - deletion_count_++; return true; } @@ -724,7 +714,7 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) { std::string_view key = it->first.GetSlice(&tmp); SlotId sid = ClusterConfig::KeySlot(key); if (slot_ids.Contains(sid) && it.GetVersion() < next_version) { - PerformDeletion(it, db_arr_[0].get()); + PerformDeletion(Iterator::FromPrime(it), db_arr_[0].get()); } return true; }; @@ -804,7 +794,7 @@ void DbSlice::FlushDbIndexes(const std::vector& indexes) { if (db_ptr && db_ptr->stats.tiered_entries > 0) { for (auto it = db_ptr->prime.begin(); it != db_ptr->prime.end(); ++it) { if (it->second.IsExternal()) - PerformDeletion(it, db_ptr.get()); + PerformDeletion(Iterator::FromPrime(it), db_ptr.get()); } DCHECK_EQ(0u, db_ptr->stats.tiered_entries); @@ -840,13 +830,13 @@ void DbSlice::FlushDb(DbIndex db_ind) { FlushDbIndexes(indexes); } -void DbSlice::AddExpire(DbIndex db_ind, PrimeIterator main_it, uint64_t at) { +void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) { uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates. CHECK(db_arr_[db_ind]->expire.Insert(main_it->first.AsRef(), ExpirePeriod(delta)).second); main_it->second.SetExpire(true); } -bool DbSlice::RemoveExpire(DbIndex db_ind, PrimeIterator main_it) { +bool DbSlice::RemoveExpire(DbIndex db_ind, Iterator main_it) { if (main_it->second.HasExpire()) { CHECK_EQ(1u, db_arr_[db_ind]->expire.Erase(main_it->first)); main_it->second.SetExpire(false); @@ -856,7 +846,7 @@ bool DbSlice::RemoveExpire(DbIndex db_ind, PrimeIterator main_it) { } // Returns true if a state has changed, false otherwise. -bool DbSlice::UpdateExpire(DbIndex db_ind, PrimeIterator it, uint64_t at) { +bool DbSlice::UpdateExpire(DbIndex db_ind, Iterator it, uint64_t at) { if (at == 0) { return RemoveExpire(db_ind, it); } @@ -910,8 +900,8 @@ pair DbSlice::ExpireParams::Calculate(int64_t now_ms) const { return make_pair(rel_msec, now_msec + rel_msec); } -OpResult DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it, - ExpireIterator expire_it, const ExpireParams& params) { +OpResult DbSlice::UpdateExpire(const Context& cntx, Iterator prime_it, + ExpIterator expire_it, const ExpireParams& params) { constexpr uint64_t kPersistValue = 0; DCHECK(params.IsDefined()); DCHECK(IsValid(prime_it)); @@ -976,7 +966,8 @@ OpResult DbSlice::AddOrUpdateInternal(const Context& c if (IsValid(res.exp_it) && force_update) { res.exp_it->second = ExpirePeriod(delta); } else { - res.exp_it = db.expire.InsertNew(it->first.AsRef(), ExpirePeriod(delta)); + auto exp_it = db.expire.InsertNew(it->first.AsRef(), ExpirePeriod(delta)); + res.exp_it = ExpIterator(exp_it, StringOrView::FromView(key)); } } @@ -1076,18 +1067,18 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, const KeyLockArgs& lock_args) con return true; } -void DbSlice::PreUpdate(DbIndex db_ind, PrimeIterator it) { +void DbSlice::PreUpdate(DbIndex db_ind, Iterator it) { FiberAtomicGuard fg; DVLOG(2) << "Running callbacks in dbid " << db_ind; for (const auto& ccb : change_cb_) { - ccb.second(db_ind, ChangeReq{it}); + ccb.second(db_ind, ChangeReq{it.GetInnerIt()}); } - it.SetVersion(NextVersion()); + it.GetInnerIt().SetVersion(NextVersion()); } -void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, size_t orig_size) { +void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size) { int64_t delta = static_cast(it->second.MallocUsed()) - static_cast(orig_size); AccountObjectMemory(key, it->second.ObjType(), delta, GetDBTable(db_ind)); @@ -1113,7 +1104,12 @@ void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, SendInvalidationTrackingMessage(key); } -DbSlice::ItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) { +DbSlice::ItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, Iterator it) { + auto res = ExpireIfNeeded(cntx, it.GetInnerIt()); + return {.it = Iterator::FromPrime(res.it), .exp_it = ExpIterator::FromPrime(res.exp_it)}; +} + +DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) { if (!it->second.HasExpire()) { LOG(ERROR) << "Invalid call to ExpireIfNeeded"; return {it, ExpireIterator{}}; @@ -1123,6 +1119,10 @@ DbSlice::ItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) auto expire_it = db->expire.Find(it->first); + // TODO: Accept Iterator instead of PrimeIterator, as this might save an allocation below. + string scratch; + string_view key = it->first.GetSlice(&scratch); + if (IsValid(expire_it)) { // TODO: to employ multi-generation update of expire-base and the underlying values. time_t expire_time = ExpireTime(expire_it); @@ -1136,23 +1136,18 @@ DbSlice::ItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) << ", expire table size: " << db->expire.size() << ", prime table size: " << db->prime.size() << util::fb2::GetStacktrace(); } - string tmp_key_buf; - string_view tmp_key; - // Replicate expiry if (auto journal = owner_->journal(); journal) { - tmp_key = it->first.GetSlice(&tmp_key_buf); - RecordExpiry(cntx.db_index, tmp_key); + RecordExpiry(cntx.db_index, key); } auto obj_type = it->second.ObjType(); if (doc_del_cb_ && (obj_type == OBJ_JSON || obj_type == OBJ_HASH)) { - if (tmp_key.empty()) - tmp_key = it->first.GetSlice(&tmp_key_buf); - doc_del_cb_(tmp_key, cntx, it->second); + doc_del_cb_(key, cntx, it->second); } - PerformDeletion(it, expire_it, db.get()); + PerformDeletion(Iterator(it, StringOrView::FromView(key)), + ExpIterator(expire_it, StringOrView::FromView(key)), db.get()); ++events_.expired_keys; return {PrimeIterator{}, ExpireIterator{}}; @@ -1186,8 +1181,7 @@ uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) { return ver; } -void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, PrimeIterator it, - uint64_t upper_bound) { +void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) { FiberAtomicGuard fg; uint64_t bucket_version = it.GetVersion(); // change_cb_ is ordered by version. @@ -1200,7 +1194,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, PrimeIterator it, return; } if (bucket_version < cb_version) { - ccb.second(db_ind, ChangeReq{it}); + ccb.second(db_ind, ChangeReq{it.GetInnerIt()}); } } } @@ -1342,7 +1336,7 @@ void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes keys_to_journal.push_back(string(key)); } - PerformDeletion(evict_it, db_table.get()); + PerformDeletion(Iterator(evict_it, StringOrView::FromView(key)), db_table.get()); ++evicted; used_memory_after = owner_->UsedMemory(); @@ -1384,17 +1378,17 @@ void DbSlice::CreateDb(DbIndex db_ind) { // "it" is the iterator that we just added/updated and it should not be deleted. // "table" is the instance where we should delete the objects from. -size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* table) { +size_t DbSlice::EvictObjects(size_t memory_to_free, Iterator it, DbTable* table) { if (owner_->IsReplica()) { return 0; } - PrimeTable::Segment_t* segment = table->prime.GetSegment(it.segment_id()); + PrimeTable::Segment_t* segment = table->prime.GetSegment(it.GetInnerIt().segment_id()); DCHECK(segment); constexpr unsigned kNumStashBuckets = PrimeTable::Segment_t::kStashBucketNum; constexpr unsigned kNumRegularBuckets = PrimeTable::Segment_t::kBucketNum; - PrimeTable::bucket_iterator it2(it); + PrimeTable::bucket_iterator it2(it.GetInnerIt()); unsigned evicted = 0; bool evict_succeeded = false; @@ -1416,9 +1410,9 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t if (!bucket.IsBusy(slot_id)) continue; - auto evict_it = table->prime.GetIterator(it.segment_id(), stash_bid, slot_id); + auto evict_it = table->prime.GetIterator(it.GetInnerIt().segment_id(), stash_bid, slot_id); // skip the iterator that we must keep or the sticky items. - if (evict_it == it || evict_it->first.IsSticky()) + if (evict_it == it.GetInnerIt() || evict_it->first.IsSticky()) continue; PerformDeletion(evict_it, table); @@ -1439,13 +1433,13 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t for (int slot_id = PrimeTable::Segment_t::kSlotNum - 1; !evict_succeeded && slot_id >= 0; --slot_id) { for (unsigned i = 0; i < kNumRegularBuckets; ++i) { - unsigned bid = (it.bucket_id() + i) % kNumRegularBuckets; + unsigned bid = (it.GetInnerIt().bucket_id() + i) % kNumRegularBuckets; const auto& bucket = segment->GetBucket(bid); if (!bucket.IsBusy(slot_id)) continue; - auto evict_it = table->prime.GetIterator(it.segment_id(), bid, slot_id); - if (evict_it == it || evict_it->first.IsSticky()) + auto evict_it = table->prime.GetIterator(it.GetInnerIt().segment_id(), bid, slot_id); + if (evict_it == it.GetInnerIt() || evict_it->first.IsSticky()) continue; PerformDeletion(evict_it, table); @@ -1558,30 +1552,31 @@ void DbSlice::SendInvalidationTrackingMessage(std::string_view key) { } } -void DbSlice::RemoveFromTiered(PrimeIterator it, DbIndex index) { +void DbSlice::RemoveFromTiered(Iterator it, DbIndex index) { DbTable* table = GetDBTable(index); RemoveFromTiered(it, table); } -void DbSlice::RemoveFromTiered(PrimeIterator it, DbTable* table) { +void DbSlice::RemoveFromTiered(Iterator it, DbTable* table) { DbTableStats& stats = table->stats; PrimeValue& pv = it->second; if (pv.IsExternal()) { TieredStorage* tiered = shard_owner()->tiered_storage(); - tiered->Free(it, &stats); + tiered->Free(it.GetInnerIt(), &stats); } if (pv.HasIoPending()) { TieredStorage* tiered = shard_owner()->tiered_storage(); - tiered->CancelIo(table->index, it); + tiered->CancelIo(table->index, it.GetInnerIt()); } } -void DbSlice::PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, DbTable* table) { - std::string tmp; - std::string_view key = del_it->first.GetSlice(&tmp); +void DbSlice::PerformDeletion(PrimeIterator del_it, DbTable* table) { + return PerformDeletion(Iterator::FromPrime(del_it), table); +} +void DbSlice::PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* table) { if (!exp_it.is_done()) { - table->expire.Erase(exp_it); + table->expire.Erase(exp_it.GetInnerIt()); } if (del_it->second.HasFlag()) { @@ -1597,8 +1592,9 @@ void DbSlice::PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, DbTab size_t value_heap_size = pv.MallocUsed(); stats.inline_keys -= del_it->first.IsInline(); - AccountObjectMemory(key, del_it->first.ObjType(), -del_it->first.MallocUsed(), table); // Key - AccountObjectMemory(key, pv.ObjType(), -value_heap_size, table); // Value + AccountObjectMemory(del_it.key(), del_it->first.ObjType(), -del_it->first.MallocUsed(), + table); // Key + AccountObjectMemory(del_it.key(), pv.ObjType(), -value_heap_size, table); // Value if (pv.ObjType() == OBJ_HASH && pv.Encoding() == kEncodingListPack) { --stats.listpack_blob_cnt; } else if (pv.ObjType() == OBJ_ZSET && pv.Encoding() == OBJ_ENCODING_LISTPACK) { @@ -1606,18 +1602,18 @@ void DbSlice::PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, DbTab } if (ClusterConfig::IsEnabled()) { - SlotId sid = ClusterConfig::KeySlot(key); + SlotId sid = ClusterConfig::KeySlot(del_it.key()); table->slots_stats[sid].key_count -= 1; } - table->prime.Erase(del_it); - SendInvalidationTrackingMessage(key); + table->prime.Erase(del_it.GetInnerIt()); + SendInvalidationTrackingMessage(del_it.key()); } -void DbSlice::PerformDeletion(PrimeIterator del_it, DbTable* table) { - ExpireIterator exp_it; +void DbSlice::PerformDeletion(Iterator del_it, DbTable* table) { + ExpIterator exp_it; if (del_it->second.HasExpire()) { - exp_it = table->expire.Find(del_it->first); + exp_it = ExpIterator::FromPrime(table->expire.Find(del_it->first)); DCHECK(!exp_it.is_done()); } diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 07efe4004..5295ba3e3 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -5,11 +5,13 @@ #pragma once #include "core/mi_memory_resource.h" +#include "core/string_or_view.h" #include "facade/dragonfly_connection.h" #include "facade/op_status.h" #include "server/common.h" #include "server/conn_context.h" #include "server/table.h" +#include "util/fibers/fibers.h" namespace dfly { @@ -69,6 +71,70 @@ class DbSlice { void operator=(const DbSlice&) = delete; public: + // Auto-laundering iterator wrapper. Laundering means re-finding keys if they moved between + // buckets. + template class IteratorT { + public: + IteratorT() = default; + + IteratorT(T it, StringOrView key) + : it_(it), fiber_epoch_(util::fb2::FiberSwitchEpoch()), key_(std::move(key)) { + } + + static IteratorT FromPrime(T it) { + if (!IsValid(it)) { + return IteratorT(); + } + + std::string key; + it->first.GetString(&key); + return IteratorT(it, StringOrView::FromString(std::move(key))); + } + + IteratorT(const IteratorT& o) = default; + IteratorT(IteratorT&& o) = default; + IteratorT& operator=(const IteratorT& o) = default; + IteratorT& operator=(IteratorT&& o) = default; + + // Do NOT store this iterator in a variable, as it will not be laundered automatically. + T GetInnerIt() const { + LaunderIfNeeded(); + return it_; + } + + auto operator->() const { + return GetInnerIt().operator->(); + } + + auto is_done() const { + return GetInnerIt().is_done(); + } + + std::string_view key() const { + return key_.view(); + } + + auto IsOccupied() const { + return GetInnerIt().IsOccupied(); + } + + auto GetVersion() const { + return GetInnerIt().GetVersion(); + } + + private: + void LaunderIfNeeded() const; // const is a lie + + mutable T it_; + mutable uint64_t fiber_epoch_ = 0; + StringOrView key_; + }; + + using Iterator = IteratorT; + using ConstIterator = IteratorT; + using ExpIterator = IteratorT; + using ExpConstIterator = IteratorT; + class AutoUpdater { public: AutoUpdater(); @@ -93,12 +159,10 @@ class DbSlice { DbSlice* db_slice = nullptr; DbIndex db_ind = 0; - PrimeIterator it; + Iterator it; std::string_view key; // The following fields are calculated at init time - size_t db_size = 0; - size_t deletion_count = 0; size_t orig_heap_size = 0; }; @@ -186,6 +250,12 @@ class DbSlice { } // returns absolute time of the expiration. + time_t ExpireTime(ExpConstIterator it) const { + return ExpireTime(it.GetInnerIt()); + } + time_t ExpireTime(ExpIterator it) const { + return ExpireTime(it.GetInnerIt()); + } time_t ExpireTime(ExpireConstIterator it) const { return it.is_done() ? 0 : expire_base_[0] + it->second.duration_ms(); } @@ -195,8 +265,8 @@ class DbSlice { } struct ItAndUpdater { - PrimeIterator it; - ExpireIterator exp_it; + Iterator it; + ExpIterator exp_it; AutoUpdater post_updater; }; ItAndUpdater FindMutable(const Context& cntx, std::string_view key); @@ -207,24 +277,23 @@ class DbSlice { unsigned req_obj_type); struct ItAndExpConst { - PrimeConstIterator it; - ExpireConstIterator exp_it; + ConstIterator it; + ExpConstIterator exp_it; }; ItAndExpConst FindReadOnly(const Context& cntx, std::string_view key); - OpResult FindReadOnly(const Context& cntx, std::string_view key, - unsigned req_obj_type); - OpResult FindAndFetchReadOnly(const Context& cntx, std::string_view key, - unsigned req_obj_type); + OpResult FindReadOnly(const Context& cntx, std::string_view key, + unsigned req_obj_type); + OpResult FindAndFetchReadOnly(const Context& cntx, std::string_view key, + unsigned req_obj_type); // Returns (iterator, args-index) if found, KEY_NOTFOUND otherwise. // If multiple keys are found, returns the first index in the ArgSlice. - OpResult> FindFirstReadOnly(const Context& cntx, - ArgSlice args, - int req_obj_type); + OpResult> FindFirstReadOnly(const Context& cntx, ArgSlice args, + int req_obj_type); struct AddOrFindResult { - PrimeIterator it; - ExpireIterator exp_it; + Iterator it; + ExpIterator exp_it; bool is_new = false; AutoUpdater post_updater; @@ -246,19 +315,19 @@ class DbSlice { // Update entry expiration. Return epxiration timepoint in abs milliseconds, or -1 if the entry // already expired and was deleted; - facade::OpResult UpdateExpire(const Context& cntx, PrimeIterator prime_it, - ExpireIterator exp_it, const ExpireParams& params); + facade::OpResult UpdateExpire(const Context& cntx, Iterator prime_it, ExpIterator exp_it, + const ExpireParams& params); // Adds expiry information. - void AddExpire(DbIndex db_ind, PrimeIterator main_it, uint64_t at); + void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at); // Removes the corresponing expiry information if exists. // Returns true if expiry existed (and removed). - bool RemoveExpire(DbIndex db_ind, PrimeIterator main_it); + bool RemoveExpire(DbIndex db_ind, Iterator main_it); // Either adds or removes (if at == 0) expiry. Returns true if a change was made. // Does not change expiry if at != 0 and expiry already exists. - bool UpdateExpire(DbIndex db_ind, PrimeIterator main_it, uint64_t at); + bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at); void SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag); uint32_t GetMCFlag(DbIndex db_ind, const PrimeKey& key) const; @@ -266,8 +335,8 @@ class DbSlice { // Creates a database with index `db_ind`. If such database exists does nothing. void ActivateDb(DbIndex db_ind); - bool Del(DbIndex db_ind, PrimeIterator it); - void RemoveFromTiered(PrimeIterator it, DbIndex index); + bool Del(DbIndex db_ind, Iterator it); + void RemoveFromTiered(Iterator it, DbIndex index); constexpr static DbIndex kDbAll = 0xFFFF; @@ -330,12 +399,12 @@ class DbSlice { } // Check whether 'it' has not expired. Returns it if it's still valid. Otherwise, erases it - // from both tables and return PrimeIterator{}. + // from both tables and return Iterator{}. struct ItAndExp { - PrimeIterator it; - ExpireIterator exp_it; + Iterator it; + ExpIterator exp_it; }; - ItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it); + ItAndExp ExpireIfNeeded(const Context& cntx, Iterator it); // Iterate over all expire table entries and delete expired. void ExpireAllIfNeeded(); @@ -354,7 +423,7 @@ class DbSlice { uint64_t RegisterOnChange(ChangeCallback cb); // Call registered callbacks with version less than upper_bound. - void FlushChangeToEarlierCallbacks(DbIndex db_ind, PrimeIterator it, uint64_t upper_bound); + void FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound); //! Unregisters the callback. void UnregisterOnChange(uint64_t id); @@ -407,14 +476,15 @@ class DbSlice { void TrackKeys(const facade::Connection::WeakRef&, const ArgSlice&); // Delete a key referred by its iterator. + void PerformDeletion(Iterator del_it, DbTable* table); void PerformDeletion(PrimeIterator del_it, DbTable* table); // Releases a single key. `key` must have been normalized by GetLockKey(). void ReleaseNormalized(IntentLock::Mode m, DbIndex db_index, std::string_view key); private: - void PreUpdate(DbIndex db_ind, PrimeIterator it); - void PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, size_t orig_size); + void PreUpdate(DbIndex db_ind, Iterator it); + void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size); OpResult AddOrUpdateInternal(const Context& cntx, std::string_view key, PrimeValue obj, uint64_t expire_at_ms, @@ -429,13 +499,13 @@ class DbSlice { // Invalidate all watched keys for given slots. Used on FlushSlots. void InvalidateSlotWatches(const SlotSet& slot_ids); - void PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, DbTable* table); + void PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* table); // Send invalidation message to the clients that are tracking the change to a key. void SendInvalidationTrackingMessage(std::string_view key); void CreateDb(DbIndex index); - size_t EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* table); + size_t EvictObjects(size_t memory_to_free, Iterator it, DbTable* table); enum class UpdateStatsMode { kReadStats, @@ -446,9 +516,14 @@ class DbSlice { kLoad, kDontLoad, }; - OpResult FindInternal(const Context& cntx, std::string_view key, - std::optional req_obj_type, UpdateStatsMode stats_mode, - LoadExternalMode load_mode); + struct PrimeItAndExp { + PrimeIterator it; + ExpireIterator exp_it; + }; + PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it); + OpResult FindInternal(const Context& cntx, std::string_view key, + std::optional req_obj_type, + UpdateStatsMode stats_mode, LoadExternalMode load_mode); OpResult AddOrFindInternal(const Context& cntx, std::string_view key, LoadExternalMode load_mode); OpResult FindMutableInternal(const Context& cntx, std::string_view key, @@ -458,7 +533,7 @@ class DbSlice { uint64_t NextVersion() { return version_++; } - void RemoveFromTiered(PrimeIterator it, DbTable* table); + void RemoveFromTiered(Iterator it, DbTable* table); private: ShardId shard_id_; @@ -473,7 +548,6 @@ class DbSlice { ssize_t memory_budget_ = SSIZE_MAX; size_t bytes_per_object_ = 0; size_t soft_budget_limit_ = 0; - size_t deletion_count_ = 0; mutable SliceEvents events_; // we may change this even for const operations. @@ -518,4 +592,34 @@ class DbSlice { client_tracking_map_; }; +inline bool IsValid(DbSlice::Iterator it) { + return dfly::IsValid(it.GetInnerIt()); +} + +inline bool IsValid(DbSlice::ConstIterator it) { + return dfly::IsValid(it.GetInnerIt()); +} + +inline bool IsValid(DbSlice::ExpIterator it) { + return dfly::IsValid(it.GetInnerIt()); +} + +inline bool IsValid(DbSlice::ExpConstIterator it) { + return dfly::IsValid(it.GetInnerIt()); +} + +template void DbSlice::IteratorT::LaunderIfNeeded() const { + if (!dfly::IsValid(it_)) { + return; + } + + uint64_t current_epoch = util::fb2::FiberSwitchEpoch(); + if (current_epoch != fiber_epoch_) { + if (!it_.IsOccupied() || it_->first != key_.view()) { + it_ = it_.owner().Find(key_.view()); + } + fiber_epoch_ = current_epoch; + } +} + } // namespace dfly diff --git a/src/server/detail/table.h b/src/server/detail/table.h index 14dc271f8..11e6c96e8 100644 --- a/src/server/detail/table.h +++ b/src/server/detail/table.h @@ -53,6 +53,10 @@ struct ExpireTablePolicy { return s.HashCode(); } + static uint64_t HashFn(std::string_view u) { + return CompactObj::HashCode(u); + } + static void DestroyKey(PrimeKey& cs) { cs.Reset(); } @@ -63,6 +67,10 @@ struct ExpireTablePolicy { static void DestroyValue(uint32_t val) { } + static bool Equal(const PrimeKey& s1, std::string_view s2) { + return s1 == s2; + } + static bool Equal(const PrimeKey& s1, const PrimeKey& s2) { return s1 == s2; } diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 649a6be7f..2c56a2e54 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -758,7 +758,7 @@ auto EngineShard::AnalyzeTxQueue() const -> TxQueueInfo { info.contended_locks++; if (lock.ContentionScore() > info.max_contention_score) { info.max_contention_score = lock.ContentionScore(); - info.max_contention_lock_name = string_view{key}; + info.max_contention_lock_name = key.view(); } } } diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index b46fa6ad4..d2550eb17 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -290,7 +290,7 @@ void Renamer::Find(Transaction* t) { auto [it, exp_it] = db_slice.FindReadOnly(t->GetDbContext(), res->key); res->found = IsValid(it); - if (IsValid(it)) { + if (res->found) { res->ref_val = it->second.AsRef(); res->expire_ts = db_slice.ExpireTime(exp_it); res->sticky = it->first.IsSticky(); @@ -472,9 +472,11 @@ OpResult OnRestore(const OpArgs& op_args, std::string_view key, std::strin restore_args.ExpirationTime()); } -bool ScanCb(const OpArgs& op_args, PrimeIterator it, const ScanOpts& opts, StringVec* res) { +bool ScanCb(const OpArgs& op_args, PrimeIterator prime_it, const ScanOpts& opts, StringVec* res) { auto& db_slice = op_args.shard->db_slice(); - if (it->second.HasExpire()) { + + DbSlice::Iterator it = DbSlice::Iterator::FromPrime(prime_it); + if (prime_it->second.HasExpire()) { it = db_slice.ExpireIfNeeded(op_args.db_cntx, it).it; } @@ -486,7 +488,7 @@ bool ScanCb(const OpArgs& op_args, PrimeIterator it, const ScanOpts& opts, Strin if (!matches) return false; - if (opts.bucket_id != UINT_MAX && opts.bucket_id != it.bucket_id()) { + if (opts.bucket_id != UINT_MAX && opts.bucket_id != it.GetInnerIt().bucket_id()) { return false; } diff --git a/src/server/hll_family.cc b/src/server/hll_family.cc index 0c1f045e2..5526a72c2 100644 --- a/src/server/hll_family.cc +++ b/src/server/hll_family.cc @@ -114,7 +114,7 @@ void PFAdd(CmdArgList args, ConnectionContext* cntx) { OpResult CountHllsSingle(const OpArgs& op_args, string_view key) { auto& db_slice = op_args.shard->db_slice(); - OpResult it = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STRING); + auto it = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STRING); if (it.ok()) { string hll; string_view hll_view = it.value()->second.GetSlice(&hll); @@ -147,8 +147,7 @@ OpResult> ReadValues(const OpArgs& op_args, ArgSlice keys) { try { vector values; for (size_t i = 0; i < keys.size(); ++i) { - OpResult it = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, keys[i], OBJ_STRING); + auto it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, keys[i], OBJ_STRING); if (it.ok()) { string hll; it.value()->second.GetString(&hll); diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 6abef4c50..e785f3488 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -280,15 +280,14 @@ OpResult OpScan(const OpArgs& op_args, std::string_view key, uint64_t * of returning no or very few elements. (taken from redis code at db.c line 904 */ constexpr size_t INTERATION_FACTOR = 10; - OpResult find_res = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_HASH); + auto find_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_HASH); if (!find_res) { DVLOG(1) << "ScanOp: find failed: " << find_res << ", baling out"; return find_res.status(); } - PrimeConstIterator it = find_res.value(); + auto it = find_res.value(); StringVec res; uint32_t count = scan_op.limit * HASH_TABLE_ENTRIES_FACTOR; const PrimeValue& pv = it->second; @@ -626,7 +625,7 @@ OpResult OpSet(const OpArgs& op_args, string_view key, CmdArgList valu DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index); uint8_t* lp = nullptr; - PrimeIterator& it = add_res.it; + auto& it = add_res.it; PrimeValue& pv = it->second; if (add_res.is_new) { diff --git a/src/server/json_family.cc b/src/server/json_family.cc index e2fe1fa0c..8626dc212 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -196,7 +196,7 @@ OpStatus UpdateEntry(const OpArgs& op_args, std::string_view key, std::string_vi return it_res.status(); } - PrimeConstIterator entry_it = it_res->it; + auto entry_it = it_res->it; JsonType* json_val = entry_it->second.GetJson(); DCHECK(json_val) << "should have a valid JSON object for key '" << key << "' the type for it is '" << entry_it->second.ObjType() << "'"; @@ -241,8 +241,7 @@ OpStatus UpdateEntry(const OpArgs& op_args, string_view key, const json::Path& p } OpResult GetJson(const OpArgs& op_args, string_view key) { - OpResult it_res = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_JSON); + auto it_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_JSON); if (!it_res.ok()) return it_res.status(); @@ -1136,8 +1135,7 @@ vector OpJsonMGet(JsonPathV2 expression, const Transaction* t, Engine auto& db_slice = shard->db_slice(); for (size_t i = 0; i < args.size(); ++i) { - OpResult it_res = - db_slice.FindReadOnly(t->GetDbContext(), args[i], OBJ_JSON); + auto it_res = db_slice.FindReadOnly(t->GetDbContext(), args[i], OBJ_JSON); if (!it_res.ok()) continue; @@ -1221,8 +1219,7 @@ OpResult OpSet(const OpArgs& op_args, string_view key, string_view path, // and its not JSON, it would return an error. if (path == "." || path == "$") { if (is_nx_condition || is_xx_condition) { - OpResult it_res = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_JSON); + auto it_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_JSON); bool key_exists = (it_res.status() != OpStatus::KEY_NOTFOUND); if (is_nx_condition && key_exists) { return false; diff --git a/src/server/list_family.cc b/src/server/list_family.cc index ca5bec84d..9b1354ca3 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -198,7 +198,7 @@ std::string OpBPop(Transaction* t, EngineShard* shard, std::string_view key, Lis CHECK(it_res) << t->DebugId() << " " << key; // must exist and must be ok. - PrimeIterator it = it_res->it; + auto it = it_res->it; quicklist* ql = GetQL(it->second); absl::StrAppend(debugMessages.Next(), "OpBPop: ", key, " by ", t->DebugId()); @@ -228,7 +228,7 @@ OpResult OpMoveSingleShard(const OpArgs& op_args, string_view src, strin if (!src_res) return src_res.status(); - PrimeIterator src_it = src_res->it; + auto src_it = src_res->it; quicklist* src_ql = GetQL(src_it->second); if (src == dest) { // simple case. @@ -368,7 +368,7 @@ OpResult OpPop(const OpArgs& op_args, string_view key, ListDir dir, u if (!it_res) return it_res.status(); - PrimeIterator it = it_res->it; + auto it = it_res->it; quicklist* ql = GetQL(it->second); StringVec res; @@ -499,8 +499,7 @@ OpResult OpIndex(const OpArgs& op_args, std::string_view key, long index OpResult> OpPos(const OpArgs& op_args, std::string_view key, std::string_view element, int rank, int count, int max_len) { - OpResult it_res = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST); + auto it_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST); if (!it_res.ok()) return it_res.status(); @@ -580,7 +579,7 @@ OpResult OpRem(const OpArgs& op_args, string_view key, string_view ele if (!it_res) return it_res.status(); - PrimeIterator it = it_res->it; + auto it = it_res->it; quicklist* ql = GetQL(it->second); int iter_direction = AL_START_HEAD; @@ -622,7 +621,7 @@ OpStatus OpSet(const OpArgs& op_args, string_view key, string_view elem, long in if (!it_res) return it_res.status(); - PrimeIterator it = it_res->it; + auto it = it_res->it; quicklist* ql = GetQL(it->second); int replaced = quicklistReplaceAtIndex(ql, index, elem.data(), elem.size()); @@ -639,7 +638,7 @@ OpStatus OpTrim(const OpArgs& op_args, string_view key, long start, long end) { if (!it_res) return it_res.status(); - PrimeIterator it = it_res->it; + auto it = it_res->it; quicklist* ql = GetQL(it->second); long llen = quicklistCount(ql); diff --git a/src/server/set_family.cc b/src/server/set_family.cc index acc3fbdf2..7240c022a 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -594,7 +594,7 @@ OpStatus Mover::OpFind(Transaction* t, EngineShard* es) { for (auto k : largs) { unsigned index = (k == src_) ? 0 : 1; - OpResult res = es->db_slice().FindReadOnly(t->GetDbContext(), k, OBJ_SET); + auto res = es->db_slice().FindReadOnly(t->GetDbContext(), k, OBJ_SET); if (res && index == 0) { // successful src find. DCHECK(!res->is_done()); const CompactObj& val = res.value()->second; @@ -660,8 +660,7 @@ OpResult OpUnion(const OpArgs& op_args, ArgSlice keys) { absl::flat_hash_set uniques; for (string_view key : keys) { - OpResult find_res = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_SET); + auto find_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_SET); if (find_res) { const PrimeValue& pv = find_res.value()->second; if (IsDenseEncoding(pv)) { @@ -688,8 +687,7 @@ OpResult OpDiff(const OpArgs& op_args, ArgSlice keys) { DCHECK(!keys.empty()); DVLOG(1) << "OpDiff from " << keys.front(); EngineShard* es = op_args.shard; - OpResult find_res = - es->db_slice().FindReadOnly(op_args.db_cntx, keys.front(), OBJ_SET); + auto find_res = es->db_slice().FindReadOnly(op_args.db_cntx, keys.front(), OBJ_SET); if (!find_res) { return find_res.status(); @@ -710,8 +708,7 @@ OpResult OpDiff(const OpArgs& op_args, ArgSlice keys) { DCHECK(!uniques.empty()); // otherwise the key would not exist. for (size_t i = 1; i < keys.size(); ++i) { - OpResult diff_res = - es->db_slice().FindReadOnly(op_args.db_cntx, keys[i], OBJ_SET); + auto diff_res = es->db_slice().FindReadOnly(op_args.db_cntx, keys[i], OBJ_SET); if (!diff_res) { if (diff_res.status() == OpStatus::WRONG_TYPE) { return OpStatus::WRONG_TYPE; @@ -748,8 +745,7 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f StringVec result; if (keys.size() == 1) { - OpResult find_res = - es->db_slice().FindReadOnly(t->GetDbContext(), keys.front(), OBJ_SET); + auto find_res = es->db_slice().FindReadOnly(t->GetDbContext(), keys.front(), OBJ_SET); if (!find_res) return find_res.status(); @@ -772,8 +768,7 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f OpStatus status = OpStatus::OK; for (size_t i = 0; i < keys.size(); ++i) { - OpResult find_res = - es->db_slice().FindReadOnly(t->GetDbContext(), keys[i], OBJ_SET); + auto find_res = es->db_slice().FindReadOnly(t->GetDbContext(), keys[i], OBJ_SET); if (!find_res) { if (status == OpStatus::OK || status == OpStatus::KEY_NOTFOUND || find_res.status() != OpStatus::KEY_NOTFOUND) { @@ -831,7 +826,7 @@ OpResult OpPop(const OpArgs& op_args, string_view key, unsigned count if (count == 0) return result; - PrimeIterator it = find_res->it; + auto it = find_res->it; size_t slen = it->second.Size(); /* CASE 1: @@ -888,13 +883,12 @@ OpResult OpPop(const OpArgs& op_args, string_view key, unsigned count OpResult OpScan(const OpArgs& op_args, string_view key, uint64_t* cursor, const ScanOpts& scan_op) { - OpResult find_res = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_SET); + auto find_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_SET); if (!find_res) return find_res.status(); - PrimeConstIterator it = find_res.value(); + auto it = find_res.value(); StringVec res; if (it->second.Encoding() == kEncodingIntSet) { @@ -940,8 +934,7 @@ void SIsMember(CmdArgList args, ConnectionContext* cntx) { string_view val = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { - OpResult find_res = - shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_SET); + auto find_res = shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_SET); if (find_res) { SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()}; @@ -972,8 +965,7 @@ void SMIsMember(CmdArgList args, ConnectionContext* cntx) { memberships.reserve(vals.size()); auto cb = [&](Transaction* t, EngineShard* shard) { - OpResult find_res = - shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_SET); + auto find_res = shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_SET); if (find_res) { SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()}; FindInSet(memberships, t->GetDbContext(), st, vals); @@ -1039,8 +1031,7 @@ void SCard(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - OpResult find_res = - shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_SET); + auto find_res = shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_SET); if (!find_res) { return find_res.status(); } @@ -1215,8 +1206,7 @@ void SRandMember(CmdArgList args, ConnectionContext* cntx) { const auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { StringVec result; - OpResult find_res = - shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_SET); + auto find_res = shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_SET); if (!find_res) { return find_res.status(); } diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 81460971f..0f011e555 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -242,7 +242,8 @@ bool SliceSnapshot::BucketSaveCb(PrimeIterator it) { ++stats_.skipped; return false; } - db_slice_->FlushChangeToEarlierCallbacks(current_db_, it, snapshot_version_); + db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it), + snapshot_version_); stats_.loop_serialized += SerializeBucket(current_db_, it); return false; diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index d700fb502..30c3a9d18 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -620,7 +620,7 @@ OpResult OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgL add_res = std::move(*op_res); } - PrimeIterator& it = add_res.it; + auto& it = add_res.it; if (add_res.is_new) { stream* s = streamNew(); @@ -658,7 +658,7 @@ OpResult OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgL OpResult OpRange(const OpArgs& op_args, string_view key, const RangeOpts& opts) { auto& db_slice = op_args.shard->db_slice(); - OpResult res_it = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STREAM); + auto res_it = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -805,7 +805,7 @@ OpResult>> OpLastIDs(const OpArgs& op_args, vector> last_ids; for (string_view key : args) { - OpResult res_it = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STREAM); + auto res_it = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) { if (res_it.status() == OpStatus::KEY_NOTFOUND) { continue; @@ -867,7 +867,7 @@ vector OpRead(const OpArgs& op_args, const ArgSlice& args, const Read OpResult OpLen(const OpArgs& op_args, string_view key) { auto& db_slice = op_args.shard->db_slice(); - OpResult res_it = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STREAM); + auto res_it = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); const CompactObj& cobj = (*res_it)->second; @@ -878,7 +878,7 @@ OpResult OpLen(const OpArgs& op_args, string_view key) { OpResult> OpListGroups(const DbContext& db_cntx, string_view key, EngineShard* shard) { auto& db_slice = shard->db_slice(); - OpResult res_it = db_slice.FindReadOnly(db_cntx, key, OBJ_STREAM); + auto res_it = db_slice.FindReadOnly(db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -1010,7 +1010,7 @@ void GetConsumers(stream* s, streamCG* cg, long long count, GroupInfo* ginfo) { OpResult OpStreams(const DbContext& db_cntx, string_view key, EngineShard* shard, int full, size_t count) { auto& db_slice = shard->db_slice(); - OpResult res_it = db_slice.FindReadOnly(db_cntx, key, OBJ_STREAM); + auto res_it = db_slice.FindReadOnly(db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -1073,7 +1073,7 @@ OpResult OpStreams(const DbContext& db_cntx, string_view key, Engine OpResult> OpConsumers(const DbContext& db_cntx, EngineShard* shard, string_view stream_name, string_view group_name) { auto& db_slice = shard->db_slice(); - OpResult res_it = db_slice.FindReadOnly(db_cntx, stream_name, OBJ_STREAM); + auto res_it = db_slice.FindReadOnly(db_cntx, stream_name, OBJ_STREAM); if (!res_it) return res_it.status(); diff --git a/src/server/string_family.cc b/src/server/string_family.cc index c930d1004..308d523e6 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -98,8 +98,7 @@ OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t sta OpResult OpGetRange(const OpArgs& op_args, string_view key, int32_t start, int32_t end) { auto& db_slice = op_args.shard->db_slice(); - OpResult it_res = - db_slice.FindAndFetchReadOnly(op_args.db_cntx, key, OBJ_STRING); + auto it_res = db_slice.FindAndFetchReadOnly(op_args.db_cntx, key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); @@ -129,7 +128,7 @@ OpResult OpGetRange(const OpArgs& op_args, string_view key, int32_t star return string(slice.substr(start, end - start + 1)); }; -size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key, string_view val, +size_t ExtendExisting(const OpArgs& op_args, DbSlice::Iterator it, string_view key, string_view val, bool prepend) { string tmp, new_val; string_view slice = it->second.GetSlice(&tmp); @@ -499,12 +498,11 @@ SinkReplyBuilder::MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const auto& db_slice = shard->db_slice(); SinkReplyBuilder::MGetResponse response(keys.size()); - absl::InlinedVector iters(keys.size()); + absl::InlinedVector iters(keys.size()); size_t total_size = 0; for (size_t i = 0; i < keys.size(); ++i) { - OpResult it_res = - db_slice.FindAndFetchReadOnly(t->GetDbContext(), keys[i], OBJ_STRING); + auto it_res = db_slice.FindAndFetchReadOnly(t->GetDbContext(), keys[i], OBJ_STRING); if (!it_res) continue; iters[i] = *it_res; @@ -515,7 +513,7 @@ SinkReplyBuilder::MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const char* next = response.storage_list->data; for (size_t i = 0; i < keys.size(); ++i) { - PrimeConstIterator it = iters[i]; + auto it = iters[i]; if (it.is_done()) continue; @@ -585,7 +583,7 @@ OpResult> SetCmd::Set(const SetParams& params, string_view key, RETURN_ON_BAD_STATUS(op_res); auto& add_res = *op_res; - PrimeIterator it = add_res.it; + auto it = add_res.it; if (!add_res.is_new) { result_builder.CachePrevValueIfNeeded(it->second); return std::move(result_builder).Return(SetExisting(params, it, add_res.exp_it, key, value)); @@ -610,7 +608,8 @@ OpResult> SetCmd::Set(const SetParams& params, string_view key, if (shard->tiered_storage() && TieredStorage::EligibleForOffload(value.size())) { // external storage enabled. - shard->tiered_storage()->ScheduleOffloadWithThrottle(op_args_.db_cntx.db_index, it, key); + shard->tiered_storage()->ScheduleOffloadWithThrottle(op_args_.db_cntx.db_index, it.GetInnerIt(), + key); } if (manual_journal_ && op_args_.shard->journal()) { @@ -620,8 +619,8 @@ OpResult> SetCmd::Set(const SetParams& params, string_view key, return std::move(result_builder).Return(OpStatus::OK); } -OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIterator e_it, - string_view key, string_view value) { +OpStatus SetCmd::SetExisting(const SetParams& params, DbSlice::Iterator it, + DbSlice::ExpIterator e_it, string_view key, string_view value) { if (params.flags & SET_IF_NOTEXIST) return OpStatus::SKIPPED; @@ -838,7 +837,7 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) { auto op_args = t->GetOpArgs(shard); DbSlice& db_slice = op_args.shard->db_slice(); - OpResult res; + OpResult res; // A temporary code that allows running dragonfly without filling up memory store // when reading data from disk. @@ -1314,8 +1313,7 @@ void StringFamily::StrLen(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - OpResult it_res = - shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_STRING); + auto it_res = shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); diff --git a/src/server/string_family.h b/src/server/string_family.h index c9ca583a6..2ebf05b6f 100644 --- a/src/server/string_family.h +++ b/src/server/string_family.h @@ -5,6 +5,7 @@ #pragma once #include "server/common.h" +#include "server/db_slice.h" #include "server/engine_shard_set.h" #include "util/proactor_pool.h" @@ -53,7 +54,7 @@ class SetCmd { std::string_view value); private: - OpStatus SetExisting(const SetParams& params, PrimeIterator it, ExpireIterator e_it, + OpStatus SetExisting(const SetParams& params, DbSlice::Iterator it, DbSlice::ExpIterator e_it, std::string_view key, std::string_view value); void RecordJournal(const SetParams& params, std::string_view key, std::string_view value); diff --git a/src/server/table.cc b/src/server/table.cc index b5ede72b8..d93164a54 100644 --- a/src/server/table.cc +++ b/src/server/table.cc @@ -58,11 +58,6 @@ SlotStats& SlotStats::operator+=(const SlotStats& o) { return *this; } -void LockTable::Key::MakeOwned() const { - if (std::holds_alternative(val_)) - val_ = std::string{std::get(val_)}; -} - size_t LockTable::Size() const { return locks_.size(); } @@ -70,7 +65,7 @@ size_t LockTable::Size() const { std::optional LockTable::Find(std::string_view key) const { DCHECK_EQ(KeyLockArgs::GetLockKey(key), key); - if (auto it = locks_.find(Key{key}); it != locks_.end()) + if (auto it = locks_.find(Key::FromView(key)); it != locks_.end()) return it->second; return std::nullopt; } @@ -78,9 +73,9 @@ std::optional LockTable::Find(std::string_view key) const { bool LockTable::Acquire(std::string_view key, IntentLock::Mode mode) { DCHECK_EQ(KeyLockArgs::GetLockKey(key), key); - auto [it, inserted] = locks_.try_emplace(Key{key}); - if (!inserted) // If more than one transaction refers to a key - it->first.MakeOwned(); // we must fall back to using a self-contained string + auto [it, inserted] = locks_.try_emplace(Key::FromView(key)); + if (!inserted) // If more than one transaction refers to a key + const_cast(it->first).MakeOwned(); // we must fall back to using a self-contained string return it->second.Acquire(mode); } @@ -88,7 +83,7 @@ bool LockTable::Acquire(std::string_view key, IntentLock::Mode mode) { void LockTable::Release(std::string_view key, IntentLock::Mode mode) { DCHECK_EQ(KeyLockArgs::GetLockKey(key), key); - auto it = locks_.find(Key{key}); + auto it = locks_.find(Key::FromView(key)); CHECK(it != locks_.end()) << key; it->second.Release(mode); diff --git a/src/server/table.h b/src/server/table.h index 696feb1b7..aa4cc4f64 100644 --- a/src/server/table.h +++ b/src/server/table.h @@ -11,6 +11,8 @@ #include "core/expire_period.h" #include "core/intent_lock.h" +#include "core/string_or_view.h" +#include "server/cluster/cluster_config.h" #include "server/cluster/slot_set.h" #include "server/conn_context.h" #include "server/detail/table.h" @@ -99,25 +101,7 @@ class LockTable { } private: - struct Key { - operator std::string_view() const { - return visit([](const auto& s) -> std::string_view { return s; }, val_); - } - - bool operator==(const Key& o) const { - return *this == std::string_view(o); - } - - friend std::ostream& operator<<(std::ostream& o, const Key& key) { - return o << std::string_view(key); - } - - // If the key is backed by a string_view, replace it with a string with the same value - void MakeOwned() const; - - mutable std::variant val_; - }; - + using Key = StringOrView; absl::flat_hash_map locks_; }; diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 858cd9cbf..c63b6ce1d 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -736,7 +736,7 @@ bool TieredStorage::FlushPending(DbIndex db_index, unsigned bin_index) { DCHECK(IsValid(it)); if (it->second.HasExpire()) { - auto [pit, exp_it] = db_slice_.ExpireIfNeeded(db_context, it); + auto [pit, exp_it] = db_slice_.ExpireIfNeeded(db_context, DbSlice::Iterator::FromPrime(it)); CHECK(!pit.is_done()) << "TBD: should abort in case of expired keys"; } diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index a27ddff04..3418d2b55 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -197,7 +197,7 @@ OpResult FindZEntry(const ZParams& zparams, const OpArgs& RETURN_ON_BAD_STATUS(op_res); auto& add_res = *op_res; - PrimeIterator& it = add_res.it; + auto& it = add_res.it; PrimeValue& pv = it->second; DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index); if (add_res.is_new || zparams.override) { @@ -790,7 +790,7 @@ void InterScoredMap(ScoredMap* dest, ScoredMap* src, AggType agg_type) { dest->swap(*src); } -using KeyIterWeightVec = vector>; +using KeyIterWeightVec = vector>; ScoredMap UnionShardKeysWithScore(const KeyIterWeightVec& key_iter_weight_vec, AggType agg_type) { ScoredMap result; @@ -1265,7 +1265,7 @@ ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_view key, bo auto& db_slice = shard->db_slice(); auto it_res = db_slice.FindMutable(t->GetDbContext(), key, OBJ_ZSET); CHECK(it_res) << t->DebugId() << " " << key; // must exist and must be ok. - PrimeIterator it = it_res->it; + auto it = it_res->it; ZSetFamily::RangeParams range_params; range_params.reverse = is_max; @@ -1391,8 +1391,7 @@ auto OpPopCount(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, auto OpRange(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, string_view key) -> OpResult { - OpResult res_it = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1407,8 +1406,7 @@ auto OpRange(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, st auto OpRanges(const std::vector& range_specs, const OpArgs& op_args, string_view key) -> OpResult> { - OpResult res_it = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1447,8 +1445,7 @@ OpResult OpRemRange(const OpArgs& op_args, string_view key, OpResult OpRank(const OpArgs& op_args, string_view key, string_view member, bool reverse) { - OpResult res_it = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1494,8 +1491,7 @@ OpResult OpRank(const OpArgs& op_args, string_view key, string_view me OpResult OpCount(const OpArgs& op_args, std::string_view key, const ZSetFamily::ScoreInterval& interval) { - OpResult res_it = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1545,8 +1541,7 @@ OpResult OpCount(const OpArgs& op_args, std::string_view key, OpResult OpLexCount(const OpArgs& op_args, string_view key, const ZSetFamily::LexInterval& interval) { - OpResult res_it = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1611,14 +1606,12 @@ OpResult OpRem(const OpArgs& op_args, string_view key, ArgSlice member } OpResult OpKeyExisted(const OpArgs& op_args, string_view key) { - OpResult res_it = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); return res_it.status(); } OpResult OpScore(const OpArgs& op_args, string_view key, string_view member) { - OpResult res_it = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1635,8 +1628,7 @@ OpResult OpScore(const OpArgs& op_args, string_view key, string_view mem } OpResult OpMScore(const OpArgs& op_args, string_view key, ArgSlice members) { - OpResult res_it = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1657,13 +1649,12 @@ OpResult OpMScore(const OpArgs& op_args, string_view key, ArgSli OpResult OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor, const ScanOpts& scan_op) { - OpResult find_res = - op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto find_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!find_res) return find_res.status(); - PrimeConstIterator it = find_res.value(); + auto it = find_res.value(); const PrimeValue& pv = it->second; StringVec res; char buf[128]; @@ -1866,8 +1857,7 @@ void ZSetFamily::ZCard(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - OpResult find_res = - shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_ZSET); + auto find_res = shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_ZSET); if (!find_res) { return find_res.status(); }