1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: add active time to stream consumers (#4285)

* chore: add active time to stream consumers

Adjust XINFO command to output active-time property.
Store active-time and switch to RDB_TYPE_STREAM_LISTPACKS_3 if FLAGS_stream_rdb_encode_v2
is enabled.
---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-12-10 14:11:20 +02:00 committed by GitHub
parent 0562796cac
commit 5fee668391
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 126 additions and 74 deletions

View file

@ -79,7 +79,8 @@ typedef struct streamCG {
/* A specific consumer in a consumer group. */ /* A specific consumer in a consumer group. */
typedef struct streamConsumer { typedef struct streamConsumer {
mstime_t seen_time; /* Last time this consumer was active. */ mstime_t seen_time; /* Last time this consumer tried to perform an action (attempted reading/claiming). */
mstime_t active_time; /* Last time this consumer was active (successful reading/claiming). */
sds name; /* Consumer name. This is how the consumer sds name; /* Consumer name. This is how the consumer
will be identified in the consumer group will be identified in the consumer group
protocol. Case sensitive. */ protocol. Case sensitive. */
@ -124,10 +125,6 @@ typedef struct {
/* Prototypes of exported APIs. */ /* Prototypes of exported APIs. */
// struct client; // struct client;
/* Flags for streamLookupConsumer */
#define SLC_DEFAULT 0
#define SLC_NO_REFRESH (1<<0) /* Do not update consumer's seen-time */
/* Flags for streamCreateConsumer */ /* Flags for streamCreateConsumer */
#define SCC_DEFAULT 0 #define SCC_DEFAULT 0
#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */ #define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
@ -149,7 +146,7 @@ void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsign
void streamIteratorRemoveEntry(streamIterator *si, streamID *current); void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
void streamIteratorStop(streamIterator *si); void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname); streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *id, long long entries_read); streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *id, long long entries_read);
void streamEncodeID(void *buf, streamID *id); void streamEncodeID(void *buf, streamID *id);
void streamDecodeID(void *buf, streamID *id); void streamDecodeID(void *buf, streamID *id);

View file

@ -1068,9 +1068,8 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
return (cg == raxNotFound) ? NULL : cg; return (cg == raxNotFound) ? NULL : cg;
} }
/* Lookup the consumer with the specified name in the group 'cg'. Its last /* Lookup the consumer with the specified name in the group 'cg' */
* seen time is updated unless the SLC_NO_REFRESH flag is specified. */ streamConsumer *streamLookupConsumer(streamCG *cg, sds name) {
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
if (cg == NULL) return NULL; if (cg == NULL) return NULL;
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name)); sdslen(name));

View file

@ -64,6 +64,7 @@ streamConsumer* StreamCreateConsumer(streamCG* cg, string_view name, uint64_t no
consumer->name = sdsnewlen(name.data(), name.size()); consumer->name = sdsnewlen(name.data(), name.size());
consumer->pel = raxNew(); consumer->pel = raxNew();
consumer->seen_time = now_ms; consumer->seen_time = now_ms;
consumer->active_time = -1;
return consumer; return consumer;
} }

View file

@ -1010,6 +1010,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
return; return;
} }
consumer->active_time = cons.active_time;
/* Create the PEL (pending entries list) about entries owned by this specific /* Create the PEL (pending entries list) about entries owned by this specific
* consumer. */ * consumer. */
for (const auto& rawid : cons.nack_arr) { for (const auto& rawid : cons.nack_arr) {

View file

@ -215,8 +215,8 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
} }
break; break;
case OBJ_STREAM: case OBJ_STREAM:
return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS_3
: RDB_TYPE_STREAM_LISTPACKS_2; : RDB_TYPE_STREAM_LISTPACKS;
case OBJ_MODULE: case OBJ_MODULE:
return RDB_TYPE_MODULE_2; return RDB_TYPE_MODULE_2;
case OBJ_JSON: case OBJ_JSON:
@ -713,8 +713,7 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) {
RETURN_ON_ERR(SaveStreamPEL(cg->pel, true)); RETURN_ON_ERR(SaveStreamPEL(cg->pel, true));
/* Save the consumers of this group. */ /* Save the consumers of this group. */
RETURN_ON_ERR(SaveStreamConsumers(rdb_type >= RDB_TYPE_STREAM_LISTPACKS_3, cg));
RETURN_ON_ERR(SaveStreamConsumers(cg));
} }
} }
@ -845,7 +844,7 @@ error_code RdbSerializer::SaveStreamPEL(rax* pel, bool nacks) {
return error_code{}; return error_code{};
} }
error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) { error_code RdbSerializer::SaveStreamConsumers(bool save_active, streamCG* cg) {
/* Number of consumers in this consumer group. */ /* Number of consumers in this consumer group. */
RETURN_ON_ERR(SaveLen(raxSize(cg->consumers))); RETURN_ON_ERR(SaveLen(raxSize(cg->consumers)));
@ -867,11 +866,11 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
absl::little_endian::Store64(buf, consumer->seen_time); absl::little_endian::Store64(buf, consumer->seen_time);
RETURN_ON_ERR(WriteRaw(buf)); RETURN_ON_ERR(WriteRaw(buf));
// TODO: enable this when we switch to RDB_TYPE_STREAM_LISTPACKS_3 if (save_active) {
/* Active time. */ /* Active time. */
// absl::little_endian::Store64(buf, consumer->active_time); absl::little_endian::Store64(buf, consumer->active_time);
// RETURN_ON_ERR(WriteRaw(buf)); RETURN_ON_ERR(WriteRaw(buf));
}
/* Consumer PEL, without the ACKs (see last parameter of the function /* Consumer PEL, without the ACKs (see last parameter of the function
* passed with value of 0), at loading time we'll lookup the ID * passed with value of 0), at loading time we'll lookup the ID
* in the consumer group global PEL and will put a reference in the * in the consumer group global PEL and will put a reference in the

View file

@ -255,7 +255,7 @@ class RdbSerializer : public SerializerBase {
std::error_code SaveBinaryDouble(double val); std::error_code SaveBinaryDouble(double val);
std::error_code SaveListPackAsZiplist(uint8_t* lp); std::error_code SaveListPackAsZiplist(uint8_t* lp);
std::error_code SaveStreamPEL(rax* pel, bool nacks); std::error_code SaveStreamPEL(rax* pel, bool nacks);
std::error_code SaveStreamConsumers(streamCG* cg); std::error_code SaveStreamConsumers(bool save_active, streamCG* cg);
std::error_code SavePlainNodeAsZiplist(const quicklistNode* node); std::error_code SavePlainNodeAsZiplist(const quicklistNode* node);
// Might preempt // Might preempt

View file

@ -46,7 +46,7 @@ struct ParsedStreamId {
// Whether to lookup messages after the last ID in the stream. Used for XREAD // Whether to lookup messages after the last ID in the stream. Used for XREAD
// when using ID '$'. // when using ID '$'.
bool last_id = false; bool resolve_last_id = false;
}; };
struct RangeId { struct RangeId {
@ -82,7 +82,8 @@ struct NACKInfo {
struct ConsumerInfo { struct ConsumerInfo {
string name; string name;
size_t seen_time; mstime_t seen_time;
mstime_t active_time;
size_t pel_count; size_t pel_count;
vector<NACKInfo> pending; vector<NACKInfo> pending;
size_t idle; size_t idle;
@ -769,6 +770,7 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeO
LOG(DFATAL) << "Internal error"; LOG(DFATAL) << "Internal error";
return OpStatus::SKIPPED; // ("NACK half-created. Should not be possible."); return OpStatus::SKIPPED; // ("NACK half-created. Should not be possible.");
} }
opts.consumer->active_time = now_ms;
} }
if (opts.count == result.size()) if (opts.count == result.size())
break; break;
@ -985,6 +987,7 @@ void GetConsumers(stream* s, streamCG* cg, long long count, GroupInfo* ginfo) {
consumer_info.name = consumer->name; consumer_info.name = consumer->name;
consumer_info.seen_time = consumer->seen_time; consumer_info.seen_time = consumer->seen_time;
consumer_info.active_time = consumer->active_time;
consumer_info.pel_count = raxSize(consumer->pel); consumer_info.pel_count = raxSize(consumer->pel);
/* Consumer PEL */ /* Consumer PEL */
@ -1106,6 +1109,7 @@ OpResult<vector<ConsumerInfo>> OpConsumers(const DbContext& db_cntx, EngineShard
consumer_info.name = consumer->name; consumer_info.name = consumer->name;
consumer_info.pel_count = raxSize(consumer->pel); consumer_info.pel_count = raxSize(consumer->pel);
consumer_info.idle = idle; consumer_info.idle = idle;
consumer_info.active_time = consumer->active_time;
result.push_back(std::move(consumer_info)); result.push_back(std::move(consumer_info));
} }
raxStop(&ri); raxStop(&ri);
@ -1181,6 +1185,18 @@ OpResult<FindGroupResult> FindGroup(const OpArgs& op_args, string_view key, stri
return FindGroupResult{s, cg, std::move(res_it->post_updater), res_it->it}; return FindGroupResult{s, cg, std::move(res_it->post_updater), res_it->it};
} }
// Try to get the consumer. If not found, create a new one.
streamConsumer* FindOrAddConsumer(string_view name, streamCG* cg, uint64_t now_ms) {
// Try to get the consumer. If not found, create a new one.
auto cname = WrapSds(name);
streamConsumer* consumer = streamLookupConsumer(cg, cname);
if (consumer)
consumer->seen_time = now_ms;
else // TODO: notify xgroup-createconsumer event once we support stream events.
consumer = StreamCreateConsumer(cg, name, now_ms, SCC_DEFAULT);
return consumer;
}
constexpr uint8_t kClaimForce = 1 << 0; constexpr uint8_t kClaimForce = 1 << 0;
constexpr uint8_t kClaimJustID = 1 << 1; constexpr uint8_t kClaimJustID = 1 << 1;
constexpr uint8_t kClaimLastID = 1 << 2; constexpr uint8_t kClaimLastID = 1 << 2;
@ -1240,7 +1256,6 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
auto cgr_res = FindGroup(op_args, key, opts.group); auto cgr_res = FindGroup(op_args, key, opts.group);
RETURN_ON_BAD_STATUS(cgr_res); RETURN_ON_BAD_STATUS(cgr_res);
streamConsumer* consumer = nullptr;
uint64_t now_ms = op_args.db_cntx.time_now_ms; uint64_t now_ms = op_args.db_cntx.time_now_ms;
ClaimInfo result; ClaimInfo result;
result.justid = (opts.flags & kClaimJustID); result.justid = (opts.flags & kClaimJustID);
@ -1254,6 +1269,8 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
StreamMemTracker tracker; StreamMemTracker tracker;
streamConsumer* consumer = FindOrAddConsumer(opts.consumer, cgr_res->cg, now_ms);
for (streamID id : ids) { for (streamID id : ids) {
std::array<uint8_t, sizeof(streamID)> buf; std::array<uint8_t, sizeof(streamID)> buf;
StreamEncodeID(buf.begin(), &id); StreamEncodeID(buf.begin(), &id);
@ -1288,13 +1305,6 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
} }
} }
// Try to get the consumer. If not found, create a new one.
auto cname = WrapSds(opts.consumer);
if ((consumer = streamLookupConsumer(cgr_res->cg, cname, SLC_NO_REFRESH)) == nullptr) {
consumer = StreamCreateConsumer(cgr_res->cg, opts.consumer, now_ms,
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
}
// If the entry belongs to the same consumer, we don't have to // If the entry belongs to the same consumer, we don't have to
// do anything. Else remove the entry from the old consumer. // do anything. Else remove the entry from the old consumer.
if (nack->consumer != consumer) { if (nack->consumer != consumer) {
@ -1320,9 +1330,11 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
raxInsert(consumer->pel, buf.begin(), sizeof(buf), nack, nullptr); raxInsert(consumer->pel, buf.begin(), sizeof(buf), nack, nullptr);
nack->consumer = consumer; nack->consumer = consumer;
} }
consumer->active_time = now_ms;
/* Send the reply for this entry. */ /* Send the reply for this entry. */
AppendClaimResultItem(result, cgr_res->s, id); AppendClaimResultItem(result, cgr_res->s, id);
// TODO: propagate this change with streamPropagateXCLAIM
} }
} }
tracker.UpdateStreamSize(cgr_res->it->second); tracker.UpdateStreamSize(cgr_res->it->second);
@ -1382,8 +1394,7 @@ OpResult<uint32_t> OpDelConsumer(const OpArgs& op_args, string_view key, string_
StreamMemTracker mem_tracker; StreamMemTracker mem_tracker;
long long pending = 0; long long pending = 0;
streamConsumer* consumer = streamConsumer* consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(consumer_name));
streamLookupConsumer(cgroup_res->cg, WrapSds(consumer_name), SLC_NO_REFRESH);
if (consumer) { if (consumer) {
pending = raxSize(consumer->pel); pending = raxSize(consumer->pel);
streamDelConsumer(cgroup_res->cg, consumer); streamDelConsumer(cgroup_res->cg, consumer);
@ -1571,13 +1582,8 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
uint64_t now_ms = op_args.db_cntx.time_now_ms; uint64_t now_ms = op_args.db_cntx.time_now_ms;
int count = opts.count; int count = opts.count;
auto cname = WrapSds(opts.consumer); streamConsumer* consumer = FindOrAddConsumer(opts.consumer, group, now_ms);
streamConsumer* consumer = streamLookupConsumer(group, cname, SLC_DEFAULT);
if (consumer == nullptr) {
consumer = StreamCreateConsumer(group, opts.consumer, now_ms, SCC_DEFAULT);
// TODO: notify xgroup-createconsumer event once we support stream events.
}
consumer->seen_time = now_ms;
while (attempts-- && count && raxNext(&ri)) { while (attempts-- && count && raxNext(&ri)) {
streamNACK* nack = (streamNACK*)ri.data; streamNACK* nack = (streamNACK*)ri.data;
@ -1621,7 +1627,7 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
raxInsert(consumer->pel, ri.key, ri.key_len, nack, nullptr); raxInsert(consumer->pel, ri.key, ri.key_len, nack, nullptr);
nack->consumer = consumer; nack->consumer = consumer;
} }
consumer->active_time = now_ms;
AppendClaimResultItem(result, stream, id); AppendClaimResultItem(result, stream, id);
count--; count--;
// TODO: propagate xclaim to replica // TODO: propagate xclaim to replica
@ -1760,7 +1766,7 @@ OpResult<PendingResult> OpPending(const OpArgs& op_args, string_view key, const
streamConsumer* consumer = nullptr; streamConsumer* consumer = nullptr;
if (!opts.consumer_name.empty()) { if (!opts.consumer_name.empty()) {
consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(opts.consumer_name), SLC_NO_REFRESH); consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(opts.consumer_name));
} }
PendingResult result; PendingResult result;
@ -2153,7 +2159,7 @@ std::optional<ReadOpts> ParseReadArgsOrReply(CmdArgList args, bool read_group,
} }
id.val.ms = 0; id.val.ms = 0;
id.val.seq = 0; id.val.seq = 0;
id.last_id = true; id.resolve_last_id = true;
sitem.id = id; sitem.id = id;
auto [_, is_inserted] = opts.stream_ids.emplace(key, sitem); auto [_, is_inserted] = opts.stream_ids.emplace(key, sitem);
if (!is_inserted) { if (!is_inserted) {
@ -2329,12 +2335,8 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder,
// Update consumer // Update consumer
if (sitem.group) { if (sitem.group) {
auto cname = WrapSds(opts->consumer_name); range_opts.consumer =
range_opts.consumer = streamLookupConsumer(sitem.group, cname, SLC_NO_REFRESH); FindOrAddConsumer(opts->consumer_name, sitem.group, GetCurrentTimeMs());
if (!range_opts.consumer) {
range_opts.consumer = StreamCreateConsumer(
sitem.group, opts->consumer_name, GetCurrentTimeMs(), SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
}
} }
range_opts.noack = opts->noack; range_opts.noack = opts->noack;
@ -2902,7 +2904,7 @@ void StreamFamily::XInfo(CmdArgList args, const CommandContext& cmd_cntx) {
rb->SendBulkString("consumers"); rb->SendBulkString("consumers");
rb->StartArray(ginfo.consumer_info_vec.size()); rb->StartArray(ginfo.consumer_info_vec.size());
for (const auto& consumer_info : ginfo.consumer_info_vec) { for (const auto& consumer_info : ginfo.consumer_info_vec) {
rb->StartCollection(4, RedisReplyBuilder::MAP); rb->StartCollection(5, RedisReplyBuilder::MAP);
rb->SendBulkString("name"); rb->SendBulkString("name");
rb->SendBulkString(consumer_info.name); rb->SendBulkString(consumer_info.name);
@ -2910,6 +2912,9 @@ void StreamFamily::XInfo(CmdArgList args, const CommandContext& cmd_cntx) {
rb->SendBulkString("seen-time"); rb->SendBulkString("seen-time");
rb->SendLong(consumer_info.seen_time); rb->SendLong(consumer_info.seen_time);
rb->SendBulkString("active-time");
rb->SendLong(consumer_info.active_time);
rb->SendBulkString("pel-count"); rb->SendBulkString("pel-count");
rb->SendLong(consumer_info.pel_count); rb->SendLong(consumer_info.pel_count);
@ -2961,14 +2966,20 @@ void StreamFamily::XInfo(CmdArgList args, const CommandContext& cmd_cntx) {
OpResult<vector<ConsumerInfo>> result = shard_set->Await(sid, std::move(cb)); OpResult<vector<ConsumerInfo>> result = shard_set->Await(sid, std::move(cb));
if (result) { if (result) {
rb->StartArray(result->size()); rb->StartArray(result->size());
int64_t now_ms = GetCurrentTimeMs();
for (const auto& consumer_info : *result) { for (const auto& consumer_info : *result) {
rb->StartCollection(3, RedisReplyBuilder::MAP); int64_t active = consumer_info.active_time;
int64_t inactive = active != -1 ? now_ms - active : -1;
rb->StartCollection(4, RedisReplyBuilder::MAP);
rb->SendBulkString("name"); rb->SendBulkString("name");
rb->SendBulkString(consumer_info.name); rb->SendBulkString(consumer_info.name);
rb->SendBulkString("pending"); rb->SendBulkString("pending");
rb->SendLong(consumer_info.pel_count); rb->SendLong(consumer_info.pel_count);
rb->SendBulkString("idle"); rb->SendBulkString("idle");
rb->SendLong(consumer_info.idle); rb->SendLong(consumer_info.idle);
rb->SendBulkString("inactive");
rb->SendLong(inactive);
} }
return; return;
} }
@ -3099,26 +3110,11 @@ variant<bool, facade::ErrorReply> HasEntries2(const OpArgs& op_args, string_view
return facade::ErrorReply{ return facade::ErrorReply{
NoGroupOrKey(skey, opts->group_name, " in XREADGROUP with GROUP option")}; NoGroupOrKey(skey, opts->group_name, " in XREADGROUP with GROUP option")};
auto cname = WrapSds(opts->consumer_name); consumer = FindOrAddConsumer(opts->consumer_name, group, op_args.db_cntx.time_now_ms);
consumer = streamLookupConsumer(group, cname, SLC_NO_REFRESH);
if (!consumer) {
consumer = StreamCreateConsumer(group, opts->consumer_name, op_args.db_cntx.time_now_ms,
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
}
requested_sitem.group = group; requested_sitem.group = group;
requested_sitem.consumer = consumer; requested_sitem.consumer = consumer;
}
// Resolve $ to the last ID in the stream.
if (requested_sitem.id.last_id && !opts->read_group) {
requested_sitem.id.val = last_id;
streamIncrID(&requested_sitem.id.val); // include id's strictly greater
requested_sitem.id.last_id = false;
return false;
}
if (opts->read_group) {
// If '>' is not provided, consumer PEL is used. So don't need to block. // If '>' is not provided, consumer PEL is used. So don't need to block.
if (requested_sitem.id.val.ms != UINT64_MAX || requested_sitem.id.val.seq != UINT64_MAX) { if (requested_sitem.id.val.ms != UINT64_MAX || requested_sitem.id.val.seq != UINT64_MAX) {
opts->serve_history = true; opts->serve_history = true;
@ -3130,6 +3126,14 @@ variant<bool, facade::ErrorReply> HasEntries2(const OpArgs& op_args, string_view
requested_sitem.id.val = requested_sitem.group->last_id; requested_sitem.id.val = requested_sitem.group->last_id;
streamIncrID(&requested_sitem.id.val); streamIncrID(&requested_sitem.id.val);
} }
} else {
// Resolve $ to the last ID in the stream.
if (requested_sitem.id.resolve_last_id) {
requested_sitem.id.val = last_id;
streamIncrID(&requested_sitem.id.val); // include id's strictly greater
requested_sitem.id.resolve_last_id = false;
return false;
}
} }
return streamCompareID(&last_id, &requested_sitem.id.val) >= 0; return streamCompareID(&last_id, &requested_sitem.id.val) >= 0;

View file

@ -4,6 +4,7 @@
#include "server/stream_family.h" #include "server/stream_family.h"
#include "base/flags.h"
#include "base/gtest.h" #include "base/gtest.h"
#include "base/logging.h" #include "base/logging.h"
#include "facade/facade_test.h" #include "facade/facade_test.h"
@ -14,6 +15,8 @@ using namespace testing;
using namespace std; using namespace std;
using namespace util; using namespace util;
ABSL_DECLARE_FLAG(bool, stream_rdb_encode_v2);
namespace dfly { namespace dfly {
const auto kMatchNil = ArgType(RespExpr::NIL); const auto kMatchNil = ArgType(RespExpr::NIL);
@ -862,8 +865,8 @@ TEST_F(StreamFamilyTest, XInfoConsumers) {
Run({"xgroup", "createconsumer", "mystream", "mygroup", "second-consumer"}); Run({"xgroup", "createconsumer", "mystream", "mygroup", "second-consumer"});
resp = Run({"xinfo", "consumers", "mystream", "mygroup"}); resp = Run({"xinfo", "consumers", "mystream", "mygroup"});
EXPECT_THAT(resp, ArrLen(2)); EXPECT_THAT(resp, ArrLen(2));
EXPECT_THAT(resp.GetVec()[0], ArrLen(6)); EXPECT_THAT(resp.GetVec()[0], ArrLen(8));
EXPECT_THAT(resp.GetVec()[1], ArrLen(6)); EXPECT_THAT(resp.GetVec()[1], ArrLen(8));
EXPECT_THAT(resp.GetVec()[0].GetVec()[1], "first-consumer"); EXPECT_THAT(resp.GetVec()[0].GetVec()[1], "first-consumer");
EXPECT_THAT(resp.GetVec()[1].GetVec()[1], "second-consumer"); EXPECT_THAT(resp.GetVec()[1].GetVec()[1], "second-consumer");
@ -1021,7 +1024,7 @@ TEST_F(StreamFamilyTest, XInfoStream) {
"consumers", ArrLen(1))); "consumers", ArrLen(1)));
EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec(), EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec(),
ElementsAre("name", "first-consumer", "seen-time", ArgType(RespExpr::INT64), ElementsAre("name", "first-consumer", "seen-time", ArgType(RespExpr::INT64),
"pel-count", IntArg(0), "pending", ArrLen(0))); "active-time", IntArg(-1), "pel-count", IntArg(0), "pending", ArrLen(0)));
// full with count less than number of messages in stream // full with count less than number of messages in stream
resp = Run({"xinfo", "stream", "mystream", "full", "count", "5"}); resp = Run({"xinfo", "stream", "mystream", "full", "count", "5"});
@ -1045,9 +1048,9 @@ TEST_F(StreamFamilyTest, XInfoStream) {
EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[9], IntArg(11)); // pel-count EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[9], IntArg(11)); // pel-count
EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[11], ArrLen(11)); // pending list EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[11], ArrLen(11)); // pending list
// consumer // consumer
EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec()[5],
IntArg(11)); // pel-count
EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec()[7], EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec()[7],
IntArg(11)); // pel-count
EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec()[9],
ArrLen(11)); // pending list ArrLen(11)); // pending list
// delete message // delete message
@ -1074,9 +1077,10 @@ TEST_F(StreamFamilyTest, XInfoStream) {
ElementsAre("name", "mygroup", "last-delivered-id", "11-1", "entries-read", ElementsAre("name", "mygroup", "last-delivered-id", "11-1", "entries-read",
IntArg(11), "lag", IntArg(0), "pel-count", IntArg(11), "pending", IntArg(11), "lag", IntArg(0), "pel-count", IntArg(11), "pending",
ArrLen(11), "consumers", ArrLen(1))); ArrLen(11), "consumers", ArrLen(1)));
EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec(), EXPECT_THAT(
ElementsAre("name", "first-consumer", "seen-time", ArgType(RespExpr::INT64), resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec(),
"pel-count", IntArg(11), "pending", ArrLen(11))); ElementsAre("name", "first-consumer", "seen-time", ArgType(RespExpr::INT64), "active-time",
ArgType(RespExpr::INT64), "pel-count", IntArg(11), "pending", ArrLen(11)));
} }
TEST_F(StreamFamilyTest, AutoClaimPelItemsFromAnotherConsumer) { TEST_F(StreamFamilyTest, AutoClaimPelItemsFromAnotherConsumer) {
@ -1195,4 +1199,51 @@ TEST_F(StreamFamilyTest, XsetIdSmallerMaxDeleted) {
ASSERT_THAT(resp, ErrArg("smaller")); ASSERT_THAT(resp, ErrArg("smaller"));
} }
TEST_F(StreamFamilyTest, SeenActiveTime) {
TEST_current_time_ms = 1000;
Run({"XGROUP", "CREATE", "mystream", "mygroup", "$", "MKSTREAM"});
Run({"XREADGROUP", "GROUP", "mygroup", "Alice", "COUNT", "1", "STREAMS", "mystream", ">"});
AdvanceTime(100);
auto resp = Run({"xinfo", "consumers", "mystream", "mygroup"});
EXPECT_THAT(resp, RespElementsAre("name", "Alice", "pending", IntArg(0), "idle", IntArg(100),
"inactive", IntArg(-1)));
Run({"XADD", "mystream", "*", "f", "v"});
Run({"XREADGROUP", "GROUP", "mygroup", "Alice", "COUNT", "1", "STREAMS", "mystream", ">"});
AdvanceTime(50);
resp = Run({"xinfo", "consumers", "mystream", "mygroup"});
EXPECT_THAT(resp, RespElementsAre("name", "Alice", "pending", IntArg(1), "idle", IntArg(50),
"inactive", IntArg(50)));
AdvanceTime(100);
resp = Run({"XREADGROUP", "GROUP", "mygroup", "Alice", "COUNT", "1", "STREAMS", "mystream", ">"});
EXPECT_THAT(resp, ArgType(RespExpr::NIL_ARRAY));
resp = Run({"xinfo", "consumers", "mystream", "mygroup"});
// Idle is 0 because XREADGROUP just run, but inactive continues clocking because nothing was
// read.
EXPECT_THAT(resp, RespElementsAre("name", "Alice", "pending", IntArg(1), "idle", IntArg(0),
"inactive", IntArg(150)));
// Serialize/deserialize.
resp = Run({"XINFO", "STREAM", "mystream", "FULL"});
auto groups = resp.GetVec()[17];
auto consumers = groups.GetVec()[0].GetVec()[13].GetVec()[0];
EXPECT_THAT(consumers, RespElementsAre("name", "Alice", "seen-time", IntArg(1250), "active-time",
IntArg(1100), "pel-count", IntArg(1), "pending", _));
absl::SetFlag(&FLAGS_stream_rdb_encode_v2, true);
resp = Run({"DUMP", "mystream"});
Run({"del", "mystream"});
resp = Run({"RESTORE", "mystream", "0", resp.GetString()});
EXPECT_EQ(resp, "OK");
resp = Run({"XINFO", "STREAM", "mystream", "FULL"});
groups = resp.GetVec()[17];
consumers = groups.GetVec()[0].GetVec()[13].GetVec()[0];
EXPECT_THAT(consumers, RespElementsAre("name", "Alice", "seen-time", IntArg(1250), "active-time",
IntArg(1100), "pel-count", IntArg(1), "pending", _));
}
} // namespace dfly } // namespace dfly