mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-15 17:51:06 +00:00
server(tiering): load data on read (#2415)
* server(tiering): load data on read Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
8eda8226b2
commit
9f3b118b87
13 changed files with 401 additions and 231 deletions
|
@ -782,7 +782,7 @@ void CompactObj::SetJson(JsonType&& j) {
|
|||
|
||||
void CompactObj::SetString(std::string_view str) {
|
||||
uint8_t mask = mask_ & ~kEncMask;
|
||||
|
||||
CHECK(!IsExternal());
|
||||
// Trying auto-detection heuristics first.
|
||||
if (str.size() <= 20) {
|
||||
long long ival;
|
||||
|
@ -853,6 +853,7 @@ void CompactObj::SetString(std::string_view str) {
|
|||
}
|
||||
|
||||
string_view CompactObj::GetSlice(string* scratch) const {
|
||||
CHECK(!IsExternal());
|
||||
uint8_t is_encoded = mask_ & kEncMask;
|
||||
|
||||
if (IsInline()) {
|
||||
|
@ -958,6 +959,7 @@ void __attribute__((noinline)) CompactObj::GetString(string* res) const {
|
|||
}
|
||||
|
||||
void CompactObj::GetString(char* dest) const {
|
||||
CHECK(!IsExternal());
|
||||
uint8_t is_encoded = mask_ & kEncMask;
|
||||
|
||||
if (IsInline()) {
|
||||
|
|
|
@ -180,6 +180,10 @@ class CompactObj {
|
|||
|
||||
bool operator==(std::string_view sl) const;
|
||||
|
||||
bool operator!=(std::string_view sl) const {
|
||||
return !(*this == sl);
|
||||
}
|
||||
|
||||
friend bool operator!=(const CompactObj& lhs, const CompactObj& rhs) {
|
||||
return !(lhs == rhs);
|
||||
}
|
||||
|
|
|
@ -404,6 +404,11 @@ class DashTable<_Key, _Value, Policy>::Iterator {
|
|||
return owner_ == nullptr;
|
||||
}
|
||||
|
||||
bool IsOccupied() const {
|
||||
return (seg_id_ < owner_->segment_.size()) &&
|
||||
((owner_->segment_[seg_id_]->IsBusy(bucket_id_, slot_id_)));
|
||||
}
|
||||
|
||||
template <bool B = Policy::kUseVersion> std::enable_if_t<B, uint64_t> GetVersion() const {
|
||||
assert(owner_ && seg_id_ < owner_->segment_.size());
|
||||
return owner_->segment_[seg_id_]->GetVersion(bucket_id_);
|
||||
|
|
|
@ -472,6 +472,10 @@ template <typename _Key, typename _Value, typename Policy = DefaultSegmentPolicy
|
|||
return bucket_[i];
|
||||
}
|
||||
|
||||
bool IsBusy(unsigned bid, unsigned slot) const {
|
||||
return bucket_[bid].GetBusy() & (1U << slot);
|
||||
}
|
||||
|
||||
Key_t& Key(unsigned bid, unsigned slot) {
|
||||
assert(bucket_[bid].GetBusy() & (1U << slot));
|
||||
return bucket_[bid].key[slot];
|
||||
|
|
|
@ -57,7 +57,7 @@ OpResult<std::size_t> CountBitsForValue(const OpArgs& op_args, std::string_view
|
|||
int64_t end, bool bit_value);
|
||||
OpResult<int64_t> FindFirstBitWithValue(const OpArgs& op_args, std::string_view key, bool value,
|
||||
int64_t start, int64_t end, bool as_bit);
|
||||
std::string GetString(const PrimeValue& pv, EngineShard* shard);
|
||||
std::string GetString(const PrimeValue& pv);
|
||||
bool SetBitValue(uint32_t offset, bool bit_value, std::string* entry);
|
||||
std::size_t CountBitSetByByteIndices(std::string_view at, std::size_t start, std::size_t end);
|
||||
std::size_t CountBitSet(std::string_view str, int64_t start, int64_t end, bool bits);
|
||||
|
@ -324,7 +324,7 @@ std::optional<bool> ElementAccess::Exists(EngineShard* shard) {
|
|||
|
||||
OpStatus ElementAccess::Find(EngineShard* shard) {
|
||||
try {
|
||||
auto add_res = shard->db_slice().AddOrFind(context_, key_);
|
||||
auto add_res = shard->db_slice().AddOrFindAndFetch(context_, key_);
|
||||
if (!add_res.is_new) {
|
||||
if (add_res.it->second.ObjType() != OBJ_STRING) {
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
@ -343,7 +343,7 @@ OpStatus ElementAccess::Find(EngineShard* shard) {
|
|||
std::string ElementAccess::Value() const {
|
||||
CHECK_NOTNULL(shard_);
|
||||
if (!added_) { // Exist entry - return it
|
||||
return GetString(element_iter_->second, shard_);
|
||||
return GetString(element_iter_->second);
|
||||
} else { // we only have reference to the new entry but no value
|
||||
return std::string{};
|
||||
}
|
||||
|
@ -459,9 +459,9 @@ OpResult<std::string> RunBitOpNot(const OpArgs& op_args, ArgSlice keys) {
|
|||
// if we found the value, just return, if not found then skip, otherwise report an error
|
||||
auto key = keys.front();
|
||||
OpResult<PrimeConstIterator> find_res =
|
||||
es->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_STRING);
|
||||
es->db_slice().FindAndFetchReadOnly(op_args.db_cntx, key, OBJ_STRING);
|
||||
if (find_res) {
|
||||
return GetString(find_res.value()->second, es);
|
||||
return GetString(find_res.value()->second);
|
||||
} else {
|
||||
return find_res.status();
|
||||
}
|
||||
|
@ -481,9 +481,9 @@ OpResult<std::string> RunBitOpOnShard(std::string_view op, const OpArgs& op_args
|
|||
// collect all the value for this shard
|
||||
for (auto& key : keys) {
|
||||
OpResult<PrimeConstIterator> find_res =
|
||||
es->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_STRING);
|
||||
es->db_slice().FindAndFetchReadOnly(op_args.db_cntx, key, OBJ_STRING);
|
||||
if (find_res) {
|
||||
values.emplace_back(GetString(find_res.value()->second, es));
|
||||
values.emplace_back(GetString(find_res.value()->second));
|
||||
} else {
|
||||
if (find_res.status() == OpStatus::KEY_NOTFOUND) {
|
||||
continue; // this is allowed, just return empty string per Redis
|
||||
|
@ -1237,19 +1237,9 @@ void SetBit(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
// ------------------------------------------------------------------------- //
|
||||
// This are the "callbacks" that we're using from above
|
||||
std::string GetString(const PrimeValue& pv, EngineShard* shard) {
|
||||
std::string GetString(const PrimeValue& pv) {
|
||||
std::string res;
|
||||
if (pv.IsExternal()) {
|
||||
auto* tiered = shard->tiered_storage();
|
||||
auto [offset, size] = pv.GetExternalSlice();
|
||||
res.resize(size);
|
||||
|
||||
std::error_code ec = tiered->Read(offset, size, res.data());
|
||||
CHECK(!ec) << "TBD: " << ec;
|
||||
} else {
|
||||
pv.GetString(&res);
|
||||
}
|
||||
|
||||
pv.GetString(&res);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -1264,14 +1254,15 @@ OpResult<bool> ReadValueBitsetAt(const OpArgs& op_args, std::string_view key, ui
|
|||
|
||||
OpResult<std::string> ReadValue(const DbContext& context, std::string_view key,
|
||||
EngineShard* shard) {
|
||||
OpResult<PrimeConstIterator> it_res = shard->db_slice().FindReadOnly(context, key, OBJ_STRING);
|
||||
OpResult<PrimeConstIterator> it_res =
|
||||
shard->db_slice().FindAndFetchReadOnly(context, key, OBJ_STRING);
|
||||
if (!it_res.ok()) {
|
||||
return it_res.status();
|
||||
}
|
||||
|
||||
const PrimeValue& pv = it_res.value()->second;
|
||||
|
||||
return GetString(pv, shard);
|
||||
return GetString(pv);
|
||||
}
|
||||
|
||||
OpResult<std::size_t> CountBitsForValue(const OpArgs& op_args, std::string_view key, int64_t start,
|
||||
|
|
|
@ -8,6 +8,8 @@ extern "C" {
|
|||
#include "redis/object.h"
|
||||
}
|
||||
|
||||
#include <absl/cleanup/cleanup.h>
|
||||
|
||||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "generic_family.h"
|
||||
|
@ -52,7 +54,6 @@ void AccountObjectMemory(string_view key, unsigned type, int64_t size, DbTable*
|
|||
DCHECK_GE(static_cast<int64_t>(stats.obj_memory_usage) + size, 0)
|
||||
<< "Can't decrease " << size << " from " << stats.obj_memory_usage;
|
||||
|
||||
stats.obj_memory_usage += size;
|
||||
stats.AddTypeMemoryUsage(type, size);
|
||||
|
||||
if (ClusterConfig::IsEnabled()) {
|
||||
|
@ -199,7 +200,8 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
|
|||
make_pair("DEL", delete_args), false);
|
||||
}
|
||||
|
||||
db_slice_->PerformDeletion(last_slot_it, db_slice_->shard_owner(), table);
|
||||
db_slice_->PerformDeletion(last_slot_it, table);
|
||||
|
||||
++evicted_;
|
||||
}
|
||||
me->ShiftRight(bucket_it);
|
||||
|
@ -322,6 +324,7 @@ void DbSlice::AutoUpdater::Run() {
|
|||
if (fields_.action == DestructorAction::kDoNothing) {
|
||||
return;
|
||||
}
|
||||
// TBD add logic to update iterator if needed as we can now preempt in cb
|
||||
|
||||
// Check that AutoUpdater does not run after a key was removed.
|
||||
// If this CHECK() failed for you, it probably means that you deleted a key while having an auto
|
||||
|
@ -364,86 +367,120 @@ DbSlice::AddOrFindResult& DbSlice::AddOrFindResult::operator=(ItAndUpdater&& o)
|
|||
return *this;
|
||||
}
|
||||
|
||||
DbSlice::ItAndUpdater DbSlice::FindMutable(const Context& cntx, string_view key) {
|
||||
auto [it, exp_it] = FindInternal(cntx, key, FindInternalMode::kUpdateMutableStats);
|
||||
DbSlice::ItAndUpdater DbSlice::FindAndFetchMutable(const Context& cntx, string_view key) {
|
||||
return std::move(FindMutableInternal(cntx, key, std::nullopt, LoadExternalMode::kLoad).value());
|
||||
}
|
||||
|
||||
if (IsValid(it)) {
|
||||
PreUpdate(cntx.db_index, it);
|
||||
return {it, exp_it,
|
||||
AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
|
||||
.db_slice = this,
|
||||
.db_ind = cntx.db_index,
|
||||
.it = it,
|
||||
.key = key})};
|
||||
} else {
|
||||
return {it, exp_it, {}};
|
||||
}
|
||||
DbSlice::ItAndUpdater DbSlice::FindMutable(const Context& cntx, string_view key) {
|
||||
return std::move(
|
||||
FindMutableInternal(cntx, key, std::nullopt, LoadExternalMode::kDontLoad).value());
|
||||
}
|
||||
|
||||
OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutable(const Context& cntx, string_view key,
|
||||
unsigned req_obj_type) {
|
||||
// Don't use FindMutable() so that we don't call PreUpdate()
|
||||
auto [it, exp_it] = FindInternal(cntx, key, FindInternalMode::kUpdateMutableStats);
|
||||
return FindMutableInternal(cntx, key, req_obj_type, LoadExternalMode::kDontLoad);
|
||||
}
|
||||
|
||||
if (!IsValid(it))
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
OpResult<DbSlice::ItAndUpdater> DbSlice::FindAndFetchMutable(const Context& cntx, string_view key,
|
||||
unsigned req_obj_type) {
|
||||
return FindMutableInternal(cntx, key, req_obj_type, LoadExternalMode::kLoad);
|
||||
}
|
||||
|
||||
if (it->second.ObjType() != req_obj_type) {
|
||||
return OpStatus::WRONG_TYPE;
|
||||
OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutableInternal(const Context& cntx, string_view key,
|
||||
std::optional<unsigned> req_obj_type,
|
||||
LoadExternalMode load_mode) {
|
||||
auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kMutableStats, load_mode);
|
||||
if (!res.ok()) {
|
||||
return res.status();
|
||||
}
|
||||
|
||||
PreUpdate(cntx.db_index, it);
|
||||
return {{it, exp_it,
|
||||
PreUpdate(cntx.db_index, res->it);
|
||||
return {{res->it, res->exp_it,
|
||||
AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
|
||||
.db_slice = this,
|
||||
.db_ind = cntx.db_index,
|
||||
.it = it,
|
||||
.it = res->it,
|
||||
.key = key})}};
|
||||
}
|
||||
|
||||
DbSlice::ItAndExpConst DbSlice::FindReadOnly(const Context& cntx, std::string_view key) {
|
||||
auto res = FindInternal(cntx, key, FindInternalMode::kUpdateReadStats);
|
||||
return {res.it, res.exp_it};
|
||||
auto res = FindInternal(cntx, key, std::nullopt, UpdateStatsMode::kReadStats,
|
||||
LoadExternalMode::kDontLoad);
|
||||
return {res->it, res->exp_it};
|
||||
}
|
||||
|
||||
OpResult<PrimeConstIterator> DbSlice::FindReadOnly(const Context& cntx, string_view key,
|
||||
unsigned req_obj_type) {
|
||||
auto it = FindReadOnly(cntx, key).it;
|
||||
|
||||
if (!IsValid(it))
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
|
||||
if (it->second.ObjType() != req_obj_type) {
|
||||
return OpStatus::WRONG_TYPE;
|
||||
auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kReadStats,
|
||||
LoadExternalMode::kDontLoad);
|
||||
if (res.ok()) {
|
||||
return {res->it};
|
||||
}
|
||||
|
||||
return {it};
|
||||
return res.status();
|
||||
}
|
||||
|
||||
DbSlice::ItAndExp DbSlice::FindInternal(const Context& cntx, std::string_view key,
|
||||
FindInternalMode mode) {
|
||||
OpResult<PrimeConstIterator> DbSlice::FindAndFetchReadOnly(const Context& cntx,
|
||||
std::string_view key,
|
||||
unsigned req_obj_type) {
|
||||
auto res =
|
||||
FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kReadStats, LoadExternalMode::kLoad);
|
||||
if (res.ok()) {
|
||||
return {res->it};
|
||||
}
|
||||
return res.status();
|
||||
}
|
||||
|
||||
OpResult<DbSlice::ItAndExp> DbSlice::FindInternal(const Context& cntx, std::string_view key,
|
||||
std::optional<unsigned> req_obj_type,
|
||||
UpdateStatsMode stats_mode,
|
||||
LoadExternalMode load_mode) {
|
||||
if (!IsDbValid(cntx.db_index)) {
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
|
||||
DbSlice::ItAndExp res;
|
||||
|
||||
if (!IsDbValid(cntx.db_index))
|
||||
return res;
|
||||
|
||||
auto& db = *db_arr_[cntx.db_index];
|
||||
res.it = db.prime.Find(key);
|
||||
FiberAtomicGuard fg;
|
||||
if (!IsValid(res.it)) {
|
||||
switch (mode) {
|
||||
case FindInternalMode::kUpdateMutableStats:
|
||||
|
||||
absl::Cleanup update_stats_on_miss = [&]() {
|
||||
switch (stats_mode) {
|
||||
case UpdateStatsMode::kMutableStats:
|
||||
events_.mutations++;
|
||||
break;
|
||||
case FindInternalMode::kUpdateReadStats:
|
||||
case UpdateStatsMode::kReadStats:
|
||||
events_.misses++;
|
||||
break;
|
||||
}
|
||||
return res;
|
||||
};
|
||||
|
||||
if (!IsValid(res.it)) {
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
|
||||
if (req_obj_type.has_value() && res.it->second.ObjType() != req_obj_type.value()) {
|
||||
return OpStatus::WRONG_TYPE;
|
||||
}
|
||||
|
||||
if (TieredStorage* tiered = shard_owner()->tiered_storage();
|
||||
tiered && load_mode == LoadExternalMode::kLoad) {
|
||||
if (res.it->second.HasIoPending()) {
|
||||
tiered->CancelIo(cntx.db_index, res.it);
|
||||
} else if (res.it->second.IsExternal()) {
|
||||
// Load reads data from disk therefore we will preempt in this function.
|
||||
// We will update the iterator if it changed during the preemption
|
||||
res.it = tiered->Load(cntx.db_index, res.it, key);
|
||||
if (!IsValid(res.it)) {
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
FiberAtomicGuard fg;
|
||||
if (res.it->second.HasExpire()) { // check expiry state
|
||||
res = ExpireIfNeeded(cntx, res.it);
|
||||
if (!IsValid(res.it)) {
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
}
|
||||
|
||||
if (caching_mode_ && IsValid(res.it)) {
|
||||
|
@ -463,11 +500,12 @@ DbSlice::ItAndExp DbSlice::FindInternal(const Context& cntx, std::string_view ke
|
|||
|
||||
db.top_keys.Touch(key);
|
||||
|
||||
switch (mode) {
|
||||
case FindInternalMode::kUpdateMutableStats:
|
||||
std::move(update_stats_on_miss).Cancel();
|
||||
switch (stats_mode) {
|
||||
case UpdateStatsMode::kMutableStats:
|
||||
events_.mutations++;
|
||||
break;
|
||||
case FindInternalMode::kUpdateReadStats:
|
||||
case UpdateStatsMode::kReadStats:
|
||||
events_.hits++;
|
||||
if (ClusterConfig::IsEnabled()) {
|
||||
db.slots_stats[ClusterConfig::KeySlot(key)].total_reads++;
|
||||
|
@ -495,24 +533,33 @@ OpResult<pair<PrimeConstIterator, unsigned>> DbSlice::FindFirstReadOnly(const Co
|
|||
return OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
|
||||
DbSlice::AddOrFindResult DbSlice::AddOrFind(const Context& cntx, string_view key) noexcept(false) {
|
||||
DbSlice::AddOrFindResult DbSlice::AddOrFind(const Context& cntx, string_view key) {
|
||||
return AddOrFindInternal(cntx, key, LoadExternalMode::kDontLoad);
|
||||
}
|
||||
|
||||
DbSlice::AddOrFindResult DbSlice::AddOrFindAndFetch(const Context& cntx, string_view key) {
|
||||
return AddOrFindInternal(cntx, key, LoadExternalMode::kLoad);
|
||||
}
|
||||
|
||||
DbSlice::AddOrFindResult DbSlice::AddOrFindInternal(const Context& cntx, string_view key,
|
||||
LoadExternalMode load_mode) noexcept(false) {
|
||||
DCHECK(IsDbValid(cntx.db_index));
|
||||
|
||||
DbTable& db = *db_arr_[cntx.db_index];
|
||||
auto res = FindInternal(cntx, key, std::nullopt, UpdateStatsMode::kMutableStats, load_mode);
|
||||
|
||||
auto res = FindInternal(cntx, key, FindInternalMode::kUpdateMutableStats);
|
||||
|
||||
if (IsValid(res.it)) {
|
||||
PreUpdate(cntx.db_index, res.it);
|
||||
return {.it = res.it,
|
||||
.exp_it = res.exp_it,
|
||||
if (res.ok()) {
|
||||
PreUpdate(cntx.db_index, res->it);
|
||||
return {.it = res->it,
|
||||
.exp_it = res->exp_it,
|
||||
.is_new = false,
|
||||
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
|
||||
.db_slice = this,
|
||||
.db_ind = cntx.db_index,
|
||||
.it = res.it,
|
||||
.it = res->it,
|
||||
.key = key})};
|
||||
}
|
||||
CHECK_EQ(res.status(), OpStatus::KEY_NOTFOUND);
|
||||
|
||||
// It's a new entry.
|
||||
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
|
||||
|
@ -620,7 +667,7 @@ bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) {
|
|||
doc_del_cb_(key, cntx, it->second);
|
||||
}
|
||||
bumped_items_.erase(it->first.AsRef());
|
||||
PerformDeletion(it, shard_owner(), db.get());
|
||||
PerformDeletion(it, db.get());
|
||||
deletion_count_++;
|
||||
|
||||
return true;
|
||||
|
@ -637,7 +684,7 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) {
|
|||
std::string_view key = it->first.GetSlice(&tmp);
|
||||
SlotId sid = ClusterConfig::KeySlot(key);
|
||||
if (slot_ids.contains(sid) && it.GetVersion() < next_version) {
|
||||
PerformDeletion(it, shard_owner(), db_arr_[0].get());
|
||||
PerformDeletion(it, db_arr_[0].get());
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
@ -686,7 +733,7 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
|
|||
if (db_ptr && db_ptr->stats.tiered_entries > 0) {
|
||||
for (auto it = db_ptr->prime.begin(); it != db_ptr->prime.end(); ++it) {
|
||||
if (it->second.IsExternal())
|
||||
PerformDeletion(it, shard_owner(), db_ptr.get());
|
||||
PerformDeletion(it, db_ptr.get());
|
||||
}
|
||||
|
||||
DCHECK_EQ(0u, db_ptr->stats.tiered_entries);
|
||||
|
@ -973,27 +1020,6 @@ void DbSlice::PreUpdate(DbIndex db_ind, PrimeIterator it) {
|
|||
ccb.second(db_ind, ChangeReq{it});
|
||||
}
|
||||
|
||||
auto* stats = MutableStats(db_ind);
|
||||
if (it->second.ObjType() == OBJ_STRING) {
|
||||
if (it->second.IsExternal()) {
|
||||
// We assume here that the operation code either loaded the entry into memory
|
||||
// before calling to PreUpdate or it does not need to read it at all.
|
||||
// After this code executes, the external blob is lost.
|
||||
TieredStorage* tiered = shard_owner()->tiered_storage();
|
||||
auto [offset, size] = it->second.GetExternalSlice();
|
||||
tiered->Free(offset, size);
|
||||
bool has_expire = it->second.HasExpire();
|
||||
it->second.Reset();
|
||||
it->second.SetExpire(has_expire); // we keep expire data
|
||||
|
||||
stats->tiered_entries -= 1;
|
||||
stats->tiered_size -= size;
|
||||
} else if (it->second.HasIoPending()) {
|
||||
TieredStorage* tiered = shard_owner()->tiered_storage();
|
||||
tiered->CancelIo(db_ind, it);
|
||||
}
|
||||
}
|
||||
|
||||
it.SetVersion(NextVersion());
|
||||
}
|
||||
|
||||
|
@ -1054,7 +1080,7 @@ DbSlice::ItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it)
|
|||
doc_del_cb_(tmp_key, cntx, it->second);
|
||||
}
|
||||
|
||||
PerformDeletion(it, expire_it, shard_owner(), db.get());
|
||||
PerformDeletion(it, expire_it, db.get());
|
||||
++events_.expired_keys;
|
||||
|
||||
return {PrimeIterator{}, ExpireIterator{}};
|
||||
|
@ -1213,7 +1239,7 @@ void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes
|
|||
keys_to_journal.push_back(string(key));
|
||||
}
|
||||
|
||||
PerformDeletion(evict_it, shard_owner(), db_table.get());
|
||||
PerformDeletion(evict_it, db_table.get());
|
||||
++evicted;
|
||||
|
||||
used_memory_after = owner_->UsedMemory();
|
||||
|
@ -1292,7 +1318,7 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t
|
|||
if (evict_it == it || evict_it->first.IsSticky())
|
||||
continue;
|
||||
|
||||
PerformDeletion(evict_it, shard_owner(), table);
|
||||
PerformDeletion(evict_it, table);
|
||||
++evicted;
|
||||
|
||||
if (freed_memory_fun() > memory_to_free) {
|
||||
|
@ -1319,7 +1345,7 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t
|
|||
if (evict_it == it || evict_it->first.IsSticky())
|
||||
continue;
|
||||
|
||||
PerformDeletion(evict_it, shard_owner(), table);
|
||||
PerformDeletion(evict_it, table);
|
||||
++evicted;
|
||||
|
||||
if (freed_memory_fun() > memory_to_free) {
|
||||
|
@ -1426,8 +1452,25 @@ void DbSlice::SendInvalidationTrackingMessage(std::string_view key) {
|
|||
}
|
||||
}
|
||||
|
||||
void DbSlice::PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, EngineShard* shard,
|
||||
DbTable* table) {
|
||||
void DbSlice::RemoveFromTiered(PrimeIterator it, DbIndex index) {
|
||||
DbTable* table = GetDBTable(index);
|
||||
RemoveFromTiered(it, table);
|
||||
}
|
||||
|
||||
void DbSlice::RemoveFromTiered(PrimeIterator it, DbTable* table) {
|
||||
DbTableStats& stats = table->stats;
|
||||
PrimeValue& pv = it->second;
|
||||
if (pv.IsExternal()) {
|
||||
TieredStorage* tiered = shard_owner()->tiered_storage();
|
||||
tiered->Free(it, &stats);
|
||||
}
|
||||
if (pv.HasIoPending()) {
|
||||
TieredStorage* tiered = shard_owner()->tiered_storage();
|
||||
tiered->CancelIo(table->index, it);
|
||||
}
|
||||
}
|
||||
|
||||
void DbSlice::PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, DbTable* table) {
|
||||
std::string tmp;
|
||||
std::string_view key = del_it->first.GetSlice(&tmp);
|
||||
|
||||
|
@ -1444,18 +1487,7 @@ void DbSlice::PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, Engin
|
|||
|
||||
DbTableStats& stats = table->stats;
|
||||
const PrimeValue& pv = del_it->second;
|
||||
if (pv.IsExternal()) {
|
||||
auto [offset, size] = pv.GetExternalSlice();
|
||||
|
||||
stats.tiered_entries--;
|
||||
stats.tiered_size -= size;
|
||||
TieredStorage* tiered = shard->tiered_storage();
|
||||
tiered->Free(offset, size);
|
||||
}
|
||||
if (pv.HasIoPending()) {
|
||||
TieredStorage* tiered = shard->tiered_storage();
|
||||
tiered->CancelIo(table->index, del_it);
|
||||
}
|
||||
RemoveFromTiered(del_it, table);
|
||||
|
||||
size_t value_heap_size = pv.MallocUsed();
|
||||
stats.inline_keys -= del_it->first.IsInline();
|
||||
|
@ -1476,17 +1508,19 @@ void DbSlice::PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, Engin
|
|||
SendInvalidationTrackingMessage(key);
|
||||
}
|
||||
|
||||
void DbSlice::PerformDeletion(PrimeIterator del_it, EngineShard* shard, DbTable* table) {
|
||||
void DbSlice::PerformDeletion(PrimeIterator del_it, DbTable* table) {
|
||||
ExpireIterator exp_it;
|
||||
if (del_it->second.HasExpire()) {
|
||||
exp_it = table->expire.Find(del_it->first);
|
||||
DCHECK(!exp_it.is_done());
|
||||
}
|
||||
|
||||
PerformDeletion(del_it, exp_it, shard, table);
|
||||
PerformDeletion(del_it, exp_it, table);
|
||||
}
|
||||
|
||||
void DbSlice::OnCbFinish() {
|
||||
// TBD update bumpups logic we can not clear now after cb finish as cb can preempt
|
||||
// btw what do we do with inline?
|
||||
bumped_items_.clear();
|
||||
}
|
||||
|
||||
|
|
|
@ -195,8 +195,11 @@ class DbSlice {
|
|||
AutoUpdater post_updater;
|
||||
};
|
||||
ItAndUpdater FindMutable(const Context& cntx, std::string_view key);
|
||||
ItAndUpdater FindAndFetchMutable(const Context& cntx, std::string_view key);
|
||||
OpResult<ItAndUpdater> FindMutable(const Context& cntx, std::string_view key,
|
||||
unsigned req_obj_type);
|
||||
OpResult<ItAndUpdater> FindAndFetchMutable(const Context& cntx, std::string_view key,
|
||||
unsigned req_obj_type);
|
||||
|
||||
struct ItAndExpConst {
|
||||
PrimeConstIterator it;
|
||||
|
@ -205,6 +208,8 @@ class DbSlice {
|
|||
ItAndExpConst FindReadOnly(const Context& cntx, std::string_view key);
|
||||
OpResult<PrimeConstIterator> FindReadOnly(const Context& cntx, std::string_view key,
|
||||
unsigned req_obj_type);
|
||||
OpResult<PrimeConstIterator> FindAndFetchReadOnly(const Context& cntx, std::string_view key,
|
||||
unsigned req_obj_type);
|
||||
|
||||
// Returns (iterator, args-index) if found, KEY_NOTFOUND otherwise.
|
||||
// If multiple keys are found, returns the first index in the ArgSlice.
|
||||
|
@ -222,6 +227,7 @@ class DbSlice {
|
|||
};
|
||||
|
||||
AddOrFindResult AddOrFind(const Context& cntx, std::string_view key) noexcept(false);
|
||||
AddOrFindResult AddOrFindAndFetch(const Context& cntx, std::string_view key) noexcept(false);
|
||||
|
||||
// Same as AddOrSkip, but overwrites in case entry exists.
|
||||
AddOrFindResult AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj,
|
||||
|
@ -256,6 +262,7 @@ class DbSlice {
|
|||
void ActivateDb(DbIndex db_ind);
|
||||
|
||||
bool Del(DbIndex db_ind, PrimeIterator it);
|
||||
void RemoveFromTiered(PrimeIterator it, DbIndex index);
|
||||
|
||||
constexpr static DbIndex kDbAll = 0xFFFF;
|
||||
|
||||
|
@ -390,7 +397,7 @@ class DbSlice {
|
|||
void TrackKeys(const facade::Connection::WeakRef&, const ArgSlice&);
|
||||
|
||||
// Delete a key referred by its iterator.
|
||||
void PerformDeletion(PrimeIterator del_it, EngineShard* shard, DbTable* table);
|
||||
void PerformDeletion(PrimeIterator del_it, DbTable* table);
|
||||
|
||||
// Releases a single key. `key` must have been normalized by GetLockKey().
|
||||
void ReleaseNormalized(IntentLock::Mode m, DbIndex db_index, std::string_view key,
|
||||
|
@ -412,8 +419,7 @@ class DbSlice {
|
|||
// Invalidate all watched keys for given slots. Used on FlushSlots.
|
||||
void InvalidateSlotWatches(const SlotSet& slot_ids);
|
||||
|
||||
void PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, EngineShard* shard,
|
||||
DbTable* table);
|
||||
void PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, DbTable* table);
|
||||
|
||||
// Send invalidation message to the clients that are tracking the change to a key.
|
||||
void SendInvalidationTrackingMessage(std::string_view key);
|
||||
|
@ -421,15 +427,28 @@ class DbSlice {
|
|||
void CreateDb(DbIndex index);
|
||||
size_t EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* table);
|
||||
|
||||
enum class FindInternalMode {
|
||||
kUpdateReadStats,
|
||||
kUpdateMutableStats,
|
||||
enum class UpdateStatsMode {
|
||||
kReadStats,
|
||||
kMutableStats,
|
||||
};
|
||||
ItAndExp FindInternal(const Context& cntx, std::string_view key, FindInternalMode mode);
|
||||
|
||||
enum class LoadExternalMode {
|
||||
kLoad,
|
||||
kDontLoad,
|
||||
};
|
||||
OpResult<ItAndExp> FindInternal(const Context& cntx, std::string_view key,
|
||||
std::optional<unsigned> req_obj_type, UpdateStatsMode stats_mode,
|
||||
LoadExternalMode load_mode);
|
||||
AddOrFindResult AddOrFindInternal(const Context& cntx, std::string_view key,
|
||||
LoadExternalMode load_mode) noexcept(false);
|
||||
OpResult<ItAndUpdater> FindMutableInternal(const Context& cntx, std::string_view key,
|
||||
std::optional<unsigned> req_obj_type,
|
||||
LoadExternalMode load_mode);
|
||||
|
||||
uint64_t NextVersion() {
|
||||
return version_++;
|
||||
}
|
||||
void RemoveFromTiered(PrimeIterator it, DbTable* table);
|
||||
|
||||
private:
|
||||
ShardId shard_id_;
|
||||
|
|
|
@ -39,40 +39,23 @@ using CI = CommandId;
|
|||
constexpr uint32_t kMaxStrLen = 1 << 28;
|
||||
constexpr size_t kMinTieredLen = TieredStorage::kMinBlobLen;
|
||||
|
||||
size_t CopyValueToBuffer(const PrimeValue& pv, EngineShard* shard, char* dest) {
|
||||
size_t CopyValueToBuffer(const PrimeValue& pv, char* dest) {
|
||||
DCHECK_EQ(pv.ObjType(), OBJ_STRING);
|
||||
|
||||
if (pv.IsExternal()) {
|
||||
auto* tiered = shard->tiered_storage();
|
||||
auto [offset, size] = pv.GetExternalSlice();
|
||||
|
||||
error_code ec = tiered->Read(offset, size, dest);
|
||||
CHECK(!ec) << "TBD: " << ec;
|
||||
return size;
|
||||
}
|
||||
|
||||
DCHECK(!pv.IsExternal());
|
||||
pv.GetString(dest);
|
||||
return pv.Size();
|
||||
}
|
||||
|
||||
string GetString(EngineShard* shard, const PrimeValue& pv) {
|
||||
string GetString(const PrimeValue& pv) {
|
||||
string res;
|
||||
if (pv.ObjType() != OBJ_STRING)
|
||||
return res;
|
||||
res.resize(pv.Size());
|
||||
CopyValueToBuffer(pv, shard, res.data());
|
||||
CopyValueToBuffer(pv, res.data());
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
string_view GetSlice(EngineShard* shard, const PrimeValue& pv, string* tmp) {
|
||||
if (pv.IsExternal()) {
|
||||
*tmp = GetString(shard, pv);
|
||||
return *tmp;
|
||||
}
|
||||
return pv.GetSlice(tmp);
|
||||
}
|
||||
|
||||
OpResult<uint32_t> OpSetRange(const OpArgs& op_args, string_view key, size_t start,
|
||||
string_view value) {
|
||||
VLOG(2) << "SetRange(" << key << ", " << start << ", " << value << ")";
|
||||
|
@ -91,8 +74,7 @@ OpResult<uint32_t> OpSetRange(const OpArgs& op_args, string_view key, size_t sta
|
|||
DbSlice::AddOrFindResult res;
|
||||
|
||||
try {
|
||||
res = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
|
||||
res = db_slice.AddOrFindAndFetch(op_args.db_cntx, key);
|
||||
string s;
|
||||
|
||||
if (res.is_new) {
|
||||
|
@ -101,7 +83,7 @@ OpResult<uint32_t> OpSetRange(const OpArgs& op_args, string_view key, size_t sta
|
|||
if (res.it->second.ObjType() != OBJ_STRING)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
s = GetString(op_args.shard, res.it->second);
|
||||
s = GetString(res.it->second);
|
||||
if (s.size() < range_len)
|
||||
s.resize(range_len);
|
||||
}
|
||||
|
@ -116,7 +98,8 @@ OpResult<uint32_t> OpSetRange(const OpArgs& op_args, string_view key, size_t sta
|
|||
|
||||
OpResult<string> OpGetRange(const OpArgs& op_args, string_view key, int32_t start, int32_t end) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult<PrimeConstIterator> it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STRING);
|
||||
OpResult<PrimeConstIterator> it_res =
|
||||
db_slice.FindAndFetchReadOnly(op_args.db_cntx, key, OBJ_STRING);
|
||||
if (!it_res.ok())
|
||||
return it_res.status();
|
||||
|
||||
|
@ -141,7 +124,7 @@ OpResult<string> OpGetRange(const OpArgs& op_args, string_view key, int32_t star
|
|||
end = strlen - 1;
|
||||
|
||||
string tmp;
|
||||
string_view slice = GetSlice(op_args.shard, co, &tmp);
|
||||
string_view slice = co.GetSlice(&tmp);
|
||||
|
||||
return string(slice.substr(start, end - start + 1));
|
||||
};
|
||||
|
@ -149,8 +132,8 @@ OpResult<string> OpGetRange(const OpArgs& op_args, string_view key, int32_t star
|
|||
size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key, string_view val,
|
||||
bool prepend) {
|
||||
string tmp, new_val;
|
||||
auto* shard = op_args.shard;
|
||||
string_view slice = GetSlice(shard, it->second, &tmp);
|
||||
string_view slice = it->second.GetSlice(&tmp);
|
||||
|
||||
if (prepend)
|
||||
new_val = absl::StrCat(val, slice);
|
||||
else
|
||||
|
@ -166,12 +149,14 @@ OpResult<uint32_t> ExtendOrSet(const OpArgs& op_args, string_view key, string_vi
|
|||
bool prepend) {
|
||||
auto* shard = op_args.shard;
|
||||
auto& db_slice = shard->db_slice();
|
||||
|
||||
DbSlice::AddOrFindResult add_res;
|
||||
try {
|
||||
add_res = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
add_res = db_slice.AddOrFindAndFetch(op_args.db_cntx, key);
|
||||
} catch (const std::bad_alloc& e) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (add_res.is_new) {
|
||||
add_res.it->second.SetString(val);
|
||||
return val.size();
|
||||
|
@ -185,7 +170,7 @@ OpResult<uint32_t> ExtendOrSet(const OpArgs& op_args, string_view key, string_vi
|
|||
|
||||
OpResult<bool> ExtendOrSkip(const OpArgs& op_args, string_view key, string_view val, bool prepend) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
auto it_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STRING);
|
||||
auto it_res = db_slice.FindAndFetchMutable(op_args.db_cntx, key, OBJ_STRING);
|
||||
if (!it_res) {
|
||||
return false;
|
||||
}
|
||||
|
@ -195,7 +180,7 @@ OpResult<bool> ExtendOrSkip(const OpArgs& op_args, string_view key, string_view
|
|||
|
||||
OpResult<string> OpMutableGet(const OpArgs& op_args, string_view key, bool del_hit = false,
|
||||
const DbSlice::ExpireParams& exp_params = {}) {
|
||||
auto res = op_args.shard->db_slice().FindMutable(op_args.db_cntx, key);
|
||||
auto res = op_args.shard->db_slice().FindAndFetchMutable(op_args.db_cntx, key);
|
||||
res.post_updater.Run();
|
||||
|
||||
if (!IsValid(res.it))
|
||||
|
@ -207,7 +192,7 @@ OpResult<string> OpMutableGet(const OpArgs& op_args, string_view key, bool del_h
|
|||
const PrimeValue& pv = res.it->second;
|
||||
|
||||
if (del_hit) {
|
||||
string key_bearer = GetString(op_args.shard, pv);
|
||||
string key_bearer = GetString(pv);
|
||||
|
||||
DVLOG(1) << "Del: " << key;
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
|
@ -218,7 +203,7 @@ OpResult<string> OpMutableGet(const OpArgs& op_args, string_view key, bool del_h
|
|||
}
|
||||
|
||||
/*Get value before expire*/
|
||||
string ret_val = GetString(op_args.shard, pv);
|
||||
string ret_val = GetString(pv);
|
||||
|
||||
if (exp_params.IsDefined()) {
|
||||
DVLOG(1) << "Expire: " << key;
|
||||
|
@ -257,7 +242,7 @@ OpResult<double> OpIncrFloat(const OpArgs& op_args, string_view key, double val)
|
|||
return OpStatus::INVALID_FLOAT;
|
||||
|
||||
string tmp;
|
||||
string_view slice = GetSlice(op_args.shard, add_res.it->second, &tmp);
|
||||
string_view slice = add_res.it->second.GetSlice(&tmp);
|
||||
|
||||
double base = 0;
|
||||
if (!ParseDouble(slice, &base)) {
|
||||
|
@ -479,10 +464,10 @@ class SetResultBuilder {
|
|||
explicit SetResultBuilder(bool return_prev_value) : return_prev_value_(return_prev_value) {
|
||||
}
|
||||
|
||||
void CachePrevValueIfNeeded(EngineShard* shard, const PrimeValue& pv) {
|
||||
void CachePrevValueIfNeeded(const PrimeValue& pv) {
|
||||
if (return_prev_value_) {
|
||||
// We call lazily call GetString() here to save string copying when not needed.
|
||||
prev_value_ = GetString(shard, pv);
|
||||
prev_value_ = GetString(pv);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -513,7 +498,7 @@ SinkReplyBuilder::MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const
|
|||
size_t total_size = 0;
|
||||
for (size_t i = 0; i < args.size(); ++i) {
|
||||
OpResult<PrimeConstIterator> it_res =
|
||||
db_slice.FindReadOnly(t->GetDbContext(), args[i], OBJ_STRING);
|
||||
db_slice.FindAndFetchReadOnly(t->GetDbContext(), args[i], OBJ_STRING);
|
||||
if (!it_res)
|
||||
continue;
|
||||
iters[i] = *it_res;
|
||||
|
@ -530,7 +515,7 @@ SinkReplyBuilder::MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const
|
|||
|
||||
auto& resp = response.resp_arr[i].emplace();
|
||||
|
||||
size_t size = CopyValueToBuffer(it->second, shard, next);
|
||||
size_t size = CopyValueToBuffer(it->second, next);
|
||||
resp.value = string_view(next, size);
|
||||
next += size;
|
||||
|
||||
|
@ -562,21 +547,27 @@ OpResult<optional<string>> SetCmd::Set(const SetParams& params, string_view key,
|
|||
VLOG(2) << "Set " << key << "(" << db_slice.shard_id() << ") ";
|
||||
|
||||
if (params.IsConditionalSet()) {
|
||||
const auto res = db_slice.FindMutable(op_args_.db_cntx, key);
|
||||
if (IsValid(res.it)) {
|
||||
result_builder.CachePrevValueIfNeeded(shard, res.it->second);
|
||||
bool fetch_value = params.prev_val || (params.flags & SET_GET);
|
||||
DbSlice::ItAndUpdater find_res;
|
||||
if (fetch_value) {
|
||||
find_res = db_slice.FindAndFetchMutable(op_args_.db_cntx, key);
|
||||
} else {
|
||||
find_res = db_slice.FindMutable(op_args_.db_cntx, key);
|
||||
}
|
||||
if (IsValid(find_res.it)) {
|
||||
result_builder.CachePrevValueIfNeeded(find_res.it->second);
|
||||
}
|
||||
|
||||
// Make sure that we have this key, and only add it if it does exists
|
||||
if (params.flags & SET_IF_EXISTS) {
|
||||
if (IsValid(res.it)) {
|
||||
if (IsValid(find_res.it)) {
|
||||
return std::move(result_builder)
|
||||
.Return(SetExisting(params, res.it, res.exp_it, key, value));
|
||||
.Return(SetExisting(params, find_res.it, find_res.exp_it, key, value));
|
||||
} else {
|
||||
return std::move(result_builder).Return(OpStatus::SKIPPED);
|
||||
}
|
||||
} else {
|
||||
if (IsValid(res.it)) { // if the policy is not to overide and have the key, just return
|
||||
if (IsValid(find_res.it)) { // if the policy is not to overide and have the key, just return
|
||||
return std::move(result_builder).Return(OpStatus::SKIPPED);
|
||||
}
|
||||
}
|
||||
|
@ -586,14 +577,14 @@ OpResult<optional<string>> SetCmd::Set(const SetParams& params, string_view key,
|
|||
// Trying to add a new entry.
|
||||
DbSlice::AddOrFindResult add_res;
|
||||
try {
|
||||
add_res = db_slice.AddOrFind(op_args_.db_cntx, key);
|
||||
add_res = db_slice.AddOrFind(op_args_.db_cntx, key); // TODO do we free the space for existing?
|
||||
} catch (bad_alloc& e) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
PrimeIterator it = add_res.it;
|
||||
if (!add_res.is_new) {
|
||||
result_builder.CachePrevValueIfNeeded(shard, it->second);
|
||||
result_builder.CachePrevValueIfNeeded(it->second);
|
||||
return std::move(result_builder).Return(SetExisting(params, it, add_res.exp_it, key, value));
|
||||
}
|
||||
|
||||
|
@ -617,7 +608,7 @@ OpResult<optional<string>> SetCmd::Set(const SetParams& params, string_view key,
|
|||
if (shard->tiered_storage() &&
|
||||
TieredStorage::EligibleForOffload(value)) { // external storage enabled.
|
||||
// TODO: we may have a bug if we block the fiber inside UnloadItem - "it" may be invalid
|
||||
// afterwards.
|
||||
// afterwards. handle this
|
||||
shard->tiered_storage()->ScheduleOffload(op_args_.db_cntx.db_index, it);
|
||||
}
|
||||
|
||||
|
@ -640,7 +631,7 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt
|
|||
if (prime_value.ObjType() != OBJ_STRING)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
string val = GetString(shard, prime_value);
|
||||
string val = GetString(prime_value);
|
||||
params.prev_val->emplace(std::move(val));
|
||||
}
|
||||
|
||||
|
@ -673,19 +664,11 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt
|
|||
db_slice.SetMCFlag(op_args_.db_cntx.db_index, it->first.AsRef(), params.memcache_flags);
|
||||
}
|
||||
|
||||
db_slice.RemoveFromTiered(it, op_args_.db_cntx.db_index);
|
||||
// overwrite existing entry.
|
||||
prime_value.SetString(value);
|
||||
DCHECK(!prime_value.HasIoPending());
|
||||
|
||||
if (TieredStorage::EligibleForOffload(value)) {
|
||||
// TODO: if UnloadItem can block the calling fiber, then we have the bug because then "it"
|
||||
// can be invalid after the function returns and the functions that follow may access invalid
|
||||
// entry.
|
||||
if (shard->tiered_storage()) {
|
||||
shard->tiered_storage()->ScheduleOffload(op_args_.db_cntx.db_index, it);
|
||||
}
|
||||
}
|
||||
|
||||
if (manual_journal_ && op_args_.shard->journal()) {
|
||||
RecordJournal(params, key, value);
|
||||
}
|
||||
|
@ -847,12 +830,12 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
|
|||
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<string> {
|
||||
auto op_args = t->GetOpArgs(shard);
|
||||
DbSlice& db_slice = op_args.shard->db_slice();
|
||||
auto res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STRING);
|
||||
auto res = db_slice.FindAndFetchReadOnly(op_args.db_cntx, key, OBJ_STRING);
|
||||
if (!res) {
|
||||
return res.status();
|
||||
}
|
||||
|
||||
return GetString(op_args.shard, (*res)->second);
|
||||
return GetString((*res)->second);
|
||||
};
|
||||
|
||||
DVLOG(1) << "Before Get::ScheduleSingleHopT " << key;
|
||||
|
|
|
@ -24,7 +24,7 @@ void DbTableStats::AddTypeMemoryUsage(unsigned type, int64_t delta) {
|
|||
DCHECK(false) << "Unsupported type " << type;
|
||||
return;
|
||||
}
|
||||
|
||||
obj_memory_usage += delta;
|
||||
memory_usage_by_type[type] += delta;
|
||||
}
|
||||
|
||||
|
|
|
@ -94,7 +94,6 @@ static size_t ExternalizeEntry(size_t item_offset, DbTableStats* stats, PrimeVal
|
|||
size_t heap_size = entry->MallocUsed();
|
||||
size_t item_size = entry->Size();
|
||||
|
||||
stats->obj_memory_usage -= heap_size;
|
||||
stats->AddTypeMemoryUsage(entry->ObjType(), -heap_size);
|
||||
|
||||
entry->SetExternal(item_offset, item_size);
|
||||
|
@ -334,7 +333,12 @@ std::error_code TieredStorage::Read(size_t offset, size_t len, char* dest) {
|
|||
return io_mgr_.Read(offset, io::MutableBytes{reinterpret_cast<uint8_t*>(dest), len});
|
||||
}
|
||||
|
||||
void TieredStorage::Free(size_t offset, size_t len) {
|
||||
void TieredStorage::Free(PrimeIterator it, DbTableStats* stats) {
|
||||
PrimeValue& entry = it->second;
|
||||
CHECK(entry.IsExternal());
|
||||
DCHECK_EQ(entry.ObjType(), OBJ_STRING);
|
||||
auto [offset, len] = entry.GetExternalSlice();
|
||||
|
||||
if (offset % kBlockLen == 0) {
|
||||
alloc_.Free(offset, len);
|
||||
} else {
|
||||
|
@ -347,6 +351,13 @@ void TieredStorage::Free(size_t offset, size_t len) {
|
|||
page_refcnt_.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
bool has_expire = entry.HasExpire();
|
||||
entry.Reset();
|
||||
entry.SetExpire(has_expire); // we keep expire data
|
||||
|
||||
stats->tiered_entries -= 1;
|
||||
stats->tiered_size -= len;
|
||||
}
|
||||
|
||||
void TieredStorage::Shutdown() {
|
||||
|
@ -390,6 +401,43 @@ void TieredStorage::FinishIoRequest(int io_res, InflightWriteRequest* req) {
|
|||
VLOG_IF(2, num_active_requests_ == 0) << "Finished active requests";
|
||||
}
|
||||
|
||||
PrimeIterator TieredStorage::Load(DbIndex db_index, PrimeIterator it, string_view key) {
|
||||
PrimeValue* entry = &it->second;
|
||||
CHECK(entry->IsExternal());
|
||||
DCHECK_EQ(entry->ObjType(), OBJ_STRING);
|
||||
auto [offset, size] = entry->GetExternalSlice();
|
||||
string res(size, '\0');
|
||||
auto ec = Read(offset, size, res.data());
|
||||
CHECK(!ec) << "TBD";
|
||||
|
||||
// Read will preempt, check if iterator still points to our entry
|
||||
PrimeTable* pt = db_slice_.GetTables(db_index).first;
|
||||
if (!it.IsOccupied() || it->first != key) {
|
||||
it = pt->Find(key);
|
||||
if (it.is_done()) {
|
||||
// Entry was remove from db while reading from disk. (background expire task)
|
||||
return it;
|
||||
}
|
||||
entry = &it->second;
|
||||
}
|
||||
|
||||
if (!entry->IsExternal()) {
|
||||
// Because 2 reads can happen at the same time, then if the other read
|
||||
// already loaded the data from disk to memory we don't need to do anything now just return.
|
||||
// TODO we can register to reads with multiple callbacks so if there is already a callback
|
||||
// reading the data from disk we will not run read twice.
|
||||
return it;
|
||||
}
|
||||
|
||||
auto* stats = db_slice_.MutableStats(db_index);
|
||||
Free(it, stats);
|
||||
entry->SetString(res);
|
||||
|
||||
size_t heap_size = entry->MallocUsed();
|
||||
stats->AddTypeMemoryUsage(entry->ObjType(), heap_size);
|
||||
return it;
|
||||
}
|
||||
|
||||
error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) {
|
||||
CHECK_EQ(OBJ_STRING, it->second.ObjType());
|
||||
DCHECK(!it->second.IsExternal());
|
||||
|
@ -580,6 +628,7 @@ void TieredStorage::WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_
|
|||
++num_active_requests_;
|
||||
|
||||
io_mgr_.WriteAsync(res, string_view{req->block_ptr, req->page_size}, std::move(cb));
|
||||
++stats_.tiered_writes;
|
||||
}
|
||||
|
||||
bool TieredStorage::FlushPending(DbIndex db_index, unsigned bin_index) {
|
||||
|
|
|
@ -26,7 +26,7 @@ class TieredStorage {
|
|||
|
||||
std::error_code Open(const std::string& path);
|
||||
|
||||
std::error_code Read(size_t offset, size_t len, char* dest);
|
||||
PrimeIterator Load(DbIndex db_index, PrimeIterator it, std::string_view key);
|
||||
|
||||
// Schedules unloading of the item, pointed by the iterator.
|
||||
std::error_code ScheduleOffload(DbIndex db_index, PrimeIterator it);
|
||||
|
@ -37,7 +37,7 @@ class TieredStorage {
|
|||
return val.size() >= kMinBlobLen;
|
||||
}
|
||||
|
||||
void Free(size_t offset, size_t len);
|
||||
void Free(PrimeIterator it, DbTableStats* stats);
|
||||
|
||||
void Shutdown();
|
||||
|
||||
|
@ -56,6 +56,7 @@ class TieredStorage {
|
|||
|
||||
void FinishIoRequest(int io_res, InflightWriteRequest* req);
|
||||
void SetExternal(DbIndex db_index, size_t item_offset, PrimeValue* dest);
|
||||
std::error_code Read(size_t offset, size_t len, char* dest);
|
||||
|
||||
DbSlice& db_slice_;
|
||||
IoMgr io_mgr_;
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "facade/facade_test.h"
|
||||
#include "server/test_utils.h"
|
||||
|
||||
using namespace std;
|
||||
|
@ -72,24 +73,17 @@ void TieredStorageTest::FillKeysWithExpire(unsigned count, int val_size, uint32_
|
|||
|
||||
TEST_F(TieredStorageTest, Basic) {
|
||||
FillExternalKeys(5000);
|
||||
|
||||
EXPECT_EQ(5000, CheckedInt({"dbsize"}));
|
||||
|
||||
usleep(20000); // 20 milliseconds
|
||||
|
||||
Metrics m = GetMetrics();
|
||||
EXPECT_GT(m.db_stats[0].tiered_entries, 0u);
|
||||
|
||||
FillExternalKeys(5000);
|
||||
usleep(20000); // 0.02 milliseconds
|
||||
|
||||
m = GetMetrics();
|
||||
DbStats stats = m.db_stats[0];
|
||||
|
||||
LOG(INFO) << stats;
|
||||
unsigned tiered_entries = m.db_stats[0].tiered_entries;
|
||||
|
||||
EXPECT_GT(tiered_entries, 0u);
|
||||
string resp = CheckedString({"debug", "object", "k1"});
|
||||
EXPECT_THAT(resp, HasSubstr("spill_len"));
|
||||
m = GetMetrics();
|
||||
LOG(INFO) << m.db_stats[0];
|
||||
ASSERT_EQ(tiered_entries, m.db_stats[0].tiered_entries);
|
||||
|
||||
Run({"del", "k1"});
|
||||
|
@ -101,7 +95,7 @@ TEST_F(TieredStorageTest, DelBeforeOffload) {
|
|||
FillExternalKeys(100);
|
||||
EXPECT_EQ(100, CheckedInt({"dbsize"}));
|
||||
|
||||
usleep(20000); // 0.02 milliseconds
|
||||
usleep(20000); // 20 milliseconds
|
||||
Metrics m = GetMetrics();
|
||||
EXPECT_GT(m.db_stats[0].tiered_entries, 0u);
|
||||
EXPECT_LT(m.db_stats[0].tiered_entries, 100);
|
||||
|
@ -113,7 +107,7 @@ TEST_F(TieredStorageTest, DelBeforeOffload) {
|
|||
EXPECT_EQ(m.db_stats[0].tiered_entries, 0u);
|
||||
|
||||
FillExternalKeys(100);
|
||||
usleep(20000); // 0.02 milliseconds
|
||||
usleep(20000); // 20 milliseconds
|
||||
m = GetMetrics();
|
||||
EXPECT_GT(m.db_stats[0].tiered_entries, 0u);
|
||||
EXPECT_LT(m.db_stats[0].tiered_entries, 100);
|
||||
|
@ -127,7 +121,7 @@ TEST_F(TieredStorageTest, AddMultiDb) {
|
|||
FillExternalKeys(100);
|
||||
EXPECT_EQ(100, CheckedInt({"dbsize"}));
|
||||
|
||||
usleep(20000); // 0.02 milliseconds
|
||||
usleep(20000); // 20 milliseconds
|
||||
|
||||
Metrics m = GetMetrics();
|
||||
EXPECT_GT(m.db_stats[1].tiered_entries, 0u);
|
||||
|
@ -148,7 +142,7 @@ TEST_F(TieredStorageTest, FlushDBAfterSet) {
|
|||
FillExternalKeys(100);
|
||||
EXPECT_EQ(100, CheckedInt({"dbsize"}));
|
||||
|
||||
usleep(20000); // 0.02 milliseconds
|
||||
usleep(20000); // 20 milliseconds
|
||||
m = GetMetrics();
|
||||
EXPECT_GT(m.db_stats[5].tiered_entries, 0u);
|
||||
EXPECT_LT(m.db_stats[5].tiered_entries, 100);
|
||||
|
@ -166,7 +160,7 @@ TEST_F(TieredStorageTest, FlushAllAfterSet) {
|
|||
FillExternalKeys(100);
|
||||
EXPECT_EQ(100, CheckedInt({"dbsize"}));
|
||||
|
||||
usleep(20000); // 0.02 milliseconds
|
||||
usleep(20000); // 20 milliseconds
|
||||
m = GetMetrics();
|
||||
EXPECT_GT(m.db_stats[5].tiered_entries, 0u);
|
||||
EXPECT_LT(m.db_stats[5].tiered_entries, 100);
|
||||
|
@ -183,7 +177,7 @@ TEST_F(TieredStorageTest, AddBigValues) {
|
|||
FillExternalKeys(100, 5000);
|
||||
EXPECT_EQ(100, CheckedInt({"dbsize"}));
|
||||
|
||||
usleep(20000); // 0.02 milliseconds
|
||||
usleep(20000); // 20 milliseconds
|
||||
m = GetMetrics();
|
||||
EXPECT_GT(m.db_stats[0].tiered_entries, 0u);
|
||||
}
|
||||
|
@ -201,55 +195,138 @@ TEST_F(TieredStorageTest, DelBigValues) {
|
|||
FillExternalKeys(100, 5000);
|
||||
EXPECT_EQ(100, CheckedInt({"dbsize"}));
|
||||
|
||||
usleep(20000); // 0.02 milliseconds
|
||||
usleep(20000); // 20 milliseconds
|
||||
m = GetMetrics();
|
||||
EXPECT_GT(m.db_stats[0].tiered_entries, 0u);
|
||||
}
|
||||
|
||||
TEST_F(TieredStorageTest, AddBigValuesWithExpire) {
|
||||
const int kKeyNum = 10;
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
FillKeysWithExpire(kKeyNum, 8000);
|
||||
usleep(20000); // 0.02 milliseconds
|
||||
|
||||
Metrics m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 10);
|
||||
}
|
||||
FillKeysWithExpire(kKeyNum, 8000);
|
||||
usleep(20000); // 20 milliseconds
|
||||
|
||||
Metrics m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 10);
|
||||
|
||||
for (int i = 0; i < kKeyNum; ++i) {
|
||||
auto resp = Run({"ttl", StrCat("k", i)});
|
||||
EXPECT_GT(resp.GetInt(), 0);
|
||||
}
|
||||
|
||||
m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 10);
|
||||
}
|
||||
|
||||
TEST_F(TieredStorageTest, AddSmallValuesWithExpire) {
|
||||
const int kKeyNum = 100;
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
FillKeysWithExpire(kKeyNum);
|
||||
usleep(20000); // 0.02 milliseconds
|
||||
|
||||
Metrics m = GetMetrics();
|
||||
EXPECT_GT(m.db_stats[0].tiered_entries, 0);
|
||||
}
|
||||
FillKeysWithExpire(kKeyNum);
|
||||
usleep(20000); // 20 milliseconds
|
||||
|
||||
Metrics m = GetMetrics();
|
||||
EXPECT_GT(m.db_stats[0].tiered_entries, 0);
|
||||
|
||||
for (int i = 0; i < kKeyNum; ++i) {
|
||||
auto resp = Run({"ttl", StrCat("k", i)});
|
||||
EXPECT_GT(resp.GetInt(), 0);
|
||||
}
|
||||
m = GetMetrics();
|
||||
EXPECT_GT(m.db_stats[0].tiered_entries, 0);
|
||||
}
|
||||
|
||||
TEST_F(TieredStorageTest, SetAndExpire) {
|
||||
string val(5000, 'a');
|
||||
Run({"set", "key", val});
|
||||
usleep(20000); // 0.02 milliseconds
|
||||
|
||||
usleep(20000); // 20 milliseconds
|
||||
Metrics m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 1);
|
||||
Run({"expire", "key", "3"});
|
||||
|
||||
Run({"set", "key", val});
|
||||
usleep(20000); // 0.02 milliseconds
|
||||
|
||||
m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 1);
|
||||
|
||||
Run({"set", "key", val});
|
||||
usleep(20000); // 20 milliseconds
|
||||
|
||||
m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 0);
|
||||
Run({"expire", "key", "3"});
|
||||
}
|
||||
|
||||
TEST_F(TieredStorageTest, SetAndGet) {
|
||||
string val1(5000, 'a');
|
||||
string val2(5000, 'a');
|
||||
|
||||
Run({"set", "key1", val1});
|
||||
Run({"set", "key2", val1});
|
||||
usleep(20000); // 20 milliseconds
|
||||
Metrics m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 2);
|
||||
EXPECT_EQ(m.db_stats[0].obj_memory_usage, 0);
|
||||
|
||||
EXPECT_EQ(Run({"get", "key1"}), val1);
|
||||
usleep(20000); // 20 milliseconds
|
||||
m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 1);
|
||||
EXPECT_GT(m.db_stats[0].obj_memory_usage, 0);
|
||||
|
||||
Run({"set", "key1", val2});
|
||||
usleep(20000); // 20 milliseconds
|
||||
m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 1);
|
||||
EXPECT_GT(m.db_stats[0].obj_memory_usage, 0);
|
||||
|
||||
Run({"set", "key2", val2});
|
||||
usleep(20000); // 20 milliseconds
|
||||
m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 0);
|
||||
EXPECT_GT(m.db_stats[0].obj_memory_usage, 0);
|
||||
|
||||
EXPECT_EQ(Run({"get", "key1"}), val2);
|
||||
EXPECT_EQ(Run({"get", "key2"}), val2);
|
||||
|
||||
Run({"set", "key3", val1});
|
||||
usleep(20000); // 20 milliseconds
|
||||
m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 1);
|
||||
|
||||
Run({"del", "key1"});
|
||||
Run({"del", "key2"});
|
||||
Run({"del", "key3"});
|
||||
m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 0);
|
||||
EXPECT_EQ(m.db_stats[0].obj_memory_usage, 0);
|
||||
}
|
||||
|
||||
TEST_F(TieredStorageTest, GetValueValidation) {
|
||||
string val1(5000, 'a');
|
||||
string val2(5000, 'b');
|
||||
|
||||
Run({"set", "key1", val1});
|
||||
Run({"set", "key2", val2});
|
||||
usleep(20000); // 20 milliseconds
|
||||
Metrics m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 2);
|
||||
|
||||
EXPECT_EQ(Run({"get", "key1"}), val1);
|
||||
EXPECT_EQ(Run({"get", "key2"}), val2);
|
||||
m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 0);
|
||||
|
||||
for (unsigned i = 0; i < 100; ++i) {
|
||||
string val(100, i); // small entries
|
||||
Run({"set", StrCat("k", i), val});
|
||||
}
|
||||
usleep(20000); // 20 milliseconds
|
||||
m = GetMetrics();
|
||||
EXPECT_GE(m.db_stats[0].tiered_entries, 0);
|
||||
|
||||
for (unsigned i = 0; i < 100; ++i) {
|
||||
string val(100, i); // small entries
|
||||
EXPECT_EQ(Run({"get", StrCat("k", i)}), val);
|
||||
}
|
||||
m = GetMetrics();
|
||||
EXPECT_EQ(m.db_stats[0].tiered_entries, 0);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -1066,6 +1066,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
|
|||
// Fast path. If none of the keys are locked, we can run briefly atomically on the thread
|
||||
// without acquiring them at all.
|
||||
if (quick_run) {
|
||||
// TBD add acquire lock here
|
||||
auto result = RunQuickie(shard);
|
||||
local_result_ = result.status;
|
||||
|
||||
|
|
Loading…
Reference in a new issue