From 63b83c5b9906afcb0fe6fde022f2748ff319a47d Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 28 Dec 2022 10:37:55 +0200 Subject: [PATCH] chore: Add tiered_storage_test. (#613) 1. Support tiered deletion. 2. Add notion of tiered entity in "DEBUG OBJECT" output. Signed-off-by: Roman Gershman Signed-off-by: Roman Gershman --- src/core/compact_object.cc | 8 ++-- src/core/compact_object.h | 9 ++-- src/server/CMakeLists.txt | 2 +- src/server/bitops_family.cc | 2 +- src/server/common.cc | 6 ++- src/server/common.h | 3 +- src/server/db_slice.cc | 73 ++++++++++++++++++----------- src/server/db_slice.h | 2 +- src/server/debugcmd.cc | 31 +++++++++---- src/server/dragonfly_test.cc | 8 ++-- src/server/engine_shard_set.cc | 4 +- src/server/generic_family_test.cc | 3 ++ src/server/rdb_test.cc | 29 ++++++++++-- src/server/server_family.cc | 3 +- src/server/string_family.cc | 2 +- src/server/test_utils.cc | 14 ++++++ src/server/test_utils.h | 6 +++ src/server/tiered_storage.cc | 5 +- src/server/tiered_storage_test.cc | 77 +++++++++++++++++++++++++++++++ 19 files changed, 225 insertions(+), 62 deletions(-) create mode 100644 src/server/tiered_storage_test.cc diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index 01e6cd10c..e37c62c7a 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -876,13 +876,15 @@ void CompactObj::GetString(char* dest) const { void CompactObj::SetExternal(size_t offset, size_t sz) { SetMeta(EXTERNAL_TAG, mask_ & ~kEncMask); - u_.ext_ptr.offset = offset; + u_.ext_ptr.page_index = offset / 4096; + u_.ext_ptr.page_offset = offset % 4096; u_.ext_ptr.size = sz; } -std::pair CompactObj::GetExternalPtr() const { +std::pair CompactObj::GetExternalSlice() const { DCHECK_EQ(EXTERNAL_TAG, taglen_); - return pair(size_t(u_.ext_ptr.offset), size_t(u_.ext_ptr.size)); + size_t offset = size_t(u_.ext_ptr.page_index) * 4096 + u_.ext_ptr.page_offset; + return pair(offset, size_t(u_.ext_ptr.size)); } void CompactObj::Reset() { diff --git a/src/core/compact_object.h b/src/core/compact_object.h index 4261b9704..f613fabd8 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -268,7 +268,7 @@ class CompactObj { } void SetExternal(size_t offset, size_t sz); - std::pair GetExternalPtr() const; + std::pair GetExternalSlice() const; // In case this object a single blob, returns number of bytes allocated on heap // for that blob. Otherwise returns 0. @@ -317,9 +317,12 @@ class CompactObj { } struct ExternalPtr { - size_t offset; + uint32_t type : 8; + uint32_t reserved : 24; + uint32_t page_index; + uint16_t page_offset; // 0 for multi-page blobs. != 0 for small blobs. + uint16_t reserved2; uint32_t size; - uint32_t unneeded; } __attribute__((packed)); struct JsonWrapper { diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index b5cd20872..621ea6f51 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -42,7 +42,7 @@ cxx_test(blocking_controller_test dragonfly_lib LABELS DFLY) cxx_test(snapshot_test dragonfly_lib LABELS DFLY) cxx_test(json_family_test dfly_test_lib LABELS DFLY) cxx_test(journal_test dfly_test_lib LABELS DFLY) - +cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY) add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY) add_dependencies(check_dfly dragonfly_test json_family_test list_family_test diff --git a/src/server/bitops_family.cc b/src/server/bitops_family.cc index 78b65060f..58ba8cd7b 100644 --- a/src/server/bitops_family.cc +++ b/src/server/bitops_family.cc @@ -631,7 +631,7 @@ std::string GetString(const PrimeValue& pv, EngineShard* shard) { std::string res; if (pv.IsExternal()) { auto* tiered = shard->tiered_storage(); - auto [offset, size] = pv.GetExternalPtr(); + auto [offset, size] = pv.GetExternalSlice(); res.resize(size); std::error_code ec = tiered->Read(offset, size, res.data()); diff --git a/src/server/common.cc b/src/server/common.cc index 59fb593ee..7f4b2da73 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -193,13 +193,15 @@ bool ParseDouble(string_view src, double* value) { #define ADD(x) (x) += o.x TieredStats& TieredStats::operator+=(const TieredStats& o) { - static_assert(sizeof(TieredStats) == 40); + static_assert(sizeof(TieredStats) == 48); ADD(tiered_reads); ADD(tiered_writes); ADD(storage_capacity); ADD(storage_reserved); - ADD(aborted_offloads); + ADD(aborted_write_cnt); + ADD(flush_skip_cnt); + return *this; } diff --git a/src/server/common.h b/src/server/common.h index 9861bfafd..3e57c3ed2 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -99,7 +99,8 @@ struct TieredStats { // how much was reserved by actively stored items. size_t storage_reserved = 0; - size_t aborted_offloads = 0; + size_t aborted_write_cnt = 0; + size_t flush_skip_cnt = 0; TieredStats& operator+=(const TieredStats&); }; diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 317055bda..31a08c33e 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -37,24 +37,40 @@ static_assert(kPrimeSegmentSize == 32288); // 24576 static_assert(kExpireSegmentSize == 23528); -void UpdateStatsOnDeletion(PrimeIterator it, DbTableStats* stats) { - size_t value_heap_size = it->second.MallocUsed(); - stats->inline_keys -= it->first.IsInline(); - stats->obj_memory_usage -= (it->first.MallocUsed() + value_heap_size); - if (it->second.ObjType() == OBJ_STRING) - stats->strval_memory_usage -= value_heap_size; -} - -void EvictItemFun(PrimeIterator del_it, DbTable* table) { - if (del_it->second.HasExpire()) { - CHECK_EQ(1u, table->expire.Erase(del_it->first)); +void PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, EngineShard* shard, + DbTable* table) { + if (!exp_it.is_done()) { + table->expire.Erase(exp_it); } - UpdateStatsOnDeletion(del_it, &table->stats); + DbTableStats& stats = table->stats; + const PrimeValue& pv = del_it->second; + if (pv.IsExternal()) { + auto [offset, size] = pv.GetExternalSlice(); - DVLOG(2) << "Evicted from bucket " << del_it.bucket_id() << " " << del_it->first.ToString(); + stats.tiered_entries--; + stats.tiered_size -= size; + TieredStorage* tiered = shard->tiered_storage(); + tiered->Free(offset, size); + } + + size_t value_heap_size = pv.MallocUsed(); + stats.inline_keys -= del_it->first.IsInline(); + stats.obj_memory_usage -= (del_it->first.MallocUsed() + value_heap_size); + if (pv.ObjType() == OBJ_STRING) + stats.strval_memory_usage -= value_heap_size; table->prime.Erase(del_it); +} + +inline void PerformDeletion(PrimeIterator del_it, EngineShard* shard, DbTable* table) { + ExpireIterator exp_it; + if (del_it->second.HasExpire()) { + exp_it = table->expire.Find(del_it->first); + DCHECK(!exp_it.is_done()); + } + + PerformDeletion(del_it, exp_it, shard, table); }; class PrimeEvictionPolicy { @@ -167,7 +183,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT } DbTable* table = db_slice_->GetDBTable(cntx_.db_index); - EvictItemFun(last_slot_it, table); + PerformDeletion(last_slot_it, db_slice_->shard_owner(), table); ++evicted_; } me->ShiftRight(bucket_it); @@ -446,16 +462,11 @@ bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) { } auto& db = db_arr_[db_ind]; - if (it->second.HasExpire()) { - CHECK_EQ(1u, db->expire.Erase(it->first)); - } - if (it->second.HasFlag()) { CHECK_EQ(1u, db->mcflag.Erase(it->first)); } - UpdateStatsOnDeletion(it, &db->stats); - db->prime.Erase(it); + PerformDeletion(it, shard_owner(), db.get()); return true; } @@ -474,7 +485,17 @@ void DbSlice::FlushDb(DbIndex db_ind) { CreateDb(db_ind); db_arr_[db_ind]->trans_locks.swap(db_ptr->trans_locks); - auto cb = [db_ptr = std::move(db_ptr)]() mutable { + auto cb = [this, db_ptr = std::move(db_ptr)]() mutable { + if (db_ptr->stats.tiered_entries > 0) { + for (auto it = db_ptr->prime.begin(); it != db_ptr->prime.end(); ++it) { + if (it->second.IsExternal()) { + PerformDeletion(it, shard_owner(), db_ptr.get()); + } + } + } + + DCHECK_EQ(0u, db_ptr->stats.tiered_entries); + db_ptr.reset(); mi_heap_collect(ServerState::tlocal()->data_heap(), true); }; @@ -711,7 +732,7 @@ void DbSlice::PreUpdate(DbIndex db_ind, PrimeIterator it) { // before calling to PreUpdate or it does not need to read it at all. // After this code executes, the external blob is lost. TieredStorage* tiered = shard_owner()->tiered_storage(); - auto [offset, size] = it->second.GetExternalPtr(); + auto [offset, size] = it->second.GetExternalSlice(); tiered->Free(offset, size); it->second.Reset(); @@ -764,9 +785,7 @@ pair DbSlice::ExpireIfNeeded(const Context& cntx, if (time_t(cntx.time_now_ms) < expire_time) return make_pair(it, expire_it); - db->expire.Erase(expire_it); - UpdateStatsOnDeletion(it, &db->stats); - db->prime.Erase(it); + PerformDeletion(it, expire_it, shard_owner(), db.get()); ++events_.expired_keys; return make_pair(PrimeIterator{}, ExpireIterator{}); @@ -892,7 +911,7 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t if (evict_it == it || evict_it->first.IsSticky()) continue; - EvictItemFun(evict_it, table); + PerformDeletion(evict_it, shard_owner(), table); ++evicted; if (freed_memory_fun() > memory_to_free) { evict_succeeded = true; @@ -918,7 +937,7 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t if (evict_it == it || evict_it->first.IsSticky()) continue; - EvictItemFun(evict_it, table); + PerformDeletion(evict_it, shard_owner(), table); ++evicted; if (freed_memory_fun() > memory_to_free) { diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 24c81f0eb..8655deb23 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -202,7 +202,7 @@ class DbSlice { */ void FlushDb(DbIndex db_ind); - EngineShard* shard_owner() { + EngineShard* shard_owner() const { return owner_; } diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index d7be50386..dc36d7203 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -55,6 +55,8 @@ struct ObjInfo { enum LockStatus { NONE, S, X } lock_status = NONE; int64_t ttl = INT64_MAX; + optional external_len; + bool has_sec_precision = false; bool found = false; }; @@ -338,11 +340,17 @@ void DebugCmd::Inspect(string_view key) { PrimeIterator it = pt->Find(key); ObjInfo oinfo; if (IsValid(it)) { + const PrimeValue& pv = it->second; + oinfo.found = true; - oinfo.encoding = it->second.Encoding(); + oinfo.encoding = pv.Encoding(); oinfo.bucket_id = it.bucket_id(); oinfo.slot_id = it.slot_id(); - if (it->second.HasExpire()) { + if (pv.IsExternal()) { + oinfo.external_len.emplace(pv.GetExternalSlice().second); + } + + if (pv.HasExpire()) { ExpireIterator exp_it = exp_t->Find(it->first); CHECK(!exp_it.is_done()); @@ -366,13 +374,20 @@ void DebugCmd::Inspect(string_view key) { ObjInfo res = ess.Await(sid, cb); string resp; - if (res.found) { - StrAppend(&resp, "encoding:", strEncoding(res.encoding), " bucket_id:", res.bucket_id); - StrAppend(&resp, " slot:", res.slot_id, " shard:", sid); + if (!res.found) { + (*cntx_)->SendError(kKeyNotFoundErr); + return; + } - if (res.ttl != INT64_MAX) { - StrAppend(&resp, " ttl:", res.ttl, res.has_sec_precision ? "s" : "ms"); - } + StrAppend(&resp, "encoding:", strEncoding(res.encoding), " bucket_id:", res.bucket_id); + StrAppend(&resp, " slot:", res.slot_id, " shard:", sid); + + if (res.ttl != INT64_MAX) { + StrAppend(&resp, " ttl:", res.ttl, res.has_sec_precision ? "s" : "ms"); + } + + if (res.external_len) { + StrAppend(&resp, " spill_len:", *res.external_len); } if (res.lock_status != ObjInfo::NONE) { diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 22cb6133b..5f58abb19 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -52,9 +52,9 @@ class DflyEngineTest : public BaseFamilyTest { } }; -class DefragDflyEngineTest : public DflyEngineTest { +class DefragDflyEngineTest : public BaseFamilyTest { protected: - DefragDflyEngineTest() : DflyEngineTest() { + DefragDflyEngineTest() : BaseFamilyTest() { num_threads_ = 1; } }; @@ -141,8 +141,8 @@ TEST_F(DflyEngineTest, HitMissStats) { resp = Run({"get", "Key2"}); ASSERT_THAT(resp, ArgType(RespExpr::NIL)); - EXPECT_THAT(service_->server_family().GetMetrics().events.hits, 1); - EXPECT_THAT(service_->server_family().GetMetrics().events.misses, 1); + EXPECT_THAT(GetMetrics().events.hits, 1); + EXPECT_THAT(GetMetrics().events.misses, 1); } TEST_F(DflyEngineTest, MultiEmpty) { diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 070a013c8..4d6f85d54 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -20,7 +20,7 @@ extern "C" { using namespace std; -ABSL_FLAG(string, backing_prefix, "", ""); +ABSL_FLAG(string, spill_file_prefix, "", ""); ABSL_FLAG(uint32_t, hz, 100, "Base frequency at which the server performs other background tasks. " @@ -227,7 +227,7 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { CompactObj::InitThreadLocal(shard_->memory_resource()); SmallString::InitThreadLocal(data_heap); - string backing_prefix = GetFlag(FLAGS_backing_prefix); + string backing_prefix = GetFlag(FLAGS_spill_file_prefix); if (!backing_prefix.empty()) { if (pb->GetKind() != ProactorBase::IOURING) { LOG(ERROR) << "Only ioring based backing storage is supported. Exiting..."; diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index d48a8cfae..d6e2dbc52 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -95,6 +95,9 @@ TEST_F(GenericFamilyTest, Del) { exist_fb.Join(); del_fb.Join(); + + Run({"setex", "k1", "10", "bar"}); + Run({"del", "k1"}); } TEST_F(GenericFamilyTest, TTL) { diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 0c08b2a37..e4e55826e 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -17,6 +17,7 @@ extern "C" { #include "base/logging.h" #include "facade/facade_test.h" // needed to find operator== for RespExpr. #include "io/file.h" +#include "io/file_util.h" #include "server/engine_shard_set.h" #include "server/rdb_load.h" #include "server/test_utils.h" @@ -31,15 +32,32 @@ using absl::StrCat; ABSL_DECLARE_FLAG(int32, list_compress_depth); ABSL_DECLARE_FLAG(int32, list_max_listpack_size); ABSL_DECLARE_FLAG(int, compression_mode); +ABSL_DECLARE_FLAG(string, dbfilename); namespace dfly { class RdbTest : public BaseFamilyTest { protected: - protected: + static void SetUpTestSuite(); + void TearDown(); + io::FileSource GetSource(string name); }; +void RdbTest::SetUpTestSuite() { + BaseFamilyTest::SetUpTestSuite(); + SetFlag(&FLAGS_dbfilename, "rdbtestdump"); +} + +void RdbTest::TearDown() { + auto rdb_files = io::StatFiles("rdbtestdump*"); + CHECK(rdb_files); + for (const auto& fl : *rdb_files) { + unlink(fl.name.c_str()); + } + BaseFamilyTest::TearDown(); +} + inline const uint8_t* to_byte(const void* s) { return reinterpret_cast(s); } @@ -146,7 +164,7 @@ TEST_F(RdbTest, Stream) { } TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) { - Run({"debug", "populate", "500000"}); + Run({"debug", "populate", "50000"}); for (int i = 0; i <= 3; ++i) { SetFlag(&FLAGS_compression_mode, i); @@ -156,7 +174,7 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) { auto save_info = service_->server_family().GetLastSaveInfo(); resp = Run({"debug", "load", save_info->file_name}); ASSERT_EQ(resp, "OK"); - ASSERT_EQ(500000, CheckedInt({"dbsize"})); + ASSERT_EQ(50000, CheckedInt({"dbsize"})); } } @@ -240,7 +258,8 @@ TEST_F(RdbTest, SaveManyDbs) { Run({"select", "1"}); Run({"debug", "populate", "10000"}); }); - auto metrics = service_->server_family().GetMetrics(); + + auto metrics = GetMetrics(); ASSERT_EQ(2, metrics.db.size()); EXPECT_EQ(50000, metrics.db[0].key_count); EXPECT_EQ(10000, metrics.db[1].key_count); @@ -272,7 +291,7 @@ TEST_F(RdbTest, SaveManyDbs) { auto resp = Run({"debug", "reload", "NOSAVE"}); EXPECT_EQ(resp, "OK"); - metrics = service_->server_family().GetMetrics(); + metrics = GetMetrics(); ASSERT_EQ(2, metrics.db.size()); EXPECT_EQ(50000, metrics.db[0].key_count); EXPECT_EQ(10000, metrics.db[1].key_count); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 492bd028b..e47f5969d 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1319,7 +1319,8 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("tiered_writes", m.tiered_stats.tiered_writes); append("tiered_reserved", m.tiered_stats.storage_reserved); append("tiered_capacity", m.tiered_stats.storage_capacity); - append("tiered_aborted_writes", m.tiered_stats.aborted_offloads); + append("tiered_aborted_write_total", m.tiered_stats.aborted_write_cnt); + append("tiered_flush_skip_total", m.tiered_stats.flush_skip_cnt); } if (should_enter("PERSISTENCE", true)) { diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 86acf235a..0067968e0 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -44,7 +44,7 @@ string GetString(EngineShard* shard, const PrimeValue& pv) { string res; if (pv.IsExternal()) { auto* tiered = shard->tiered_storage(); - auto [offset, size] = pv.GetExternalPtr(); + auto [offset, size] = pv.GetExternalSlice(); res.resize(size); error_code ec = tiered->Read(offset, size, res.data()); diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index a994d61a2..accc2a796 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -179,6 +179,14 @@ RespExpr BaseFamilyTest::Run(ArgSlice list) { return Run(GetId(), list); } +RespExpr BaseFamilyTest::Run(absl::Span span) { + vector sv_vec(span.size()); + for (unsigned i = 0; i < span.size(); ++i) { + sv_vec[i] = span[i]; + } + return Run(sv_vec); +} + RespExpr BaseFamilyTest::Run(std::string_view id, ArgSlice slice) { TestConnWrapper* conn_wrapper = AddFindConn(Protocol::REDIS, id); @@ -292,6 +300,12 @@ int64_t BaseFamilyTest::CheckedInt(ArgSlice list) { return res; } +string BaseFamilyTest::CheckedString(ArgSlice list) { + RespExpr resp = Run(list); + CHECK_EQ(RespExpr::STRING, int(resp.type)) << list; + return string{ToSV(resp.GetBuf())}; +} + CmdArgVec BaseFamilyTest::TestConnWrapper::Args(ArgSlice list) { CHECK_NE(0u, list.size()); diff --git a/src/server/test_utils.h b/src/server/test_utils.h index 975307992..0db086828 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -47,6 +47,7 @@ class BaseFamilyTest : public ::testing::Test { } RespExpr Run(ArgSlice list); + RespExpr Run(absl::Span list); RespExpr Run(std::string_view id, ArgSlice list); @@ -60,6 +61,7 @@ class BaseFamilyTest : public ::testing::Test { return CheckedInt(ArgSlice{list.begin(), list.size()}); } int64_t CheckedInt(ArgSlice list); + std::string CheckedString(ArgSlice list); bool IsLocked(DbIndex db_index, std::string_view key) const; ConnectionContext::DebugInfo GetDebugInfo(const std::string& id) const; @@ -71,6 +73,10 @@ class BaseFamilyTest : public ::testing::Test { TestConnWrapper* AddFindConn(Protocol proto, std::string_view id); static std::vector StrArray(const RespExpr& expr); + Metrics GetMetrics() const { + return service_->server_family().GetMetrics(); + } + void AdvanceTime(int64_t ms) { TEST_current_time_ms += ms; } diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index ac40f5493..837e4266e 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -340,13 +340,13 @@ void TieredStorage::FinishIoRequest(int io_res, InflightWriteRequest* req) { LOG(ERROR) << "Error writing into ssd file: " << util::detail::SafeErrorMessage(-io_res); alloc_.Free(req->page_index() * kBlockLen, kBlockLen); req->Undo(&bin_record, &db_slice_); - ++stats_.aborted_offloads; + ++stats_.aborted_write_cnt; } else { // Also removes the entries from bin_record. uint16_t entries_serialized = req->ExternalizeEntries(&bin_record, &db_slice_); if (entries_serialized == 0) { // aborted - ++stats_.aborted_offloads; + ++stats_.aborted_write_cnt; alloc_.Free(req->page_index() * kBlockLen, kBlockLen); } else { // succeeded. VLOG(2) << "page_refcnt emplace " << req->page_index(); @@ -413,6 +413,7 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) { if (!flush_succeeded) { // we could not flush because I/O is saturated, so lets remove the last item. bin_record.pending_entries.erase(it->first.AsRef()); + ++stats_.flush_skip_cnt; } return error_code{}; diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc new file mode 100644 index 000000000..db3933515 --- /dev/null +++ b/src/server/tiered_storage_test.cc @@ -0,0 +1,77 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include +#include + +#include "base/flags.h" +#include "server/test_utils.h" + +using namespace std; +using namespace testing; +using absl::SetFlag; +using absl::StrCat; + +ABSL_DECLARE_FLAG(string, spill_file_prefix); + +namespace dfly { + +class TieredStorageTest : public BaseFamilyTest { + protected: + TieredStorageTest() : BaseFamilyTest() { + num_threads_ = 1; + } + + void FillExternalKeys(unsigned count); + + static void SetUpTestSuite(); +}; + +void TieredStorageTest::SetUpTestSuite() { + BaseFamilyTest::SetUpTestSuite(); + SetFlag(&FLAGS_spill_file_prefix, "/tmp/spill"); +} + +void TieredStorageTest::FillExternalKeys(unsigned count) { + string val(256, 'a'); + + unsigned batch_cnt = count / 50; + for (unsigned i = 0; i < batch_cnt; ++i) { + vector cmd; + cmd.push_back("mset"); + + for (unsigned j = 0; j < 50; ++j) { + string key = StrCat("k", i * 100 + j); + cmd.push_back(key); + cmd.push_back(val); + } + Run(absl::Span{cmd}); + } + + for (unsigned i = batch_cnt * 50; i < count; ++i) { + Run({"set", StrCat("k", i), val}); + } +} + +TEST_F(TieredStorageTest, Basic) { + FillExternalKeys(5000); + + EXPECT_EQ(5000, CheckedInt({"dbsize"})); + Metrics m = GetMetrics(); + EXPECT_GT(m.db[0].tiered_entries, 0u); + + FillExternalKeys(5000); + + m = GetMetrics(); + unsigned tiered_entries = m.db[0].tiered_entries; + EXPECT_GT(tiered_entries, 0u); + string resp = CheckedString({"debug", "object", "k1"}); + EXPECT_THAT(resp, HasSubstr("spill_len")); + + Run({"del", "k1"}); + m = GetMetrics(); + EXPECT_EQ(m.db[0].tiered_entries, tiered_entries - 1); +} + +} // namespace dfly