1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix: stream memory tracking (#4067)

* add object memory usage for streams
* add test
This commit is contained in:
Kostas Kyrimis 2024-11-27 11:41:08 +01:00 committed by GitHub
parent 065a63cab7
commit 66e0fd0908
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 91 additions and 16 deletions

View file

@ -125,11 +125,6 @@ size_t MallocUsedZSet(unsigned encoding, void* ptr) {
return 0;
}
size_t MallocUsedStream(unsigned encoding, void* streamv) {
// stream* str_obj = (stream*)streamv;
return 0; // TODO
}
inline void FreeObjHash(unsigned encoding, void* ptr) {
switch (encoding) {
case kEncodingStrMap2:
@ -316,7 +311,7 @@ size_t RobjWrapper::MallocUsed(bool slow) const {
case OBJ_ZSET:
return MallocUsedZSet(encoding_, inner_obj_);
case OBJ_STREAM:
return MallocUsedStream(encoding_, inner_obj_);
return sz_;
default:
LOG(FATAL) << "Not supported " << type_;
@ -370,7 +365,12 @@ size_t RobjWrapper::Size() const {
StringMap* sm = (StringMap*)inner_obj_;
return sm->UpperBoundSize();
}
default:
LOG(FATAL) << "Unexpected encoding " << encoding_;
}
case OBJ_STREAM:
// Size mean malloc bytes for streams
return sz_;
default:;
}
return 0;
@ -461,6 +461,10 @@ void RobjWrapper::SetString(string_view s, MemoryResource* mr) {
}
}
void RobjWrapper::SetSize(uint64_t size) {
sz_ = size;
}
bool RobjWrapper::DefragIfNeeded(float ratio) {
auto do_defrag = [this, ratio](auto defrag_fun) mutable {
auto [new_ptr, realloced] = defrag_fun(encoding_, inner_obj_, ratio);
@ -811,6 +815,17 @@ void CompactObj::SetJsonSize(int64_t size) {
}
}
void CompactObj::AddStreamSize(int64_t size) {
if (size < 0) {
// We might have a negative size. For example, if we remove a consumer,
// the tracker will report a negative net (since we deallocated),
// so the object now consumes less memory than it did before. This DCHECK
// is for fanity and to catch any potential issues with our tracking approach.
DCHECK(static_cast<int64_t>(u_.r_obj.Size()) >= size);
}
u_.r_obj.SetSize((u_.r_obj.Size() + size));
}
void CompactObj::SetJson(const uint8_t* buf, size_t len) {
SetMeta(JSON_TAG);
u_.json_obj.flat.flat_ptr = (uint8_t*)tl.local_mr->allocate(len, kAlignSize);

View file

@ -46,6 +46,8 @@ class RobjWrapper {
void Free(MemoryResource* mr);
void SetString(std::string_view s, MemoryResource* mr);
// Used when sz_ is used to denote memory usage
void SetSize(uint64_t size);
void Init(unsigned type, unsigned encoding, void* inner);
unsigned type() const {
@ -315,6 +317,8 @@ class CompactObj {
void SetJson(const uint8_t* buf, size_t len);
// Adjusts the size used by json
void SetJsonSize(int64_t size);
// Adjusts the size used by a stream
void AddStreamSize(int64_t size);
// pre condition - the type here is OBJ_JSON and was set with SetJson
JsonType* GetJson() const;

View file

@ -603,6 +603,24 @@ int StreamTrim(const AddTrimOpts& opts, stream* s) {
return 0;
}
class StreamMemTracker {
public:
StreamMemTracker() {
start_size_ = zmalloc_used_memory_tl;
}
void UpdateStreamSize(PrimeValue& pv) const {
const size_t current = zmalloc_used_memory_tl;
int64_t diff = static_cast<int64_t>(current) - static_cast<int64_t>(start_size_);
pv.AddStreamSize(diff);
// Under any flow we must not end up with this special value.
DCHECK(pv.MallocUsed() != 0);
}
private:
size_t start_size_{0};
};
OpResult<streamID> OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgList args) {
DCHECK(!args.empty() && args.size() % 2 == 0);
auto& db_slice = op_args.GetDbSlice();
@ -622,6 +640,8 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgL
auto& it = add_res.it;
StreamMemTracker mem_tracker;
if (add_res.is_new) {
stream* s = streamNew();
it->second.InitRobj(OBJ_STREAM, OBJ_ENCODING_STREAM, s);
@ -648,6 +668,8 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgL
StreamTrim(opts, stream_inst);
mem_tracker.UpdateStreamSize(it->second);
auto blocking_controller = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
if (blocking_controller) {
blocking_controller->AwakeWatched(op_args.db_cntx.db_index, opts.key);
@ -1093,6 +1115,7 @@ OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts
auto& db_slice = op_args.GetDbSlice();
auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM);
int64_t entries_read = SCG_INVALID_ENTRIES_READ;
StreamMemTracker mem_tracker;
if (!res_it) {
if (opts.flags & kCreateOptMkstream) {
// MKSTREAM is enabled, so create the stream
@ -1123,16 +1146,15 @@ OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts
}
streamCG* cg = streamCreateCG(s, opts.gname.data(), opts.gname.size(), &id, entries_read);
if (cg) {
return OpStatus::OK;
}
return OpStatus::BUSY_GROUP;
mem_tracker.UpdateStreamSize(res_it->it->second);
return cg ? OpStatus::OK : OpStatus::BUSY_GROUP;
}
struct FindGroupResult {
stream* s = nullptr;
streamCG* cg = nullptr;
DbSlice::AutoUpdater post_updater;
DbSlice::Iterator it;
};
OpResult<FindGroupResult> FindGroup(const OpArgs& op_args, string_view key, string_view gname,
@ -1147,7 +1169,7 @@ OpResult<FindGroupResult> FindGroup(const OpArgs& op_args, string_view key, stri
if (skip_group && !cg)
return OpStatus::SKIPPED;
return FindGroupResult{s, cg, std::move(res_it->post_updater)};
return FindGroupResult{s, cg, std::move(res_it->post_updater), res_it->it};
}
constexpr uint8_t kClaimForce = 1 << 0;
@ -1221,6 +1243,8 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
}
}
StreamMemTracker tracker;
for (streamID id : ids) {
std::array<uint8_t, sizeof(streamID)> buf;
StreamEncodeID(buf.begin(), &id);
@ -1292,6 +1316,7 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
AppendClaimResultItem(result, cgr_res->s, id);
}
}
tracker.UpdateStreamSize(cgr_res->it->second);
return result;
}
@ -1299,10 +1324,13 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
OpStatus OpDestroyGroup(const OpArgs& op_args, string_view key, string_view gname) {
auto cgr_res = FindGroup(op_args, key, gname);
RETURN_ON_BAD_STATUS(cgr_res);
StreamMemTracker mem_tracker;
raxRemove(cgr_res->s->cgroups, (uint8_t*)(gname.data()), gname.size(), NULL);
streamFreeCG(cgr_res->cg);
mem_tracker.UpdateStreamSize(cgr_res->it->second);
// Awake readers blocked on this group
auto blocking_controller = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
if (blocking_controller) {
@ -1328,12 +1356,13 @@ OpResult<uint32_t> OpCreateConsumer(const OpArgs& op_args, string_view key, stri
auto cgroup_res = FindGroup(op_args, key, gname);
RETURN_ON_BAD_STATUS(cgroup_res);
StreamMemTracker mem_tracker;
streamConsumer* consumer = streamCreateConsumer(cgroup_res->cg, WrapSds(consumer_name), NULL, 0,
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
if (consumer)
return OpStatus::OK;
return OpStatus::KEY_EXISTS;
mem_tracker.UpdateStreamSize(cgroup_res->it->second);
return consumer ? OpStatus::OK : OpStatus::KEY_EXISTS;
}
// XGROUP DELCONSUMER key groupname consumername
@ -1341,6 +1370,7 @@ OpResult<uint32_t> OpDelConsumer(const OpArgs& op_args, string_view key, string_
string_view consumer_name) {
auto cgroup_res = FindGroup(op_args, key, gname);
RETURN_ON_BAD_STATUS(cgroup_res);
StreamMemTracker mem_tracker;
long long pending = 0;
streamConsumer* consumer =
@ -1350,6 +1380,7 @@ OpResult<uint32_t> OpDelConsumer(const OpArgs& op_args, string_view key, string_
streamDelConsumer(cgroup_res->cg, consumer);
}
mem_tracker.UpdateStreamSize(cgroup_res->it->second);
return pending;
}
@ -1379,6 +1410,8 @@ OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) {
if (!res_it)
return res_it.status();
StreamMemTracker mem_tracker;
CompactObj& cobj = res_it->it->second;
stream* stream_inst = (stream*)cobj.RObjPtr();
long long entries_added = -1;
@ -1408,6 +1441,8 @@ OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) {
if (!streamIDEqZero(&max_xdel_id))
stream_inst->max_deleted_entry_id = max_xdel_id;
mem_tracker.UpdateStreamSize(cobj);
return OpStatus::OK;
}
@ -1423,6 +1458,8 @@ OpResult<uint32_t> OpDel(const OpArgs& op_args, string_view key, absl::Span<stre
uint32_t deleted = 0;
bool first_entry = false;
StreamMemTracker tracker;
for (size_t j = 0; j < ids.size(); j++) {
streamID id = ids[j];
if (!streamDeleteItem(stream_inst, &id))
@ -1450,6 +1487,7 @@ OpResult<uint32_t> OpDel(const OpArgs& op_args, string_view key, absl::Span<stre
}
}
tracker.UpdateStreamSize(cobj);
return deleted;
}
@ -1464,6 +1502,7 @@ OpResult<uint32_t> OpAck(const OpArgs& op_args, string_view key, string_view gna
}
int acknowledged = 0;
StreamMemTracker mem_tracker;
for (auto& id : ids) {
unsigned char buf[sizeof(streamID)];
streamEncodeID(buf, &id);
@ -1480,6 +1519,7 @@ OpResult<uint32_t> OpAck(const OpArgs& op_args, string_view key, string_view gna
acknowledged++;
}
}
mem_tracker.UpdateStreamSize(res->it->second);
return acknowledged;
}
@ -1494,6 +1534,8 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
return OpStatus::KEY_NOTFOUND;
}
StreamMemTracker mem_tracker;
streamConsumer* consumer = nullptr;
// from Redis spec on XAutoClaim:
// https://redis.io/commands/xautoclaim/
@ -1572,6 +1614,8 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
raxStop(&ri);
result.end_id = end_id;
mem_tracker.UpdateStreamSize(cgr_res->it->second);
return result;
}
@ -1874,10 +1918,15 @@ OpResult<int64_t> OpTrim(const OpArgs& op_args, const AddTrimOpts& opts) {
return res_it.status();
}
StreamMemTracker mem_tracker;
CompactObj& cobj = res_it->it->second;
stream* s = (stream*)cobj.RObjPtr();
return StreamTrim(opts, s);
auto res = StreamTrim(opts, s);
mem_tracker.UpdateStreamSize(cobj);
return res;
}
optional<pair<AddTrimOpts, unsigned>> ParseAddOrTrimArgsOrReply(CmdArgList args, bool is_xadd,

View file

@ -16,6 +16,7 @@ from .instance import DflyInstance, DflyInstanceFactory
("ZSET", 250_000, 100, 100),
("LIST", 300_000, 100, 100),
("STRING", 3_500_000, 1000, 1),
("STREAM", 260_000, 100, 100),
],
)
# We limit to 5gb just in case to sanity check the gh runner. Otherwise, if we ask for too much
@ -28,6 +29,12 @@ async def test_rss_used_mem_gap(df_server: DflyInstance, type, keys, val_size, e
min_rss = 3 * 1024 * 1024 * 1024 # 3gb
max_unaccounted = 200 * 1024 * 1024 # 200mb
# There is a big rss spike when this test is ran in one the gh runners (not the self hosted)
# and it fails. This rss spike is not observed locally or on our self host runner so
# this adjustment is mostly for CI
if type == "STREAM":
max_unaccounted = max_unaccounted * 3
client = df_server.client()
await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly
@ -46,7 +53,7 @@ async def test_rss_used_mem_gap(df_server: DflyInstance, type, keys, val_size, e
assert delta < max_unaccounted
delta = info["used_memory_rss"] - info["object_used_memory"]
# TODO investigate why it fails on string
if type == "json":
if type == "JSON" or type == "STREAM":
assert delta > 0
assert delta < max_unaccounted