From d7351b315e09a18c03c7b2c071658aaf74765189 Mon Sep 17 00:00:00 2001 From: Shahar Mike Date: Fri, 12 Jul 2024 08:13:16 +0300 Subject: [PATCH] refactor: Use `DbContext`, `OpArgs` and `Transaction` to access `DbSlice` (#3311) This is a refactor that will put us closer to adding namespaces, see included `docs/namespaces.md` --- docs/namespaces.md | 71 +++++++++++++++++++++ src/server/bitops_family.cc | 18 +++--- src/server/bloom_family.cc | 6 +- src/server/container_utils.cc | 12 ++-- src/server/db_slice.cc | 8 +-- src/server/db_slice.h | 2 +- src/server/detail/save_stages_controller.cc | 10 +-- src/server/dflycmd.cc | 6 +- src/server/dragonfly_test.cc | 4 +- src/server/engine_shard_set.cc | 36 +++++++---- src/server/engine_shard_set.h | 3 + src/server/generic_family.cc | 65 +++++++++---------- src/server/hll_family.cc | 8 +-- src/server/hset_family.cc | 24 +++---- src/server/json_family.cc | 18 +++--- src/server/list_family.cc | 45 ++++++------- src/server/main_service.cc | 14 ++-- src/server/memory_cmd.cc | 1 + src/server/search/doc_index.cc | 8 +-- src/server/server_family.cc | 2 +- src/server/set_family.cc | 43 ++++++------- src/server/stream_family.cc | 31 ++++----- src/server/string_family.cc | 38 +++++------ src/server/transaction.cc | 37 +++++++---- src/server/transaction.h | 3 + src/server/tx_base.cc | 11 ++++ src/server/tx_base.h | 14 ++-- src/server/zset_family.cc | 50 +++++++-------- 28 files changed, 355 insertions(+), 233 deletions(-) create mode 100644 docs/namespaces.md diff --git a/docs/namespaces.md b/docs/namespaces.md new file mode 100644 index 000000000..75939d49e --- /dev/null +++ b/docs/namespaces.md @@ -0,0 +1,71 @@ +# Namespaces in Dragonfly + +Dragonfly will soon add an _experimental_ feature, allowing complete separation of data by different users. +We call this feature _namespaces_, and it allows using a single Dragonfly server with multiple +tenants, each using their own data, without being able to mix them together. + +Note that this feature can alternatively be achieved by having each user `SELECT` a different +(numeric) database, or by asking that each user uses a unique prefix for their keys. This approach +has several disadvantages, like users forgetting to `SELECT` / use their prefix, accessing data +logically belonging to other users. + +The advantage of using Namespaces is that data is completely isolated, and users cannot accidentally +use data they do not own. A user must authenticate in order to access the namespace it was assigned. +And as a bonus, each namespace can have multiple databases, switched via `SELECT` like any regular +data store. + +However, before using this feature, please note that it is experimental. This means that: + +* Some features are not supported for non-default namespaces, such as replication and save to RDB +* Some tools are missing, like breakdown of memory / load per namespace +* We do not yet consider this production ready, and it might still have some uncovered bugs + +So kindly use it at your own risk. + +## Usage + +This section describes how, as a Dragonfly user / administrator, you could use namespaces. + +A namespace is identified by a unique string id, defined by the user / admin. Each Dragonfly user +is associated with a single namespace. If not set explicitly, then the default namespace is used, +which is the empty string id. + +Multiple users can use the same namespace if they are all assigned the same namespace id. This can +allow, for example, creating a read-only user as well as a mutating user over the same data. + +To associate user `user1` with the namespace `namespace1`, use the `ACL` command with the +`NAMESPACE:namespace1` flag: + +``` +ACL SETUSER user1 NAMESPACE:namespace1 ON >user_pass +@all ~* +``` + +This sets / creates user `user`, using password `user_pass`, using namespace `namespace1`. + +For more examples check out `tests/dragonfly/acl_family_test.py` - specifically the +`test_namespaces` function. + +## Technical Details + +This section describes how we _implemented_ namespaces in Dragonfly. It is meant to be used by those +who wish to contribute pull requests to Dragonfly. + +Prior to adding namespaces to Dragonfly, each _shard_ had a single `DbSlice` that it owned. They +were thread-local, global-scope instances. + +To support namespaces, we created a `Namespace` class (see `src/server/namespaces.h`) which contains +a `vector`, with a `DbSlice` per shard. When first used, a `Namespace` calls the engine +shard set to initialize the array of `DbSlice`s. + +To access all `Namespace`s, we also added a registry with the original name `Namespaces`. It is a +global, thread safe class that allows accessing all registered namespaces, and registering new ones +on the fly. Note that, while it is thread safe, it shouldn't be a bottle neck because it is supposed +to only be used during the authentication of a connection (or when adding new namespaces). + +When a new connection is authenticated with Dragonfly, we look up (and create, if needed) the +namespace it is associated with. We then save a `Namespace* ns` inside the `dfly::ConnectionContext` +class to associate the user with the namespaces. Because we removed the global `DbSlice` objects, +this is now the only way to access namespaces, which protects users from accessing unowned data. + +Currently, we do not have any support for removing namespaces, so they hang in memory until the +server exits. diff --git a/src/server/bitops_family.cc b/src/server/bitops_family.cc index 4320ea2a2..46c3306e6 100644 --- a/src/server/bitops_family.cc +++ b/src/server/bitops_family.cc @@ -310,7 +310,7 @@ class ElementAccess { }; std::optional ElementAccess::Exists(EngineShard* shard) { - auto res = shard->db_slice().FindReadOnly(context_, key_, OBJ_STRING); + auto res = context_.GetDbSlice(shard->shard_id()).FindReadOnly(context_, key_, OBJ_STRING); if (res.status() == OpStatus::WRONG_TYPE) { return {}; } @@ -318,7 +318,7 @@ std::optional ElementAccess::Exists(EngineShard* shard) { } OpStatus ElementAccess::Find(EngineShard* shard) { - auto op_res = shard->db_slice().AddOrFind(context_, key_); + auto op_res = context_.GetDbSlice(shard->shard_id()).AddOrFind(context_, key_); RETURN_ON_BAD_STATUS(op_res); auto& add_res = *op_res; @@ -355,7 +355,7 @@ OpResult BitNewValue(const OpArgs& args, std::string_view key, uint32_t of bool bit_value) { EngineShard* shard = args.shard; ElementAccess element_access{key, args}; - auto& db_slice = shard->db_slice(); + auto& db_slice = args.GetDbSlice(); DCHECK(db_slice.IsDbValid(element_access.Index())); bool old_value = false; @@ -445,9 +445,9 @@ OpResult CombineResultOp(ShardStringResults result, std::string_vie // For bitop not - we cannot accumulate OpResult RunBitOpNot(const OpArgs& op_args, string_view key) { - EngineShard* es = op_args.shard; // if we found the value, just return, if not found then skip, otherwise report an error - auto find_res = es->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_STRING); + DbSlice& db_slice = op_args.GetDbSlice(); + auto find_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STRING); if (find_res) { return GetString(find_res.value()->second); } else { @@ -463,12 +463,13 @@ OpResult RunBitOpOnShard(std::string_view op, const OpArgs& op_args if (op == NOT_OP_NAME) { return RunBitOpNot(op_args, *start); } - EngineShard* es = op_args.shard; + + DbSlice& db_slice = op_args.GetDbSlice(); BitsStrVec values; // collect all the value for this shard for (; start != end; ++start) { - auto find_res = es->db_slice().FindReadOnly(op_args.db_cntx, *start, OBJ_STRING); + auto find_res = db_slice.FindReadOnly(op_args.db_cntx, *start, OBJ_STRING); if (find_res) { values.emplace_back(GetString(find_res.value()->second)); } else { @@ -1244,7 +1245,8 @@ OpResult ReadValueBitsetAt(const OpArgs& op_args, std::string_view key, ui OpResult ReadValue(const DbContext& context, std::string_view key, EngineShard* shard) { - auto it_res = shard->db_slice().FindReadOnly(context, key, OBJ_STRING); + DbSlice& db_slice = context.GetDbSlice(shard->shard_id()); + auto it_res = db_slice.FindReadOnly(context, key, OBJ_STRING); if (!it_res.ok()) { return it_res.status(); } diff --git a/src/server/bloom_family.cc b/src/server/bloom_family.cc index df854865a..891ae2743 100644 --- a/src/server/bloom_family.cc +++ b/src/server/bloom_family.cc @@ -35,7 +35,7 @@ using AddResult = absl::InlinedVector, 4>; using ExistsResult = absl::InlinedVector; OpStatus OpReserve(const SbfParams& params, const OpArgs& op_args, string_view key) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); OpResult op_res = db_slice.AddOrFind(op_args.db_cntx, key); if (!op_res) return op_res.status(); @@ -50,7 +50,7 @@ OpStatus OpReserve(const SbfParams& params, const OpArgs& op_args, string_view k // Returns true, if item was added, false if it was already "present". OpResult OpAdd(const OpArgs& op_args, string_view key, CmdArgList items) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); OpResult op_res = db_slice.AddOrFind(op_args.db_cntx, key); if (!op_res) @@ -73,7 +73,7 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, CmdArgList ite } OpResult OpExists(const OpArgs& op_args, string_view key, CmdArgList items) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); OpResult op_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_SBF); if (!op_res) return op_res.status(); diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index 4c6b15e40..314cb0191 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -60,8 +60,9 @@ OpResult FindFirstNonEmptySingleShard(Transaction* trans, int req_obj_ty DCHECK_EQ(trans->GetUniqueShardCnt(), 1u); string key; auto cb = [&](Transaction* t, EngineShard* shard) -> Transaction::RunnableResult { - auto args = t->GetShardArgs(shard->shard_id()); - auto ff_res = FindFirstReadOnly(shard->db_slice(), t->GetDbContext(), args, req_obj_type); + ShardId sid = shard->shard_id(); + auto args = t->GetShardArgs(sid); + auto ff_res = FindFirstReadOnly(t->GetDbSlice(sid), t->GetDbContext(), args, req_obj_type); if (ff_res == OpStatus::WRONG_TYPE) return OpStatus::WRONG_TYPE; @@ -96,8 +97,9 @@ OpResult FindFirstNonEmpty(Transaction* trans, int req_obj_type) std::fill(find_res.begin(), find_res.end(), OpStatus::KEY_NOTFOUND); auto cb = [&](Transaction* t, EngineShard* shard) { - auto args = t->GetShardArgs(shard->shard_id()); - auto ff_res = FindFirstReadOnly(shard->db_slice(), t->GetDbContext(), args, req_obj_type); + ShardId sid = shard->shard_id(); + auto args = t->GetShardArgs(sid); + auto ff_res = FindFirstReadOnly(t->GetDbSlice(sid), t->GetDbContext(), args, req_obj_type); if (ff_res) { find_res[shard->shard_id()] = FFResult{ff_res->first->first.AsRef(), ff_res->second, shard->shard_id()}; @@ -339,7 +341,7 @@ OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty auto wcb = [](Transaction* t, EngineShard* shard) { return t->GetShardArgs(shard->shard_id()); }; const auto key_checker = [req_obj_type](EngineShard* owner, const DbContext& context, Transaction*, std::string_view key) -> bool { - return owner->db_slice().FindReadOnly(context, key, req_obj_type).ok(); + return context.GetDbSlice(owner->shard_id()).FindReadOnly(context, key, req_obj_type).ok(); }; auto status = trans->WaitOnWatch(limit_tp, std::move(wcb), key_checker, block_flag, pause_flag); diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 5485df232..cd67ffad7 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -618,18 +618,18 @@ void DbSlice::ActivateDb(DbIndex db_ind) { CreateDb(db_ind); } -bool DbSlice::Del(DbIndex db_ind, Iterator it) { +bool DbSlice::Del(Context cntx, Iterator it) { if (!IsValid(it)) { return false; } - auto& db = db_arr_[db_ind]; + auto& db = db_arr_[cntx.db_index]; auto obj_type = it->second.ObjType(); if (doc_del_cb_ && (obj_type == OBJ_JSON || obj_type == OBJ_HASH)) { string tmp; string_view key = it->first.GetSlice(&tmp); - doc_del_cb_(key, DbContext{db_ind, GetCurrentTimeMs()}, it->second); + doc_del_cb_(key, cntx, it->second); } fetched_items_.erase(it->first.AsRef()); PerformDeletion(it, db.get()); @@ -842,7 +842,7 @@ OpResult DbSlice::UpdateExpire(const Context& cntx, Iterator prime_it, } if (rel_msec <= 0) { // implicit - don't persist - CHECK(Del(cntx.db_index, prime_it)); + CHECK(Del(cntx, prime_it)); return -1; } else if (IsValid(expire_it) && !params.persist) { auto current = ExpireTime(expire_it); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 66b007cec..caf96eff3 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -329,7 +329,7 @@ class DbSlice { // Creates a database with index `db_ind`. If such database exists does nothing. void ActivateDb(DbIndex db_ind); - bool Del(DbIndex db_ind, Iterator it); + bool Del(Context cntx, Iterator it); constexpr static DbIndex kDbAll = 0xFFFF; diff --git a/src/server/detail/save_stages_controller.cc b/src/server/detail/save_stages_controller.cc index 0ef993c4c..2d748144c 100644 --- a/src/server/detail/save_stages_controller.cc +++ b/src/server/detail/save_stages_controller.cc @@ -254,10 +254,11 @@ void SaveStagesController::SaveDfs() { // Save shard files. auto cb = [this](Transaction* t, EngineShard* shard) { + auto& db_slice = shard->db_slice(); // a hack to avoid deadlock in Transaction::RunCallback(...) - shard->db_slice().UnlockChangeCb(); + db_slice.UnlockChangeCb(); SaveDfsSingle(shard); - shard->db_slice().LockChangeCb(); + db_slice.LockChangeCb(); return OpStatus::OK; }; trans_->ScheduleSingleHop(std::move(cb)); @@ -298,9 +299,10 @@ void SaveStagesController::SaveRdb() { auto cb = [snapshot = snapshot.get()](Transaction* t, EngineShard* shard) { // a hack to avoid deadlock in Transaction::RunCallback(...) - shard->db_slice().UnlockChangeCb(); + auto& db_slice = shard->db_slice(); + db_slice.UnlockChangeCb(); snapshot->StartInShard(shard); - shard->db_slice().LockChangeCb(); + db_slice.LockChangeCb(); return OpStatus::OK; }; trans_->ScheduleSingleHop(std::move(cb)); diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 15646766b..0d4d4fc0f 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -73,7 +73,7 @@ std::string_view SyncStateName(DflyCmd::SyncState sync_state) { struct TransactionGuard { static OpStatus ExitGuardCb(Transaction* t, EngineShard* shard) { - shard->db_slice().SetExpireAllowed(true); + t->GetDbSlice(shard->shard_id()).SetExpireAllowed(true); return OpStatus::OK; }; @@ -81,7 +81,7 @@ struct TransactionGuard { t->Execute( [disable_expirations](Transaction* t, EngineShard* shard) { if (disable_expirations) { - shard->db_slice().SetExpireAllowed(!disable_expirations); + t->GetDbSlice(shard->shard_id()).SetExpireAllowed(!disable_expirations); } return OpStatus::OK; }, @@ -455,7 +455,7 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) { void DflyCmd::Expire(CmdArgList args, ConnectionContext* cntx) { RedisReplyBuilder* rb = static_cast(cntx->reply_builder()); cntx->transaction->ScheduleSingleHop([](Transaction* t, EngineShard* shard) { - shard->db_slice().ExpireAllIfNeeded(); + t->GetDbSlice(shard->shard_id()).ExpireAllIfNeeded(); return OpStatus::OK; }); diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index a96be4b28..f036568c8 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -570,12 +570,12 @@ TEST_F(DflyEngineTest, Bug468) { resp = Run({"exec"}); ASSERT_THAT(resp, ErrArg("not an integer")); - ASSERT_FALSE(service_->IsLocked(0, "foo")); + ASSERT_FALSE(IsLocked(0, "foo")); resp = Run({"eval", "return redis.call('set', 'foo', 'bar', 'EX', 'moo')", "1", "foo"}); ASSERT_THAT(resp, ErrArg("not an integer")); - ASSERT_FALSE(service_->IsLocked(0, "foo")); + ASSERT_FALSE(IsLocked(0, "foo")); } TEST_F(DflyEngineTest, Bug496) { diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 1d9d8f1cf..3618b4751 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -286,7 +286,7 @@ void EngineShard::ForceDefrag() { bool EngineShard::DoDefrag() { // -------------------------------------------------------------------------- // NOTE: This task is running with exclusive access to the shard. - // i.e. - Since we are using shared noting access here, and all access + // i.e. - Since we are using shared nothing access here, and all access // are done using fibers, This fiber is run only when no other fiber in the // context of the controlling thread will access this shard! // -------------------------------------------------------------------------- @@ -422,15 +422,6 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time, size_t CompactObj::InitThreadLocal(shard_->memory_resource()); SmallString::InitThreadLocal(data_heap); - if (string backing_prefix = GetFlag(FLAGS_tiered_prefix); !backing_prefix.empty()) { - LOG_IF(FATAL, pb->GetKind() != ProactorBase::IOURING) - << "Only ioring based backing storage is supported. Exiting..."; - - shard_->tiered_storage_ = make_unique(&shard_->db_slice_, max_file_size); - error_code ec = shard_->tiered_storage_->Open(backing_prefix); - CHECK(!ec) << ec.message(); - } - RoundRobinSharder::Init(); shard_->shard_search_indices_.reset(new ShardDocIndices()); @@ -441,11 +432,23 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time, size_t } } +void EngineShard::InitTieredStorage(ProactorBase* pb, size_t max_file_size) { + if (string backing_prefix = GetFlag(FLAGS_tiered_prefix); !backing_prefix.empty()) { + LOG_IF(FATAL, pb->GetKind() != ProactorBase::IOURING) + << "Only ioring based backing storage is supported. Exiting..."; + + auto* shard = EngineShard::tlocal(); + shard->tiered_storage_ = make_unique(&db_slice_, max_file_size); + error_code ec = shard->tiered_storage_->Open(backing_prefix); + CHECK(!ec) << ec.message(); + } +} + void EngineShard::DestroyThreadLocal() { if (!shard_) return; - uint32_t index = shard_->db_slice_.shard_id(); + uint32_t shard_id = shard_->shard_id(); mi_heap_t* tlh = shard_->mi_resource_.heap(); shard_->Shutdown(); @@ -456,7 +459,7 @@ void EngineShard::DestroyThreadLocal() { CompactObj::InitThreadLocal(nullptr); mi_heap_delete(tlh); RoundRobinSharder::Destroy(); - VLOG(1) << "Shard reset " << index; + VLOG(1) << "Shard reset " << shard_id; } // Is called by Transaction::ExecuteAsync in order to run transaction tasks. @@ -643,12 +646,13 @@ void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) { int64_t last_stats_time = time(nullptr); while (true) { - Heartbeat(); if (fiber_periodic_done_.WaitFor(period_ms)) { VLOG(2) << "finished running engine shard periodic task"; return; } + Heartbeat(); + if (runs_global_periodic) { ++global_count; @@ -864,6 +868,12 @@ void EngineShardSet::Init(uint32_t sz, bool update_db_time) { InitThreadLocal(pb, update_db_time, max_shard_file_size); } }); + + pp_->AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) { + if (index < shard_queue_.size()) { + EngineShard::tlocal()->InitTieredStorage(pb, max_shard_file_size); + } + }); } void EngineShardSet::Shutdown() { diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 34b602843..388c9d61d 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -50,6 +50,9 @@ class EngineShard { // If update_db_time is true, initializes periodic time update for its db_slice. static void InitThreadLocal(util::ProactorBase* pb, bool update_db_time, size_t max_file_size); + // Must be called after all InitThreadLocal() have finished + void InitTieredStorage(util::ProactorBase* pb, size_t max_file_size); + static void DestroyThreadLocal(); static EngineShard* tlocal() { diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 145388fb1..1ce3d8115 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -150,7 +150,7 @@ class RdbRestoreValue : protected RdbLoaderBase { } std::optional Add(std::string_view payload, std::string_view key, - DbSlice& db_slice, DbIndex index, + DbSlice& db_slice, const DbContext& cntx, const RestoreArgs& args); private: @@ -177,7 +177,8 @@ std::optional RdbRestoreValue::Parse(std::string_view std::optional RdbRestoreValue::Add(std::string_view data, std::string_view key, DbSlice& db_slice, - DbIndex index, const RestoreArgs& args) { + const DbContext& cntx, + const RestoreArgs& args) { auto opaque_res = Parse(data); if (!opaque_res) { return std::nullopt; @@ -190,8 +191,7 @@ std::optional RdbRestoreValue::Add(std::string_view data, return std::nullopt; } - auto res = db_slice.AddNew(DbContext{index, GetCurrentTimeMs()}, key, std::move(pv), - args.ExpirationTime()); + auto res = db_slice.AddNew(cntx, key, std::move(pv), args.ExpirationTime()); res->it->first.SetSticky(args.Sticky()); if (res) { return std::move(res.value()); @@ -374,13 +374,13 @@ void Renamer::FinalizeRename() { } bool Renamer::KeyExists(Transaction* t, EngineShard* shard, std::string_view key) const { - auto& db_slice = shard->db_slice(); + auto& db_slice = t->GetDbSlice(shard->shard_id()); auto it = db_slice.FindReadOnly(t->GetDbContext(), key).it; return IsValid(it); } void Renamer::SerializeSrc(Transaction* t, EngineShard* shard) { - auto& db_slice = shard->db_slice(); + auto& db_slice = t->GetDbSlice(shard->shard_id()); auto [it, exp_it] = db_slice.FindReadOnly(t->GetDbContext(), src_key_); src_found_ = IsValid(it); @@ -399,7 +399,8 @@ void Renamer::SerializeSrc(Transaction* t, EngineShard* shard) { } OpStatus Renamer::DelSrc(Transaction* t, EngineShard* shard) { - auto res = shard->db_slice().FindMutable(t->GetDbContext(), src_key_); + auto& db_slice = t->GetDbSlice(shard->shard_id()); + auto res = db_slice.FindMutable(t->GetDbContext(), src_key_); auto& it = res.it; CHECK(IsValid(it)); @@ -407,7 +408,7 @@ OpStatus Renamer::DelSrc(Transaction* t, EngineShard* shard) { DVLOG(1) << "Rename: removing the key '" << src_key_; res.post_updater.Run(); - CHECK(shard->db_slice().Del(t->GetDbIndex(), it)); + CHECK(db_slice.Del(t->GetDbContext(), it)); if (shard->journal()) { RecordJournal(t->GetOpArgs(shard), "DEL"sv, ArgSlice{src_key_}, 2); } @@ -423,13 +424,13 @@ OpStatus Renamer::DeserializeDest(Transaction* t, EngineShard* shard) { return OpStatus::OUT_OF_RANGE; } - auto& db_slice = shard->db_slice(); + auto& db_slice = t->GetDbSlice(shard->shard_id()); auto dest_res = db_slice.FindMutable(op_args.db_cntx, dest_key_); if (dest_found_) { DVLOG(1) << "Rename: deleting the destiny key '" << dest_key_; dest_res.post_updater.Run(); - CHECK(db_slice.Del(op_args.db_cntx.db_index, dest_res.it)); + CHECK(db_slice.Del(op_args.db_cntx, dest_res.it)); } if (restore_args.Expired()) { @@ -443,8 +444,8 @@ OpStatus Renamer::DeserializeDest(Transaction* t, EngineShard* shard) { } RdbRestoreValue loader(serialized_value_.version.value()); - auto restored_dest_it = loader.Add(serialized_value_.value, dest_key_, db_slice, - op_args.db_cntx.db_index, restore_args); + auto restored_dest_it = + loader.Add(serialized_value_.value, dest_key_, db_slice, op_args.db_cntx, restore_args); if (restored_dest_it) { auto& dest_it = restored_dest_it->it; @@ -472,7 +473,7 @@ OpStatus Renamer::DeserializeDest(Transaction* t, EngineShard* shard) { } OpStatus OpPersist(const OpArgs& op_args, string_view key) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto res = db_slice.FindMutable(op_args.db_cntx, key); if (!IsValid(res.it)) { @@ -488,7 +489,7 @@ OpStatus OpPersist(const OpArgs& op_args, string_view key) { } OpResult OpDump(const OpArgs& op_args, string_view key) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto [it, expire_it] = db_slice.FindReadOnly(op_args.db_cntx, key); if (IsValid(it)) { @@ -508,7 +509,7 @@ OpResult OnRestore(const OpArgs& op_args, std::string_view key, std::strin return OpStatus::OUT_OF_RANGE; } - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); // The redis impl (see cluster.c function restoreCommand), remove the old key if // the replace option is set, so lets do the same here { @@ -518,7 +519,7 @@ OpResult OnRestore(const OpArgs& op_args, std::string_view key, std::strin VLOG(1) << "restore command is running with replace, found old key '" << key << "' and removing it"; res.post_updater.Run(); - CHECK(db_slice.Del(op_args.db_cntx.db_index, res.it)); + CHECK(db_slice.Del(op_args.db_cntx, res.it)); } } else { // we are not allowed to replace it, so make sure it doesn't exist @@ -534,13 +535,13 @@ OpResult OnRestore(const OpArgs& op_args, std::string_view key, std::strin } RdbRestoreValue loader(rdb_version); - auto res = loader.Add(payload, key, db_slice, op_args.db_cntx.db_index, restore_args); + auto res = loader.Add(payload, key, db_slice, op_args.db_cntx, restore_args); return res.has_value(); } bool ScanCb(const OpArgs& op_args, PrimeIterator prime_it, const ScanOpts& opts, string* scratch, StringVec* res) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); DbSlice::Iterator it = DbSlice::Iterator::FromPrime(prime_it); if (prime_it->second.HasExpire()) { @@ -569,7 +570,7 @@ bool ScanCb(const OpArgs& op_args, PrimeIterator prime_it, const ScanOpts& opts, } void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, StringVec* vec) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index)); unsigned cnt = 0; @@ -643,7 +644,7 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys } OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireParams& params) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto find_res = db_slice.FindMutable(op_args.db_cntx, key); if (!IsValid(find_res.it)) { return OpStatus::KEY_NOTFOUND; @@ -670,7 +671,7 @@ OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireP // returns -2 if the key was not found, -3 if the field was not found, // -1 if ttl on the field was not found. OpResult OpFieldTtl(Transaction* t, EngineShard* shard, string_view key, string_view field) { - auto& db_slice = shard->db_slice(); + auto& db_slice = t->GetDbSlice(shard->shard_id()); const DbContext& db_cntx = t->GetDbContext(); auto [it, expire_it] = db_slice.FindReadOnly(db_cntx, key); if (!IsValid(it)) @@ -691,7 +692,7 @@ OpResult OpFieldTtl(Transaction* t, EngineShard* shard, string_view key, s OpResult OpDel(const OpArgs& op_args, const ShardArgs& keys) { DVLOG(1) << "Del: " << keys.Front(); - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); uint32_t res = 0; @@ -700,7 +701,7 @@ OpResult OpDel(const OpArgs& op_args, const ShardArgs& keys) { if (!IsValid(fres.it)) continue; fres.post_updater.Run(); - res += int(db_slice.Del(op_args.db_cntx.db_index, fres.it)); + res += int(db_slice.Del(op_args.db_cntx, fres.it)); } return res; @@ -709,7 +710,7 @@ OpResult OpDel(const OpArgs& op_args, const ShardArgs& keys) { OpResult OpStick(const OpArgs& op_args, const ShardArgs& keys) { DVLOG(1) << "Stick: " << keys.Front(); - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); uint32_t res = 0; for (string_view key : keys) { @@ -1094,7 +1095,7 @@ OpResultTyped OpFetchSortEntries(const OpArgs& op_args, std::stri bool alpha) { using namespace container_utils; - auto it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key).it; + auto it = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key).it; if (!IsValid(it) || !IsContainer(it->second)) { return OpStatus::KEY_NOTFOUND; } @@ -1469,7 +1470,7 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) { } OpResult GenericFamily::OpTtl(Transaction* t, EngineShard* shard, string_view key) { - auto& db_slice = shard->db_slice(); + auto& db_slice = t->GetDbSlice(shard->shard_id()); auto [it, expire_it] = db_slice.FindReadOnly(t->GetDbContext(), key); if (!IsValid(it)) return OpStatus::KEY_NOTFOUND; @@ -1484,7 +1485,7 @@ OpResult GenericFamily::OpTtl(Transaction* t, EngineShard* shard, stri OpResult GenericFamily::OpExists(const OpArgs& op_args, const ShardArgs& keys) { DVLOG(1) << "Exists: " << keys.Front(); - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); uint32_t res = 0; for (string_view key : keys) { @@ -1497,7 +1498,7 @@ OpResult GenericFamily::OpExists(const OpArgs& op_args, const ShardArg OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, string_view to_key, bool destination_should_not_exist) { auto* es = op_args.shard; - auto& db_slice = es->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto from_res = db_slice.FindMutable(op_args.db_cntx, from_key); if (!IsValid(from_res.it)) return OpStatus::KEY_NOTFOUND; @@ -1535,13 +1536,13 @@ OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, to_res.post_updater.Run(); from_res.post_updater.Run(); - CHECK(db_slice.Del(op_args.db_cntx.db_index, from_res.it)); + CHECK(db_slice.Del(op_args.db_cntx, from_res.it)); } else { // Here we first delete from_it because AddNew below could invalidate from_it. // On the other hand, AddNew does not rely on the iterators - this is why we keep // the value in `from_obj`. from_res.post_updater.Run(); - CHECK(db_slice.Del(op_args.db_cntx.db_index, from_res.it)); + CHECK(db_slice.Del(op_args.db_cntx, from_res.it)); auto op_result = db_slice.AddNew(op_args.db_cntx, to_key, std::move(from_obj), exp_ts); RETURN_ON_BAD_STATUS(op_result); to_res = std::move(*op_result); @@ -1558,7 +1559,7 @@ OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, // as a global transaction. // TODO: Allow running OpMove without a global transaction. OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex target_db) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); // Fetch value at key in current db. auto from_res = db_slice.FindMutable(op_args.db_cntx, key); @@ -1583,7 +1584,7 @@ OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex t // Restore expire flag after std::move. from_res.it->second.SetExpire(IsValid(from_res.exp_it)); - CHECK(db_slice.Del(op_args.db_cntx.db_index, from_res.it)); + CHECK(db_slice.Del(op_args.db_cntx, from_res.it)); auto op_result = db_slice.AddNew(target_cntx, key, std::move(from_obj), exp_ts); RETURN_ON_BAD_STATUS(op_result); auto& add_res = *op_result; diff --git a/src/server/hll_family.cc b/src/server/hll_family.cc index 0bb88e4e6..27d59e342 100644 --- a/src/server/hll_family.cc +++ b/src/server/hll_family.cc @@ -67,7 +67,7 @@ void ConvertToDenseIfNeeded(string* hll) { } OpResult AddToHll(const OpArgs& op_args, string_view key, CmdArgList values) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); string hll; @@ -138,7 +138,7 @@ void PFAdd(CmdArgList args, ConnectionContext* cntx) { } OpResult CountHllsSingle(const OpArgs& op_args, string_view key) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STRING); if (it.ok()) { @@ -173,7 +173,7 @@ OpResult> ReadValues(const OpArgs& op_args, const ShardArgs& keys try { vector values; for (string_view key : keys) { - auto it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_STRING); + auto it = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_STRING); if (it.ok()) { string hll; it.value()->second.GetString(&hll); @@ -280,7 +280,7 @@ OpResult PFMergeInternal(CmdArgList args, ConnectionContext* cntx) { auto set_cb = [&](Transaction* t, EngineShard* shard) { string_view key = ArgS(args, 0); const OpArgs& op_args = t->GetOpArgs(shard); - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto op_res = db_slice.AddOrFind(t->GetDbContext(), key); RETURN_ON_BAD_STATUS(op_res); auto& res = *op_res; diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 7f0cb9151..fb1ad8558 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -166,7 +166,7 @@ OpStatus IncrementValue(optional prev_val, IncrByParam* param) { }; OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, IncrByParam* param) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto op_res = db_slice.AddOrFind(op_args.db_cntx, key); RETURN_ON_BAD_STATUS(op_res); if (!op_res) { @@ -280,7 +280,7 @@ OpResult OpScan(const OpArgs& op_args, std::string_view key, uint64_t * of returning no or very few elements. (taken from redis code at db.c line 904 */ constexpr size_t INTERATION_FACTOR = 10; - auto find_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_HASH); + auto find_res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_HASH); if (!find_res) { DVLOG(1) << "ScanOp: find failed: " << find_res << ", baling out"; @@ -343,7 +343,7 @@ OpResult OpScan(const OpArgs& op_args, std::string_view key, uint64_t OpResult OpDel(const OpArgs& op_args, string_view key, CmdArgList values) { DCHECK(!values.empty()); - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_HASH); if (!it_res) @@ -397,7 +397,7 @@ OpResult OpDel(const OpArgs& op_args, string_view key, CmdArgList valu if (enc == kEncodingListPack) { stats->listpack_blob_cnt--; } - db_slice.Del(op_args.db_cntx.db_index, it_res->it); + db_slice.Del(op_args.db_cntx, it_res->it); } else if (enc == kEncodingListPack) { stats->listpack_bytes += lpBytes((uint8_t*)pv.RObjPtr()); } @@ -408,7 +408,7 @@ OpResult OpDel(const OpArgs& op_args, string_view key, CmdArgList valu OpResult> OpHMGet(const OpArgs& op_args, std::string_view key, CmdArgList fields) { DCHECK(!fields.empty()); - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH); if (!it_res) @@ -466,7 +466,7 @@ OpResult> OpHMGet(const OpArgs& op_args, std::string_view key, Cm } OpResult OpLen(const OpArgs& op_args, string_view key) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH); if (it_res) { @@ -479,7 +479,7 @@ OpResult OpLen(const OpArgs& op_args, string_view key) { } OpResult OpExist(const OpArgs& op_args, string_view key, string_view field) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH); if (!it_res) { @@ -503,7 +503,7 @@ OpResult OpExist(const OpArgs& op_args, string_view key, string_view field) }; OpResult OpGet(const OpArgs& op_args, string_view key, string_view field) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH); if (!it_res) return it_res.status(); @@ -531,7 +531,7 @@ OpResult OpGet(const OpArgs& op_args, string_view key, string_view field } OpResult> OpGetAll(const OpArgs& op_args, string_view key, uint8_t mask) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH); if (!it_res) { if (it_res.status() == OpStatus::KEY_NOTFOUND) @@ -582,7 +582,7 @@ OpResult> OpGetAll(const OpArgs& op_args, string_view key, uint8_ } OpResult OpStrLen(const OpArgs& op_args, string_view key, string_view field) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH); if (!it_res) { @@ -617,7 +617,7 @@ OpResult OpSet(const OpArgs& op_args, string_view key, CmdArgList valu DCHECK(!values.empty() && 0 == values.size() % 2); VLOG(2) << "OpSet(" << key << ")"; - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto op_res = db_slice.AddOrFind(op_args.db_cntx, key); RETURN_ON_BAD_STATUS(op_res); auto& add_res = *op_res; @@ -1094,7 +1094,7 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) { if (string_map->Empty()) { auto it_mutable = db_slice.FindMutable(db_context, key, OBJ_HASH); it_mutable->post_updater.Run(); - db_slice.Del(db_context.db_index, it_mutable->it); + db_slice.Del(db_context, it_mutable->it); return facade::OpStatus::KEY_NOTFOUND; } } else if (pv.Encoding() == kEncodingListPack) { diff --git a/src/server/json_family.cc b/src/server/json_family.cc index 2330df0a1..89d06e65e 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -131,7 +131,7 @@ inline JsonType Evaluate(const json::Path& expr, const JsonType& obj) { } facade::OpStatus SetJson(const OpArgs& op_args, string_view key, JsonType&& value) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto op_res = db_slice.AddOrFind(op_args.db_cntx, key); RETURN_ON_BAD_STATUS(op_res); @@ -244,7 +244,7 @@ error_code JsonReplace(JsonType& instance, string_view path, json::MutateCallbac // jsoncons version OpStatus UpdateEntry(const OpArgs& op_args, std::string_view key, std::string_view path, json::MutateCallback callback, JsonReplaceVerify verify_op = {}) { - auto it_res = op_args.shard->db_slice().FindMutable(op_args.db_cntx, key, OBJ_JSON); + auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON); if (!it_res.ok()) { return it_res.status(); } @@ -278,7 +278,7 @@ OpStatus UpdateEntry(const OpArgs& op_args, std::string_view key, std::string_vi // json::Path version. OpStatus UpdateEntry(const OpArgs& op_args, string_view key, const json::Path& path, json::MutateCallback cb) { - auto it_res = op_args.shard->db_slice().FindMutable(op_args.db_cntx, key, OBJ_JSON); + auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON); if (!it_res.ok()) { return it_res.status(); } @@ -294,7 +294,7 @@ OpStatus UpdateEntry(const OpArgs& op_args, string_view key, const json::Path& p } OpResult GetJson(const OpArgs& op_args, string_view key) { - auto it_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_JSON); + auto it_res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_JSON); if (!it_res.ok()) return it_res.status(); @@ -753,9 +753,9 @@ OpResult OpDoubleArithmetic(const OpArgs& op_args, string_view key, stri OpResult OpDel(const OpArgs& op_args, string_view key, string_view path, optional expression) { if (!expression || path.empty()) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately - return long(db_slice.Del(op_args.db_cntx.db_index, it)); + return long(db_slice.Del(op_args.db_cntx, it)); } OpResult result = GetJson(op_args, key); @@ -1200,7 +1200,7 @@ vector OpJsonMGet(const JsonPathV2& expression, const Transaction* t, DCHECK(!args.Empty()); vector response(args.Size()); - auto& db_slice = shard->db_slice(); + auto& db_slice = t->GetDbSlice(shard->shard_id()); unsigned index = 0; for (string_view key : args) { auto it_res = db_slice.FindReadOnly(t->GetDbContext(), key, OBJ_JSON); @@ -1288,7 +1288,7 @@ OpResult OpSet(const OpArgs& op_args, string_view key, string_view path, // and its not JSON, it would return an error. if (path == "." || path == "$") { if (is_nx_condition || is_xx_condition) { - auto it_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_JSON); + auto it_res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_JSON); bool key_exists = (it_res.status() != OpStatus::KEY_NOTFOUND); if (is_nx_condition && key_exists) { return false; @@ -1417,7 +1417,7 @@ OpStatus OpMerge(const OpArgs& op_args, string_view key, std::string_view json_s return OpStatus::SYNTAX_ERR; } - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_JSON); if (it_res.ok()) { op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, it_res->it->second); diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 3e296e8f1..ee518fc45 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -178,7 +178,7 @@ class BPopPusher { std::string OpBPop(Transaction* t, EngineShard* shard, std::string_view key, ListDir dir) { DVLOG(2) << "popping from " << key << " " << t->DebugId(); - auto& db_slice = shard->db_slice(); + auto& db_slice = t->GetDbSlice(shard->shard_id()); auto it_res = db_slice.FindMutable(t->GetDbContext(), key, OBJ_LIST); if (!it_res) { @@ -206,14 +206,15 @@ std::string OpBPop(Transaction* t, EngineShard* shard, std::string_view key, Lis std::string value = ListPop(dir, ql); it_res->post_updater.Run(); + OpArgs op_args = t->GetOpArgs(shard); if (quicklistCount(ql) == 0) { DVLOG(1) << "deleting key " << key << " " << t->DebugId(); absl::StrAppend(debugMessages.Next(), "OpBPop Del: ", key, " by ", t->DebugId()); - CHECK(shard->db_slice().Del(t->GetDbIndex(), it)); + CHECK(op_args.GetDbSlice().Del(op_args.db_cntx, it)); } - if (OpArgs op_args = t->GetOpArgs(shard); op_args.shard->journal()) { + if (op_args.shard->journal()) { string command = dir == ListDir::LEFT ? "LPOP" : "RPOP"; RecordJournal(op_args, command, ArgSlice{key}, 1); } @@ -223,7 +224,7 @@ std::string OpBPop(Transaction* t, EngineShard* shard, std::string_view key, Lis OpResult OpMoveSingleShard(const OpArgs& op_args, string_view src, string_view dest, ListDir src_dir, ListDir dest_dir) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto src_res = db_slice.FindMutable(op_args.db_cntx, src, OBJ_LIST); if (!src_res) return src_res.status(); @@ -271,7 +272,7 @@ OpResult OpMoveSingleShard(const OpArgs& op_args, string_view src, strin dest_res.post_updater.Run(); if (quicklistCount(src_ql) == 0) { - CHECK(db_slice.Del(op_args.db_cntx.db_index, src_it)); + CHECK(db_slice.Del(op_args.db_cntx, src_it)); } return val; @@ -280,7 +281,7 @@ OpResult OpMoveSingleShard(const OpArgs& op_args, string_view src, strin // Read-only peek operation that determines whether the list exists and optionally // returns the first from left/right value without popping it from the list. OpResult Peek(const OpArgs& op_args, string_view key, ListDir dir, bool fetch) { - auto it_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST); + auto it_res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST); if (!it_res) { return it_res.status(); } @@ -307,12 +308,12 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d DbSlice::AddOrFindResult res; if (skip_notexist) { - auto tmp_res = es->db_slice().FindMutable(op_args.db_cntx, key, OBJ_LIST); + auto tmp_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_LIST); if (!tmp_res) return 0; // Redis returns 0 for nonexisting keys for the *PUSHX actions. res = std::move(*tmp_res); } else { - auto op_res = es->db_slice().AddOrFind(op_args.db_cntx, key); + auto op_res = op_args.GetDbSlice().AddOrFind(op_args.db_cntx, key); RETURN_ON_BAD_STATUS(op_res); res = std::move(*op_res); } @@ -363,7 +364,7 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d OpResult OpPop(const OpArgs& op_args, string_view key, ListDir dir, uint32_t count, bool return_results, bool journal_rewrite) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); @@ -391,7 +392,7 @@ OpResult OpPop(const OpArgs& op_args, string_view key, ListDir dir, u if (quicklistCount(ql) == 0) { absl::StrAppend(debugMessages.Next(), "OpPop Del: ", key, " by ", op_args.tx->DebugId()); - CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); + CHECK(db_slice.Del(op_args.db_cntx, it)); } if (op_args.shard->journal() && journal_rewrite) { @@ -464,7 +465,7 @@ OpResult MoveTwoShards(Transaction* trans, string_view src, string_view } OpResult OpLen(const OpArgs& op_args, std::string_view key) { - auto res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST); + auto res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST); if (!res) return res.status(); @@ -474,7 +475,7 @@ OpResult OpLen(const OpArgs& op_args, std::string_view key) { } OpResult OpIndex(const OpArgs& op_args, std::string_view key, long index) { - auto res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST); + auto res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST); if (!res) return res.status(); quicklist* ql = GetQL(res.value()->second); @@ -498,7 +499,7 @@ OpResult OpIndex(const OpArgs& op_args, std::string_view key, long index OpResult> OpPos(const OpArgs& op_args, std::string_view key, std::string_view element, int rank, int count, int max_len) { - auto it_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST); + auto it_res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST); if (!it_res.ok()) return it_res.status(); @@ -541,7 +542,7 @@ OpResult> OpPos(const OpArgs& op_args, std::string_view key, OpResult OpInsert(const OpArgs& op_args, string_view key, string_view pivot, string_view elem, InsertParam insert_param) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); @@ -573,7 +574,7 @@ OpResult OpInsert(const OpArgs& op_args, string_view key, string_view pivot } OpResult OpRem(const OpArgs& op_args, string_view key, string_view elem, long count) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); @@ -608,14 +609,14 @@ OpResult OpRem(const OpArgs& op_args, string_view key, string_view ele quicklistReleaseIterator(qiter); if (quicklistCount(ql) == 0) { - CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); + CHECK(db_slice.Del(op_args.db_cntx, it)); } return removed; } OpStatus OpSet(const OpArgs& op_args, string_view key, string_view elem, long index) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); @@ -632,7 +633,7 @@ OpStatus OpSet(const OpArgs& op_args, string_view key, string_view elem, long in } OpStatus OpTrim(const OpArgs& op_args, string_view key, long start, long end) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); @@ -670,13 +671,13 @@ OpStatus OpTrim(const OpArgs& op_args, string_view key, long start, long end) { it_res->post_updater.Run(); if (quicklistCount(ql) == 0) { - CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); + CHECK(db_slice.Del(op_args.db_cntx, it)); } return OpStatus::OK; } OpResult OpRange(const OpArgs& op_args, std::string_view key, long start, long end) { - auto res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST); + auto res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST); if (!res) return res.status(); @@ -876,7 +877,7 @@ OpResult BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) { const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*, std::string_view key) -> bool { - return owner->db_slice().FindReadOnly(context, key, OBJ_LIST).ok(); + return context.GetDbSlice(owner->shard_id()).FindReadOnly(context, key, OBJ_LIST).ok(); }; // Block auto status = t->WaitOnWatch(tp, std::move(wcb), key_checker, &(cntx->blocked), &(cntx->paused)); @@ -907,7 +908,7 @@ OpResult BPopPusher::RunPair(ConnectionContext* cntx, time_point tp) { const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*, std::string_view key) -> bool { - return owner->db_slice().FindReadOnly(context, key, OBJ_LIST).ok(); + return context.GetDbSlice(owner->shard_id()).FindReadOnly(context, key, OBJ_LIST).ok(); }; if (auto status = t->WaitOnWatch(tp, std::move(wcb), key_checker, &cntx->blocked, &cntx->paused); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index ed74e83e2..3e2d29c23 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -813,6 +813,7 @@ Service::Service(ProactorPool* pp) }); #endif + CHECK(shard_set == nullptr); shard_set = new EngineShardSet(pp); // We support less than 1024 threads and we support less than 1024 shards. @@ -1284,7 +1285,7 @@ OpResult OpTrackKeys(const OpArgs slice_args, const facade::Connection::We DVLOG(2) << "Start tracking keys for client ID: " << conn_ref.GetClientId() << " with thread ID: " << conn_ref.Thread(); - auto& db_slice = slice_args.shard->db_slice(); + auto& db_slice = slice_args.GetDbSlice(); // TODO: There is a bug here that we track all arguments instead of tracking only keys. for (auto key : args) { DVLOG(2) << "Inserting client ID " << conn_ref.GetClientId() @@ -1659,9 +1660,10 @@ void Service::Watch(CmdArgList args, ConnectionContext* cntx) { atomic_uint32_t keys_existed = 0; auto cb = [&](Transaction* t, EngineShard* shard) { - ShardArgs largs = t->GetShardArgs(shard->shard_id()); + ShardId shard_id = shard->shard_id(); + ShardArgs largs = t->GetShardArgs(shard_id); for (auto k : largs) { - shard->db_slice().RegisterWatchedKey(cntx->db_index(), k, &exec_info); + t->GetDbSlice(shard_id).RegisterWatchedKey(cntx->db_index(), k, &exec_info); } auto res = GenericFamily::OpExists(t->GetOpArgs(shard), largs); @@ -2131,8 +2133,10 @@ CmdArgVec CollectAllKeys(ConnectionState::ExecInfo* exec_info) { } // Return true if transaction was scheduled, false if scheduling was not required. -void StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo* exec_info, +void StartMultiExec(ConnectionContext* cntx, ConnectionState::ExecInfo* exec_info, Transaction::MultiMode multi_mode) { + auto trans = cntx->transaction; + auto dbid = cntx->db_index(); switch (multi_mode) { case Transaction::GLOBAL: trans->StartMultiGlobal(dbid); @@ -2189,7 +2193,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { bool scheduled = false; if (multi_mode != Transaction::NOT_DETERMINED) { - StartMultiExec(cntx->db_index(), cntx->transaction, &exec_info, multi_mode); + StartMultiExec(cntx, &exec_info, multi_mode); scheduled = true; } diff --git a/src/server/memory_cmd.cc b/src/server/memory_cmd.cc index 4bdee2582..91c9b25f5 100644 --- a/src/server/memory_cmd.cc +++ b/src/server/memory_cmd.cc @@ -24,6 +24,7 @@ #include "server/server_family.h" #include "server/server_state.h" #include "server/snapshot.h" +#include "server/transaction.h" using namespace std; using namespace facade; diff --git a/src/server/search/doc_index.cc b/src/server/search/doc_index.cc index ff34a4ce6..7daf0cca8 100644 --- a/src/server/search/doc_index.cc +++ b/src/server/search/doc_index.cc @@ -22,7 +22,7 @@ namespace { template void TraverseAllMatching(const DocIndex& index, const OpArgs& op_args, F&& f) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index)); auto [prime_table, _] = db_slice.GetTables(op_args.db_cntx.db_index); @@ -202,7 +202,7 @@ bool ShardDocIndex::Matches(string_view key, unsigned obj_code) const { SearchResult ShardDocIndex::Search(const OpArgs& op_args, const SearchParams& params, search::SearchAlgorithm* search_algo) const { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto search_results = search_algo->Search(&indices_, params.limit_offset + params.limit_total); if (!search_results.error.empty()) @@ -235,7 +235,7 @@ SearchResult ShardDocIndex::Search(const OpArgs& op_args, const SearchParams& pa vector> ShardDocIndex::SearchForAggregator( const OpArgs& op_args, ArgSlice load_fields, search::SearchAlgorithm* search_algo) const { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto search_results = search_algo->Search(&indices_); if (!search_results.error.empty()) @@ -287,7 +287,7 @@ void ShardDocIndices::InitIndex(const OpArgs& op_args, std::string_view name, if (ServerState::tlocal()->gstate() == GlobalState::ACTIVE) it->second->Rebuild(op_args, &local_mr_); - op_args.shard->db_slice().SetDocDeletionCallback( + op_args.GetDbSlice().SetDocDeletionCallback( [this](string_view key, const DbContext& cntx, const PrimeValue& pv) { RemoveDoc(key, cntx, pv); }); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index b034acf36..a5f5e760b 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1533,7 +1533,7 @@ error_code ServerFamily::Drakarys(Transaction* transaction, DbIndex db_ind) { transaction->Execute( [db_ind](Transaction* t, EngineShard* shard) { - shard->db_slice().FlushDb(db_ind); + t->GetDbSlice(shard->shard_id()).FlushDb(db_ind); return OpStatus::OK; }, true); diff --git a/src/server/set_family.cc b/src/server/set_family.cc index b5369d797..d8ee8b2d3 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -469,8 +469,7 @@ SvArray ToSvArray(const absl::flat_hash_set& set) { // if overwrite is true then OpAdd writes vals into the key and discards its previous value. OpResult OpAdd(const OpArgs& op_args, std::string_view key, const NewEntries& vals, bool overwrite, bool journal_update) { - auto* es = op_args.shard; - auto& db_slice = es->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto vals_it = EntriesRange(vals); VLOG(2) << "OpAdd(" << key << ")"; @@ -480,7 +479,7 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, const NewE // key if it exists. if (overwrite && (vals_it.begin() == vals_it.end())) { auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately - db_slice.Del(op_args.db_cntx.db_index, it); + db_slice.Del(op_args.db_cntx, it); if (journal_update && op_args.shard->journal()) { RecordJournal(op_args, "DEL"sv, ArgSlice{key}); } @@ -555,8 +554,7 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, const NewE OpResult OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_sec, const NewEntries& vals) { - auto* es = op_args.shard; - auto& db_slice = es->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto op_res = db_slice.AddOrFind(op_args.db_cntx, key); RETURN_ON_BAD_STATUS(op_res); @@ -591,8 +589,7 @@ OpResult OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_ OpResult OpRem(const OpArgs& op_args, string_view key, facade::ArgRange vals, bool journal_rewrite) { - auto* es = op_args.shard; - auto& db_slice = es->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto find_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_SET); if (!find_res) { return find_res.status(); @@ -604,7 +601,7 @@ OpResult OpRem(const OpArgs& op_args, string_view key, facade::ArgRang find_res->post_updater.Run(); if (isempty) { - CHECK(db_slice.Del(op_args.db_cntx.db_index, find_res->it)); + CHECK(db_slice.Del(op_args.db_cntx, find_res->it)); } if (journal_rewrite && op_args.shard->journal()) { vector mapped(vals.Size() + 1); @@ -638,6 +635,7 @@ class Mover { }; OpStatus Mover::OpFind(Transaction* t, EngineShard* es) { + auto& db_slice = t->GetDbSlice(es->shard_id()); ShardArgs largs = t->GetShardArgs(es->shard_id()); // In case both src and dest are in the same shard, largs size will be 2. @@ -645,7 +643,7 @@ OpStatus Mover::OpFind(Transaction* t, EngineShard* es) { for (auto k : largs) { unsigned index = (k == src_) ? 0 : 1; - auto res = es->db_slice().FindReadOnly(t->GetDbContext(), k, OBJ_SET); + auto res = db_slice.FindReadOnly(t->GetDbContext(), k, OBJ_SET); if (res && index == 0) { // successful src find. DCHECK(!res->is_done()); const CompactObj& val = res.value()->second; @@ -713,7 +711,7 @@ OpResult OpUnion(const OpArgs& op_args, ShardArgs::Iterator start, absl::flat_hash_set uniques; for (; start != end; ++start) { - auto find_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, *start, OBJ_SET); + auto find_res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, *start, OBJ_SET); if (find_res) { const PrimeValue& pv = find_res.value()->second; if (IsDenseEncoding(pv)) { @@ -738,10 +736,10 @@ OpResult OpUnion(const OpArgs& op_args, ShardArgs::Iterator start, // Read-only OpDiff op on sets. OpResult OpDiff(const OpArgs& op_args, ShardArgs::Iterator start, ShardArgs::Iterator end) { + auto& db_slice = op_args.GetDbSlice(); DCHECK(start != end); DVLOG(1) << "OpDiff from " << *start; - EngineShard* es = op_args.shard; - auto find_res = es->db_slice().FindReadOnly(op_args.db_cntx, *start, OBJ_SET); + auto find_res = db_slice.FindReadOnly(op_args.db_cntx, *start, OBJ_SET); if (!find_res) { return find_res.status(); @@ -762,7 +760,7 @@ OpResult OpDiff(const OpArgs& op_args, ShardArgs::Iterator start, DCHECK(!uniques.empty()); // otherwise the key would not exist. for (++start; start != end; ++start) { - auto diff_res = es->db_slice().FindReadOnly(op_args.db_cntx, *start, OBJ_SET); + auto diff_res = db_slice.FindReadOnly(op_args.db_cntx, *start, OBJ_SET); if (!diff_res) { if (diff_res.status() == OpStatus::WRONG_TYPE) { return OpStatus::WRONG_TYPE; @@ -791,6 +789,7 @@ OpResult OpDiff(const OpArgs& op_args, ShardArgs::Iterator start, // Read-only OpInter op on sets. OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_first) { + auto& db_slice = t->GetDbSlice(es->shard_id()); ShardArgs args = t->GetShardArgs(es->shard_id()); auto it = args.begin(); if (remove_first) { @@ -800,7 +799,7 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f StringVec result; if (args.Size() == 1 + unsigned(remove_first)) { - auto find_res = es->db_slice().FindReadOnly(t->GetDbContext(), *it, OBJ_SET); + auto find_res = db_slice.FindReadOnly(t->GetDbContext(), *it, OBJ_SET); if (!find_res) return find_res.status(); @@ -824,7 +823,7 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f unsigned index = 0; for (; it != args.end(); ++it) { auto& dest = sets[index++]; - auto find_res = es->db_slice().FindReadOnly(t->GetDbContext(), *it, OBJ_SET); + auto find_res = db_slice.FindReadOnly(t->GetDbContext(), *it, OBJ_SET); if (!find_res) { if (status == OpStatus::OK || status == OpStatus::KEY_NOTFOUND || find_res.status() != OpStatus::KEY_NOTFOUND) { @@ -872,7 +871,7 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f } OpResult OpRandMember(const OpArgs& op_args, std::string_view key, int count) { - auto find_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_SET); + auto find_res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_SET); if (!find_res) return find_res.status(); @@ -897,7 +896,7 @@ OpResult OpRandMember(const OpArgs& op_args, std::string_view key, in // count - how many elements to pop. OpResult OpPop(const OpArgs& op_args, string_view key, unsigned count) { auto& db_cntx = op_args.db_cntx; - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto find_res = db_slice.FindMutable(db_cntx, key, OBJ_SET); if (!find_res) { return find_res.status(); @@ -927,7 +926,7 @@ OpResult OpPop(const OpArgs& op_args, string_view key, unsigned count // Delete the set as it is now empty find_res->post_updater.Run(); - CHECK(db_slice.Del(op_args.db_cntx.db_index, find_res->it)); + CHECK(db_slice.Del(op_args.db_cntx, find_res->it)); // Replicate as DEL. if (op_args.shard->journal()) { @@ -963,7 +962,7 @@ OpResult OpPop(const OpArgs& op_args, string_view key, unsigned count OpResult OpScan(const OpArgs& op_args, string_view key, uint64_t* cursor, const ScanOpts& scan_op) { - auto find_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_SET); + auto find_res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_SET); if (!find_res) return find_res.status(); @@ -1010,7 +1009,7 @@ void SIsMember(CmdArgList args, ConnectionContext* cntx) { string_view val = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { - auto find_res = shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_SET); + auto find_res = t->GetDbSlice(shard->shard_id()).FindReadOnly(t->GetDbContext(), key, OBJ_SET); if (find_res) { SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()}; @@ -1037,7 +1036,7 @@ void SMIsMember(CmdArgList args, ConnectionContext* cntx) { memberships.reserve(vals.size()); auto cb = [&](Transaction* t, EngineShard* shard) { - auto find_res = shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_SET); + auto find_res = t->GetDbSlice(shard->shard_id()).FindReadOnly(t->GetDbContext(), key, OBJ_SET); if (find_res) { SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()}; FindInSet(memberships, t->GetDbContext(), st, vals); @@ -1097,7 +1096,7 @@ void SCard(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - auto find_res = shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_SET); + auto find_res = t->GetDbSlice(shard->shard_id()).FindReadOnly(t->GetDbContext(), key, OBJ_SET); if (!find_res) { return find_res.status(); } diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index e17b31f71..afcb05d89 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -605,7 +605,7 @@ int StreamTrim(const AddTrimOpts& opts, stream* s) { OpResult OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgList args) { DCHECK(!args.empty() && args.size() % 2 == 0); - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); DbSlice::AddOrFindResult add_res; if (opts.no_mkstream) { @@ -657,7 +657,7 @@ OpResult OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgL } OpResult OpRange(const OpArgs& op_args, string_view key, const RangeOpts& opts) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto res_it = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -801,7 +801,7 @@ stream* GetReadOnlyStream(const CompactObj& cobj) { OpResult>> OpLastIDs(const OpArgs& op_args, const ShardArgs& args) { DCHECK(!args.Empty()); - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); vector> last_ids; for (string_view key : args) { @@ -866,7 +866,7 @@ vector OpRead(const OpArgs& op_args, const ShardArgs& shard_args, con } OpResult OpLen(const OpArgs& op_args, string_view key) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto res_it = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -877,7 +877,7 @@ OpResult OpLen(const OpArgs& op_args, string_view key) { OpResult> OpListGroups(const DbContext& db_cntx, string_view key, EngineShard* shard) { - auto& db_slice = shard->db_slice(); + auto& db_slice = db_cntx.GetDbSlice(shard->shard_id()); auto res_it = db_slice.FindReadOnly(db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -1009,7 +1009,7 @@ void GetConsumers(stream* s, streamCG* cg, long long count, GroupInfo* ginfo) { OpResult OpStreams(const DbContext& db_cntx, string_view key, EngineShard* shard, int full, size_t count) { - auto& db_slice = shard->db_slice(); + auto& db_slice = db_cntx.GetDbSlice(shard->shard_id()); auto res_it = db_slice.FindReadOnly(db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -1072,7 +1072,7 @@ OpResult OpStreams(const DbContext& db_cntx, string_view key, Engine OpResult> OpConsumers(const DbContext& db_cntx, EngineShard* shard, string_view stream_name, string_view group_name) { - auto& db_slice = shard->db_slice(); + auto& db_slice = db_cntx.GetDbSlice(shard->shard_id()); auto res_it = db_slice.FindReadOnly(db_cntx, stream_name, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -1116,8 +1116,7 @@ struct CreateOpts { }; OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts) { - auto* shard = op_args.shard; - auto& db_slice = shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM); int64_t entries_read = SCG_INVALID_ENTRIES_READ; if (!res_it) { @@ -1164,7 +1163,7 @@ struct FindGroupResult { OpResult FindGroup(const OpArgs& op_args, string_view key, string_view gname) { auto* shard = op_args.shard; - auto& db_slice = shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -1453,8 +1452,7 @@ OpStatus OpSetId(const OpArgs& op_args, string_view key, string_view gname, stri } OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) { - auto* shard = op_args.shard; - auto& db_slice = shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -1492,8 +1490,7 @@ OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) { } OpResult OpDel(const OpArgs& op_args, string_view key, absl::Span ids) { - auto* shard = op_args.shard; - auto& db_slice = shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -1922,8 +1919,7 @@ void XGroupHelp(CmdArgList args, ConnectionContext* cntx) { } OpResult OpTrim(const OpArgs& op_args, const AddTrimOpts& opts) { - auto* shard = op_args.shard; - auto& db_slice = shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto res_it = db_slice.FindMutable(op_args.db_cntx, opts.key, OBJ_STREAM); if (!res_it) { if (res_it.status() == OpStatus::KEY_NOTFOUND) { @@ -2900,7 +2896,8 @@ void XReadBlock(ReadOpts* opts, ConnectionContext* cntx) { const auto key_checker = [&opts](EngineShard* owner, const DbContext& context, Transaction* tx, std::string_view key) -> bool { - auto res_it = owner->db_slice().FindReadOnly(context, key, OBJ_STREAM); + auto& db_slice = context.GetDbSlice(owner->shard_id()); + auto res_it = db_slice.FindReadOnly(context, key, OBJ_STREAM); if (!res_it.ok()) return false; diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 97e754807..c871ec605 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -72,7 +72,7 @@ template T GetResult(std::variant> v) { OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t start, string_view value) { VLOG(2) << "SetRange(" << key << ", " << start << ", " << value << ")"; - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); size_t range_len = start + value.size(); if (range_len == 0) { @@ -107,7 +107,7 @@ OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t sta } OpResult OpGetRange(const OpArgs& op_args, string_view key, int32_t start, int32_t end) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); @@ -153,7 +153,7 @@ size_t ExtendExisting(DbSlice::Iterator it, string_view key, string_view val, bo } OpResult ExtendOrSkip(const OpArgs& op_args, string_view key, string_view val, bool prepend) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto it_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STRING); if (!it_res) { return false; @@ -163,7 +163,7 @@ OpResult ExtendOrSkip(const OpArgs& op_args, string_view key, string_view } OpResult OpIncrFloat(const OpArgs& op_args, string_view key, double val) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto op_res = db_slice.AddOrFind(op_args.db_cntx, key); RETURN_ON_BAD_STATUS(op_res); @@ -208,7 +208,7 @@ OpResult OpIncrFloat(const OpArgs& op_args, string_view key, double val) // if skip_on_missing - returns KEY_NOTFOUND. OpResult OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr, bool skip_on_missing) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); // we avoid using AddOrFind because of skip_on_missing option for memcache. auto res = db_slice.FindMutable(op_args.db_cntx, key); @@ -307,7 +307,7 @@ OpStatus OpMSet(const OpArgs& op_args, const ShardArgs& args) { OpResult> OpThrottle(const OpArgs& op_args, const string_view key, const int64_t limit, const int64_t emission_interval_ms, const uint64_t quantity) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); if (emission_interval_ms > INT64_MAX / limit) { return OpStatus::INVALID_INT; @@ -413,7 +413,7 @@ SinkReplyBuilder::MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, bool f ShardArgs keys = t->GetShardArgs(shard->shard_id()); DCHECK(!keys.Empty()); - auto& db_slice = shard->db_slice(); + auto& db_slice = t->GetDbSlice(shard->shard_id()); SinkReplyBuilder::MGetResponse response(keys.Size()); absl::InlinedVector iters(keys.Size()); @@ -478,7 +478,7 @@ OpResult>> OpExtend(const OpArgs& op_a std::string_view value, bool prepend) { auto* shard = op_args.shard; - auto it_res = shard->db_slice().AddOrFind(op_args.db_cntx, key); + auto it_res = op_args.GetDbSlice().AddOrFind(op_args.db_cntx, key); RETURN_ON_BAD_STATUS(it_res); if (it_res->is_new) { @@ -551,7 +551,7 @@ bool StringValue::IsEmpty() const { } OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value) { - auto& db_slice = op_args_.shard->db_slice(); + auto& db_slice = op_args_.GetDbSlice(); DCHECK(db_slice.IsDbValid(op_args_.db_cntx.db_index)); VLOG(2) << "Set " << key << "(" << db_slice.shard_id() << ") "; @@ -596,7 +596,7 @@ OpStatus SetCmd::SetExisting(const SetParams& params, DbSlice::Iterator it, PrimeValue& prime_value = it->second; EngineShard* shard = op_args_.shard; - DbSlice& db_slice = shard->db_slice(); + auto& db_slice = op_args_.GetDbSlice(); uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + op_args_.db_cntx.time_now_ms : 0; @@ -636,8 +636,7 @@ OpStatus SetCmd::SetExisting(const SetParams& params, DbSlice::Iterator it, void SetCmd::AddNew(const SetParams& params, DbSlice::Iterator it, DbSlice::ExpIterator e_it, std::string_view key, std::string_view value) { - EngineShard* shard = op_args_.shard; - auto& db_slice = shard->db_slice(); + auto& db_slice = op_args_.GetDbSlice(); // Adding new value. PrimeValue tvalue{value}; @@ -858,7 +857,7 @@ void StringFamily::SetNx(CmdArgList args, ConnectionContext* cntx) { void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) { auto cb = [key = ArgS(args, 0)](Transaction* tx, EngineShard* es) -> OpResult { - auto it_res = es->db_slice().FindReadOnly(tx->GetDbContext(), key, OBJ_STRING); + auto it_res = tx->GetDbSlice(es->shard_id()).FindReadOnly(tx->GetDbContext(), key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); @@ -870,13 +869,14 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) { void StringFamily::GetDel(CmdArgList args, ConnectionContext* cntx) { auto cb = [key = ArgS(args, 0)](Transaction* tx, EngineShard* es) -> OpResult { - auto it_res = es->db_slice().FindMutable(tx->GetDbContext(), key, OBJ_STRING); + auto& db_slice = tx->GetDbSlice(es->shard_id()); + auto it_res = db_slice.FindMutable(tx->GetDbContext(), key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); auto value = StringValue::Read(tx->GetDbIndex(), key, it_res->it->second, es); it_res->post_updater.Run(); // Run manually before delete - es->db_slice().Del(tx->GetDbIndex(), it_res->it); + db_slice.Del(tx->GetDbContext(), it_res->it); return value; }; @@ -985,7 +985,7 @@ void StringFamily::GetEx(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { auto op_args = t->GetOpArgs(shard); - auto it_res = op_args.shard->db_slice().FindMutable(op_args.db_cntx, key, OBJ_STRING); + auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_STRING); if (!it_res) return it_res.status(); @@ -993,8 +993,8 @@ void StringFamily::GetEx(CmdArgList args, ConnectionContext* cntx) { if (exp_params.IsDefined()) { it_res->post_updater.Run(); // Run manually before possible delete due to negative expire - RETURN_ON_BAD_STATUS(op_args.shard->db_slice().UpdateExpire(op_args.db_cntx, it_res->it, - it_res->exp_it, exp_params)); + RETURN_ON_BAD_STATUS(op_args.GetDbSlice().UpdateExpire(op_args.db_cntx, it_res->it, + it_res->exp_it, exp_params)); } // Replicate GETEX as PEXPIREAT or PERSIST @@ -1261,7 +1261,7 @@ void StringFamily::StrLen(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - auto it_res = shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_STRING); + auto it_res = t->GetDbSlice(shard->shard_id()).FindReadOnly(t->GetDbContext(), key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 7d1c5c1c8..64fa7b48d 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -585,7 +585,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { // touching those keys will be ordered via TxQueue. It's necessary because we preserve // the atomicity of awaked transactions by halting the TxQueue. if (was_suspended || !became_suspended) { - shard->db_slice().Release(mode, largs); + GetDbSlice(shard->shard_id()).Release(mode, largs); sd.local_mask &= ~KEYLOCK_ACQUIRED; } sd.local_mask &= ~OUT_OF_ORDER; @@ -619,7 +619,8 @@ void Transaction::RunCallback(EngineShard* shard) { DCHECK_EQ(shard, EngineShard::tlocal()); RunnableResult result; - shard->db_slice().LockChangeCb(); + auto& db_slice = GetDbSlice(shard->shard_id()); + db_slice.LockChangeCb(); try { result = (*cb_ptr_)(this, shard); @@ -643,7 +644,7 @@ void Transaction::RunCallback(EngineShard* shard) { LOG(FATAL) << "Unexpected exception " << e.what(); } - shard->db_slice().OnCbFinish(); + db_slice.OnCbFinish(); // Handle result flags to alter behaviour. if (result.flags & RunnableResult::AVOID_CONCLUDING) { @@ -657,10 +658,10 @@ void Transaction::RunCallback(EngineShard* shard) { // Log to journal only once the command finished running if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding)) { LogAutoJournalOnShard(shard, result); - shard->db_slice().UnlockChangeCb(); + db_slice.UnlockChangeCb(); MaybeInvokeTrackingCb(); } else { - shard->db_slice().UnlockChangeCb(); + db_slice.UnlockChangeCb(); } } @@ -1027,7 +1028,8 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool can_run_immediately) bool shard_unlocked = shard->shard_lock()->Check(mode); // Check if we can run immediately - if (shard_unlocked && can_run_immediately && CheckLocks(shard->db_slice(), mode, lock_args)) { + if (shard_unlocked && can_run_immediately && + CheckLocks(GetDbSlice(shard->shard_id()), mode, lock_args)) { sd.local_mask |= RAN_IMMEDIATELY; shard->stats().tx_immediate_total++; @@ -1038,7 +1040,7 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool can_run_immediately) return true; } - bool keys_unlocked = shard->db_slice().Acquire(mode, lock_args); + bool keys_unlocked = GetDbSlice(shard->shard_id()).Acquire(mode, lock_args); lock_granted = shard_unlocked && keys_unlocked; sd.local_mask |= KEYLOCK_ACQUIRED; @@ -1062,7 +1064,7 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool can_run_immediately) // fail this scheduling attempt for trans. if (!txq->Empty() && txid_ < txq->TailScore() && !lock_granted) { if (sd.local_mask & KEYLOCK_ACQUIRED) { - shard->db_slice().Release(mode, lock_args); + GetDbSlice(shard->shard_id()).Release(mode, lock_args); sd.local_mask &= ~KEYLOCK_ACQUIRED; } return false; @@ -1106,7 +1108,7 @@ bool Transaction::CancelShardCb(EngineShard* shard) { auto lock_args = GetLockArgs(shard->shard_id()); DCHECK(sd.local_mask & KEYLOCK_ACQUIRED); DCHECK(!lock_args.fps.empty()); - shard->db_slice().Release(LockMode(), lock_args); + GetDbSlice(shard->shard_id()).Release(LockMode(), lock_args); sd.local_mask &= ~KEYLOCK_ACQUIRED; } @@ -1196,7 +1198,7 @@ OpStatus Transaction::WatchInShard(BlockingController::Keys keys, EngineShard* s void Transaction::ExpireShardCb(BlockingController::Keys keys, EngineShard* shard) { // Blocking transactions don't release keys when suspending, release them now. auto lock_args = GetLockArgs(shard->shard_id()); - shard->db_slice().Release(LockMode(), lock_args); + GetDbSlice(shard->shard_id()).Release(LockMode(), lock_args); auto& sd = shard_data_[SidToId(shard->shard_id())]; sd.local_mask &= ~KEYLOCK_ACQUIRED; @@ -1209,16 +1211,23 @@ void Transaction::ExpireShardCb(BlockingController::Keys keys, EngineShard* shar FinishHop(); } +DbSlice& Transaction::GetDbSlice(ShardId shard_id) const { + auto* shard = EngineShard::tlocal(); + DCHECK_EQ(shard->shard_id(), shard_id); + return shard->db_slice(); +} + OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) { DCHECK(multi_ && multi_->role == SQUASHED_STUB); DCHECK_EQ(unique_shard_cnt_, 1u); auto* shard = EngineShard::tlocal(); - shard->db_slice().LockChangeCb(); + auto& db_slice = GetDbSlice(shard->shard_id()); + db_slice.LockChangeCb(); auto result = cb(this, shard); - shard->db_slice().OnCbFinish(); + db_slice.OnCbFinish(); LogAutoJournalOnShard(shard, result); - shard->db_slice().UnlockChangeCb(); + db_slice.UnlockChangeCb(); MaybeInvokeTrackingCb(); DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it @@ -1231,7 +1240,7 @@ void Transaction::UnlockMultiShardCb(absl::Span fps, EngineShard* if (multi_->mode == GLOBAL) { shard->shard_lock()->Release(IntentLock::EXCLUSIVE); } else { - shard->db_slice().Release(*multi_->lock_mode, KeyLockArgs{db_index_, fps}); + GetDbSlice(shard->shard_id()).Release(*multi_->lock_mode, KeyLockArgs{db_index_, fps}); } ShardId sid = shard->shard_id(); diff --git a/src/server/transaction.h b/src/server/transaction.h index eec467a2a..b4933d690 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -29,6 +29,7 @@ namespace dfly { class EngineShard; class BlockingController; +class DbSlice; using facade::OpResult; using facade::OpStatus; @@ -305,6 +306,8 @@ class Transaction { return DbContext{db_index_, time_now_ms_}; } + DbSlice& GetDbSlice(ShardId sid) const; + DbIndex GetDbIndex() const { return db_index_; } diff --git a/src/server/tx_base.cc b/src/server/tx_base.cc index b0fa25506..ba075aa33 100644 --- a/src/server/tx_base.cc +++ b/src/server/tx_base.cc @@ -16,6 +16,17 @@ namespace dfly { using namespace std; using Payload = journal::Entry::Payload; +DbSlice& DbContext::GetDbSlice(ShardId shard_id) const { + // TODO: Update this when adding namespaces + DCHECK_EQ(shard_id, EngineShard::tlocal()->shard_id()); + return EngineShard::tlocal()->db_slice(); +} + +DbSlice& OpArgs::GetDbSlice() const { + // TODO: Update this when adding namespaces + return shard->db_slice(); +} + size_t ShardArgs::Size() const { size_t sz = 0; for (const auto& s : slice_.second) diff --git a/src/server/tx_base.h b/src/server/tx_base.h index 9484b281c..03aab1a5f 100644 --- a/src/server/tx_base.h +++ b/src/server/tx_base.h @@ -14,6 +14,7 @@ namespace dfly { class EngineShard; class Transaction; +class DbSlice; using DbIndex = uint16_t; using ShardId = uint16_t; @@ -59,19 +60,24 @@ struct KeyIndex { struct DbContext { DbIndex db_index = 0; uint64_t time_now_ms = 0; + + // Convenience method. + DbSlice& GetDbSlice(ShardId shard_id) const; }; struct OpArgs { - EngineShard* shard; - const Transaction* tx; + EngineShard* shard = nullptr; + const Transaction* tx = nullptr; DbContext db_cntx; - OpArgs() : shard(nullptr), tx(nullptr) { - } + OpArgs() = default; OpArgs(EngineShard* s, const Transaction* tx, const DbContext& cntx) : shard(s), tx(tx), db_cntx(cntx) { } + + // Convenience method. + DbSlice& GetDbSlice() const; }; // A strong type for a lock tag. Helps to disambiguate between keys and the parts of the diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index db41a8ec3..0e504c8c1 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -188,7 +188,7 @@ void OutputScoredArrayResult(const OpResult& result, OpResult FindZEntry(const ZParams& zparams, const OpArgs& op_args, string_view key, size_t member_len) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); if (zparams.flags & ZADD_IN_XX) { return db_slice.FindMutable(op_args.db_cntx, key, OBJ_ZSET); } @@ -847,7 +847,7 @@ OpResult OpUnion(EngineShard* shard, Transaction* t, string_view dest } } - auto& db_slice = shard->db_slice(); + auto& db_slice = t->GetDbSlice(shard->shard_id()); KeyIterWeightVec key_weight_vec(keys.Size() - removed_keys); unsigned index = 0; for (; start != end; ++start) { @@ -900,7 +900,7 @@ OpResult OpInter(EngineShard* shard, Transaction* t, string_view dest } } - auto& db_slice = shard->db_slice(); + auto& db_slice = t->GetDbSlice(shard->shard_id()); vector> it_arr(keys.Size() - removed_keys); unsigned index = 0; @@ -966,11 +966,11 @@ size_t EstimateListpackMinBytes(ScoredMemberSpan members) { OpResult OpAdd(const OpArgs& op_args, const ZParams& zparams, string_view key, ScoredMemberSpan members) { DCHECK(!members.empty() || zparams.override); - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); if (zparams.override && members.empty()) { auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately - db_slice.Del(op_args.db_cntx.db_index, it); + db_slice.Del(op_args.db_cntx, it); return OpStatus::OK; } @@ -1268,7 +1268,7 @@ bool ParseLimit(string_view offset_str, string_view limit_str, ZSetFamily::Range } ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_view key, bool is_max) { - auto& db_slice = shard->db_slice(); + auto& db_slice = t->GetDbSlice(shard->shard_id()); auto it_res = db_slice.FindMutable(t->GetDbContext(), key, OBJ_ZSET); CHECK(it_res) << t->DebugId() << " " << key; // must exist and must be ok. auto it = it_res->it; @@ -1299,7 +1299,7 @@ ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_view key, bo auto zlen = pv.Size(); if (zlen == 0) { DVLOG(1) << "deleting key " << key << " " << t->DebugId(); - CHECK(db_slice.Del(t->GetDbIndex(), it_res->it)); + CHECK(db_slice.Del(t->GetDbContext(), it_res->it)); } OpArgs op_args = t->GetOpArgs(shard); @@ -1376,7 +1376,7 @@ vector OpFetch(EngineShard* shard, Transaction* t) { vector results; results.reserve(keys.Size()); - auto& db_slice = shard->db_slice(); + auto& db_slice = t->GetDbSlice(shard->shard_id()); for (string_view key : keys) { auto it = db_slice.FindReadOnly(t->GetDbContext(), key, OBJ_ZSET); if (!it) { @@ -1393,7 +1393,7 @@ vector OpFetch(EngineShard* shard, Transaction* t) { auto OpPopCount(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, string_view key) -> OpResult { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1407,7 +1407,7 @@ auto OpPopCount(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, auto zlen = pv.Size(); if (zlen == 0) { - CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it->it)); + CHECK(op_args.GetDbSlice().Del(op_args.db_cntx, res_it->it)); } return iv.PopResult(); @@ -1415,7 +1415,7 @@ auto OpPopCount(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, auto OpRange(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, string_view key) -> OpResult { - auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1430,7 +1430,7 @@ auto OpRange(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, st auto OpRanges(const std::vector& range_specs, const OpArgs& op_args, string_view key) -> OpResult> { - auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1448,7 +1448,7 @@ auto OpRanges(const std::vector& range_specs, const OpAr OpResult OpRemRange(const OpArgs& op_args, string_view key, const ZSetFamily::ZRangeSpec& range_spec) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1461,7 +1461,7 @@ OpResult OpRemRange(const OpArgs& op_args, string_view key, auto zlen = pv.Size(); if (zlen == 0) { - CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it->it)); + CHECK(op_args.GetDbSlice().Del(op_args.db_cntx, res_it->it)); } return iv.removed(); @@ -1469,7 +1469,7 @@ OpResult OpRemRange(const OpArgs& op_args, string_view key, OpResult OpRank(const OpArgs& op_args, string_view key, string_view member, bool reverse) { - auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1515,7 +1515,7 @@ OpResult OpRank(const OpArgs& op_args, string_view key, string_view me OpResult OpCount(const OpArgs& op_args, std::string_view key, const ZSetFamily::ScoreInterval& interval) { - auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1565,7 +1565,7 @@ OpResult OpCount(const OpArgs& op_args, std::string_view key, OpResult OpLexCount(const OpArgs& op_args, string_view key, const ZSetFamily::LexInterval& interval) { - auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1607,7 +1607,7 @@ OpResult OpLexCount(const OpArgs& op_args, string_view key, } OpResult OpRem(const OpArgs& op_args, string_view key, facade::ArgRange members) { - auto& db_slice = op_args.shard->db_slice(); + auto& db_slice = op_args.GetDbSlice(); auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1623,19 +1623,19 @@ OpResult OpRem(const OpArgs& op_args, string_view key, facade::ArgRang res_it->post_updater.Run(); if (zlen == 0) { - CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it->it)); + CHECK(op_args.GetDbSlice().Del(op_args.db_cntx, res_it->it)); } return deleted; } OpResult OpKeyExisted(const OpArgs& op_args, string_view key) { - auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); return res_it.status(); } OpResult OpScore(const OpArgs& op_args, string_view key, string_view member) { - auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1653,7 +1653,7 @@ OpResult OpScore(const OpArgs& op_args, string_view key, string_view mem OpResult OpMScore(const OpArgs& op_args, string_view key, facade::ArgRange members) { - auto res_it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto res_it = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1673,7 +1673,7 @@ OpResult OpMScore(const OpArgs& op_args, string_view key, OpResult OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor, const ScanOpts& scan_op) { - auto find_res = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto find_res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!find_res) return find_res.status(); @@ -1725,7 +1725,7 @@ OpResult OpScan(const OpArgs& op_args, std::string_view key, uint64_t OpResult OpRandMember(int count, const ZSetFamily::RangeParams& params, const OpArgs& op_args, string_view key) { - auto it = op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); + auto it = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); if (!it) return it.status(); @@ -1923,7 +1923,7 @@ void ZSetFamily::ZCard(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - auto find_res = shard->db_slice().FindReadOnly(t->GetDbContext(), key, OBJ_ZSET); + auto find_res = t->GetDbSlice(shard->shard_id()).FindReadOnly(t->GetDbContext(), key, OBJ_ZSET); if (!find_res) { return find_res.status(); }