mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
refactor(rdb): Expose default compression mode without direct flag (#2360)
* refactor(rdb): Expose default compression mode without direct flag * fixes
This commit is contained in:
parent
5b905452b3
commit
d9886024d3
4 changed files with 64 additions and 28 deletions
|
@ -31,7 +31,6 @@ extern "C" {
|
|||
|
||||
ABSL_FLAG(uint32_t, dbnum, 16, "Number of databases");
|
||||
ABSL_FLAG(uint32_t, keys_output_limit, 8192, "Maximum number of keys output by keys command");
|
||||
ABSL_DECLARE_FLAG(int, compression_mode);
|
||||
|
||||
namespace dfly {
|
||||
using namespace std;
|
||||
|
@ -450,10 +449,11 @@ OpResult<std::string> OpDump(const OpArgs& op_args, string_view key) {
|
|||
if (IsValid(it)) {
|
||||
DVLOG(1) << "Dump: key '" << key << "' successfully found, going to dump it";
|
||||
io::StringSink sink;
|
||||
int compression_mode = absl::GetFlag(FLAGS_compression_mode);
|
||||
CompressionMode serializer_compression_mode =
|
||||
compression_mode == 0 ? CompressionMode::NONE : CompressionMode::SINGLE_ENTRY;
|
||||
RdbSerializer serializer(serializer_compression_mode);
|
||||
CompressionMode compression_mode = GetDefaultCompressionMode();
|
||||
if (compression_mode != CompressionMode::NONE) {
|
||||
compression_mode = CompressionMode::SINGLE_ENTRY;
|
||||
}
|
||||
RdbSerializer serializer(compression_mode);
|
||||
|
||||
// According to Redis code we need to
|
||||
// 1. Save the value itself - without the key
|
||||
|
|
|
@ -39,7 +39,7 @@ extern "C" {
|
|||
#include "server/snapshot.h"
|
||||
#include "util/fibers/simple_channel.h"
|
||||
|
||||
ABSL_FLAG(int, compression_mode, 3,
|
||||
ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_ENTRY_LZ4,
|
||||
"set 0 for no compression,"
|
||||
"set 1 for single entry lzf compression,"
|
||||
"set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot,"
|
||||
|
@ -109,6 +109,47 @@ constexpr size_t kAmask = 4_KB - 1;
|
|||
|
||||
} // namespace
|
||||
|
||||
bool AbslParseFlag(std::string_view in, dfly::CompressionMode* flag, std::string* err) {
|
||||
if (in == "0" || in == "NONE") {
|
||||
*flag = dfly::CompressionMode::NONE;
|
||||
return true;
|
||||
}
|
||||
if (in == "1" || in == "SINGLE_ENTRY") {
|
||||
*flag = dfly::CompressionMode::SINGLE_ENTRY;
|
||||
return true;
|
||||
}
|
||||
if (in == "2" || in == "MULTI_ENTRY_ZSTD") {
|
||||
*flag = dfly::CompressionMode::MULTI_ENTRY_ZSTD;
|
||||
return true;
|
||||
}
|
||||
if (in == "3" || in == "MULTI_ENTRY_LZ4") {
|
||||
*flag = dfly::CompressionMode::MULTI_ENTRY_LZ4;
|
||||
return true;
|
||||
}
|
||||
|
||||
*err = absl::StrCat("Unknown value ", in, " for compression_mode flag");
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string AbslUnparseFlag(dfly::CompressionMode flag) {
|
||||
switch (flag) {
|
||||
case dfly::CompressionMode::NONE:
|
||||
return "NONE";
|
||||
case dfly::CompressionMode::SINGLE_ENTRY:
|
||||
return "SINGLE_ENTRY";
|
||||
case dfly::CompressionMode::MULTI_ENTRY_ZSTD:
|
||||
return "MULTI_ENTRY_ZSTD";
|
||||
case dfly::CompressionMode::MULTI_ENTRY_LZ4:
|
||||
return "MULTI_ENTRY_LZ4";
|
||||
}
|
||||
DCHECK(false) << "Unknown compression_mode flag value " << int(flag);
|
||||
return "NONE";
|
||||
}
|
||||
|
||||
dfly::CompressionMode GetDefaultCompressionMode() {
|
||||
return absl::GetFlag(FLAGS_compression_mode);
|
||||
}
|
||||
|
||||
uint8_t RdbObjectType(const PrimeValue& pv) {
|
||||
unsigned type = pv.ObjType();
|
||||
unsigned compact_enc = pv.Encoding();
|
||||
|
@ -761,8 +802,8 @@ io::Bytes RdbSerializer::PrepareFlush() {
|
|||
if (sz == 0)
|
||||
return mem_buf_.InputBuffer();
|
||||
|
||||
if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD ||
|
||||
compression_mode_ == CompressionMode::MULTY_ENTRY_LZ4) {
|
||||
if (compression_mode_ == CompressionMode::MULTI_ENTRY_ZSTD ||
|
||||
compression_mode_ == CompressionMode::MULTI_ENTRY_LZ4) {
|
||||
CompressBlob();
|
||||
}
|
||||
|
||||
|
@ -1208,12 +1249,12 @@ unique_ptr<SliceSnapshot>& RdbSaver::Impl::GetSnapshot(EngineShard* shard) {
|
|||
|
||||
RdbSaver::RdbSaver(::io::Sink* sink, SaveMode save_mode, bool align_writes) {
|
||||
CHECK_NOTNULL(sink);
|
||||
int compression_mode = absl::GetFlag(FLAGS_compression_mode);
|
||||
CompressionMode compression_mode = GetDefaultCompressionMode();
|
||||
int producer_count = 0;
|
||||
switch (save_mode) {
|
||||
case SaveMode::SUMMARY:
|
||||
producer_count = 0;
|
||||
if (compression_mode >= 1) {
|
||||
if (compression_mode >= CompressionMode::SINGLE_ENTRY) {
|
||||
compression_mode_ = CompressionMode::SINGLE_ENTRY;
|
||||
} else {
|
||||
compression_mode_ = CompressionMode::NONE;
|
||||
|
@ -1222,19 +1263,11 @@ RdbSaver::RdbSaver(::io::Sink* sink, SaveMode save_mode, bool align_writes) {
|
|||
case SaveMode::SINGLE_SHARD:
|
||||
case SaveMode::SINGLE_SHARD_WITH_SUMMARY:
|
||||
producer_count = 1;
|
||||
if (compression_mode == 3) {
|
||||
compression_mode_ = CompressionMode::MULTY_ENTRY_LZ4;
|
||||
} else if (compression_mode == 2) {
|
||||
compression_mode_ = CompressionMode::MULTY_ENTRY_ZSTD;
|
||||
} else if (compression_mode == 1) {
|
||||
compression_mode_ = CompressionMode::SINGLE_ENTRY;
|
||||
} else {
|
||||
compression_mode_ = CompressionMode::NONE;
|
||||
}
|
||||
compression_mode_ = compression_mode;
|
||||
break;
|
||||
case SaveMode::RDB:
|
||||
producer_count = shard_set->size();
|
||||
if (compression_mode >= 1) {
|
||||
if (compression_mode >= CompressionMode::SINGLE_ENTRY) {
|
||||
compression_mode_ = CompressionMode::SINGLE_ENTRY;
|
||||
} else {
|
||||
compression_mode_ = CompressionMode::NONE;
|
||||
|
@ -1374,9 +1407,9 @@ void RdbSerializer::AllocateCompressorOnce() {
|
|||
if (compressor_impl_) {
|
||||
return;
|
||||
}
|
||||
if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD) {
|
||||
if (compression_mode_ == CompressionMode::MULTI_ENTRY_ZSTD) {
|
||||
compressor_impl_.reset(new ZstdCompressor());
|
||||
} else if (compression_mode_ == CompressionMode::MULTY_ENTRY_LZ4) {
|
||||
} else if (compression_mode_ == CompressionMode::MULTI_ENTRY_LZ4) {
|
||||
compressor_impl_.reset(new Lz4Compressor());
|
||||
} else {
|
||||
CHECK(false) << "Compressor allocation should not be done";
|
||||
|
@ -1413,7 +1446,7 @@ void RdbSerializer::CompressBlob() {
|
|||
|
||||
// First write opcode for compressed string
|
||||
auto dest = mem_buf_.AppendBuffer();
|
||||
uint8_t opcode = compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD
|
||||
uint8_t opcode = compression_mode_ == CompressionMode::MULTI_ENTRY_ZSTD
|
||||
? RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START
|
||||
: RDB_OPCODE_COMPRESSED_LZ4_BLOB_START;
|
||||
dest[0] = opcode;
|
||||
|
|
|
@ -66,7 +66,8 @@ enum class SaveMode {
|
|||
RDB, // Save .rdb file. Expected to read all shards.
|
||||
};
|
||||
|
||||
enum class CompressionMode { NONE, SINGLE_ENTRY, MULTY_ENTRY_ZSTD, MULTY_ENTRY_LZ4 };
|
||||
enum class CompressionMode { NONE, SINGLE_ENTRY, MULTI_ENTRY_ZSTD, MULTI_ENTRY_LZ4 };
|
||||
CompressionMode GetDefaultCompressionMode();
|
||||
|
||||
class RdbSaver {
|
||||
public:
|
||||
|
|
|
@ -20,6 +20,7 @@ extern "C" {
|
|||
#include "io/file.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/rdb_load.h"
|
||||
#include "server/rdb_save.h"
|
||||
#include "server/test_utils.h"
|
||||
|
||||
using namespace testing;
|
||||
|
@ -31,7 +32,7 @@ using absl::StrCat;
|
|||
|
||||
ABSL_DECLARE_FLAG(int32, list_compress_depth);
|
||||
ABSL_DECLARE_FLAG(int32, list_max_listpack_size);
|
||||
ABSL_DECLARE_FLAG(int, compression_mode);
|
||||
ABSL_DECLARE_FLAG(dfly::CompressionMode, compression_mode);
|
||||
|
||||
namespace dfly {
|
||||
|
||||
|
@ -158,8 +159,9 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) {
|
|||
auto resp = Run({"keys", "key:[5-9][0-9][0-9][0-9][0-9]*"});
|
||||
EXPECT_EQ(resp.GetVec().size(), 0);
|
||||
|
||||
for (int i = 0; i <= 3; ++i) {
|
||||
SetFlag(&FLAGS_compression_mode, i);
|
||||
for (auto mode : {CompressionMode::NONE, CompressionMode::SINGLE_ENTRY,
|
||||
CompressionMode::MULTI_ENTRY_ZSTD, CompressionMode::MULTI_ENTRY_LZ4}) {
|
||||
SetFlag(&FLAGS_compression_mode, mode);
|
||||
RespExpr resp = Run({"save", "df"});
|
||||
ASSERT_EQ(resp, "OK");
|
||||
|
||||
|
@ -171,7 +173,7 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) {
|
|||
}
|
||||
|
||||
TEST_F(RdbTest, RdbLoaderOnReadCompressedDataShouldNotEnterEnsureReadFlow) {
|
||||
SetFlag(&FLAGS_compression_mode, 2);
|
||||
SetFlag(&FLAGS_compression_mode, CompressionMode::MULTI_ENTRY_ZSTD);
|
||||
for (int i = 0; i < 1000; ++i) {
|
||||
Run({"set", StrCat(i), "1"});
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue