1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

bug(streams): entries_read is not updated on xreadgroup command (#1946)

fixes #1945
This commit is contained in:
Yue Li 2023-09-27 05:03:19 -07:00 committed by GitHub
parent ab903612f1
commit bcdebc35e9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 34 deletions

View file

@ -180,5 +180,6 @@ void streamFreeCG(streamCG *cg);
void streamDelConsumer(streamCG *cg, streamConsumer *consumer);
void streamLastValidID(stream *s, streamID *maxid);
int streamIDEqZero(streamID *id);
int streamRangeHasTombstones(stream *s, streamID *start, streamID *end);
#endif

View file

@ -1429,40 +1429,6 @@ int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) {
return 0;
}
#if ROMAN_ENABLE
/* Replies with a consumer group's current lag, that is the number of messages
* in the stream that are yet to be delivered. In case that the lag isn't
* available due to fragmentation, the reply to the client is a null. */
void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) {
int valid = 0;
long long lag = 0;
if (!s->entries_added) {
/* The lag of a newly-initialized stream is 0. */
lag = 0;
valid = 1;
} else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) {
/* No fragmentation ahead means that the group's logical reads counter
* is valid for performing the lag calculation. */
lag = (long long)s->entries_added - cg->entries_read;
valid = 1;
} else {
/* Attempt to retrieve the group's last ID logical read counter. */
long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id);
if (entries_read != SCG_INVALID_ENTRIES_READ) {
/* A valid counter was obtained. */
lag = (long long)s->entries_added - entries_read;
valid = 1;
}
}
if (valid) {
addReplyLongLong(c,lag);
} else {
addReplyNull(c);
}
}
/* This function returns a value that is the ID's logical read counter, or its
* distance (the number of entries) from the first entry ever to have been added
* to the stream.
@ -1524,6 +1490,42 @@ long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) {
return SCG_INVALID_ENTRIES_READ;
}
#if ROMAN_ENABLE
/* Replies with a consumer group's current lag, that is the number of messages
* in the stream that are yet to be delivered. In case that the lag isn't
* available due to fragmentation, the reply to the client is a null. */
void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) {
int valid = 0;
long long lag = 0;
if (!s->entries_added) {
/* The lag of a newly-initialized stream is 0. */
lag = 0;
valid = 1;
} else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) {
/* No fragmentation ahead means that the group's logical reads counter
* is valid for performing the lag calculation. */
lag = (long long)s->entries_added - cg->entries_read;
valid = 1;
} else {
/* Attempt to retrieve the group's last ID logical read counter. */
long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id);
if (entries_read != SCG_INVALID_ENTRIES_READ) {
/* A valid counter was obtained. */
lag = (long long)s->entries_added - entries_read;
valid = 1;
}
}
if (valid) {
addReplyLongLong(c,lag);
} else {
addReplyNull(c);
}
}
/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
* are created in the pending list of the stream and consumers. We need
* to propagate this changes in the form of XCLAIM commands. */

View file

@ -653,6 +653,16 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeO
rec.id = id;
rec.kv_arr.reserve(numfields);
if (opts.group && streamCompareID(&id, &opts.group->last_id) > 0) {
if (opts.group->entries_read != SCG_INVALID_ENTRIES_READ &&
!streamRangeHasTombstones(s, &id, NULL)) {
/* A valid counter and no future tombstones mean we can
* increment the read counter to keep tracking the group's
* progress. */
opts.group->entries_read++;
} else if (s->entries_added) {
/* The group's counter may be invalid, so we try to obtain it. */
opts.group->entries_read = streamEstimateDistanceFromFirstEverEntry(s, &id);
}
opts.group->last_id = id;
}