From a1830e1b5e69472ebdc3ab789d2ace86c6d3d6a6 Mon Sep 17 00:00:00 2001 From: adiholden Date: Tue, 15 Oct 2024 13:55:26 +0300 Subject: [PATCH] feat(server): use listpack node encoding for list (#3914) Signed-off-by: adi_holden --- src/server/generic_family_test.cc | 3 ++ src/server/list_family_test.cc | 9 +++++ src/server/rdb_load.cc | 7 ++-- src/server/rdb_save.cc | 65 +++++++++++++++++++++---------- src/server/rdb_save.h | 3 ++ src/server/server_family.cc | 1 + tests/dragonfly/snapshot_test.py | 11 +++++- 7 files changed, 75 insertions(+), 24 deletions(-) diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 7e877dc53..487a16061 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -24,6 +24,8 @@ using namespace util; using namespace boost; using absl::StrCat; +ABSL_DECLARE_FLAG(bool, list_rdb_encode_v2); + namespace dfly { class GenericFamilyTest : public BaseFamilyTest {}; @@ -564,6 +566,7 @@ TEST_F(GenericFamilyTest, Persist) { TEST_F(GenericFamilyTest, Dump) { ASSERT_THAT(RDB_SER_VERSION, 9); + absl::SetFlag(&FLAGS_list_rdb_encode_v2, false); uint8_t EXPECTED_STRING_DUMP[13] = {0x00, 0xc0, 0x13, 0x09, 0x00, 0x23, 0x13, 0x6f, 0x4d, 0x68, 0xf6, 0x35, 0x6e}; uint8_t EXPECTED_HASH_DUMP[] = {0x0d, 0x12, 0x12, 0x00, 0x00, 0x00, 0x0d, 0x00, 0x00, 0x00, diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 6abecf6a5..b9ef3ab71 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -379,6 +379,15 @@ TEST_F(ListFamilyTest, LRem) { ASSERT_THAT(Run({"lrem", kKey2, "1", val}), IntArg(1)); } +TEST_F(ListFamilyTest, DumpRestorePlain) { + const string kValue(10'000, '#'); + EXPECT_EQ(CheckedInt({"LPUSH", kKey1, kValue}), 1); + auto buffer = Run({"DUMP", kKey1}).GetBuf(); + EXPECT_EQ(Run({"RESTORE", kKey2, "0", ToSV(buffer)}), "OK"); + EXPECT_EQ(CheckedInt({"LLEN", kKey2}), 1); + EXPECT_EQ(Run({"LRANGE", kKey2, "0", "1"}), kValue); +} + TEST_F(ListFamilyTest, LTrim) { Run({"rpush", kKey1, "a", "b", "c", "d"}); ASSERT_EQ(Run({"ltrim", kKey1, "-2", "-1"}), "OK"); diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 9d0558ad7..5f4b1862b 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -733,13 +733,14 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) { if (ec_) return false; + uint8_t* lp = nullptr; if (container == QUICKLIST_NODE_CONTAINER_PLAIN) { - quicklistAppendPlainNode(ql, (uint8_t*)sv.data(), sv.size()); + lp = (uint8_t*)zmalloc(sv.size()); + ::memcpy(lp, (uint8_t*)sv.data(), sv.size()); + quicklistAppendPlainNode(ql, lp, sv.size()); return true; } - uint8_t* lp = nullptr; - if (rdb_type_ == RDB_TYPE_LIST_QUICKLIST_2) { uint8_t* src = (uint8_t*)sv.data(); if (!lpValidateIntegrity(src, sv.size(), 0, nullptr, nullptr)) { diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 5885cd4e8..8718020b5 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -49,6 +49,9 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_ "set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot," "set 3 for multi entry lz4 compression on df snapshot and single entry on rdb snapshot"); ABSL_FLAG(int, compression_level, 2, "The compression level to use on zstd/lz4 compression"); +ABSL_FLAG(bool, list_rdb_encode_v2, true, + "V2 rdb encoding of list uses listpack encoding format, compatible with redis 7. V1 rdb " + "enconding of list uses ziplist encoding compatible with redis 6"); namespace dfly { @@ -164,8 +167,12 @@ uint8_t RdbObjectType(const PrimeValue& pv) { case OBJ_STRING: return RDB_TYPE_STRING; case OBJ_LIST: - if (compact_enc == OBJ_ENCODING_QUICKLIST) + if (compact_enc == OBJ_ENCODING_QUICKLIST) { + if (absl::GetFlag(FLAGS_list_rdb_encode_v2)) + return RDB_TYPE_LIST_QUICKLIST_2; return RDB_TYPE_LIST_QUICKLIST; + } + break; case OBJ_SET: if (compact_enc == kEncodingIntSet) @@ -439,7 +446,9 @@ error_code RdbSerializer::SaveListObject(const PrimeValue& pv) { DVLOG(3) << "QL node (encoding/container/sz): " << node->encoding << "/" << node->container << "/" << node->sz; - if (QL_NODE_IS_PLAIN(node)) { + if (absl::GetFlag(FLAGS_list_rdb_encode_v2)) { + // Use listpack encoding + SaveLen(node->container); if (quicklistNodeIsCompressed(node)) { void* data; size_t compress_len = quicklistGetLzf(node, &data); @@ -453,28 +462,33 @@ error_code RdbSerializer::SaveListObject(const PrimeValue& pv) { FlushIfNeeded(flush_state); } } else { - // listpack - uint8_t* lp = node->entry; - uint8_t* decompressed = NULL; + // Use ziplist encoding + if (QL_NODE_IS_PLAIN(node)) { + RETURN_ON_ERR(SavePlainNodeAsZiplist(node)); + } else { + // listpack node + uint8_t* lp = node->entry; + uint8_t* decompressed = NULL; - if (quicklistNodeIsCompressed(node)) { - void* data; - size_t compress_len = quicklistGetLzf(node, &data); - decompressed = (uint8_t*)zmalloc(node->sz); + if (quicklistNodeIsCompressed(node)) { + void* data; + size_t compress_len = quicklistGetLzf(node, &data); + decompressed = (uint8_t*)zmalloc(node->sz); - if (lzf_decompress(data, compress_len, decompressed, node->sz) == 0) { - /* Someone requested decompress, but we can't decompress. Not good. */ - zfree(decompressed); - return make_error_code(errc::illegal_byte_sequence); + if (lzf_decompress(data, compress_len, decompressed, node->sz) == 0) { + /* Someone requested decompress, but we can't decompress. Not good. */ + zfree(decompressed); + return make_error_code(errc::illegal_byte_sequence); + } + lp = decompressed; } - lp = decompressed; - } - auto cleanup = absl::MakeCleanup([=] { - if (decompressed) - zfree(decompressed); - }); - RETURN_ON_ERR(SaveListPackAsZiplist(lp)); + auto cleanup = absl::MakeCleanup([=] { + if (decompressed) + zfree(decompressed); + }); + RETURN_ON_ERR(SaveListPackAsZiplist(lp)); + } } node = node->next; } @@ -744,6 +758,17 @@ error_code RdbSerializer::SaveListPackAsZiplist(uint8_t* lp) { return ec; } +error_code RdbSerializer::SavePlainNodeAsZiplist(quicklistNode* node) { + uint8_t* zl = ziplistNew(); + zl = ziplistPush(zl, node->entry, node->sz, ZIPLIST_TAIL); + + size_t ziplen = ziplistBlobLen(zl); + error_code ec = SaveString(string_view{reinterpret_cast(zl), ziplen}); + zfree(zl); + + return ec; +} + error_code RdbSerializer::SaveStreamPEL(rax* pel, bool nacks) { /* Number of entries in the PEL. */ diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 4a5456b9b..90ca435c7 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -7,6 +7,7 @@ extern "C" { #include "redis/lzfP.h" +#include "redis/quicklist.h" } #include @@ -21,6 +22,7 @@ extern "C" { typedef struct rax rax; typedef struct streamCG streamCG; +typedef struct quicklistNode quicklistNode; namespace dfly { @@ -247,6 +249,7 @@ class RdbSerializer : public SerializerBase { std::error_code SaveListPackAsZiplist(uint8_t* lp); std::error_code SaveStreamPEL(rax* pel, bool nacks); std::error_code SaveStreamConsumers(streamCG* cg); + std::error_code SavePlainNodeAsZiplist(quicklistNode* node); // Might preempt void FlushIfNeeded(FlushState flush_state); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 4ab33c331..4c9e5a1b2 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -860,6 +860,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vectorpool()->GetNextProactor(); if (pb_task_->GetKind() == ProactorBase::EPOLL) { diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index e729299ab..e7062f48d 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -150,7 +150,14 @@ async def test_dbfilenames( @pytest.mark.asyncio -@dfly_args({**BASIC_ARGS, "proactor_threads": 4, "dbfilename": "test-redis-load-rdb"}) +@dfly_args( + { + **BASIC_ARGS, + "proactor_threads": 4, + "dbfilename": "test-redis-load-rdb", + "list_rdb_encode_v2": "false", # Needed for compatibility with Redis 6 + } +) async def test_redis_load_snapshot( async_client: aioredis.Redis, df_server, redis_local_server: RedisServer, tmp_dir: Path ): @@ -161,6 +168,8 @@ async def test_redis_load_snapshot( **LIGHTWEIGHT_SEEDER_ARGS, types=["STRING", "LIST", "SET", "HASH", "ZSET"] ).run(async_client) + await async_client.lpush("list", "A" * 10_000) + await async_client.execute_command("SAVE", "rdb") dbsize = await async_client.dbsize()