mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
fix: support loading of 7.x streams correctly (#4281)
Now rdb_load supports RDB_TYPE_STREAM_LISTPACKS, RDB_TYPE_STREAM_LISTPACKS_2 and RDB_TYPE_STREAM_LISTPACKS_3 formats. rdb_save still saves with RDB_TYPE_STREAM_LISTPACKS format - we want to release the DF version that can load everything first, and then update the replicaion format in the next versions. Also, update rdb_test.cc Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
4428480a4e
commit
f428dc31be
6 changed files with 133 additions and 58 deletions
|
@ -91,6 +91,10 @@ inline bool operator==(const RespExpr& left, std::string_view s) {
|
||||||
return left.type == RespExpr::STRING && ToSV(left.GetBuf()) == s;
|
return left.type == RespExpr::STRING && ToSV(left.GetBuf()) == s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline bool operator==(const RespExpr& left, int64_t val) {
|
||||||
|
return left.type == RespExpr::INT64 && left.GetInt() == val;
|
||||||
|
}
|
||||||
|
|
||||||
inline bool operator!=(const RespExpr& left, std::string_view s) {
|
inline bool operator!=(const RespExpr& left, std::string_view s) {
|
||||||
return !(left == s);
|
return !(left == s);
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,6 @@ extern "C" {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Custom types: Range 30-35 is used by DF RDB types.
|
// Custom types: Range 30-35 is used by DF RDB types.
|
||||||
constexpr uint8_t RDB_TYPE_JSON_OLD = 20;
|
|
||||||
constexpr uint8_t RDB_TYPE_JSON = 30;
|
constexpr uint8_t RDB_TYPE_JSON = 30;
|
||||||
constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31;
|
constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31;
|
||||||
constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32;
|
constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32;
|
||||||
|
|
|
@ -477,6 +477,8 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const unique_ptr<LoadTrace>& ptr
|
||||||
CreateZSet(ptr.get());
|
CreateZSet(ptr.get());
|
||||||
break;
|
break;
|
||||||
case RDB_TYPE_STREAM_LISTPACKS:
|
case RDB_TYPE_STREAM_LISTPACKS:
|
||||||
|
case RDB_TYPE_STREAM_LISTPACKS_2:
|
||||||
|
case RDB_TYPE_STREAM_LISTPACKS_3:
|
||||||
CreateStream(ptr.get());
|
CreateStream(ptr.get());
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -955,8 +957,16 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
|
||||||
}
|
}
|
||||||
|
|
||||||
s->length = ltrace->stream_trace->stream_len;
|
s->length = ltrace->stream_trace->stream_len;
|
||||||
s->last_id.ms = ltrace->stream_trace->ms;
|
CopyStreamId(ltrace->stream_trace->last_id, &s->last_id);
|
||||||
s->last_id.seq = ltrace->stream_trace->seq;
|
CopyStreamId(ltrace->stream_trace->first_id, &s->first_id);
|
||||||
|
CopyStreamId(ltrace->stream_trace->max_deleted_entry_id, &s->max_deleted_entry_id);
|
||||||
|
s->entries_added = ltrace->stream_trace->entries_added;
|
||||||
|
|
||||||
|
if (rdb_type_ == RDB_TYPE_STREAM_LISTPACKS) {
|
||||||
|
/* Since the rax is already loaded, we can find the first entry's
|
||||||
|
* ID. */
|
||||||
|
streamGetEdgeID(s, 1, 1, &s->first_id);
|
||||||
|
}
|
||||||
|
|
||||||
for (const auto& cg : ltrace->stream_trace->cgroup) {
|
for (const auto& cg : ltrace->stream_trace->cgroup) {
|
||||||
string_view cgname = ToSV(cg.name);
|
string_view cgname = ToSV(cg.name);
|
||||||
|
@ -964,7 +974,12 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
|
||||||
cg_id.ms = cg.ms;
|
cg_id.ms = cg.ms;
|
||||||
cg_id.seq = cg.seq;
|
cg_id.seq = cg.seq;
|
||||||
|
|
||||||
streamCG* cgroup = streamCreateCG(s, cgname.data(), cgname.size(), &cg_id, 0);
|
uint64_t entries_read = cg.entries_read;
|
||||||
|
if (rdb_type_ == RDB_TYPE_STREAM_LISTPACKS) {
|
||||||
|
entries_read = streamEstimateDistanceFromFirstEverEntry(s, &cg_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
streamCG* cgroup = streamCreateCG(s, cgname.data(), cgname.size(), &cg_id, entries_read);
|
||||||
if (cgroup == NULL) {
|
if (cgroup == NULL) {
|
||||||
LOG(ERROR) << "Duplicated consumer group name " << cgname;
|
LOG(ERROR) << "Duplicated consumer group name " << cgname;
|
||||||
ec_ = RdbError(errc::duplicate_key);
|
ec_ = RdbError(errc::duplicate_key);
|
||||||
|
@ -1512,7 +1527,7 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
|
||||||
// RDB_TYPE_JSON == 20. On newer versions > 9 we bumped up RDB_TYPE_JSON to 30
|
// RDB_TYPE_JSON == 20. On newer versions > 9 we bumped up RDB_TYPE_JSON to 30
|
||||||
// because it overlapped with the new type RDB_TYPE_SET_LISTPACK
|
// because it overlapped with the new type RDB_TYPE_SET_LISTPACK
|
||||||
if (rdb_version_ < 10) {
|
if (rdb_version_ < 10) {
|
||||||
// consider it RDB_TYPE_JSON_OLD
|
// consider it RDB_TYPE_JSON_OLD (20)
|
||||||
iores = ReadJson();
|
iores = ReadJson();
|
||||||
} else {
|
} else {
|
||||||
iores = ReadGeneric(rdbtype);
|
iores = ReadGeneric(rdbtype);
|
||||||
|
@ -1876,21 +1891,20 @@ auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result<OpaqueObj> {
|
||||||
// so if there are still unread elements return the partial stream.
|
// so if there are still unread elements return the partial stream.
|
||||||
if (listpacks > n) {
|
if (listpacks > n) {
|
||||||
pending_read_.remaining = listpacks - n;
|
pending_read_.remaining = listpacks - n;
|
||||||
return OpaqueObj{std::move(load_trace), RDB_TYPE_STREAM_LISTPACKS};
|
return OpaqueObj{std::move(load_trace), rdbtype};
|
||||||
} else if (pending_read_.remaining > 0) {
|
|
||||||
pending_read_.remaining = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load stream metadata.
|
pending_read_.remaining = 0;
|
||||||
|
|
||||||
|
// Load stream metadata.
|
||||||
load_trace->stream_trace.reset(new StreamTrace);
|
load_trace->stream_trace.reset(new StreamTrace);
|
||||||
|
|
||||||
/* Load total number of items inside the stream. */
|
/* Load total number of items inside the stream. */
|
||||||
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->stream_len);
|
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->stream_len);
|
||||||
|
|
||||||
/* Load the last entry ID. */
|
/* Load the last entry ID. */
|
||||||
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->ms);
|
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->last_id.ms);
|
||||||
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->seq);
|
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->last_id.seq);
|
||||||
|
|
||||||
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
|
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
|
||||||
/* Load the first entry ID. */
|
/* Load the first entry ID. */
|
||||||
|
@ -1907,13 +1921,7 @@ auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result<OpaqueObj> {
|
||||||
/* During migration the offset can be initialized to the stream's
|
/* During migration the offset can be initialized to the stream's
|
||||||
* length. At this point, we also don't care about tombstones
|
* length. At this point, we also don't care about tombstones
|
||||||
* because CG offsets will be later initialized as well. */
|
* because CG offsets will be later initialized as well. */
|
||||||
load_trace->stream_trace->max_deleted_entry_id.ms = 0;
|
|
||||||
load_trace->stream_trace->max_deleted_entry_id.seq = 0;
|
|
||||||
load_trace->stream_trace->entries_added = load_trace->stream_trace->stream_len;
|
load_trace->stream_trace->entries_added = load_trace->stream_trace->stream_len;
|
||||||
|
|
||||||
// TODO add implementation, we need to find the first entry's ID.
|
|
||||||
// The redis code is next
|
|
||||||
// streamGetEdgeID(s,1,1,&s->first_id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Consumer groups loading */
|
/* Consumer groups loading */
|
||||||
|
@ -1937,24 +1945,11 @@ auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result<OpaqueObj> {
|
||||||
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.ms);
|
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.ms);
|
||||||
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.seq);
|
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.seq);
|
||||||
|
|
||||||
uint64_t cg_offset;
|
cgroup.entries_read = 0;
|
||||||
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
|
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
|
||||||
SET_OR_UNEXPECT(LoadLen(nullptr), cg_offset);
|
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.entries_read);
|
||||||
(void)cg_offset;
|
|
||||||
} else {
|
|
||||||
// TODO implement
|
|
||||||
// cg_offset = should be calculated like streamEstimateDistanceFromFirstEverEntry();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO add our implementation for the next Redis logic
|
|
||||||
// streamCG* cgroup = streamCreateCG(s, cgname, sdslen(cgname), &cg_id, cg_offset);
|
|
||||||
// if (cgroup == NULL) {
|
|
||||||
// rdbReportCorruptRDB("Duplicated consumer group name %s", cgname);
|
|
||||||
// decrRefCount(o);
|
|
||||||
// sdsfree(cgname);
|
|
||||||
// return NULL;
|
|
||||||
// }
|
|
||||||
|
|
||||||
/* Load the global PEL for this consumer group, however we'll
|
/* Load the global PEL for this consumer group, however we'll
|
||||||
* not yet populate the NACK structures with the message
|
* not yet populate the NACK structures with the message
|
||||||
* owner, since consumers for this group and their messages will
|
* owner, since consumers for this group and their messages will
|
||||||
|
@ -2724,6 +2719,11 @@ std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, LoadConfig co
|
||||||
return visitor.ec();
|
return visitor.ec();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RdbLoaderBase::CopyStreamId(const StreamID& src, struct streamID* dest) {
|
||||||
|
dest->ms = src.ms;
|
||||||
|
dest->seq = src.seq;
|
||||||
|
}
|
||||||
|
|
||||||
void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
|
void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
|
||||||
EngineShard* es = EngineShard::tlocal();
|
EngineShard* es = EngineShard::tlocal();
|
||||||
DbContext db_cntx{&namespaces->GetDefaultNamespace(), db_ind, GetCurrentTimeMs()};
|
DbContext db_cntx{&namespaces->GetDefaultNamespace(), db_ind, GetCurrentTimeMs()};
|
||||||
|
|
|
@ -16,6 +16,8 @@ extern "C" {
|
||||||
#include "server/common.h"
|
#include "server/common.h"
|
||||||
#include "server/journal/serializer.h"
|
#include "server/journal/serializer.h"
|
||||||
|
|
||||||
|
struct streamID;
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
|
||||||
class EngineShardSet;
|
class EngineShardSet;
|
||||||
|
@ -84,15 +86,15 @@ class RdbLoaderBase {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct StreamID {
|
struct StreamID {
|
||||||
uint64_t ms;
|
uint64_t ms = 0;
|
||||||
uint64_t seq;
|
uint64_t seq = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct StreamCGTrace {
|
struct StreamCGTrace {
|
||||||
RdbVariant name;
|
RdbVariant name;
|
||||||
uint64_t ms;
|
uint64_t ms;
|
||||||
uint64_t seq;
|
uint64_t seq;
|
||||||
|
uint64_t entries_read;
|
||||||
std::vector<StreamPelTrace> pel_arr;
|
std::vector<StreamPelTrace> pel_arr;
|
||||||
std::vector<StreamConsumerTrace> cons_arr;
|
std::vector<StreamConsumerTrace> cons_arr;
|
||||||
};
|
};
|
||||||
|
@ -100,10 +102,10 @@ class RdbLoaderBase {
|
||||||
struct StreamTrace {
|
struct StreamTrace {
|
||||||
size_t lp_len;
|
size_t lp_len;
|
||||||
size_t stream_len;
|
size_t stream_len;
|
||||||
uint64_t ms, seq;
|
StreamID last_id;
|
||||||
StreamID first_id; /* The first non-tombstone entry, zero if empty. */
|
StreamID first_id; /* The first non-tombstone entry, zero if empty. */
|
||||||
StreamID max_deleted_entry_id; /* The maximal ID that was deleted. */
|
StreamID max_deleted_entry_id; /* The maximal ID that was deleted. */
|
||||||
uint64_t entries_added; /* All time count of elements added. */
|
uint64_t entries_added = 0; /* All time count of elements added. */
|
||||||
std::vector<StreamCGTrace> cgroup;
|
std::vector<StreamCGTrace> cgroup;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -192,6 +194,8 @@ class RdbLoaderBase {
|
||||||
|
|
||||||
std::error_code EnsureReadInternal(size_t min_to_read);
|
std::error_code EnsureReadInternal(size_t min_to_read);
|
||||||
|
|
||||||
|
static void CopyStreamId(const StreamID& src, struct streamID* dest);
|
||||||
|
|
||||||
base::IoBuf* mem_buf_ = nullptr;
|
base::IoBuf* mem_buf_ = nullptr;
|
||||||
base::IoBuf origin_mem_buf_;
|
base::IoBuf origin_mem_buf_;
|
||||||
::io::Source* src_ = nullptr;
|
::io::Source* src_ = nullptr;
|
||||||
|
|
|
@ -51,10 +51,16 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_
|
||||||
"set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot,"
|
"set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot,"
|
||||||
"set 3 for multi entry lz4 compression on df snapshot and single entry on rdb snapshot");
|
"set 3 for multi entry lz4 compression on df snapshot and single entry on rdb snapshot");
|
||||||
ABSL_FLAG(int, compression_level, 2, "The compression level to use on zstd/lz4 compression");
|
ABSL_FLAG(int, compression_level, 2, "The compression level to use on zstd/lz4 compression");
|
||||||
|
|
||||||
|
// TODO: to retire both flags in v1.27 (Jan 2025)
|
||||||
ABSL_FLAG(bool, list_rdb_encode_v2, true,
|
ABSL_FLAG(bool, list_rdb_encode_v2, true,
|
||||||
"V2 rdb encoding of list uses listpack encoding format, compatible with redis 7. V1 rdb "
|
"V2 rdb encoding of list uses listpack encoding format, compatible with redis 7. V1 rdb "
|
||||||
"enconding of list uses ziplist encoding compatible with redis 6");
|
"enconding of list uses ziplist encoding compatible with redis 6");
|
||||||
|
|
||||||
|
ABSL_FLAG(bool, stream_rdb_encode_v2, false,
|
||||||
|
"V2 uses format, compatible with redis 7.2 and Dragonfly v1.26+, while v1 format "
|
||||||
|
"is compatible with redis 6");
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
@ -209,12 +215,12 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case OBJ_STREAM:
|
case OBJ_STREAM:
|
||||||
return RDB_TYPE_STREAM_LISTPACKS;
|
return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS
|
||||||
|
: RDB_TYPE_STREAM_LISTPACKS_2;
|
||||||
case OBJ_MODULE:
|
case OBJ_MODULE:
|
||||||
return RDB_TYPE_MODULE_2;
|
return RDB_TYPE_MODULE_2;
|
||||||
case OBJ_JSON:
|
case OBJ_JSON:
|
||||||
return RDB_TYPE_JSON; // save with RDB_TYPE_JSON, deprecate RDB_TYPE_JSON_OLD after July
|
return RDB_TYPE_JSON;
|
||||||
// 2024.
|
|
||||||
case OBJ_SBF:
|
case OBJ_SBF:
|
||||||
return RDB_TYPE_SBF;
|
return RDB_TYPE_SBF;
|
||||||
}
|
}
|
||||||
|
@ -657,6 +663,22 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) {
|
||||||
RETURN_ON_ERR(SaveLen(s->last_id.ms));
|
RETURN_ON_ERR(SaveLen(s->last_id.ms));
|
||||||
RETURN_ON_ERR(SaveLen(s->last_id.seq));
|
RETURN_ON_ERR(SaveLen(s->last_id.seq));
|
||||||
|
|
||||||
|
uint8_t rdb_type = RdbObjectType(pv);
|
||||||
|
|
||||||
|
// 'first_id', 'max_deleted_entry_id' and 'entries_added' are added
|
||||||
|
// in RDB_TYPE_STREAM_LISTPACKS_2
|
||||||
|
if (rdb_type >= RDB_TYPE_STREAM_LISTPACKS_2) {
|
||||||
|
/* Save the first entry ID. */
|
||||||
|
RETURN_ON_ERR(SaveLen(s->first_id.ms));
|
||||||
|
RETURN_ON_ERR(SaveLen(s->first_id.seq));
|
||||||
|
|
||||||
|
/* Save the maximal tombstone ID. */
|
||||||
|
RETURN_ON_ERR(SaveLen(s->max_deleted_entry_id.ms));
|
||||||
|
RETURN_ON_ERR(SaveLen(s->max_deleted_entry_id.seq));
|
||||||
|
|
||||||
|
/* Save the offset. */
|
||||||
|
RETURN_ON_ERR(SaveLen(s->entries_added));
|
||||||
|
}
|
||||||
/* The consumer groups and their clients are part of the stream
|
/* The consumer groups and their clients are part of the stream
|
||||||
* type, so serialize every consumer group. */
|
* type, so serialize every consumer group. */
|
||||||
|
|
||||||
|
@ -678,9 +700,14 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) {
|
||||||
RETURN_ON_ERR(SaveString((uint8_t*)ri.key, ri.key_len));
|
RETURN_ON_ERR(SaveString((uint8_t*)ri.key, ri.key_len));
|
||||||
|
|
||||||
/* Last ID. */
|
/* Last ID. */
|
||||||
RETURN_ON_ERR(SaveLen(s->last_id.ms));
|
RETURN_ON_ERR(SaveLen(cg->last_id.ms));
|
||||||
|
|
||||||
RETURN_ON_ERR(SaveLen(s->last_id.seq));
|
RETURN_ON_ERR(SaveLen(cg->last_id.seq));
|
||||||
|
|
||||||
|
if (rdb_type >= RDB_TYPE_STREAM_LISTPACKS_2) {
|
||||||
|
/* Save the group's logical reads counter. */
|
||||||
|
RETURN_ON_ERR(SaveLen(cg->entries_read));
|
||||||
|
}
|
||||||
|
|
||||||
/* Save the global PEL. */
|
/* Save the global PEL. */
|
||||||
RETURN_ON_ERR(SaveStreamPEL(cg->pel, true));
|
RETURN_ON_ERR(SaveStreamPEL(cg->pel, true));
|
||||||
|
@ -836,10 +863,15 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
|
||||||
/* Consumer name. */
|
/* Consumer name. */
|
||||||
RETURN_ON_ERR(SaveString(ri.key, ri.key_len));
|
RETURN_ON_ERR(SaveString(ri.key, ri.key_len));
|
||||||
|
|
||||||
/* Last seen time. */
|
/* seen time. */
|
||||||
absl::little_endian::Store64(buf, consumer->seen_time);
|
absl::little_endian::Store64(buf, consumer->seen_time);
|
||||||
RETURN_ON_ERR(WriteRaw(buf));
|
RETURN_ON_ERR(WriteRaw(buf));
|
||||||
|
|
||||||
|
// TODO: enable this when we switch to RDB_TYPE_STREAM_LISTPACKS_3
|
||||||
|
/* Active time. */
|
||||||
|
// absl::little_endian::Store64(buf, consumer->active_time);
|
||||||
|
// RETURN_ON_ERR(WriteRaw(buf));
|
||||||
|
|
||||||
/* Consumer PEL, without the ACKs (see last parameter of the function
|
/* Consumer PEL, without the ACKs (see last parameter of the function
|
||||||
* passed with value of 0), at loading time we'll lookup the ID
|
* passed with value of 0), at loading time we'll lookup the ID
|
||||||
* in the consumer group global PEL and will put a reference in the
|
* in the consumer group global PEL and will put a reference in the
|
||||||
|
|
|
@ -36,6 +36,8 @@ ABSL_DECLARE_FLAG(dfly::CompressionMode, compression_mode);
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
|
||||||
|
static const auto kMatchNil = ArgType(RespExpr::NIL);
|
||||||
|
|
||||||
class RdbTest : public BaseFamilyTest {
|
class RdbTest : public BaseFamilyTest {
|
||||||
protected:
|
protected:
|
||||||
void SetUp();
|
void SetUp();
|
||||||
|
@ -136,13 +138,20 @@ TEST_F(RdbTest, Stream) {
|
||||||
|
|
||||||
auto resp = Run({"type", "key:10"});
|
auto resp = Run({"type", "key:10"});
|
||||||
EXPECT_EQ(resp, "stream");
|
EXPECT_EQ(resp, "stream");
|
||||||
|
|
||||||
resp = Run({"xinfo", "groups", "key:0"});
|
resp = Run({"xinfo", "groups", "key:0"});
|
||||||
EXPECT_THAT(resp, ArrLen(2));
|
EXPECT_THAT(resp, ArrLen(2));
|
||||||
|
EXPECT_THAT(resp.GetVec()[0],
|
||||||
|
RespElementsAre("name", "g1", "consumers", 0, "pending", 0, "last-delivered-id",
|
||||||
|
"1655444851524-3", "entries-read", 128, "lag", 0));
|
||||||
|
EXPECT_THAT(resp.GetVec()[1],
|
||||||
|
RespElementsAre("name", "g2", "consumers", 1, "pending", 0, "last-delivered-id",
|
||||||
|
"1655444851523-1", "entries-read", kMatchNil, "lag", kMatchNil));
|
||||||
|
|
||||||
resp = Run({"xinfo", "groups", "key:1"}); // test dereferences array of size 1
|
resp = Run({"xinfo", "groups", "key:1"}); // test dereferences array of size 1
|
||||||
EXPECT_THAT(resp, RespArray(ElementsAre("name", "g2", "consumers", IntArg(0), "pending",
|
EXPECT_THAT(resp, RespElementsAre("name", "g2", "consumers", IntArg(0), "pending", IntArg(0),
|
||||||
IntArg(0), "last-delivered-id", "1655444851523-1",
|
"last-delivered-id", "1655444851523-1", "entries-read",
|
||||||
"entries-read", IntArg(0), "lag", IntArg(0))));
|
kMatchNil, "lag", kMatchNil));
|
||||||
|
|
||||||
resp = Run({"xinfo", "groups", "key:2"});
|
resp = Run({"xinfo", "groups", "key:2"});
|
||||||
EXPECT_THAT(resp, ArrLen(0));
|
EXPECT_THAT(resp, ArrLen(0));
|
||||||
|
@ -629,14 +638,30 @@ TEST_F(RdbTest, LoadHugeList) {
|
||||||
// Tests loading a huge stream, where the stream is loaded in multiple partial
|
// Tests loading a huge stream, where the stream is loaded in multiple partial
|
||||||
// reads.
|
// reads.
|
||||||
TEST_F(RdbTest, LoadHugeStream) {
|
TEST_F(RdbTest, LoadHugeStream) {
|
||||||
|
TEST_current_time_ms = 1000;
|
||||||
|
|
||||||
// Add a huge stream (test:0) with 2000 entries, and 4 1k elements per entry
|
// Add a huge stream (test:0) with 2000 entries, and 4 1k elements per entry
|
||||||
// (note must be more than 512*4kb elements to test partial reads).
|
// (note must be more than 512*4kb elements to test partial reads).
|
||||||
for (int i = 0; i != 2000; i++) {
|
// We add 2000 entries to the stream to ensure that the stream, because populate strream
|
||||||
|
// adds only a single entry at a time, with multiple elements in it.
|
||||||
|
for (unsigned i = 0; i < 2000; i++) {
|
||||||
Run({"debug", "populate", "1", "test", "2000", "rand", "type", "stream", "elements", "4"});
|
Run({"debug", "populate", "1", "test", "2000", "rand", "type", "stream", "elements", "4"});
|
||||||
}
|
}
|
||||||
ASSERT_EQ(2000, CheckedInt({"xlen", "test:0"}));
|
ASSERT_EQ(2000, CheckedInt({"xlen", "test:0"}));
|
||||||
|
Run({"XGROUP", "CREATE", "test:0", "grp1", "0"});
|
||||||
|
Run({"XGROUP", "CREATE", "test:0", "grp2", "0"});
|
||||||
|
Run({"XREADGROUP", "GROUP", "grp1", "Alice", "COUNT", "1", "STREAMS", "test:0", ">"});
|
||||||
|
Run({"XREADGROUP", "GROUP", "grp2", "Alice", "COUNT", "1", "STREAMS", "test:0", ">"});
|
||||||
|
|
||||||
RespExpr resp = Run({"save", "df"});
|
auto resp = Run({"xinfo", "stream", "test:0"});
|
||||||
|
|
||||||
|
EXPECT_THAT(
|
||||||
|
resp, RespElementsAre("length", 2000, "radix-tree-keys", 2000, "radix-tree-nodes", 2010,
|
||||||
|
"last-generated-id", "1000-1999", "max-deleted-entry-id", "0-0",
|
||||||
|
"entries-added", 2000, "recorded-first-entry-id", "1000-0", "groups", 2,
|
||||||
|
"first-entry", ArrLen(2), "last-entry", ArrLen(2)));
|
||||||
|
|
||||||
|
resp = Run({"save", "df"});
|
||||||
ASSERT_EQ(resp, "OK");
|
ASSERT_EQ(resp, "OK");
|
||||||
|
|
||||||
auto save_info = service_->server_family().GetLastSaveInfo();
|
auto save_info = service_->server_family().GetLastSaveInfo();
|
||||||
|
@ -644,18 +669,29 @@ TEST_F(RdbTest, LoadHugeStream) {
|
||||||
ASSERT_EQ(resp, "OK");
|
ASSERT_EQ(resp, "OK");
|
||||||
|
|
||||||
ASSERT_EQ(2000, CheckedInt({"xlen", "test:0"}));
|
ASSERT_EQ(2000, CheckedInt({"xlen", "test:0"}));
|
||||||
|
resp = Run({"xinfo", "stream", "test:0"});
|
||||||
|
EXPECT_THAT(
|
||||||
|
resp, RespElementsAre("length", 2000, "radix-tree-keys", 2000, "radix-tree-nodes", 2010,
|
||||||
|
"last-generated-id", "1000-1999", "max-deleted-entry-id", "0-0",
|
||||||
|
"entries-added", 2000, "recorded-first-entry-id", "1000-0", "groups", 2,
|
||||||
|
"first-entry", ArrLen(2), "last-entry", ArrLen(2)));
|
||||||
|
resp = Run({"xinfo", "groups", "test:0"});
|
||||||
|
EXPECT_THAT(resp, RespElementsAre(RespElementsAre("name", "grp1", "consumers", 1, "pending", 1,
|
||||||
|
"last-delivered-id", "1000-0", "entries-read",
|
||||||
|
1, "lag", 1999),
|
||||||
|
_));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RdbTest, LoadStream2) {
|
TEST_F(RdbTest, LoadStream2) {
|
||||||
auto ec = LoadRdb("RDB_TYPE_STREAM_LISTPACKS_2.rdb");
|
auto ec = LoadRdb("RDB_TYPE_STREAM_LISTPACKS_2.rdb");
|
||||||
ASSERT_FALSE(ec) << ec.message();
|
ASSERT_FALSE(ec) << ec.message();
|
||||||
auto res = Run({"XINFO", "STREAM", "mystream"});
|
auto res = Run({"XINFO", "STREAM", "mystream"});
|
||||||
ASSERT_THAT(
|
ASSERT_THAT(res.GetVec(),
|
||||||
res.GetVec(),
|
ElementsAre("length", 2, "radix-tree-keys", 1, "radix-tree-nodes", 2,
|
||||||
ElementsAre("length", IntArg(2), "radix-tree-keys", IntArg(1), "radix-tree-nodes", IntArg(2),
|
"last-generated-id", "1732613360686-0", "max-deleted-entry-id", "0-0",
|
||||||
"last-generated-id", "1732613360686-0", "max-deleted-entry-id", "0-0",
|
"entries-added", 2, "recorded-first-entry-id", "1732613352350-0",
|
||||||
"entries-added", IntArg(0), "recorded-first-entry-id", "0-0", "groups", IntArg(1),
|
"groups", 1, "first-entry", RespElementsAre("1732613352350-0", _),
|
||||||
"first-entry", ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY)));
|
"last-entry", RespElementsAre("1732613360686-0", _)));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RdbTest, LoadStream3) {
|
TEST_F(RdbTest, LoadStream3) {
|
||||||
|
@ -664,10 +700,10 @@ TEST_F(RdbTest, LoadStream3) {
|
||||||
auto res = Run({"XINFO", "STREAM", "mystream"});
|
auto res = Run({"XINFO", "STREAM", "mystream"});
|
||||||
ASSERT_THAT(
|
ASSERT_THAT(
|
||||||
res.GetVec(),
|
res.GetVec(),
|
||||||
ElementsAre("length", IntArg(2), "radix-tree-keys", IntArg(1), "radix-tree-nodes", IntArg(2),
|
ElementsAre("length", 2, "radix-tree-keys", 1, "radix-tree-nodes", 2, "last-generated-id",
|
||||||
"last-generated-id", "1732614679549-0", "max-deleted-entry-id", "0-0",
|
"1732614679549-0", "max-deleted-entry-id", "0-0", "entries-added", 2,
|
||||||
"entries-added", IntArg(0), "recorded-first-entry-id", "0-0", "groups", IntArg(1),
|
"recorded-first-entry-id", "1732614676541-0", "groups", 1, "first-entry",
|
||||||
"first-entry", ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY)));
|
ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY)));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RdbTest, SnapshotTooBig) {
|
TEST_F(RdbTest, SnapshotTooBig) {
|
||||||
|
|
Loading…
Reference in a new issue