mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
feat(server): use listpack node encoding for list (#3914)
Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
c868b27bbe
commit
a1830e1b5e
7 changed files with 75 additions and 24 deletions
|
@ -24,6 +24,8 @@ using namespace util;
|
|||
using namespace boost;
|
||||
using absl::StrCat;
|
||||
|
||||
ABSL_DECLARE_FLAG(bool, list_rdb_encode_v2);
|
||||
|
||||
namespace dfly {
|
||||
|
||||
class GenericFamilyTest : public BaseFamilyTest {};
|
||||
|
@ -564,6 +566,7 @@ TEST_F(GenericFamilyTest, Persist) {
|
|||
|
||||
TEST_F(GenericFamilyTest, Dump) {
|
||||
ASSERT_THAT(RDB_SER_VERSION, 9);
|
||||
absl::SetFlag(&FLAGS_list_rdb_encode_v2, false);
|
||||
uint8_t EXPECTED_STRING_DUMP[13] = {0x00, 0xc0, 0x13, 0x09, 0x00, 0x23, 0x13,
|
||||
0x6f, 0x4d, 0x68, 0xf6, 0x35, 0x6e};
|
||||
uint8_t EXPECTED_HASH_DUMP[] = {0x0d, 0x12, 0x12, 0x00, 0x00, 0x00, 0x0d, 0x00, 0x00, 0x00,
|
||||
|
|
|
@ -379,6 +379,15 @@ TEST_F(ListFamilyTest, LRem) {
|
|||
ASSERT_THAT(Run({"lrem", kKey2, "1", val}), IntArg(1));
|
||||
}
|
||||
|
||||
TEST_F(ListFamilyTest, DumpRestorePlain) {
|
||||
const string kValue(10'000, '#');
|
||||
EXPECT_EQ(CheckedInt({"LPUSH", kKey1, kValue}), 1);
|
||||
auto buffer = Run({"DUMP", kKey1}).GetBuf();
|
||||
EXPECT_EQ(Run({"RESTORE", kKey2, "0", ToSV(buffer)}), "OK");
|
||||
EXPECT_EQ(CheckedInt({"LLEN", kKey2}), 1);
|
||||
EXPECT_EQ(Run({"LRANGE", kKey2, "0", "1"}), kValue);
|
||||
}
|
||||
|
||||
TEST_F(ListFamilyTest, LTrim) {
|
||||
Run({"rpush", kKey1, "a", "b", "c", "d"});
|
||||
ASSERT_EQ(Run({"ltrim", kKey1, "-2", "-1"}), "OK");
|
||||
|
|
|
@ -733,13 +733,14 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
|
|||
if (ec_)
|
||||
return false;
|
||||
|
||||
uint8_t* lp = nullptr;
|
||||
if (container == QUICKLIST_NODE_CONTAINER_PLAIN) {
|
||||
quicklistAppendPlainNode(ql, (uint8_t*)sv.data(), sv.size());
|
||||
lp = (uint8_t*)zmalloc(sv.size());
|
||||
::memcpy(lp, (uint8_t*)sv.data(), sv.size());
|
||||
quicklistAppendPlainNode(ql, lp, sv.size());
|
||||
return true;
|
||||
}
|
||||
|
||||
uint8_t* lp = nullptr;
|
||||
|
||||
if (rdb_type_ == RDB_TYPE_LIST_QUICKLIST_2) {
|
||||
uint8_t* src = (uint8_t*)sv.data();
|
||||
if (!lpValidateIntegrity(src, sv.size(), 0, nullptr, nullptr)) {
|
||||
|
|
|
@ -49,6 +49,9 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_
|
|||
"set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot,"
|
||||
"set 3 for multi entry lz4 compression on df snapshot and single entry on rdb snapshot");
|
||||
ABSL_FLAG(int, compression_level, 2, "The compression level to use on zstd/lz4 compression");
|
||||
ABSL_FLAG(bool, list_rdb_encode_v2, true,
|
||||
"V2 rdb encoding of list uses listpack encoding format, compatible with redis 7. V1 rdb "
|
||||
"enconding of list uses ziplist encoding compatible with redis 6");
|
||||
|
||||
namespace dfly {
|
||||
|
||||
|
@ -164,8 +167,12 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
|
|||
case OBJ_STRING:
|
||||
return RDB_TYPE_STRING;
|
||||
case OBJ_LIST:
|
||||
if (compact_enc == OBJ_ENCODING_QUICKLIST)
|
||||
if (compact_enc == OBJ_ENCODING_QUICKLIST) {
|
||||
if (absl::GetFlag(FLAGS_list_rdb_encode_v2))
|
||||
return RDB_TYPE_LIST_QUICKLIST_2;
|
||||
return RDB_TYPE_LIST_QUICKLIST;
|
||||
}
|
||||
|
||||
break;
|
||||
case OBJ_SET:
|
||||
if (compact_enc == kEncodingIntSet)
|
||||
|
@ -439,7 +446,9 @@ error_code RdbSerializer::SaveListObject(const PrimeValue& pv) {
|
|||
DVLOG(3) << "QL node (encoding/container/sz): " << node->encoding << "/" << node->container
|
||||
<< "/" << node->sz;
|
||||
|
||||
if (QL_NODE_IS_PLAIN(node)) {
|
||||
if (absl::GetFlag(FLAGS_list_rdb_encode_v2)) {
|
||||
// Use listpack encoding
|
||||
SaveLen(node->container);
|
||||
if (quicklistNodeIsCompressed(node)) {
|
||||
void* data;
|
||||
size_t compress_len = quicklistGetLzf(node, &data);
|
||||
|
@ -453,7 +462,11 @@ error_code RdbSerializer::SaveListObject(const PrimeValue& pv) {
|
|||
FlushIfNeeded(flush_state);
|
||||
}
|
||||
} else {
|
||||
// listpack
|
||||
// Use ziplist encoding
|
||||
if (QL_NODE_IS_PLAIN(node)) {
|
||||
RETURN_ON_ERR(SavePlainNodeAsZiplist(node));
|
||||
} else {
|
||||
// listpack node
|
||||
uint8_t* lp = node->entry;
|
||||
uint8_t* decompressed = NULL;
|
||||
|
||||
|
@ -476,6 +489,7 @@ error_code RdbSerializer::SaveListObject(const PrimeValue& pv) {
|
|||
});
|
||||
RETURN_ON_ERR(SaveListPackAsZiplist(lp));
|
||||
}
|
||||
}
|
||||
node = node->next;
|
||||
}
|
||||
return error_code{};
|
||||
|
@ -744,6 +758,17 @@ error_code RdbSerializer::SaveListPackAsZiplist(uint8_t* lp) {
|
|||
return ec;
|
||||
}
|
||||
|
||||
error_code RdbSerializer::SavePlainNodeAsZiplist(quicklistNode* node) {
|
||||
uint8_t* zl = ziplistNew();
|
||||
zl = ziplistPush(zl, node->entry, node->sz, ZIPLIST_TAIL);
|
||||
|
||||
size_t ziplen = ziplistBlobLen(zl);
|
||||
error_code ec = SaveString(string_view{reinterpret_cast<char*>(zl), ziplen});
|
||||
zfree(zl);
|
||||
|
||||
return ec;
|
||||
}
|
||||
|
||||
error_code RdbSerializer::SaveStreamPEL(rax* pel, bool nacks) {
|
||||
/* Number of entries in the PEL. */
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
extern "C" {
|
||||
#include "redis/lzfP.h"
|
||||
#include "redis/quicklist.h"
|
||||
}
|
||||
|
||||
#include <optional>
|
||||
|
@ -21,6 +22,7 @@ extern "C" {
|
|||
|
||||
typedef struct rax rax;
|
||||
typedef struct streamCG streamCG;
|
||||
typedef struct quicklistNode quicklistNode;
|
||||
|
||||
namespace dfly {
|
||||
|
||||
|
@ -247,6 +249,7 @@ class RdbSerializer : public SerializerBase {
|
|||
std::error_code SaveListPackAsZiplist(uint8_t* lp);
|
||||
std::error_code SaveStreamPEL(rax* pel, bool nacks);
|
||||
std::error_code SaveStreamConsumers(streamCG* cg);
|
||||
std::error_code SavePlainNodeAsZiplist(quicklistNode* node);
|
||||
|
||||
// Might preempt
|
||||
void FlushIfNeeded(FlushState flush_state);
|
||||
|
|
|
@ -860,6 +860,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
|
|||
config_registry.RegisterMutable("tls_ca_cert_dir");
|
||||
config_registry.RegisterMutable("replica_priority");
|
||||
config_registry.RegisterMutable("lua_undeclared_keys_shas");
|
||||
config_registry.RegisterMutable("list_rdb_encode_v2");
|
||||
|
||||
pb_task_ = shard_set->pool()->GetNextProactor();
|
||||
if (pb_task_->GetKind() == ProactorBase::EPOLL) {
|
||||
|
|
|
@ -150,7 +150,14 @@ async def test_dbfilenames(
|
|||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@dfly_args({**BASIC_ARGS, "proactor_threads": 4, "dbfilename": "test-redis-load-rdb"})
|
||||
@dfly_args(
|
||||
{
|
||||
**BASIC_ARGS,
|
||||
"proactor_threads": 4,
|
||||
"dbfilename": "test-redis-load-rdb",
|
||||
"list_rdb_encode_v2": "false", # Needed for compatibility with Redis 6
|
||||
}
|
||||
)
|
||||
async def test_redis_load_snapshot(
|
||||
async_client: aioredis.Redis, df_server, redis_local_server: RedisServer, tmp_dir: Path
|
||||
):
|
||||
|
@ -161,6 +168,8 @@ async def test_redis_load_snapshot(
|
|||
**LIGHTWEIGHT_SEEDER_ARGS, types=["STRING", "LIST", "SET", "HASH", "ZSET"]
|
||||
).run(async_client)
|
||||
|
||||
await async_client.lpush("list", "A" * 10_000)
|
||||
|
||||
await async_client.execute_command("SAVE", "rdb")
|
||||
dbsize = await async_client.dbsize()
|
||||
|
||||
|
|
Loading…
Reference in a new issue