From 5fee6683914c030b641fd9e9ffc51b9b201216d1 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 10 Dec 2024 14:11:20 +0200 Subject: [PATCH] chore: add active time to stream consumers (#4285) * chore: add active time to stream consumers Adjust XINFO command to output active-time property. Store active-time and switch to RDB_TYPE_STREAM_LISTPACKS_3 if FLAGS_stream_rdb_encode_v2 is enabled. --------- Signed-off-by: Roman Gershman --- src/redis/stream.h | 9 +-- src/redis/t_stream.c | 5 +- src/server/family_utils.cc | 1 + src/server/rdb_load.cc | 1 + src/server/rdb_save.cc | 19 +++---- src/server/rdb_save.h | 2 +- src/server/stream_family.cc | 96 +++++++++++++++++--------------- src/server/stream_family_test.cc | 67 +++++++++++++++++++--- 8 files changed, 126 insertions(+), 74 deletions(-) diff --git a/src/redis/stream.h b/src/redis/stream.h index 2f894e448..9e03540ea 100644 --- a/src/redis/stream.h +++ b/src/redis/stream.h @@ -79,7 +79,8 @@ typedef struct streamCG { /* A specific consumer in a consumer group. */ typedef struct streamConsumer { - mstime_t seen_time; /* Last time this consumer was active. */ + mstime_t seen_time; /* Last time this consumer tried to perform an action (attempted reading/claiming). */ + mstime_t active_time; /* Last time this consumer was active (successful reading/claiming). */ sds name; /* Consumer name. This is how the consumer will be identified in the consumer group protocol. Case sensitive. */ @@ -124,10 +125,6 @@ typedef struct { /* Prototypes of exported APIs. */ // struct client; -/* Flags for streamLookupConsumer */ -#define SLC_DEFAULT 0 -#define SLC_NO_REFRESH (1<<0) /* Do not update consumer's seen-time */ - /* Flags for streamCreateConsumer */ #define SCC_DEFAULT 0 #define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */ @@ -149,7 +146,7 @@ void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsign void streamIteratorRemoveEntry(streamIterator *si, streamID *current); void streamIteratorStop(streamIterator *si); streamCG *streamLookupCG(stream *s, sds groupname); -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); +streamConsumer *streamLookupConsumer(streamCG *cg, sds name); streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *id, long long entries_read); void streamEncodeID(void *buf, streamID *id); void streamDecodeID(void *buf, streamID *id); diff --git a/src/redis/t_stream.c b/src/redis/t_stream.c index 7c79e919b..27a10b100 100644 --- a/src/redis/t_stream.c +++ b/src/redis/t_stream.c @@ -1068,9 +1068,8 @@ streamCG *streamLookupCG(stream *s, sds groupname) { return (cg == raxNotFound) ? NULL : cg; } -/* Lookup the consumer with the specified name in the group 'cg'. Its last - * seen time is updated unless the SLC_NO_REFRESH flag is specified. */ -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) { +/* Lookup the consumer with the specified name in the group 'cg' */ +streamConsumer *streamLookupConsumer(streamCG *cg, sds name) { if (cg == NULL) return NULL; streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, sdslen(name)); diff --git a/src/server/family_utils.cc b/src/server/family_utils.cc index 6964d29de..8f874ec6c 100644 --- a/src/server/family_utils.cc +++ b/src/server/family_utils.cc @@ -64,6 +64,7 @@ streamConsumer* StreamCreateConsumer(streamCG* cg, string_view name, uint64_t no consumer->name = sdsnewlen(name.data(), name.size()); consumer->pel = raxNew(); consumer->seen_time = now_ms; + consumer->active_time = -1; return consumer; } diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 8eefe11bd..db4ddd1c7 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1010,6 +1010,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) { return; } + consumer->active_time = cons.active_time; /* Create the PEL (pending entries list) about entries owned by this specific * consumer. */ for (const auto& rawid : cons.nack_arr) { diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 3b23be01c..72f977c36 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -215,8 +215,8 @@ uint8_t RdbObjectType(const PrimeValue& pv) { } break; case OBJ_STREAM: - return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS - : RDB_TYPE_STREAM_LISTPACKS_2; + return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS_3 + : RDB_TYPE_STREAM_LISTPACKS; case OBJ_MODULE: return RDB_TYPE_MODULE_2; case OBJ_JSON: @@ -713,8 +713,7 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) { RETURN_ON_ERR(SaveStreamPEL(cg->pel, true)); /* Save the consumers of this group. */ - - RETURN_ON_ERR(SaveStreamConsumers(cg)); + RETURN_ON_ERR(SaveStreamConsumers(rdb_type >= RDB_TYPE_STREAM_LISTPACKS_3, cg)); } } @@ -845,7 +844,7 @@ error_code RdbSerializer::SaveStreamPEL(rax* pel, bool nacks) { return error_code{}; } -error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) { +error_code RdbSerializer::SaveStreamConsumers(bool save_active, streamCG* cg) { /* Number of consumers in this consumer group. */ RETURN_ON_ERR(SaveLen(raxSize(cg->consumers))); @@ -867,11 +866,11 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) { absl::little_endian::Store64(buf, consumer->seen_time); RETURN_ON_ERR(WriteRaw(buf)); - // TODO: enable this when we switch to RDB_TYPE_STREAM_LISTPACKS_3 - /* Active time. */ - // absl::little_endian::Store64(buf, consumer->active_time); - // RETURN_ON_ERR(WriteRaw(buf)); - + if (save_active) { + /* Active time. */ + absl::little_endian::Store64(buf, consumer->active_time); + RETURN_ON_ERR(WriteRaw(buf)); + } /* Consumer PEL, without the ACKs (see last parameter of the function * passed with value of 0), at loading time we'll lookup the ID * in the consumer group global PEL and will put a reference in the diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 913598fec..bed9ebfdb 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -255,7 +255,7 @@ class RdbSerializer : public SerializerBase { std::error_code SaveBinaryDouble(double val); std::error_code SaveListPackAsZiplist(uint8_t* lp); std::error_code SaveStreamPEL(rax* pel, bool nacks); - std::error_code SaveStreamConsumers(streamCG* cg); + std::error_code SaveStreamConsumers(bool save_active, streamCG* cg); std::error_code SavePlainNodeAsZiplist(const quicklistNode* node); // Might preempt diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index a27c4e324..faa1966ab 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -46,7 +46,7 @@ struct ParsedStreamId { // Whether to lookup messages after the last ID in the stream. Used for XREAD // when using ID '$'. - bool last_id = false; + bool resolve_last_id = false; }; struct RangeId { @@ -82,7 +82,8 @@ struct NACKInfo { struct ConsumerInfo { string name; - size_t seen_time; + mstime_t seen_time; + mstime_t active_time; size_t pel_count; vector pending; size_t idle; @@ -769,6 +770,7 @@ OpResult OpRange(const OpArgs& op_args, string_view key, const RangeO LOG(DFATAL) << "Internal error"; return OpStatus::SKIPPED; // ("NACK half-created. Should not be possible."); } + opts.consumer->active_time = now_ms; } if (opts.count == result.size()) break; @@ -985,6 +987,7 @@ void GetConsumers(stream* s, streamCG* cg, long long count, GroupInfo* ginfo) { consumer_info.name = consumer->name; consumer_info.seen_time = consumer->seen_time; + consumer_info.active_time = consumer->active_time; consumer_info.pel_count = raxSize(consumer->pel); /* Consumer PEL */ @@ -1106,6 +1109,7 @@ OpResult> OpConsumers(const DbContext& db_cntx, EngineShard consumer_info.name = consumer->name; consumer_info.pel_count = raxSize(consumer->pel); consumer_info.idle = idle; + consumer_info.active_time = consumer->active_time; result.push_back(std::move(consumer_info)); } raxStop(&ri); @@ -1181,6 +1185,18 @@ OpResult FindGroup(const OpArgs& op_args, string_view key, stri return FindGroupResult{s, cg, std::move(res_it->post_updater), res_it->it}; } +// Try to get the consumer. If not found, create a new one. +streamConsumer* FindOrAddConsumer(string_view name, streamCG* cg, uint64_t now_ms) { + // Try to get the consumer. If not found, create a new one. + auto cname = WrapSds(name); + streamConsumer* consumer = streamLookupConsumer(cg, cname); + if (consumer) + consumer->seen_time = now_ms; + else // TODO: notify xgroup-createconsumer event once we support stream events. + consumer = StreamCreateConsumer(cg, name, now_ms, SCC_DEFAULT); + return consumer; +} + constexpr uint8_t kClaimForce = 1 << 0; constexpr uint8_t kClaimJustID = 1 << 1; constexpr uint8_t kClaimLastID = 1 << 2; @@ -1240,7 +1256,6 @@ OpResult OpClaim(const OpArgs& op_args, string_view key, const ClaimO auto cgr_res = FindGroup(op_args, key, opts.group); RETURN_ON_BAD_STATUS(cgr_res); - streamConsumer* consumer = nullptr; uint64_t now_ms = op_args.db_cntx.time_now_ms; ClaimInfo result; result.justid = (opts.flags & kClaimJustID); @@ -1254,6 +1269,8 @@ OpResult OpClaim(const OpArgs& op_args, string_view key, const ClaimO StreamMemTracker tracker; + streamConsumer* consumer = FindOrAddConsumer(opts.consumer, cgr_res->cg, now_ms); + for (streamID id : ids) { std::array buf; StreamEncodeID(buf.begin(), &id); @@ -1288,13 +1305,6 @@ OpResult OpClaim(const OpArgs& op_args, string_view key, const ClaimO } } - // Try to get the consumer. If not found, create a new one. - auto cname = WrapSds(opts.consumer); - if ((consumer = streamLookupConsumer(cgr_res->cg, cname, SLC_NO_REFRESH)) == nullptr) { - consumer = StreamCreateConsumer(cgr_res->cg, opts.consumer, now_ms, - SCC_NO_NOTIFY | SCC_NO_DIRTIFY); - } - // If the entry belongs to the same consumer, we don't have to // do anything. Else remove the entry from the old consumer. if (nack->consumer != consumer) { @@ -1320,9 +1330,11 @@ OpResult OpClaim(const OpArgs& op_args, string_view key, const ClaimO raxInsert(consumer->pel, buf.begin(), sizeof(buf), nack, nullptr); nack->consumer = consumer; } + consumer->active_time = now_ms; /* Send the reply for this entry. */ AppendClaimResultItem(result, cgr_res->s, id); + // TODO: propagate this change with streamPropagateXCLAIM } } tracker.UpdateStreamSize(cgr_res->it->second); @@ -1382,8 +1394,7 @@ OpResult OpDelConsumer(const OpArgs& op_args, string_view key, string_ StreamMemTracker mem_tracker; long long pending = 0; - streamConsumer* consumer = - streamLookupConsumer(cgroup_res->cg, WrapSds(consumer_name), SLC_NO_REFRESH); + streamConsumer* consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(consumer_name)); if (consumer) { pending = raxSize(consumer->pel); streamDelConsumer(cgroup_res->cg, consumer); @@ -1571,13 +1582,8 @@ OpResult OpAutoClaim(const OpArgs& op_args, string_view key, const Cl uint64_t now_ms = op_args.db_cntx.time_now_ms; int count = opts.count; - auto cname = WrapSds(opts.consumer); - streamConsumer* consumer = streamLookupConsumer(group, cname, SLC_DEFAULT); - if (consumer == nullptr) { - consumer = StreamCreateConsumer(group, opts.consumer, now_ms, SCC_DEFAULT); - // TODO: notify xgroup-createconsumer event once we support stream events. - } - consumer->seen_time = now_ms; + streamConsumer* consumer = FindOrAddConsumer(opts.consumer, group, now_ms); + while (attempts-- && count && raxNext(&ri)) { streamNACK* nack = (streamNACK*)ri.data; @@ -1621,7 +1627,7 @@ OpResult OpAutoClaim(const OpArgs& op_args, string_view key, const Cl raxInsert(consumer->pel, ri.key, ri.key_len, nack, nullptr); nack->consumer = consumer; } - + consumer->active_time = now_ms; AppendClaimResultItem(result, stream, id); count--; // TODO: propagate xclaim to replica @@ -1760,7 +1766,7 @@ OpResult OpPending(const OpArgs& op_args, string_view key, const streamConsumer* consumer = nullptr; if (!opts.consumer_name.empty()) { - consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(opts.consumer_name), SLC_NO_REFRESH); + consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(opts.consumer_name)); } PendingResult result; @@ -2153,7 +2159,7 @@ std::optional ParseReadArgsOrReply(CmdArgList args, bool read_group, } id.val.ms = 0; id.val.seq = 0; - id.last_id = true; + id.resolve_last_id = true; sitem.id = id; auto [_, is_inserted] = opts.stream_ids.emplace(key, sitem); if (!is_inserted) { @@ -2329,12 +2335,8 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder, // Update consumer if (sitem.group) { - auto cname = WrapSds(opts->consumer_name); - range_opts.consumer = streamLookupConsumer(sitem.group, cname, SLC_NO_REFRESH); - if (!range_opts.consumer) { - range_opts.consumer = StreamCreateConsumer( - sitem.group, opts->consumer_name, GetCurrentTimeMs(), SCC_NO_NOTIFY | SCC_NO_DIRTIFY); - } + range_opts.consumer = + FindOrAddConsumer(opts->consumer_name, sitem.group, GetCurrentTimeMs()); } range_opts.noack = opts->noack; @@ -2902,7 +2904,7 @@ void StreamFamily::XInfo(CmdArgList args, const CommandContext& cmd_cntx) { rb->SendBulkString("consumers"); rb->StartArray(ginfo.consumer_info_vec.size()); for (const auto& consumer_info : ginfo.consumer_info_vec) { - rb->StartCollection(4, RedisReplyBuilder::MAP); + rb->StartCollection(5, RedisReplyBuilder::MAP); rb->SendBulkString("name"); rb->SendBulkString(consumer_info.name); @@ -2910,6 +2912,9 @@ void StreamFamily::XInfo(CmdArgList args, const CommandContext& cmd_cntx) { rb->SendBulkString("seen-time"); rb->SendLong(consumer_info.seen_time); + rb->SendBulkString("active-time"); + rb->SendLong(consumer_info.active_time); + rb->SendBulkString("pel-count"); rb->SendLong(consumer_info.pel_count); @@ -2961,14 +2966,20 @@ void StreamFamily::XInfo(CmdArgList args, const CommandContext& cmd_cntx) { OpResult> result = shard_set->Await(sid, std::move(cb)); if (result) { rb->StartArray(result->size()); + int64_t now_ms = GetCurrentTimeMs(); for (const auto& consumer_info : *result) { - rb->StartCollection(3, RedisReplyBuilder::MAP); + int64_t active = consumer_info.active_time; + int64_t inactive = active != -1 ? now_ms - active : -1; + + rb->StartCollection(4, RedisReplyBuilder::MAP); rb->SendBulkString("name"); rb->SendBulkString(consumer_info.name); rb->SendBulkString("pending"); rb->SendLong(consumer_info.pel_count); rb->SendBulkString("idle"); rb->SendLong(consumer_info.idle); + rb->SendBulkString("inactive"); + rb->SendLong(inactive); } return; } @@ -3099,26 +3110,11 @@ variant HasEntries2(const OpArgs& op_args, string_view return facade::ErrorReply{ NoGroupOrKey(skey, opts->group_name, " in XREADGROUP with GROUP option")}; - auto cname = WrapSds(opts->consumer_name); - consumer = streamLookupConsumer(group, cname, SLC_NO_REFRESH); - if (!consumer) { - consumer = StreamCreateConsumer(group, opts->consumer_name, op_args.db_cntx.time_now_ms, - SCC_NO_NOTIFY | SCC_NO_DIRTIFY); - } + consumer = FindOrAddConsumer(opts->consumer_name, group, op_args.db_cntx.time_now_ms); requested_sitem.group = group; requested_sitem.consumer = consumer; - } - // Resolve $ to the last ID in the stream. - if (requested_sitem.id.last_id && !opts->read_group) { - requested_sitem.id.val = last_id; - streamIncrID(&requested_sitem.id.val); // include id's strictly greater - requested_sitem.id.last_id = false; - return false; - } - - if (opts->read_group) { // If '>' is not provided, consumer PEL is used. So don't need to block. if (requested_sitem.id.val.ms != UINT64_MAX || requested_sitem.id.val.seq != UINT64_MAX) { opts->serve_history = true; @@ -3130,6 +3126,14 @@ variant HasEntries2(const OpArgs& op_args, string_view requested_sitem.id.val = requested_sitem.group->last_id; streamIncrID(&requested_sitem.id.val); } + } else { + // Resolve $ to the last ID in the stream. + if (requested_sitem.id.resolve_last_id) { + requested_sitem.id.val = last_id; + streamIncrID(&requested_sitem.id.val); // include id's strictly greater + requested_sitem.id.resolve_last_id = false; + return false; + } } return streamCompareID(&last_id, &requested_sitem.id.val) >= 0; diff --git a/src/server/stream_family_test.cc b/src/server/stream_family_test.cc index cec5b8fa0..0b5ae10cb 100644 --- a/src/server/stream_family_test.cc +++ b/src/server/stream_family_test.cc @@ -4,6 +4,7 @@ #include "server/stream_family.h" +#include "base/flags.h" #include "base/gtest.h" #include "base/logging.h" #include "facade/facade_test.h" @@ -14,6 +15,8 @@ using namespace testing; using namespace std; using namespace util; +ABSL_DECLARE_FLAG(bool, stream_rdb_encode_v2); + namespace dfly { const auto kMatchNil = ArgType(RespExpr::NIL); @@ -862,8 +865,8 @@ TEST_F(StreamFamilyTest, XInfoConsumers) { Run({"xgroup", "createconsumer", "mystream", "mygroup", "second-consumer"}); resp = Run({"xinfo", "consumers", "mystream", "mygroup"}); EXPECT_THAT(resp, ArrLen(2)); - EXPECT_THAT(resp.GetVec()[0], ArrLen(6)); - EXPECT_THAT(resp.GetVec()[1], ArrLen(6)); + EXPECT_THAT(resp.GetVec()[0], ArrLen(8)); + EXPECT_THAT(resp.GetVec()[1], ArrLen(8)); EXPECT_THAT(resp.GetVec()[0].GetVec()[1], "first-consumer"); EXPECT_THAT(resp.GetVec()[1].GetVec()[1], "second-consumer"); @@ -1021,7 +1024,7 @@ TEST_F(StreamFamilyTest, XInfoStream) { "consumers", ArrLen(1))); EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec(), ElementsAre("name", "first-consumer", "seen-time", ArgType(RespExpr::INT64), - "pel-count", IntArg(0), "pending", ArrLen(0))); + "active-time", IntArg(-1), "pel-count", IntArg(0), "pending", ArrLen(0))); // full with count less than number of messages in stream resp = Run({"xinfo", "stream", "mystream", "full", "count", "5"}); @@ -1045,9 +1048,9 @@ TEST_F(StreamFamilyTest, XInfoStream) { EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[9], IntArg(11)); // pel-count EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[11], ArrLen(11)); // pending list // consumer - EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec()[5], - IntArg(11)); // pel-count EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec()[7], + IntArg(11)); // pel-count + EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec()[9], ArrLen(11)); // pending list // delete message @@ -1074,9 +1077,10 @@ TEST_F(StreamFamilyTest, XInfoStream) { ElementsAre("name", "mygroup", "last-delivered-id", "11-1", "entries-read", IntArg(11), "lag", IntArg(0), "pel-count", IntArg(11), "pending", ArrLen(11), "consumers", ArrLen(1))); - EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec(), - ElementsAre("name", "first-consumer", "seen-time", ArgType(RespExpr::INT64), - "pel-count", IntArg(11), "pending", ArrLen(11))); + EXPECT_THAT( + resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec(), + ElementsAre("name", "first-consumer", "seen-time", ArgType(RespExpr::INT64), "active-time", + ArgType(RespExpr::INT64), "pel-count", IntArg(11), "pending", ArrLen(11))); } TEST_F(StreamFamilyTest, AutoClaimPelItemsFromAnotherConsumer) { @@ -1195,4 +1199,51 @@ TEST_F(StreamFamilyTest, XsetIdSmallerMaxDeleted) { ASSERT_THAT(resp, ErrArg("smaller")); } +TEST_F(StreamFamilyTest, SeenActiveTime) { + TEST_current_time_ms = 1000; + + Run({"XGROUP", "CREATE", "mystream", "mygroup", "$", "MKSTREAM"}); + Run({"XREADGROUP", "GROUP", "mygroup", "Alice", "COUNT", "1", "STREAMS", "mystream", ">"}); + AdvanceTime(100); + auto resp = Run({"xinfo", "consumers", "mystream", "mygroup"}); + EXPECT_THAT(resp, RespElementsAre("name", "Alice", "pending", IntArg(0), "idle", IntArg(100), + "inactive", IntArg(-1))); + + Run({"XADD", "mystream", "*", "f", "v"}); + Run({"XREADGROUP", "GROUP", "mygroup", "Alice", "COUNT", "1", "STREAMS", "mystream", ">"}); + AdvanceTime(50); + + resp = Run({"xinfo", "consumers", "mystream", "mygroup"}); + EXPECT_THAT(resp, RespElementsAre("name", "Alice", "pending", IntArg(1), "idle", IntArg(50), + "inactive", IntArg(50))); + AdvanceTime(100); + resp = Run({"XREADGROUP", "GROUP", "mygroup", "Alice", "COUNT", "1", "STREAMS", "mystream", ">"}); + EXPECT_THAT(resp, ArgType(RespExpr::NIL_ARRAY)); + + resp = Run({"xinfo", "consumers", "mystream", "mygroup"}); + + // Idle is 0 because XREADGROUP just run, but inactive continues clocking because nothing was + // read. + EXPECT_THAT(resp, RespElementsAre("name", "Alice", "pending", IntArg(1), "idle", IntArg(0), + "inactive", IntArg(150))); + + // Serialize/deserialize. + resp = Run({"XINFO", "STREAM", "mystream", "FULL"}); + auto groups = resp.GetVec()[17]; + auto consumers = groups.GetVec()[0].GetVec()[13].GetVec()[0]; + EXPECT_THAT(consumers, RespElementsAre("name", "Alice", "seen-time", IntArg(1250), "active-time", + IntArg(1100), "pel-count", IntArg(1), "pending", _)); + + absl::SetFlag(&FLAGS_stream_rdb_encode_v2, true); + resp = Run({"DUMP", "mystream"}); + Run({"del", "mystream"}); + resp = Run({"RESTORE", "mystream", "0", resp.GetString()}); + EXPECT_EQ(resp, "OK"); + resp = Run({"XINFO", "STREAM", "mystream", "FULL"}); + groups = resp.GetVec()[17]; + consumers = groups.GetVec()[0].GetVec()[13].GetVec()[0]; + EXPECT_THAT(consumers, RespElementsAre("name", "Alice", "seen-time", IntArg(1250), "active-time", + IntArg(1100), "pel-count", IntArg(1), "pending", _)); +} + } // namespace dfly