mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
refactor(server): Privatize PreUpdate()
and PostUpdate()
(#2322)
* refactor(server): Privatize `PreUpdate()` and `PostUpdate()` While at it: * Make `PreUpdate()` not decrease object size * Remove redundant leftover call to `PreUpdate()` outside `DbSlice` * Add pytest * Test delete leads to 0 counters * Improve test * fixes * comments
This commit is contained in:
parent
700a65ece5
commit
a360b308c9
5 changed files with 94 additions and 30 deletions
|
@ -46,11 +46,11 @@ static_assert(kPrimeSegmentSize == 32288);
|
||||||
// 24576
|
// 24576
|
||||||
static_assert(kExpireSegmentSize == 23528);
|
static_assert(kExpireSegmentSize == 23528);
|
||||||
|
|
||||||
void AccountObjectMemory(const CompactObj& obj, DbTableStats* stats, int64_t multiplier) {
|
void AccountObjectMemory(unsigned type, int64_t size, DbTableStats* stats) {
|
||||||
const int64_t value_heap_size = obj.MallocUsed() * multiplier;
|
DCHECK_GE(static_cast<int64_t>(stats->obj_memory_usage) + size, 0)
|
||||||
|
<< "Can't decrease " << size << " from " << stats->obj_memory_usage;
|
||||||
stats->obj_memory_usage += value_heap_size;
|
stats->obj_memory_usage += size;
|
||||||
stats->AddTypeMemoryUsage(obj.ObjType(), value_heap_size);
|
stats->AddTypeMemoryUsage(type, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
void PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, EngineShard* shard,
|
void PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, EngineShard* shard,
|
||||||
|
@ -82,8 +82,8 @@ void PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, EngineShard* s
|
||||||
}
|
}
|
||||||
|
|
||||||
stats.inline_keys -= del_it->first.IsInline();
|
stats.inline_keys -= del_it->first.IsInline();
|
||||||
AccountObjectMemory(del_it->first, &stats, -1); // Key
|
AccountObjectMemory(del_it->first.ObjType(), -del_it->first.MallocUsed(), &stats); // Key
|
||||||
AccountObjectMemory(del_it->second, &stats, -1); // Value
|
AccountObjectMemory(del_it->second.ObjType(), -del_it->second.MallocUsed(), &stats); // Value
|
||||||
|
|
||||||
if (pv.ObjType() == OBJ_HASH && pv.Encoding() == kEncodingListPack) {
|
if (pv.ObjType() == OBJ_HASH && pv.Encoding() == kEncodingListPack) {
|
||||||
--stats.listpack_blob_cnt;
|
--stats.listpack_blob_cnt;
|
||||||
|
@ -377,7 +377,7 @@ void DbSlice::AutoUpdater::Run() {
|
||||||
DCHECK(fields_.action == DestructorAction::kRun);
|
DCHECK(fields_.action == DestructorAction::kRun);
|
||||||
CHECK_NE(fields_.db_slice, nullptr);
|
CHECK_NE(fields_.db_slice, nullptr);
|
||||||
|
|
||||||
fields_.db_slice->PostUpdate(fields_.db_ind, fields_.it, fields_.key, fields_.key_existed);
|
fields_.db_slice->PostUpdate(fields_.db_ind, fields_.it, fields_.key, fields_.orig_heap_size);
|
||||||
Cancel(); // Reset to not run again
|
Cancel(); // Reset to not run again
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -387,8 +387,10 @@ void DbSlice::AutoUpdater::Cancel() {
|
||||||
|
|
||||||
DbSlice::AutoUpdater::AutoUpdater(const Fields& fields) : fields_(fields) {
|
DbSlice::AutoUpdater::AutoUpdater(const Fields& fields) : fields_(fields) {
|
||||||
DCHECK(fields_.action == DestructorAction::kRun);
|
DCHECK(fields_.action == DestructorAction::kRun);
|
||||||
|
DCHECK(IsValid(fields.it));
|
||||||
fields_.db_size = fields_.db_slice->DbSize(fields_.db_ind);
|
fields_.db_size = fields_.db_slice->DbSize(fields_.db_ind);
|
||||||
fields_.deletion_count = fields_.db_slice->deletion_count_;
|
fields_.deletion_count = fields_.db_slice->deletion_count_;
|
||||||
|
fields_.orig_heap_size = fields.it->second.MallocUsed();
|
||||||
}
|
}
|
||||||
|
|
||||||
DbSlice::AddOrFindResult& DbSlice::AddOrFindResult::operator=(ItAndUpdater&& o) {
|
DbSlice::AddOrFindResult& DbSlice::AddOrFindResult::operator=(ItAndUpdater&& o) {
|
||||||
|
@ -405,7 +407,11 @@ DbSlice::ItAndUpdater DbSlice::FindMutable(const Context& cntx, string_view key)
|
||||||
if (IsValid(it)) {
|
if (IsValid(it)) {
|
||||||
PreUpdate(cntx.db_index, it);
|
PreUpdate(cntx.db_index, it);
|
||||||
return {it, exp_it,
|
return {it, exp_it,
|
||||||
AutoUpdater({AutoUpdater::DestructorAction::kRun, this, cntx.db_index, it, key, true})};
|
AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
|
||||||
|
.db_slice = this,
|
||||||
|
.db_ind = cntx.db_index,
|
||||||
|
.it = it,
|
||||||
|
.key = key})};
|
||||||
} else {
|
} else {
|
||||||
return {it, exp_it, {}};
|
return {it, exp_it, {}};
|
||||||
}
|
}
|
||||||
|
@ -425,7 +431,11 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutable(const Context& cntx, string
|
||||||
|
|
||||||
PreUpdate(cntx.db_index, it);
|
PreUpdate(cntx.db_index, it);
|
||||||
return {{it, exp_it,
|
return {{it, exp_it,
|
||||||
AutoUpdater({AutoUpdater::DestructorAction::kRun, this, cntx.db_index, it, key, true})}};
|
AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
|
||||||
|
.db_slice = this,
|
||||||
|
.db_ind = cntx.db_index,
|
||||||
|
.it = it,
|
||||||
|
.key = key})}};
|
||||||
}
|
}
|
||||||
|
|
||||||
DbSlice::ItAndExpConst DbSlice::FindReadOnly(const Context& cntx, std::string_view key) {
|
DbSlice::ItAndExpConst DbSlice::FindReadOnly(const Context& cntx, std::string_view key) {
|
||||||
|
@ -534,8 +544,11 @@ DbSlice::AddOrFindResult DbSlice::AddOrFind(const Context& cntx, string_view key
|
||||||
return {.it = res.it,
|
return {.it = res.it,
|
||||||
.exp_it = res.exp_it,
|
.exp_it = res.exp_it,
|
||||||
.is_new = false,
|
.is_new = false,
|
||||||
.post_updater = AutoUpdater(
|
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
|
||||||
{AutoUpdater::DestructorAction::kRun, this, cntx.db_index, res.it, key, true})};
|
.db_slice = this,
|
||||||
|
.db_ind = cntx.db_index,
|
||||||
|
.it = res.it,
|
||||||
|
.key = key})};
|
||||||
}
|
}
|
||||||
|
|
||||||
// It's a new entry.
|
// It's a new entry.
|
||||||
|
@ -597,7 +610,7 @@ DbSlice::AddOrFindResult DbSlice::AddOrFind(const Context& cntx, string_view key
|
||||||
}
|
}
|
||||||
|
|
||||||
db.stats.inline_keys += it->first.IsInline();
|
db.stats.inline_keys += it->first.IsInline();
|
||||||
AccountObjectMemory(it->first, &db.stats, 1); // Account for key
|
AccountObjectMemory(it->first.ObjType(), it->first.MallocUsed(), &db.stats); // Account for key
|
||||||
|
|
||||||
DCHECK_EQ(it->second.MallocUsed(), 0UL); // Make sure accounting is no-op
|
DCHECK_EQ(it->second.MallocUsed(), 0UL); // Make sure accounting is no-op
|
||||||
it.SetVersion(NextVersion());
|
it.SetVersion(NextVersion());
|
||||||
|
@ -616,8 +629,11 @@ DbSlice::AddOrFindResult DbSlice::AddOrFind(const Context& cntx, string_view key
|
||||||
return {.it = it,
|
return {.it = it,
|
||||||
.exp_it = ExpireIterator{},
|
.exp_it = ExpireIterator{},
|
||||||
.is_new = true,
|
.is_new = true,
|
||||||
.post_updater = AutoUpdater(
|
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
|
||||||
{AutoUpdater::DestructorAction::kRun, this, cntx.db_index, it, key, false})};
|
.db_slice = this,
|
||||||
|
.db_ind = cntx.db_index,
|
||||||
|
.it = it,
|
||||||
|
.key = key})};
|
||||||
}
|
}
|
||||||
|
|
||||||
void DbSlice::ActivateDb(DbIndex db_ind) {
|
void DbSlice::ActivateDb(DbIndex db_ind) {
|
||||||
|
@ -999,10 +1015,7 @@ void DbSlice::PreUpdate(DbIndex db_ind, PrimeIterator it) {
|
||||||
ccb.second(db_ind, ChangeReq{it});
|
ccb.second(db_ind, ChangeReq{it});
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(#2252): Remove and do accounting only in PostUpdate()
|
|
||||||
auto* stats = MutableStats(db_ind);
|
auto* stats = MutableStats(db_ind);
|
||||||
AccountObjectMemory(it->second, stats, -1);
|
|
||||||
|
|
||||||
if (it->second.ObjType() == OBJ_STRING) {
|
if (it->second.ObjType() == OBJ_STRING) {
|
||||||
if (it->second.IsExternal()) {
|
if (it->second.IsExternal()) {
|
||||||
// We assume here that the operation code either loaded the entry into memory
|
// We assume here that the operation code either loaded the entry into memory
|
||||||
|
@ -1026,10 +1039,11 @@ void DbSlice::PreUpdate(DbIndex db_ind, PrimeIterator it) {
|
||||||
it.SetVersion(NextVersion());
|
it.SetVersion(NextVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, bool existing) {
|
void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, size_t orig_size) {
|
||||||
DbTableStats* stats = MutableStats(db_ind);
|
DbTableStats* stats = MutableStats(db_ind);
|
||||||
|
|
||||||
AccountObjectMemory(it->second, stats, 1);
|
int64_t delta = static_cast<int64_t>(it->second.MallocUsed()) - static_cast<int64_t>(orig_size);
|
||||||
|
AccountObjectMemory(it->second.ObjType(), delta, stats);
|
||||||
|
|
||||||
auto& db = *db_arr_[db_ind];
|
auto& db = *db_arr_[db_ind];
|
||||||
auto& watched_keys = db.watched_keys;
|
auto& watched_keys = db.watched_keys;
|
||||||
|
|
|
@ -90,11 +90,11 @@ class DbSlice {
|
||||||
DbIndex db_ind = 0;
|
DbIndex db_ind = 0;
|
||||||
PrimeIterator it;
|
PrimeIterator it;
|
||||||
std::string_view key;
|
std::string_view key;
|
||||||
bool key_existed = false;
|
|
||||||
|
|
||||||
|
// The following fields are calculated at init time
|
||||||
size_t db_size = 0;
|
size_t db_size = 0;
|
||||||
size_t deletion_count = 0;
|
size_t deletion_count = 0;
|
||||||
// TODO(#2252): Add heap size here, and only update memory in d'tor
|
size_t orig_heap_size = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
AutoUpdater(const Fields& fields);
|
AutoUpdater(const Fields& fields);
|
||||||
|
@ -311,11 +311,6 @@ class DbSlice {
|
||||||
size_t DbSize(DbIndex db_ind) const;
|
size_t DbSize(DbIndex db_ind) const;
|
||||||
|
|
||||||
// Callback functions called upon writing to the existing key.
|
// Callback functions called upon writing to the existing key.
|
||||||
// TODO(#2252): Remove these (or make them private)
|
|
||||||
void PreUpdate(DbIndex db_ind, PrimeIterator it);
|
|
||||||
void PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key,
|
|
||||||
bool existing_entry = true);
|
|
||||||
|
|
||||||
DbTableStats* MutableStats(DbIndex db_ind) {
|
DbTableStats* MutableStats(DbIndex db_ind) {
|
||||||
return &db_arr_[db_ind]->stats;
|
return &db_arr_[db_ind]->stats;
|
||||||
}
|
}
|
||||||
|
@ -397,6 +392,9 @@ class DbSlice {
|
||||||
void PerformDeletion(PrimeIterator del_it, EngineShard* shard, DbTable* table);
|
void PerformDeletion(PrimeIterator del_it, EngineShard* shard, DbTable* table);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
void PreUpdate(DbIndex db_ind, PrimeIterator it);
|
||||||
|
void PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, size_t orig_size);
|
||||||
|
|
||||||
// Releases a single key. `key` must have been normalized by GetLockKey().
|
// Releases a single key. `key` must have been normalized by GetLockKey().
|
||||||
void ReleaseNormalized(IntentLock::Mode m, DbIndex db_index, std::string_view key,
|
void ReleaseNormalized(IntentLock::Mode m, DbIndex db_index, std::string_view key,
|
||||||
unsigned count);
|
unsigned count);
|
||||||
|
|
|
@ -277,7 +277,6 @@ OpResult<int64_t> OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr,
|
||||||
CompactObj cobj;
|
CompactObj cobj;
|
||||||
cobj.SetInt(incr);
|
cobj.SetInt(incr);
|
||||||
|
|
||||||
// AddNew calls PostUpdate inside.
|
|
||||||
try {
|
try {
|
||||||
res = db_slice.AddNew(op_args.db_cntx, key, std::move(cobj), 0);
|
res = db_slice.AddNew(op_args.db_cntx, key, std::move(cobj), 0);
|
||||||
} catch (bad_alloc&) {
|
} catch (bad_alloc&) {
|
||||||
|
@ -449,7 +448,6 @@ OpResult<array<int64_t, 5>> OpThrottle(const OpArgs& op_args, const string_view
|
||||||
CompactObj cobj;
|
CompactObj cobj;
|
||||||
cobj.SetInt(new_tat_ms);
|
cobj.SetInt(new_tat_ms);
|
||||||
|
|
||||||
// AddNew calls PostUpdate inside.
|
|
||||||
try {
|
try {
|
||||||
db_slice.AddNew(op_args.db_cntx, key, std::move(cobj), new_tat_ms);
|
db_slice.AddNew(op_args.db_cntx, key, std::move(cobj), new_tat_ms);
|
||||||
} catch (bad_alloc&) {
|
} catch (bad_alloc&) {
|
||||||
|
|
|
@ -1275,7 +1275,6 @@ ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_view key, bo
|
||||||
range_spec.interval = ZSetFamily::TopNScored(1);
|
range_spec.interval = ZSetFamily::TopNScored(1);
|
||||||
|
|
||||||
DVLOG(2) << "popping from " << key << " " << t->DebugId();
|
DVLOG(2) << "popping from " << key << " " << t->DebugId();
|
||||||
db_slice.PreUpdate(t->GetDbIndex(), it);
|
|
||||||
|
|
||||||
PrimeValue& pv = it->second;
|
PrimeValue& pv = it->second;
|
||||||
IntervalVisitor iv{Action::POP, range_spec.params, &pv};
|
IntervalVisitor iv{Action::POP, range_spec.params, &pv};
|
||||||
|
|
|
@ -254,13 +254,58 @@ class TestDflySnapshotOnShutdown(SnapshotTestBase):
|
||||||
def setup(self, tmp_dir: Path):
|
def setup(self, tmp_dir: Path):
|
||||||
self.tmp_dir = tmp_dir
|
self.tmp_dir = tmp_dir
|
||||||
|
|
||||||
|
async def _get_info_memory_fields(self, client):
|
||||||
|
res = await client.execute_command("INFO MEMORY")
|
||||||
|
fields = {}
|
||||||
|
for line in res.decode("ascii").splitlines():
|
||||||
|
if line.startswith("#"):
|
||||||
|
continue
|
||||||
|
k, v = line.split(":")
|
||||||
|
if k == "object_used_memory" or k.startswith("type_used_memory_"):
|
||||||
|
fields.update({k: int(v)})
|
||||||
|
return fields
|
||||||
|
|
||||||
|
async def _delete_all_keys(self, client):
|
||||||
|
# Delete all keys from all DBs
|
||||||
|
for i in range(0, SEEDER_ARGS["dbcount"]):
|
||||||
|
await client.select(i)
|
||||||
|
while True:
|
||||||
|
keys = await client.keys("*")
|
||||||
|
if len(keys) == 0:
|
||||||
|
break
|
||||||
|
await client.delete(*keys)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_memory_counters(self, df_seeder_factory, df_server):
|
||||||
|
a_client = aioredis.Redis(port=df_server.port)
|
||||||
|
|
||||||
|
memory_counters = await self._get_info_memory_fields(a_client)
|
||||||
|
assert memory_counters == {"object_used_memory": 0}
|
||||||
|
|
||||||
|
seeder = df_seeder_factory.create(port=df_server.port, **SEEDER_ARGS)
|
||||||
|
await seeder.run(target_deviation=0.1)
|
||||||
|
|
||||||
|
memory_counters = await self._get_info_memory_fields(a_client)
|
||||||
|
assert all(value > 0 for value in memory_counters.values())
|
||||||
|
|
||||||
|
await self._delete_all_keys(a_client)
|
||||||
|
memory_counters = await self._get_info_memory_fields(a_client)
|
||||||
|
assert memory_counters == {"object_used_memory": 0}
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.slow
|
@pytest.mark.slow
|
||||||
async def test_snapshot(self, df_seeder_factory, df_server):
|
async def test_snapshot(self, df_seeder_factory, df_server):
|
||||||
|
"""Checks that:
|
||||||
|
1. After reloading the snapshot file the data is the same
|
||||||
|
2. Memory counters after loading from snapshot is similar to before creating a snapshot
|
||||||
|
3. Memory counters after deleting all keys loaded by snapshot - this validates the memory
|
||||||
|
counting when loading from snapshot."""
|
||||||
seeder = df_seeder_factory.create(port=df_server.port, **SEEDER_ARGS)
|
seeder = df_seeder_factory.create(port=df_server.port, **SEEDER_ARGS)
|
||||||
await seeder.run(target_deviation=0.1)
|
await seeder.run(target_deviation=0.1)
|
||||||
|
|
||||||
start_capture = await seeder.capture()
|
start_capture = await seeder.capture()
|
||||||
|
a_client = aioredis.Redis(port=df_server.port)
|
||||||
|
memory_before = await self._get_info_memory_fields(a_client)
|
||||||
|
|
||||||
df_server.stop()
|
df_server.stop()
|
||||||
df_server.start()
|
df_server.start()
|
||||||
|
@ -270,6 +315,16 @@ class TestDflySnapshotOnShutdown(SnapshotTestBase):
|
||||||
await a_client.connection_pool.disconnect()
|
await a_client.connection_pool.disconnect()
|
||||||
|
|
||||||
assert await seeder.compare(start_capture, port=df_server.port)
|
assert await seeder.compare(start_capture, port=df_server.port)
|
||||||
|
memory_after = await self._get_info_memory_fields(a_client)
|
||||||
|
for counter, value in memory_before.items():
|
||||||
|
# Unfortunately memory usage sometimes depends on order of insertion / deletion, so
|
||||||
|
# it's usually not exactly the same. For the test to be stable we check that it's
|
||||||
|
# at least 50% that of the original value.
|
||||||
|
assert memory_after[counter] >= 0.5 * value
|
||||||
|
|
||||||
|
await self._delete_all_keys(a_client)
|
||||||
|
memory_empty = await self._get_info_memory_fields(a_client)
|
||||||
|
assert memory_empty == {"object_used_memory": 0}
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({**BASIC_ARGS, "dbfilename": "test-info-persistence"})
|
@dfly_args({**BASIC_ARGS, "dbfilename": "test-info-persistence"})
|
||||||
|
|
Loading…
Reference in a new issue