1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: fix bugs in stream_family (#4237)

1. Use transaction time in streams code, similarly to how we do it in other commands.
   Stop using mstime() and delete unused redis code.
2. Check for sequence overflow issue when passing huge sequence ids.
   Add a test.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-12-02 11:57:31 +02:00 committed by GitHub
parent ada96d9041
commit 91aff49fcd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 105 additions and 459 deletions

View file

@ -150,9 +150,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags);
streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *id, long long entries_read);
streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamEncodeID(void *buf, streamID *id);
void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b);

View file

@ -129,21 +129,6 @@ int streamDecrID(streamID *id) {
return ret;
}
/* Generate the next stream item ID given the previous one. If the current
* milliseconds Unix time is greater than the previous one, just use this
* as time part and start with sequence part of zero. Otherwise we use the
* previous time (and never go backward) and increment the sequence. */
void streamNextID(streamID *last_id, streamID *new_id) {
uint64_t ms = mstime();
if (ms > last_id->ms) {
new_id->ms = ms;
new_id->seq = 0;
} else {
*new_id = *last_id;
streamIncrID(new_id);
}
}
/* This is a wrapper function for lpGet() to directly get an integer value
* from the listpack (that may store numbers as a string), converting
* the string if needed.
@ -1031,16 +1016,6 @@ long long streamCGLag(stream *s, streamCG *cg) {
* Low level implementation of consumer groups
* ----------------------------------------------------------------------- */
/* Create a NACK entry setting the delivery count to 1 and the delivery
* time to the current time. The NACK consumer will be set to the one
* specified as argument of the function. */
streamNACK *streamCreateNACK(streamConsumer *consumer) {
streamNACK *nack = zmalloc(sizeof(*nack));
nack->delivery_time = mstime();
nack->delivery_count = 1;
nack->consumer = consumer;
return nack;
}
/* Free a NACK entry. */
void streamFreeNACK(streamNACK *na) {
@ -1093,35 +1068,13 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
return (cg == raxNotFound) ? NULL : cg;
}
/* Create a consumer with the specified name in the group 'cg' and return.
* If the consumer exists, return NULL. As a side effect, when the consumer
* is successfully created, the key space will be notified and dirty++ unless
* the SCC_NO_NOTIFY or SCC_NO_DIRTIFY flags is specified. */
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags) {
if (cg == NULL) return NULL;
streamConsumer *consumer = zmalloc(sizeof(*consumer));
int success = raxTryInsert(cg->consumers,(unsigned char*)name,
sdslen(name),consumer,NULL);
if (!success) {
zfree(consumer);
return NULL;
}
consumer->name = sdsdup(name);
consumer->pel = raxNew();
consumer->seen_time = mstime();
return consumer;
}
/* Lookup the consumer with the specified name in the group 'cg'. Its last
* seen time is updated unless the SLC_NO_REFRESH flag is specified. */
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
if (cg == NULL) return NULL;
int refresh = !(flags & SLC_NO_REFRESH);
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name));
if (consumer == raxNotFound) return NULL;
if (refresh) consumer->seen_time = mstime();
return consumer;
}

View file

@ -573,313 +573,3 @@ int ld2string(char *buf, size_t len, long double value, ld2string_mode mode) {
buf[l] = '\0';
return l;
}
#ifdef ROMAN_DISABLE_CODE
/* Get random bytes, attempts to get an initial seed from /dev/urandom and
* the uses a one way hash function in counter mode to generate a random
* stream. However if /dev/urandom is not available, a weaker seed is used.
*
* This function is not thread safe, since the state is global. */
void getRandomBytes(unsigned char *p, size_t len) {
/* Global state. */
static int seed_initialized = 0;
static unsigned char seed[64]; /* 512 bit internal block size. */
static uint64_t counter = 0; /* The counter we hash with the seed. */
if (!seed_initialized) {
/* Initialize a seed and use SHA1 in counter mode, where we hash
* the same seed with a progressive counter. For the goals of this
* function we just need non-colliding strings, there are no
* cryptographic security needs. */
FILE *fp = fopen("/dev/urandom","r");
if (fp == NULL || fread(seed,sizeof(seed),1,fp) != 1) {
/* Revert to a weaker seed, and in this case reseed again
* at every call.*/
for (unsigned int j = 0; j < sizeof(seed); j++) {
struct timeval tv;
gettimeofday(&tv,NULL);
pid_t pid = getpid();
seed[j] = tv.tv_sec ^ tv.tv_usec ^ pid ^ (long)fp;
}
} else {
seed_initialized = 1;
}
if (fp) fclose(fp);
}
while(len) {
/* This implements SHA256-HMAC. */
unsigned char digest[SHA256_BLOCK_SIZE];
unsigned char kxor[64];
unsigned int copylen =
len > SHA256_BLOCK_SIZE ? SHA256_BLOCK_SIZE : len;
/* IKEY: key xored with 0x36. */
memcpy(kxor,seed,sizeof(kxor));
for (unsigned int i = 0; i < sizeof(kxor); i++) kxor[i] ^= 0x36;
/* Obtain HASH(IKEY||MESSAGE). */
SHA256_CTX ctx;
sha256_init(&ctx);
sha256_update(&ctx,kxor,sizeof(kxor));
sha256_update(&ctx,(unsigned char*)&counter,sizeof(counter));
sha256_final(&ctx,digest);
/* OKEY: key xored with 0x5c. */
memcpy(kxor,seed,sizeof(kxor));
for (unsigned int i = 0; i < sizeof(kxor); i++) kxor[i] ^= 0x5C;
/* Obtain HASH(OKEY || HASH(IKEY||MESSAGE)). */
sha256_init(&ctx);
sha256_update(&ctx,kxor,sizeof(kxor));
sha256_update(&ctx,digest,SHA256_BLOCK_SIZE);
sha256_final(&ctx,digest);
/* Increment the counter for the next iteration. */
counter++;
memcpy(p,digest,copylen);
len -= copylen;
p += copylen;
}
}
/* Generate the Redis "Run ID", a SHA1-sized random number that identifies a
* given execution of Redis, so that if you are talking with an instance
* having run_id == A, and you reconnect and it has run_id == B, you can be
* sure that it is either a different instance or it was restarted. */
void getRandomHexChars(char *p, size_t len) {
char *charset = "0123456789abcdef";
size_t j;
getRandomBytes((unsigned char*)p,len);
for (j = 0; j < len; j++) p[j] = charset[p[j] & 0x0F];
}
/* Given the filename, return the absolute path as an SDS string, or NULL
* if it fails for some reason. Note that "filename" may be an absolute path
* already, this will be detected and handled correctly.
*
* The function does not try to normalize everything, but only the obvious
* case of one or more "../" appearing at the start of "filename"
* relative path. */
sds getAbsolutePath(char *filename) {
char cwd[1024];
sds abspath;
sds relpath = sdsnew(filename);
relpath = sdstrim(relpath," \r\n\t");
if (relpath[0] == '/') return relpath; /* Path is already absolute. */
/* If path is relative, join cwd and relative path. */
if (getcwd(cwd,sizeof(cwd)) == NULL) {
sdsfree(relpath);
return NULL;
}
abspath = sdsnew(cwd);
if (sdslen(abspath) && abspath[sdslen(abspath)-1] != '/')
abspath = sdscat(abspath,"/");
/* At this point we have the current path always ending with "/", and
* the trimmed relative path. Try to normalize the obvious case of
* trailing ../ elements at the start of the path.
*
* For every "../" we find in the filename, we remove it and also remove
* the last element of the cwd, unless the current cwd is "/". */
while (sdslen(relpath) >= 3 &&
relpath[0] == '.' && relpath[1] == '.' && relpath[2] == '/')
{
sdsrange(relpath,3,-1);
if (sdslen(abspath) > 1) {
char *p = abspath + sdslen(abspath)-2;
int trimlen = 1;
while(*p != '/') {
p--;
trimlen++;
}
sdsrange(abspath,0,-(trimlen+1));
}
}
/* Finally glue the two parts together. */
abspath = sdscatsds(abspath,relpath);
sdsfree(relpath);
return abspath;
}
#endif
/* Return the UNIX time in microseconds */
long long ustime(void) {
struct timeval tv;
long long ust;
gettimeofday(&tv, NULL);
ust = ((long long)tv.tv_sec)*1000000;
ust += tv.tv_usec;
return ust;
}
#ifdef REDIS_TEST
#include <assert.h>
static void test_string2ll(void) {
char buf[32];
long long v;
/* May not start with +. */
strcpy(buf,"+1");
assert(string2ll(buf,strlen(buf),&v) == 0);
/* Leading space. */
strcpy(buf," 1");
assert(string2ll(buf,strlen(buf),&v) == 0);
/* Trailing space. */
strcpy(buf,"1 ");
assert(string2ll(buf,strlen(buf),&v) == 0);
/* May not start with 0. */
strcpy(buf,"01");
assert(string2ll(buf,strlen(buf),&v) == 0);
strcpy(buf,"-1");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == -1);
strcpy(buf,"0");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == 0);
strcpy(buf,"1");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == 1);
strcpy(buf,"99");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == 99);
strcpy(buf,"-99");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == -99);
strcpy(buf,"-9223372036854775808");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == LLONG_MIN);
strcpy(buf,"-9223372036854775809"); /* overflow */
assert(string2ll(buf,strlen(buf),&v) == 0);
strcpy(buf,"9223372036854775807");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == LLONG_MAX);
strcpy(buf,"9223372036854775808"); /* overflow */
assert(string2ll(buf,strlen(buf),&v) == 0);
}
static void test_string2l(void) {
char buf[32];
long v;
/* May not start with +. */
strcpy(buf,"+1");
assert(string2l(buf,strlen(buf),&v) == 0);
/* May not start with 0. */
strcpy(buf,"01");
assert(string2l(buf,strlen(buf),&v) == 0);
strcpy(buf,"-1");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == -1);
strcpy(buf,"0");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == 0);
strcpy(buf,"1");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == 1);
strcpy(buf,"99");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == 99);
strcpy(buf,"-99");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == -99);
#if LONG_MAX != LLONG_MAX
strcpy(buf,"-2147483648");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == LONG_MIN);
strcpy(buf,"-2147483649"); /* overflow */
assert(string2l(buf,strlen(buf),&v) == 0);
strcpy(buf,"2147483647");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == LONG_MAX);
strcpy(buf,"2147483648"); /* overflow */
assert(string2l(buf,strlen(buf),&v) == 0);
#endif
}
static void test_ll2string(void) {
char buf[32];
long long v;
int sz;
v = 0;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 1);
assert(!strcmp(buf, "0"));
v = -1;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 2);
assert(!strcmp(buf, "-1"));
v = 99;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 2);
assert(!strcmp(buf, "99"));
v = -99;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 3);
assert(!strcmp(buf, "-99"));
v = -2147483648;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 11);
assert(!strcmp(buf, "-2147483648"));
v = LLONG_MIN;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 20);
assert(!strcmp(buf, "-9223372036854775808"));
v = LLONG_MAX;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 19);
assert(!strcmp(buf, "9223372036854775807"));
}
#define UNUSED(x) (void)(x)
int utilTest(int argc, char **argv, int accurate) {
UNUSED(argc);
UNUSED(argv);
UNUSED(accurate);
test_string2ll();
test_string2l();
test_ll2string();
return 0;
}
#endif

View file

@ -92,34 +92,6 @@ void _serverAssert(const char *estr, const char *file, int line);
#define serverAssert(_e) ((_e)?(void)0 : (_serverAssert(#_e,__FILE__,__LINE__),_exit(1)))
typedef long long mstime_t; /* millisecond time type. */
long long ustime(void);
/* Return the current time in minutes, just taking the least significant
* 16 bits. The returned time is suitable to be stored as LDT (last decrement
* time) for the LFU implementation. */
static inline unsigned long LFUGetTimeInMinutesT(size_t sec) {
return (sec / 60) & 65535;
}
static inline unsigned long LFUGetTimeInMinutes() {
return LFUGetTimeInMinutesT(time(NULL));
}
/* Given an object last access time, compute the minimum number of minutes
* that elapsed since the last access. Handle overflow (ldt greater than
* the current 16 bits minutes time) considering the time as wrapping
* exactly once. */
static inline unsigned long LFUTimeElapsed(time_t sec, unsigned long ldt) {
unsigned long now = LFUGetTimeInMinutesT(sec);
if (now >= ldt) return now-ldt;
return 65535-ldt+now;
}
/* Return the UNIX time in milliseconds */
static inline mstime_t mstime(void) {
return ustime()/1000;
}
#endif

View file

@ -2,8 +2,15 @@
#include "base/logging.h"
extern "C" {
#include "redis/stream.h"
#include "redis/zmalloc.h"
}
namespace dfly {
using namespace std;
sds WrapSds(std::string_view s) {
static thread_local sds tmp_sds = sdsempty();
return tmp_sds = sdscpylen(tmp_sds, s.data(), s.length());
@ -40,4 +47,25 @@ RandomPick UniquePicksGenerator::Generate() {
return max_index;
}
streamConsumer* StreamCreateConsumer(streamCG* cg, string_view name, uint64_t now_ms, int flags) {
DCHECK(cg);
DCHECK(!name.empty());
if (cg == NULL)
return NULL;
streamConsumer* consumer = (streamConsumer*)zmalloc(sizeof(*consumer));
int success =
raxTryInsert(cg->consumers, (unsigned char*)name.data(), name.size(), consumer, NULL);
if (!success) {
zfree(consumer);
return NULL;
}
consumer->name = sdsnewlen(name.data(), name.size());
consumer->pel = raxNew();
consumer->seen_time = now_ms;
return consumer;
}
} // namespace dfly

View file

@ -14,6 +14,10 @@
extern "C" {
#include "redis/sds.h"
}
typedef struct streamConsumer streamConsumer;
typedef struct streamCG streamCG;
namespace dfly {
template <typename DenseSet>
@ -82,4 +86,7 @@ class UniquePicksGenerator : public PicksGenerator {
absl::BitGen bitgen_{};
};
streamConsumer* StreamCreateConsumer(streamCG* cg, std::string_view name, uint64_t now_ms,
int flags);
} // namespace dfly

View file

@ -41,6 +41,7 @@ extern "C" {
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/family_utils.h"
#include "server/hset_family.h"
#include "server/journal/executor.h"
#include "server/journal/serializer.h"
@ -971,9 +972,10 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
}
for (const auto& pel : cg.pel_arr) {
streamNACK* nack = streamCreateNACK(NULL);
streamNACK* nack = reinterpret_cast<streamNACK*>(zmalloc(sizeof(*nack)));
nack->delivery_time = pel.delivery_time;
nack->delivery_count = pel.delivery_count;
nack->consumer = nullptr;
if (!raxTryInsert(cgroup->pel, const_cast<uint8_t*>(pel.rawid.data()), pel.rawid.size(), nack,
NULL)) {
@ -985,17 +987,13 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
}
for (const auto& cons : cg.cons_arr) {
sds cname = ToSds(cons.name);
streamConsumer* consumer =
streamCreateConsumer(cgroup, cname, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
sdsfree(cname);
streamConsumer* consumer = StreamCreateConsumer(cgroup, ToSV(cons.name), cons.seen_time,
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
if (!consumer) {
LOG(ERROR) << "Duplicate stream consumer detected.";
ec_ = RdbError(errc::duplicate_key);
return;
}
consumer->seen_time = cons.seen_time;
/* Create the PEL (pending entries list) about entries owned by this specific
* consumer. */
@ -1975,17 +1973,8 @@ auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result<OpaqueObj> {
return make_unexpected(ec);
}
// streamNACK* nack = streamCreateNACK(NULL);
// auto cleanup2 = absl::Cleanup([&] { streamFreeNACK(nack); });
SET_OR_UNEXPECT(FetchInt<int64_t>(), pel.delivery_time);
SET_OR_UNEXPECT(LoadLen(nullptr), pel.delivery_count);
/*if (!raxTryInsert(cgroup->pel, rawid, sizeof(rawid), nack, NULL)) {
LOG(ERROR) << "Duplicated global PEL entry loading stream consumer group";
return Unexpected(errc::duplicate_key);
}
std::move(cleanup2).Cancel();*/
}
/* Now that we loaded our global PEL, we need to load the
@ -2234,7 +2223,7 @@ error_code RdbLoader::Load(io::Source* src) {
/* Key-specific attributes, set by opcodes before the key type. */
ObjSettings settings;
settings.now = mstime();
settings.now = GetCurrentTimeMs();
size_t keys_loaded = 0;
auto cleanup = absl::Cleanup([&] { FinishLoad(start, &keys_loaded); });

View file

@ -285,10 +285,9 @@ int64_t lpGetInteger(unsigned char* ele) {
* milliseconds Unix time is greater than the previous one, just use this
* as time part and start with sequence part of zero. Otherwise we use the
* previous time (and never go backward) and increment the sequence. */
void StreamNextID(const streamID* last_id, streamID* new_id) {
uint64_t ms = mstime();
if (ms > last_id->ms) {
new_id->ms = ms;
void StreamNextID(uint64_t now_ms, const streamID* last_id, streamID* new_id) {
if (now_ms > last_id->ms) {
new_id->ms = now_ms;
new_id->seq = 0;
} else {
*new_id = *last_id;
@ -315,15 +314,15 @@ inline void StreamEncodeID(uint8_t* buf, streamID* id) {
* part of the passed ID is ignored and the function will attempt to use an
* auto-generated sequence.
*
* The function returns C_OK if the item was added, this is always true
* The function returns 0 if the item was added, this is always true
* if the ID was generated by the function. However the function may return
* C_ERR in several cases:
* errors in several cases:
* 1. If an ID was given via 'use_id', but adding it failed since the
* current top ID is greater or equal. errno will be set to EDOM.
* current top ID is greater or equal, it returns EDOM.
* 2. If a size of a single element or the sum of the elements is too big to
* be stored into the stream. errno will be set to ERANGE. */
int StreamAppendItem(stream* s, CmdArgList fields, streamID* added_id, streamID* use_id,
int seq_given) {
* be stored into the stream. it returns ERANGE. */
int StreamAppendItem(stream* s, CmdArgList fields, uint64_t now_ms, streamID* added_id,
streamID* use_id, int seq_given) {
/* Generate the new entry ID. */
streamID id;
if (use_id) {
@ -336,7 +335,7 @@ int StreamAppendItem(stream* s, CmdArgList fields, streamID* added_id, streamID*
* in time. */
if (s->last_id.ms == use_id->ms) {
if (s->last_id.seq == UINT64_MAX) {
return C_ERR;
return EDOM;
}
id = s->last_id;
id.seq++;
@ -345,7 +344,7 @@ int StreamAppendItem(stream* s, CmdArgList fields, streamID* added_id, streamID*
}
}
} else {
StreamNextID(&s->last_id, &id);
StreamNextID(now_ms, &s->last_id, &id);
}
/* Check that the new ID is greater than the last entry ID
@ -353,8 +352,7 @@ int StreamAppendItem(stream* s, CmdArgList fields, streamID* added_id, streamID*
* overflow (and wrap-around) when incrementing the sequence
part. */
if (streamCompareID(&id, &s->last_id) <= 0) {
errno = EDOM;
return C_ERR;
return EDOM;
}
/* Avoid overflow when trying to add an element to the stream (listpack
@ -366,8 +364,7 @@ int StreamAppendItem(stream* s, CmdArgList fields, streamID* added_id, streamID*
}
if (totelelen > STREAM_LISTPACK_MAX_SIZE) {
errno = ERANGE;
return C_ERR;
return ERANGE;
}
/* Add the new entry. */
@ -563,15 +560,16 @@ int StreamAppendItem(stream* s, CmdArgList fields, streamID* added_id, streamID*
s->first_id = id;
if (added_id)
*added_id = id;
return C_OK;
return 0;
}
/* Create a NACK entry setting the delivery count to 1 and the delivery
* time to the current time or test-hooked time. The NACK consumer will be
* set to the one specified as argument of the function. */
streamNACK* StreamCreateNACK(streamConsumer* consumer) {
streamNACK* StreamCreateNACK(streamConsumer* consumer, uint64_t now_ms) {
streamNACK* nack = reinterpret_cast<streamNACK*>(zmalloc(sizeof(*nack)));
nack->delivery_time = GetCurrentTimeMs();
nack->delivery_time = now_ms;
nack->delivery_count = 1;
nack->consumer = consumer;
return nack;
@ -654,13 +652,13 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgL
streamID result_id;
const auto& parsed_id = opts.parsed_id;
streamID passed_id = parsed_id.val;
int res = StreamAppendItem(stream_inst, args, &result_id,
int res = StreamAppendItem(stream_inst, args, op_args.db_cntx.time_now_ms, &result_id,
parsed_id.id_given ? &passed_id : nullptr, parsed_id.has_seq);
if (res != C_OK) {
if (errno == ERANGE)
if (res != 0) {
if (res == ERANGE)
return OpStatus::OUT_OF_RANGE;
if (errno == EDOM)
if (res == EDOM)
return OpStatus::STREAM_ID_SMALL;
return OpStatus::OUT_OF_MEMORY;
@ -731,11 +729,12 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeO
if (opts.group && !opts.noack) {
unsigned char buf[sizeof(streamID)];
StreamEncodeID(buf, &id);
uint64_t now_ms = op_args.db_cntx.time_now_ms;
/* Try to add a new NACK. Most of the time this will work and
* will not require extra lookups. We'll fix the problem later
* if we find that there is already an entry for this ID. */
streamNACK* nack = StreamCreateNACK(opts.consumer);
streamNACK* nack = StreamCreateNACK(opts.consumer, now_ms);
int group_inserted = raxTryInsert(opts.group->pel, buf, sizeof(buf), nack, nullptr);
int consumer_inserted = raxTryInsert(opts.consumer->pel, buf, sizeof(buf), nack, nullptr);
@ -752,7 +751,7 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeO
/* Update the consumer and NACK metadata. */
nack->consumer = opts.consumer;
nack->delivery_time = GetCurrentTimeMs();
nack->delivery_time = now_ms;
nack->delivery_count = 1;
/* Add the entry in the new consumer local PEL. */
raxInsert(opts.consumer->pel, buf, sizeof(buf), nack, NULL);
@ -803,7 +802,7 @@ OpResult<RecordVec> OpRangeFromConsumerPEL(const OpArgs& op_args, string_view ke
result.push_back(Record{id, vector<pair<string, string>>()});
} else {
streamNACK* nack = static_cast<streamNACK*>(ri.data);
nack->delivery_time = GetCurrentTimeMs();
nack->delivery_time = op_args.db_cntx.time_now_ms;
nack->delivery_count++;
result.push_back(std::move(op_result.value()[0]));
}
@ -1086,7 +1085,7 @@ OpResult<vector<ConsumerInfo>> OpConsumers(const DbContext& db_cntx, EngineShard
raxIterator ri;
raxStart(&ri, cg->consumers);
raxSeek(&ri, "^", NULL, 0);
mstime_t now = mstime();
mstime_t now = db_cntx.time_now_ms;
while (raxNext(&ri)) {
ConsumerInfo consumer_info;
streamConsumer* consumer = (streamConsumer*)ri.data;
@ -1232,7 +1231,7 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
RETURN_ON_BAD_STATUS(cgr_res);
streamConsumer* consumer = nullptr;
auto now = GetCurrentTimeMs();
uint64_t now_ms = op_args.db_cntx.time_now_ms;
ClaimInfo result;
result.justid = (opts.flags & kClaimJustID);
@ -1265,7 +1264,7 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
// Create the NACK forcefully.
if ((opts.flags & kClaimForce) && nack == raxNotFound) {
/* Create the NACK. */
nack = streamCreateNACK(nullptr);
nack = StreamCreateNACK(nullptr, now_ms);
raxInsert(cgr_res->cg->pel, buf.begin(), sizeof(buf), nack, nullptr);
}
@ -1273,7 +1272,7 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
if (nack != raxNotFound) {
// First check if the entry id exceeds the `min_idle_time`.
if (nack->consumer && opts.min_idle_time) {
mstime_t this_idle = now - nack->delivery_time;
mstime_t this_idle = now_ms - nack->delivery_time;
if (this_idle < opts.min_idle_time) {
continue;
}
@ -1282,8 +1281,8 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
// Try to get the consumer. If not found, create a new one.
auto cname = WrapSds(opts.consumer);
if ((consumer = streamLookupConsumer(cgr_res->cg, cname, SLC_NO_REFRESH)) == nullptr) {
consumer =
streamCreateConsumer(cgr_res->cg, cname, nullptr, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
consumer = StreamCreateConsumer(cgr_res->cg, opts.consumer, now_ms,
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
}
// If the entry belongs to the same consumer, we don't have to
@ -1358,8 +1357,8 @@ OpResult<uint32_t> OpCreateConsumer(const OpArgs& op_args, string_view key, stri
StreamMemTracker mem_tracker;
streamConsumer* consumer = streamCreateConsumer(cgroup_res->cg, WrapSds(consumer_name), NULL, 0,
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
streamConsumer* consumer = StreamCreateConsumer(
cgroup_res->cg, consumer_name, op_args.db_cntx.time_now_ms, SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
mem_tracker.UpdateStreamSize(cgroup_res->it->second);
return consumer ? OpStatus::OK : OpStatus::KEY_EXISTS;
@ -1536,7 +1535,6 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
StreamMemTracker mem_tracker;
streamConsumer* consumer = nullptr;
// from Redis spec on XAutoClaim:
// https://redis.io/commands/xautoclaim/
// The maximum number of pending entries that the command scans is the product of
@ -1553,8 +1551,16 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
ClaimInfo result;
result.justid = (opts.flags & kClaimJustID);
auto now = GetCurrentTimeMs();
uint64_t now_ms = op_args.db_cntx.time_now_ms;
int count = opts.count;
auto cname = WrapSds(opts.consumer);
streamConsumer* consumer = streamLookupConsumer(group, cname, SLC_DEFAULT);
if (consumer == nullptr) {
consumer = StreamCreateConsumer(group, opts.consumer, now_ms, SCC_DEFAULT);
// TODO: notify xgroup-createconsumer event once we support stream events.
}
consumer->seen_time = now_ms;
while (attempts-- && count && raxNext(&ri)) {
streamNACK* nack = (streamNACK*)ri.data;
@ -1571,26 +1577,21 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
}
if (opts.min_idle_time) {
mstime_t this_idle = now - nack->delivery_time;
mstime_t this_idle = now_ms - nack->delivery_time;
if (this_idle < opts.min_idle_time)
continue;
}
auto cname = WrapSds(opts.consumer);
if (consumer == nullptr) {
consumer = streamLookupConsumer(group, cname, SLC_DEFAULT);
if (consumer == nullptr) {
consumer = streamCreateConsumer(group, cname, nullptr, 0, SCC_DEFAULT);
}
}
if (nack->consumer != consumer) {
/* Remove the entry from the old consumer.
* Note that nack->consumer is NULL if we created the
* NACK above because of the FORCE option. */
if (nack->consumer) {
raxRemove(nack->consumer->pel, ri.key, ri.key_len, nullptr);
}
}
nack->delivery_time = now;
nack->delivery_time = now_ms;
if (!result.justid) {
nack->delivery_count++;
}
@ -1680,12 +1681,12 @@ PendingReducedResult GetPendingReducedResult(streamCG* cg) {
return result;
}
PendingExtendedResultList GetPendingExtendedResult(streamCG* cg, streamConsumer* consumer,
PendingExtendedResultList GetPendingExtendedResult(uint64_t now_ms, streamCG* cg,
streamConsumer* consumer,
const PendingOpts& opts) {
PendingExtendedResultList result;
rax* pel = consumer ? consumer->pel : cg->pel;
streamID sstart = opts.start.val, send = opts.end.val;
auto now = GetCurrentTimeMs();
unsigned char start_key[sizeof(streamID)];
unsigned char end_key[sizeof(streamID)];
raxIterator ri;
@ -1703,7 +1704,7 @@ PendingExtendedResultList GetPendingExtendedResult(streamCG* cg, streamConsumer*
streamNACK* nack = static_cast<streamNACK*>(ri.data);
if (opts.min_idle_time) {
mstime_t this_idle = now - nack->delivery_time;
mstime_t this_idle = now_ms - nack->delivery_time;
if (this_idle < opts.min_idle_time) {
continue;
}
@ -1716,7 +1717,7 @@ PendingExtendedResultList GetPendingExtendedResult(streamCG* cg, streamConsumer*
streamDecodeID(ri.key, &id);
/* Milliseconds elapsed since last delivery. */
mstime_t elapsed = now - nack->delivery_time;
mstime_t elapsed = now_ms - nack->delivery_time;
if (elapsed < 0) {
elapsed = 0;
}
@ -1745,7 +1746,7 @@ OpResult<PendingResult> OpPending(const OpArgs& op_args, string_view key, const
if (opts.count == -1) {
result = GetPendingReducedResult(cgroup_res->cg);
} else {
result = GetPendingExtendedResult(cgroup_res->cg, consumer, opts);
result = GetPendingExtendedResult(op_args.db_cntx.time_now_ms, cgroup_res->cg, consumer, opts);
}
return result;
}
@ -2305,8 +2306,8 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder,
auto cname = WrapSds(opts->consumer_name);
range_opts.consumer = streamLookupConsumer(sitem.group, cname, SLC_NO_REFRESH);
if (!range_opts.consumer) {
range_opts.consumer =
streamCreateConsumer(sitem.group, cname, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
range_opts.consumer = StreamCreateConsumer(
sitem.group, opts->consumer_name, GetCurrentTimeMs(), SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
}
}
@ -2651,7 +2652,7 @@ void StreamFamily::XClaim(CmdArgList args, const CommandContext& cmd_cntx) {
if (!ParseXclaimOptions(args, opts, cmd_cntx.rb))
return;
if (auto now = GetCurrentTimeMs();
if (uint64_t now = cmd_cntx.tx->GetDbContext().time_now_ms;
opts.delivery_time < 0 || static_cast<uint64_t>(opts.delivery_time) > now)
opts.delivery_time = now;
@ -3077,7 +3078,8 @@ variant<bool, facade::ErrorReply> HasEntries2(const OpArgs& op_args, string_view
auto cname = WrapSds(opts->consumer_name);
consumer = streamLookupConsumer(group, cname, SLC_NO_REFRESH);
if (!consumer) {
consumer = streamCreateConsumer(group, cname, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
consumer = StreamCreateConsumer(group, opts->consumer_name, op_args.db_cntx.time_now_ms,
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
}
requested_sitem.group = group;

View file

@ -1073,4 +1073,11 @@ TEST_F(StreamFamilyTest, XInfoStream) {
ElementsAre("name", "first-consumer", "seen-time", ArgType(RespExpr::INT64),
"pel-count", IntArg(11), "pending", ArrLen(11)));
}
TEST_F(StreamFamilyTest, XAddMaxSeq) {
Run({"XADD", "x", "1-18446744073709551615", "f1", "v1"});
auto resp = Run({"XADD", "x", "1-*", "f2", "v2"});
EXPECT_THAT(resp, ErrArg("The ID specified in XADD is equal or smaller"));
}
} // namespace dfly