1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Merge branch 'main' into chakaz/huge-values-migration-3

This commit is contained in:
shahar 2024-12-11 09:31:08 +02:00
commit be98628ecd
No known key found for this signature in database
GPG key ID: 2ED7F7EDC7FDCCA4
33 changed files with 524 additions and 478 deletions

2
helio

@ -1 +1 @@
Subproject commit ff9b6cd35bf082a9d48cf0904b0e8557cf31b6d2 Subproject commit 32e8c4ec830e8d4224d8a13a11c1607f357da80f

View file

@ -305,9 +305,9 @@ pair<void*, bool> DefragSet(unsigned encoding, void* ptr, float ratio) {
return DefragIntSet((intset*)ptr, ratio); return DefragIntSet((intset*)ptr, ratio);
} }
// StringMap supports re-allocation of it's internal nodes
case kEncodingStrMap2: { case kEncodingStrMap2: {
return DefragStrMap2((StringMap*)ptr, ratio); // Still not implemented
return {ptr, false};
} }
default: default:

View file

@ -16,6 +16,7 @@
#include "core/detail/bitpacking.h" #include "core/detail/bitpacking.h"
#include "core/flat_set.h" #include "core/flat_set.h"
#include "core/mi_memory_resource.h" #include "core/mi_memory_resource.h"
#include "core/string_set.h"
extern "C" { extern "C" {
#include "redis/intset.h" #include "redis/intset.h"
@ -575,6 +576,14 @@ TEST_F(CompactObjectTest, DefragHash) {
} }
} }
TEST_F(CompactObjectTest, DefragSet) {
// This is still not implemented
StringSet* s = new StringSet();
s->Add("str");
cobj_.InitRobj(OBJ_SET, kEncodingStrMap2, s);
ASSERT_FALSE(cobj_.DefragIfNeeded(0.8));
}
TEST_F(CompactObjectTest, RawInterface) { TEST_F(CompactObjectTest, RawInterface) {
string str(50, 'a'), tmp, owned; string str(50, 'a'), tmp, owned;
cobj_.SetString(str); cobj_.SetString(str);

View file

@ -91,6 +91,10 @@ inline bool operator==(const RespExpr& left, std::string_view s) {
return left.type == RespExpr::STRING && ToSV(left.GetBuf()) == s; return left.type == RespExpr::STRING && ToSV(left.GetBuf()) == s;
} }
inline bool operator==(const RespExpr& left, int64_t val) {
return left.type == RespExpr::INT64 && left.GetInt() == val;
}
inline bool operator!=(const RespExpr& left, std::string_view s) { inline bool operator!=(const RespExpr& left, std::string_view s) {
return !(left == s); return !(left == s);
} }

View file

@ -11,6 +11,7 @@
namespace facade { namespace facade {
using namespace std; using namespace std;
constexpr static long kMaxBulkLen = 256 * (1ul << 20); // 256MB.
auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> Result { auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> Result {
DCHECK(!str.empty()); DCHECK(!str.empty());
@ -218,7 +219,11 @@ auto RedisParser::ParseLen(Buffer str, int64_t* res) -> ResultConsumed {
const char* s = reinterpret_cast<const char*>(str.data()); const char* s = reinterpret_cast<const char*>(str.data());
const char* pos = reinterpret_cast<const char*>(memchr(s, '\n', str.size())); const char* pos = reinterpret_cast<const char*>(memchr(s, '\n', str.size()));
if (!pos) { if (!pos) {
Result r = str.size() < 32 ? INPUT_PENDING : BAD_ARRAYLEN; Result r = INPUT_PENDING;
if (str.size() >= 32) {
LOG(WARNING) << "Unexpected format " << string_view{s, str.size()};
r = BAD_ARRAYLEN;
}
return {r, 0}; return {r, 0};
} }
@ -227,10 +232,16 @@ auto RedisParser::ParseLen(Buffer str, int64_t* res) -> ResultConsumed {
} }
// Skip the first character and 2 last ones (\r\n). // Skip the first character and 2 last ones (\r\n).
bool success = absl::SimpleAtoi(std::string_view{s + 1, size_t(pos - 1 - s)}, res); string_view len_token{s + 1, size_t(pos - 1 - s)};
unsigned consumed = pos - s + 1; bool success = absl::SimpleAtoi(len_token, res);
return ResultConsumed{success ? OK : BAD_ARRAYLEN, consumed}; unsigned consumed = pos - s + 1;
if (success && *res >= -1) {
return ResultConsumed{OK, consumed};
}
LOG(WARNING) << "Failed to parse len " << len_token;
return ResultConsumed{BAD_ARRAYLEN, consumed};
} }
auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed { auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed {
@ -247,8 +258,8 @@ auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed {
len *= 2; len *= 2;
} }
if (len < -1 || len > max_arr_len_) { if (len > max_arr_len_) {
LOG_IF(WARNING, len > max_arr_len_) << "Multibulk len is too large " << len; LOG(WARNING) << "Multibulk len is too large " << len;
return {BAD_ARRAYLEN, res.second}; return {BAD_ARRAYLEN, res.second};
} }
@ -310,15 +321,14 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
return res; return res;
} }
if (len < -1 || len > kMaxBulkLen)
return {BAD_ARRAYLEN, res.second};
if (len == -1) { // Resp2 NIL if (len == -1) { // Resp2 NIL
cached_expr_->emplace_back(RespExpr::NIL); cached_expr_->emplace_back(RespExpr::NIL);
cached_expr_->back().u = Buffer{}; cached_expr_->back().u = Buffer{};
HandleFinishArg(); HandleFinishArg();
} else { } else {
DVLOG(1) << "String(" << len << ")"; DVLOG(1) << "String(" << len << ")";
LOG_IF(WARNING, len > kMaxBulkLen) << "Large bulk len: " << len;
cached_expr_->emplace_back(RespExpr::STRING); cached_expr_->emplace_back(RespExpr::STRING);
cached_expr_->back().u = Buffer{}; cached_expr_->back().u = Buffer{};
bulk_len_ = len; bulk_len_ = len;

View file

@ -22,8 +22,6 @@ namespace facade {
*/ */
class RedisParser { class RedisParser {
public: public:
constexpr static long kMaxBulkLen = 256 * (1ul << 20); // 256MB.
enum Result : uint8_t { enum Result : uint8_t {
OK, OK,
INPUT_PENDING, INPUT_PENDING,

View file

@ -170,7 +170,7 @@ TEST_F(RedisParserTest, Empty) {
} }
TEST_F(RedisParserTest, LargeBulk) { TEST_F(RedisParserTest, LargeBulk) {
std::string_view prefix("*1\r\n$1024\r\n"); string_view prefix("*1\r\n$1024\r\n");
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(prefix)); ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(prefix));
ASSERT_EQ(prefix.size(), consumed_); ASSERT_EQ(prefix.size(), consumed_);
@ -191,6 +191,18 @@ TEST_F(RedisParserTest, LargeBulk) {
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(part1)); ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(part1));
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(half)); ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(half));
ASSERT_EQ(RedisParser::OK, Parse("\r\n")); ASSERT_EQ(RedisParser::OK, Parse("\r\n"));
prefix = "*1\r\n$270000000\r\n";
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(prefix));
ASSERT_EQ(prefix.size(), consumed_);
string chunk(1000000, 'a');
for (unsigned i = 0; i < 270; ++i) {
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(chunk));
ASSERT_EQ(chunk.size(), consumed_);
}
ASSERT_EQ(RedisParser::OK, Parse("\r\n"));
ASSERT_THAT(args_, ElementsAre(ArgType(RespExpr::STRING)));
EXPECT_EQ(270000000, args_[0].GetBuf().size());
} }
TEST_F(RedisParserTest, NILs) { TEST_F(RedisParserTest, NILs) {

View file

@ -79,7 +79,8 @@ typedef struct streamCG {
/* A specific consumer in a consumer group. */ /* A specific consumer in a consumer group. */
typedef struct streamConsumer { typedef struct streamConsumer {
mstime_t seen_time; /* Last time this consumer was active. */ mstime_t seen_time; /* Last time this consumer tried to perform an action (attempted reading/claiming). */
mstime_t active_time; /* Last time this consumer was active (successful reading/claiming). */
sds name; /* Consumer name. This is how the consumer sds name; /* Consumer name. This is how the consumer
will be identified in the consumer group will be identified in the consumer group
protocol. Case sensitive. */ protocol. Case sensitive. */
@ -124,10 +125,6 @@ typedef struct {
/* Prototypes of exported APIs. */ /* Prototypes of exported APIs. */
// struct client; // struct client;
/* Flags for streamLookupConsumer */
#define SLC_DEFAULT 0
#define SLC_NO_REFRESH (1<<0) /* Do not update consumer's seen-time */
/* Flags for streamCreateConsumer */ /* Flags for streamCreateConsumer */
#define SCC_DEFAULT 0 #define SCC_DEFAULT 0
#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */ #define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
@ -149,7 +146,7 @@ void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsign
void streamIteratorRemoveEntry(streamIterator *si, streamID *current); void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
void streamIteratorStop(streamIterator *si); void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname); streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *id, long long entries_read); streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *id, long long entries_read);
void streamEncodeID(void *buf, streamID *id); void streamEncodeID(void *buf, streamID *id);
void streamDecodeID(void *buf, streamID *id); void streamDecodeID(void *buf, streamID *id);

View file

@ -1068,9 +1068,8 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
return (cg == raxNotFound) ? NULL : cg; return (cg == raxNotFound) ? NULL : cg;
} }
/* Lookup the consumer with the specified name in the group 'cg'. Its last /* Lookup the consumer with the specified name in the group 'cg' */
* seen time is updated unless the SLC_NO_REFRESH flag is specified. */ streamConsumer *streamLookupConsumer(streamCG *cg, sds name) {
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
if (cg == NULL) return NULL; if (cg == NULL) return NULL;
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name)); sdslen(name));

View file

@ -96,10 +96,9 @@ bool CommandId::IsMultiTransactional() const {
return CO::IsTransKind(name()) || CO::IsEvalKind(name()); return CO::IsTransKind(name()) || CO::IsEvalKind(name());
} }
uint64_t CommandId::Invoke(CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder, uint64_t CommandId::Invoke(CmdArgList args, const CommandContext& cmd_cntx) const {
ConnectionContext* cntx) const {
int64_t before = absl::GetCurrentTimeNanos(); int64_t before = absl::GetCurrentTimeNanos();
handler_(args, tx, builder, cntx); handler_(args, cmd_cntx);
int64_t after = absl::GetCurrentTimeNanos(); int64_t after = absl::GetCurrentTimeNanos();
ServerState* ss = ServerState::tlocal(); // Might have migrated thread, read after invocation ServerState* ss = ServerState::tlocal(); // Might have migrated thread, read after invocation
@ -133,21 +132,6 @@ optional<facade::ErrorReply> CommandId::Validate(CmdArgList tail_args) const {
return nullopt; return nullopt;
} }
CommandId&& CommandId::SetHandler(Handler2 f) && {
handler_ = [f = std::move(f)](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder,
ConnectionContext*) { f(args, tx, builder); };
return std::move(*this);
}
CommandId&& CommandId::SetHandler(Handler3 f) && {
handler_ = [f = std::move(f)](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder,
ConnectionContext* cntx) {
f(std::move(args), CommandContext{tx, builder, cntx});
};
return std::move(*this);
};
CommandRegistry::CommandRegistry() { CommandRegistry::CommandRegistry() {
vector<string> rename_command = GetFlag(FLAGS_rename_command); vector<string> rename_command = GetFlag(FLAGS_rename_command);

View file

@ -96,23 +96,13 @@ class CommandId : public facade::CommandId {
command_stats_ = std::make_unique<CmdCallStats[]>(thread_count); command_stats_ = std::make_unique<CmdCallStats[]>(thread_count);
} }
using Handler =
fu2::function_base<true /*owns*/, true /*copyable*/, fu2::capacity_default,
false /* non-throwing*/, false /* strong exceptions guarantees*/,
void(CmdArgList, Transaction*, facade::SinkReplyBuilder*,
ConnectionContext*) const>;
using Handler2 =
fu2::function_base<true, true, fu2::capacity_default, false, false,
void(CmdArgList, Transaction*, facade::SinkReplyBuilder*) const>;
using Handler3 = fu2::function_base<true, true, fu2::capacity_default, false, false, using Handler3 = fu2::function_base<true, true, fu2::capacity_default, false, false,
void(CmdArgList, const CommandContext&) const>; void(CmdArgList, const CommandContext&) const>;
using ArgValidator = fu2::function_base<true, true, fu2::capacity_default, false, false, using ArgValidator = fu2::function_base<true, true, fu2::capacity_default, false, false,
std::optional<facade::ErrorReply>(CmdArgList) const>; std::optional<facade::ErrorReply>(CmdArgList) const>;
// Returns the invoke time in usec. // Returns the invoke time in usec.
uint64_t Invoke(CmdArgList args, Transaction*, facade::SinkReplyBuilder*, uint64_t Invoke(CmdArgList args, const CommandContext& cmd_cntx) const;
ConnectionContext* cntx) const;
// Returns error if validation failed, otherwise nullopt // Returns error if validation failed, otherwise nullopt
std::optional<facade::ErrorReply> Validate(CmdArgList tail_args) const; std::optional<facade::ErrorReply> Validate(CmdArgList tail_args) const;
@ -135,15 +125,11 @@ class CommandId : public facade::CommandId {
static const char* OptName(CO::CommandOpt fl); static const char* OptName(CO::CommandOpt fl);
CommandId&& SetHandler(Handler f) && { CommandId&& SetHandler(Handler3 f) && {
handler_ = std::move(f); handler_ = std::move(f);
return std::move(*this); return std::move(*this);
} }
CommandId&& SetHandler(Handler2 f) &&;
CommandId&& SetHandler(Handler3 f) &&;
CommandId&& SetValidator(ArgValidator f) && { CommandId&& SetValidator(ArgValidator f) && {
validator_ = std::move(f); validator_ = std::move(f);
return std::move(*this); return std::move(*this);
@ -169,7 +155,7 @@ class CommandId : public facade::CommandId {
private: private:
bool implicit_acl_; bool implicit_acl_;
std::unique_ptr<CmdCallStats[]> command_stats_; std::unique_ptr<CmdCallStats[]> command_stats_;
Handler handler_; Handler3 handler_;
ArgValidator validator_; ArgValidator validator_;
}; };

View file

@ -119,7 +119,6 @@ atomic_uint64_t rss_mem_peak(0);
unsigned kernel_version = 0; unsigned kernel_version = 0;
size_t max_memory_limit = 0; size_t max_memory_limit = 0;
size_t serialization_max_chunk_size = 0;
Namespaces* namespaces = nullptr; Namespaces* namespaces = nullptr;
size_t FetchRssMemory(io::StatusData sdata) { size_t FetchRssMemory(io::StatusData sdata) {
@ -438,7 +437,7 @@ ThreadLocalMutex::~ThreadLocalMutex() {
} }
void ThreadLocalMutex::lock() { void ThreadLocalMutex::lock() {
if (serialization_max_chunk_size != 0) { if (ServerState::tlocal()->serialization_max_chunk_size != 0) {
DCHECK_EQ(EngineShard::tlocal(), shard_); DCHECK_EQ(EngineShard::tlocal(), shard_);
util::fb2::NoOpLock noop_lk_; util::fb2::NoOpLock noop_lk_;
if (locked_fiber_ != nullptr) { if (locked_fiber_ != nullptr) {
@ -452,7 +451,7 @@ void ThreadLocalMutex::lock() {
} }
void ThreadLocalMutex::unlock() { void ThreadLocalMutex::unlock() {
if (serialization_max_chunk_size != 0) { if (ServerState::tlocal()->serialization_max_chunk_size != 0) {
DCHECK_EQ(EngineShard::tlocal(), shard_); DCHECK_EQ(EngineShard::tlocal(), shard_);
flag_ = false; flag_ = false;
cond_var_.notify_one(); cond_var_.notify_one();

View file

@ -388,8 +388,6 @@ struct BorrowedInterpreter {
bool owned_ = false; bool owned_ = false;
}; };
extern size_t serialization_max_chunk_size;
class LocalBlockingCounter { class LocalBlockingCounter {
public: public:
void lock() { void lock() {

View file

@ -229,14 +229,16 @@ size_t CalculateEvictionBytes() {
size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory, size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory,
shard_memory_budget_threshold); shard_memory_budget_threshold);
// TODO: Eviction due to rss usage is not working well as it causes eviction
// of to many keys untill we finally see decrease in rss. We need to improve
// this logic before we enable it.
/*
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio; const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
/* If rss_oom_deny_ratio is set, we should evict depending on rss memory too */
if (rss_oom_deny_ratio > 0.0) { if (rss_oom_deny_ratio > 0.0) {
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit); const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
/* We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss // We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
* memory */ memory const size_t shard_rss_memory_budget_threshold =
const size_t shard_rss_memory_budget_threshold =
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count; size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;
// Calculate how much rss memory is used by all shards // Calculate how much rss memory is used by all shards
@ -247,6 +249,8 @@ size_t CalculateEvictionBytes() {
goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory, goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory,
shard_rss_memory_budget_threshold)); shard_rss_memory_budget_threshold));
} }
*/
return goal_bytes; return goal_bytes;
} }

View file

@ -64,6 +64,7 @@ streamConsumer* StreamCreateConsumer(streamCG* cg, string_view name, uint64_t no
consumer->name = sdsnewlen(name.data(), name.size()); consumer->name = sdsnewlen(name.data(), name.size());
consumer->pel = raxNew(); consumer->pel = raxNew();
consumer->seen_time = now_ms; consumer->seen_time = now_ms;
consumer->active_time = -1;
return consumer; return consumer;
} }

View file

@ -17,7 +17,8 @@ class CommandAggregator {
public: public:
using WriteCmdCallback = std::function<void(absl::Span<const string_view>)>; using WriteCmdCallback = std::function<void(absl::Span<const string_view>)>;
CommandAggregator(string_view key, WriteCmdCallback cb) : key_(key), cb_(cb) { CommandAggregator(string_view key, WriteCmdCallback cb, size_t max_agg_bytes)
: key_(key), cb_(cb), max_aggragation_bytes_(max_agg_bytes) {
} }
~CommandAggregator() { ~CommandAggregator() {
@ -29,7 +30,7 @@ class CommandAggregator {
agg_bytes_ += arg.size(); agg_bytes_ += arg.size();
members_.push_back(std::move(arg)); members_.push_back(std::move(arg));
if (commit_mode != CommitMode::kNoCommit && agg_bytes_ >= serialization_max_chunk_size) { if (commit_mode != CommitMode::kNoCommit && agg_bytes_ >= max_aggragation_bytes_) {
CommitPending(); CommitPending();
} }
} }
@ -55,18 +56,20 @@ class CommandAggregator {
vector<string> members_; vector<string> members_;
absl::InlinedVector<string_view, 5> args_; absl::InlinedVector<string_view, 5> args_;
size_t agg_bytes_ = 0; size_t agg_bytes_ = 0;
size_t max_aggragation_bytes_;
}; };
} // namespace } // namespace
CmdSerializer::CmdSerializer(FlushSerialized cb) : cb_(std::move(cb)) { CmdSerializer::CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer_size)
: cb_(std::move(cb)), max_serialization_buffer_size_(max_serialization_buffer_size) {
} }
void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) { uint64_t expire_ms) {
// We send RESTORE commands for small objects, or objects we don't support breaking. // We send RESTORE commands for small objects, or objects we don't support breaking.
bool use_restore_serialization = true; bool use_restore_serialization = true;
if (serialization_max_chunk_size > 0 && pv.MallocUsed() > serialization_max_chunk_size) { if (max_serialization_buffer_size_ > 0 && pv.MallocUsed() > max_serialization_buffer_size_) {
switch (pv.ObjType()) { switch (pv.ObjType()) {
case OBJ_SET: case OBJ_SET:
SerializeSet(key, pv); SerializeSet(key, pv);
@ -138,7 +141,8 @@ void CmdSerializer::SerializeExpireIfNeeded(string_view key, uint64_t expire_ms)
void CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) { void CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator( CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("SADD", args); }); key, [&](absl::Span<const string_view> args) { SerializeCommand("SADD", args); },
max_serialization_buffer_size_);
container_utils::IterateSet(pv, [&](container_utils::ContainerEntry ce) { container_utils::IterateSet(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString()); aggregator.AddArg(ce.ToString());
@ -148,7 +152,8 @@ void CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) {
void CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) { void CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator( CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("ZADD", args); }); key, [&](absl::Span<const string_view> args) { SerializeCommand("ZADD", args); },
max_serialization_buffer_size_);
container_utils::IterateSortedSet( container_utils::IterateSortedSet(
pv.GetRobjWrapper(), pv.GetRobjWrapper(),
@ -162,7 +167,8 @@ void CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) {
void CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) { void CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator( CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("HSET", args); }); key, [&](absl::Span<const string_view> args) { SerializeCommand("HSET", args); },
max_serialization_buffer_size_);
container_utils::IterateMap( container_utils::IterateMap(
pv, [&](container_utils::ContainerEntry k, container_utils::ContainerEntry v) { pv, [&](container_utils::ContainerEntry k, container_utils::ContainerEntry v) {
@ -174,7 +180,8 @@ void CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) {
void CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) { void CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator( CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("RPUSH", args); }); key, [&](absl::Span<const string_view> args) { SerializeCommand("RPUSH", args); },
max_serialization_buffer_size_);
container_utils::IterateList(pv, [&](container_utils::ContainerEntry ce) { container_utils::IterateList(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString()); aggregator.AddArg(ce.ToString());

View file

@ -21,7 +21,7 @@ class CmdSerializer {
public: public:
using FlushSerialized = std::function<void(std::string)>; using FlushSerialized = std::function<void(std::string)>;
explicit CmdSerializer(FlushSerialized cb); explicit CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer_size);
void SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv, void SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms); uint64_t expire_ms);
@ -39,6 +39,7 @@ class CmdSerializer {
uint64_t expire_ms); uint64_t expire_ms);
FlushSerialized cb_; FlushSerialized cb_;
size_t max_serialization_buffer_size_;
}; };
} // namespace dfly } // namespace dfly

View file

@ -10,6 +10,7 @@
#include "base/logging.h" #include "base/logging.h"
#include "server/cluster/cluster_defs.h" #include "server/cluster/cluster_defs.h"
#include "server/journal/cmd_serializer.h" #include "server/journal/cmd_serializer.h"
#include "server/server_state.h"
#include "util/fibers/synchronization.h" #include "util/fibers/synchronization.h"
using namespace facade; using namespace facade;
@ -208,10 +209,10 @@ void RestoreStreamer::Run() {
do { do {
if (fiber_cancelled_) if (fiber_cancelled_)
return; return;
cursor = pt->TraverseBuckets(cursor, [&](PrimeTable::bucket_iterator it) {
cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) {
std::lock_guard guard(big_value_mu_); std::lock_guard guard(big_value_mu_);
if (fiber_cancelled_) // Traverse could have yieleded
if (fiber_cancelled_) // Could be cancelled any time as Traverse may preempt
return; return;
db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/, db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/,
@ -321,10 +322,12 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req
void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) { uint64_t expire_ms) {
CmdSerializer serializer([&](std::string s) { CmdSerializer serializer(
Write(std::move(s)); [&](std::string s) {
ThrottleIfNeeded(); Write(std::move(s));
}); ThrottleIfNeeded();
},
ServerState::tlocal()->serialization_max_chunk_size);
serializer.SerializeEntry(key, pk, pv, expire_ms); serializer.SerializeEntry(key, pk, pv, expire_ms);
} }

View file

@ -728,6 +728,11 @@ void SetRssOomDenyRatioOnAllThreads(double ratio) {
shard_set->pool()->AwaitBrief(cb); shard_set->pool()->AwaitBrief(cb);
} }
void SetSerializationMaxChunkSize(size_t val) {
auto cb = [val](unsigned, auto*) { ServerState::tlocal()->serialization_max_chunk_size = val; };
shard_set->pool()->AwaitBrief(cb);
}
} // namespace } // namespace
Service::Service(ProactorPool* pp) Service::Service(ProactorPool* pp)
@ -791,6 +796,8 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterSetter<double>("rss_oom_deny_ratio", config_registry.RegisterSetter<double>("rss_oom_deny_ratio",
[](double val) { SetRssOomDenyRatioOnAllThreads(val); }); [](double val) { SetRssOomDenyRatioOnAllThreads(val); });
config_registry.RegisterSetter<size_t>("serialization_max_chunk_size",
[](size_t val) { SetSerializationMaxChunkSize(val); });
config_registry.RegisterMutable("pipeline_squash"); config_registry.RegisterMutable("pipeline_squash");
@ -835,7 +842,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
[val](auto index, auto* context) { ServerState::tlocal()->acl_log.SetTotalEntries(val); }); [val](auto index, auto* context) { ServerState::tlocal()->acl_log.SetTotalEntries(val); });
}); });
serialization_max_chunk_size = GetFlag(FLAGS_serialization_max_chunk_size);
uint32_t shard_num = GetFlag(FLAGS_num_shards); uint32_t shard_num = GetFlag(FLAGS_num_shards);
if (shard_num == 0 || shard_num > pp_.size()) { if (shard_num == 0 || shard_num > pp_.size()) {
LOG_IF(WARNING, shard_num > pp_.size()) LOG_IF(WARNING, shard_num > pp_.size())
@ -867,6 +873,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
Transaction::Init(shard_num); Transaction::Init(shard_num);
SetRssOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_rss_oom_deny_ratio)); SetRssOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_rss_oom_deny_ratio));
SetSerializationMaxChunkSize(absl::GetFlag(FLAGS_serialization_max_chunk_size));
// Requires that shard_set will be initialized before because server_family_.Init might // Requires that shard_set will be initialized before because server_family_.Init might
// load the snapshot. // load the snapshot.
@ -1049,7 +1056,11 @@ std::optional<ErrorReply> Service::VerifyCommandState(const CommandId* cid, CmdA
allowed_by_state = false; allowed_by_state = false;
break; break;
case GlobalState::TAKEN_OVER: case GlobalState::TAKEN_OVER:
allowed_by_state = !cid->IsWriteOnly(); // Only PING, admin commands, and all commands via admin connections are allowed
// we prohibit even read commands, because read commands running in pipeline can take a while
// to send all data to a client which leads to fail in takeover
allowed_by_state = dfly_cntx.conn()->IsPrivileged() || (cid->opt_mask() & CO::ADMIN) ||
cid->name() == "PING";
break; break;
default: default:
break; break;
@ -1335,7 +1346,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, SinkReplyBui
auto last_error = builder->ConsumeLastError(); auto last_error = builder->ConsumeLastError();
DCHECK(last_error.empty()); DCHECK(last_error.empty());
try { try {
invoke_time_usec = cid->Invoke(tail_args, tx, builder, cntx); invoke_time_usec = cid->Invoke(tail_args, CommandContext{tx, builder, cntx});
} catch (std::exception& e) { } catch (std::exception& e) {
LOG(ERROR) << "Internal error, system probably unstable " << e.what(); LOG(ERROR) << "Internal error, system probably unstable " << e.what();
return false; return false;
@ -1638,34 +1649,31 @@ absl::flat_hash_map<std::string, unsigned> Service::UknownCmdMap() const {
return unknown_cmds_; return unknown_cmds_;
} }
void Service::Quit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::Quit(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) { if (cmd_cntx.rb->GetProtocol() == Protocol::REDIS)
if (builder->GetProtocol() == Protocol::REDIS) cmd_cntx.rb->SendOk();
builder->SendOk();
builder->CloseConnection(); cmd_cntx.rb->CloseConnection();
DeactivateMonitoring(static_cast<ConnectionContext*>(cntx)); DeactivateMonitoring(cmd_cntx.conn_cntx);
cntx->conn()->ShutdownSelf(); cmd_cntx.conn_cntx->conn()->ShutdownSelf();
} }
void Service::Multi(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::Multi(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) { if (cmd_cntx.conn_cntx->conn_state.exec_info.IsCollecting()) {
if (cntx->conn_state.exec_info.IsCollecting()) { return cmd_cntx.rb->SendError("MULTI calls can not be nested");
return builder->SendError("MULTI calls can not be nested");
} }
cntx->conn_state.exec_info.state = ConnectionState::ExecInfo::EXEC_COLLECT; cmd_cntx.conn_cntx->conn_state.exec_info.state = ConnectionState::ExecInfo::EXEC_COLLECT;
// TODO: to protect against huge exec transactions. // TODO: to protect against huge exec transactions.
return builder->SendOk(); return cmd_cntx.rb->SendOk();
} }
void Service::Watch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::Watch(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) { auto& exec_info = cmd_cntx.conn_cntx->conn_state.exec_info;
auto& exec_info = cntx->conn_state.exec_info;
// Skip if EXEC will already fail due previous WATCH. // Skip if EXEC will already fail due previous WATCH.
if (exec_info.watched_dirty.load(memory_order_relaxed)) { if (exec_info.watched_dirty.load(memory_order_relaxed)) {
return builder->SendOk(); return cmd_cntx.rb->SendOk();
} }
atomic_uint32_t keys_existed = 0; atomic_uint32_t keys_existed = 0;
@ -1673,28 +1681,27 @@ void Service::Watch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ShardId shard_id = shard->shard_id(); ShardId shard_id = shard->shard_id();
ShardArgs largs = t->GetShardArgs(shard_id); ShardArgs largs = t->GetShardArgs(shard_id);
for (auto k : largs) { for (auto k : largs) {
t->GetDbSlice(shard_id).RegisterWatchedKey(cntx->db_index(), k, &exec_info); t->GetDbSlice(shard_id).RegisterWatchedKey(cmd_cntx.conn_cntx->db_index(), k, &exec_info);
} }
auto res = GenericFamily::OpExists(t->GetOpArgs(shard), largs); auto res = GenericFamily::OpExists(t->GetOpArgs(shard), largs);
keys_existed.fetch_add(res.value_or(0), memory_order_relaxed); keys_existed.fetch_add(res.value_or(0), memory_order_relaxed);
return OpStatus::OK; return OpStatus::OK;
}; };
tx->ScheduleSingleHop(std::move(cb)); cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
// Duplicate keys are stored to keep correct count. // Duplicate keys are stored to keep correct count.
exec_info.watched_existed += keys_existed.load(memory_order_relaxed); exec_info.watched_existed += keys_existed.load(memory_order_relaxed);
for (string_view key : args) { for (string_view key : args) {
exec_info.watched_keys.emplace_back(cntx->db_index(), key); exec_info.watched_keys.emplace_back(cmd_cntx.conn_cntx->db_index(), key);
} }
return builder->SendOk(); return cmd_cntx.rb->SendOk();
} }
void Service::Unwatch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::Unwatch(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) { UnwatchAllKeys(cmd_cntx.conn_cntx->ns, &cmd_cntx.conn_cntx->conn_state.exec_info);
UnwatchAllKeys(cntx->ns, &cntx->conn_state.exec_info); return cmd_cntx.rb->SendOk();
return builder->SendOk();
} }
optional<CapturingReplyBuilder::Payload> Service::FlushEvalAsyncCmds(ConnectionContext* cntx, optional<CapturingReplyBuilder::Payload> Service::FlushEvalAsyncCmds(ConnectionContext* cntx,
@ -1760,41 +1767,37 @@ void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca)
DispatchCommand(ca.args, &replier, cntx); DispatchCommand(ca.args, &replier, cntx);
} }
void Service::Eval(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::Eval(CmdArgList args, const CommandContext& cmd_cntx, bool read_only) {
ConnectionContext* cntx, bool read_only) {
string_view body = ArgS(args, 0); string_view body = ArgS(args, 0);
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (body.empty()) { if (body.empty()) {
return rb->SendNull(); return rb->SendNull();
} }
BorrowedInterpreter interpreter{tx, &cntx->conn_state}; BorrowedInterpreter interpreter{cmd_cntx.tx, &cmd_cntx.conn_cntx->conn_state};
auto res = server_family_.script_mgr()->Insert(body, interpreter); auto res = server_family_.script_mgr()->Insert(body, interpreter);
if (!res) if (!res)
return builder->SendError(res.error().Format(), facade::kScriptErrType); return cmd_cntx.rb->SendError(res.error().Format(), facade::kScriptErrType);
string sha{std::move(res.value())}; string sha{std::move(res.value())};
CallSHA(args, sha, interpreter, builder, cntx, read_only); CallSHA(args, sha, interpreter, cmd_cntx.rb, cmd_cntx.conn_cntx, read_only);
} }
void Service::EvalRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::EvalRo(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) { Eval(args, cmd_cntx, true);
Eval(args, tx, builder, cntx, true);
} }
void Service::EvalSha(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::EvalSha(CmdArgList args, const CommandContext& cmd_cntx, bool read_only) {
ConnectionContext* cntx, bool read_only) {
string sha = absl::AsciiStrToLower(ArgS(args, 0)); string sha = absl::AsciiStrToLower(ArgS(args, 0));
BorrowedInterpreter interpreter{cntx->transaction, &cntx->conn_state}; BorrowedInterpreter interpreter{cmd_cntx.tx, &cmd_cntx.conn_cntx->conn_state};
CallSHA(args, sha, interpreter, builder, cntx, read_only); CallSHA(args, sha, interpreter, cmd_cntx.rb, cmd_cntx.conn_cntx, read_only);
} }
void Service::EvalShaRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::EvalShaRo(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) { EvalSha(args, cmd_cntx, true);
EvalSha(args, tx, builder, cntx, true);
} }
void Service::CallSHA(CmdArgList args, string_view sha, Interpreter* interpreter, void Service::CallSHA(CmdArgList args, string_view sha, Interpreter* interpreter,
@ -2026,15 +2029,14 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
} }
} }
void Service::Discard(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::Discard(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) { auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (!cntx->conn_state.exec_info.IsCollecting()) { if (!cmd_cntx.conn_cntx->conn_state.exec_info.IsCollecting()) {
return builder->SendError("DISCARD without MULTI"); return cmd_cntx.rb->SendError("DISCARD without MULTI");
} }
MultiCleanup(cntx); MultiCleanup(cmd_cntx.conn_cntx);
rb->SendOk(); rb->SendOk();
} }
@ -2134,25 +2136,25 @@ void StartMultiExec(ConnectionContext* cntx, ConnectionState::ExecInfo* exec_inf
}; };
} }
void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::Exec(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) { auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto& exec_info = cmd_cntx.conn_cntx->conn_state.exec_info;
auto& exec_info = cntx->conn_state.exec_info; auto* cntx = cmd_cntx.conn_cntx;
// Clean the context no matter the outcome // Clean the context no matter the outcome
absl::Cleanup exec_clear = [&cntx] { MultiCleanup(cntx); }; absl::Cleanup exec_clear = [cntx] { MultiCleanup(cntx); };
if (exec_info.state == ConnectionState::ExecInfo::EXEC_ERROR) { if (exec_info.state == ConnectionState::ExecInfo::EXEC_ERROR) {
return builder->SendError("-EXECABORT Transaction discarded because of previous errors"); return rb->SendError("-EXECABORT Transaction discarded because of previous errors");
} }
// Check basic invariants // Check basic invariants
if (!exec_info.IsCollecting()) { if (!exec_info.IsCollecting()) {
return builder->SendError("EXEC without MULTI"); return rb->SendError("EXEC without MULTI");
} }
if (IsWatchingOtherDbs(cntx->db_index(), exec_info)) { if (IsWatchingOtherDbs(cntx->db_index(), exec_info)) {
return builder->SendError("Dragonfly does not allow WATCH and EXEC on different databases"); return rb->SendError("Dragonfly does not allow WATCH and EXEC on different databases");
} }
if (exec_info.watched_dirty.load(memory_order_relaxed)) { if (exec_info.watched_dirty.load(memory_order_relaxed)) {
@ -2167,7 +2169,8 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
// We borrow a single interpreter for all the EVALs/Script load inside. Returned by MultiCleanup // We borrow a single interpreter for all the EVALs/Script load inside. Returned by MultiCleanup
if (state != ExecScriptUse::NONE) { if (state != ExecScriptUse::NONE) {
exec_info.preborrowed_interpreter = BorrowedInterpreter(tx, &cntx->conn_state).Release(); exec_info.preborrowed_interpreter =
BorrowedInterpreter(cmd_cntx.tx, &cntx->conn_state).Release();
} }
// Determine according multi mode, not only only flag, but based on presence of global commands // Determine according multi mode, not only only flag, but based on presence of global commands
@ -2183,7 +2186,7 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
// EXEC should not run if any of the watched keys expired. // EXEC should not run if any of the watched keys expired.
if (!exec_info.watched_keys.empty() && if (!exec_info.watched_keys.empty() &&
!CheckWatchedKeyExpiry(cntx, registry_.Find("EXISTS"), exec_cid_)) { !CheckWatchedKeyExpiry(cntx, registry_.Find("EXISTS"), exec_cid_)) {
tx->UnlockMulti(); cmd_cntx.tx->UnlockMulti();
return rb->SendNull(); return rb->SendNull();
} }
@ -2197,7 +2200,7 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
if (!exec_info.body.empty()) { if (!exec_info.body.empty()) {
if (GetFlag(FLAGS_track_exec_frequencies)) { if (GetFlag(FLAGS_track_exec_frequencies)) {
string descr = CreateExecDescriptor(exec_info.body, tx->GetUniqueShardCnt()); string descr = CreateExecDescriptor(exec_info.body, cmd_cntx.tx->GetUniqueShardCnt());
ServerState::tlocal()->exec_freq_count[descr]++; ServerState::tlocal()->exec_freq_count[descr]++;
} }
@ -2209,7 +2212,7 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
for (auto& scmd : exec_info.body) { for (auto& scmd : exec_info.body) {
VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs(); VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs();
tx->MultiSwitchCmd(scmd.Cid()); cmd_cntx.tx->MultiSwitchCmd(scmd.Cid());
cntx->cid = scmd.Cid(); cntx->cid = scmd.Cid();
arg_vec.resize(scmd.NumArgs()); arg_vec.resize(scmd.NumArgs());
@ -2218,14 +2221,14 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
CmdArgList args = absl::MakeSpan(arg_vec); CmdArgList args = absl::MakeSpan(arg_vec);
if (scmd.Cid()->IsTransactional()) { if (scmd.Cid()->IsTransactional()) {
OpStatus st = tx->InitByArgs(cntx->ns, cntx->conn_state.db_index, args); OpStatus st = cmd_cntx.tx->InitByArgs(cntx->ns, cntx->conn_state.db_index, args);
if (st != OpStatus::OK) { if (st != OpStatus::OK) {
builder->SendError(st); rb->SendError(st);
break; break;
} }
} }
bool ok = InvokeCmd(scmd.Cid(), args, builder, cntx); bool ok = InvokeCmd(scmd.Cid(), args, rb, cmd_cntx.conn_cntx);
if (!ok || rb->GetError()) // checks for i/o error, not logical error. if (!ok || rb->GetError()) // checks for i/o error, not logical error.
break; break;
} }
@ -2234,78 +2237,78 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
if (scheduled) { if (scheduled) {
VLOG(2) << "Exec unlocking " << exec_info.body.size() << " commands"; VLOG(2) << "Exec unlocking " << exec_info.body.size() << " commands";
tx->UnlockMulti(); cmd_cntx.tx->UnlockMulti();
} }
cntx->cid = exec_cid_; cntx->cid = exec_cid_;
VLOG(2) << "Exec completed"; VLOG(2) << "Exec completed";
} }
void Service::Publish(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::Publish(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) {
if (cluster::IsClusterEnabled()) { if (cluster::IsClusterEnabled()) {
return builder->SendError("PUBLISH is not supported in cluster mode yet"); return cmd_cntx.rb->SendError("PUBLISH is not supported in cluster mode yet");
} }
string_view channel = ArgS(args, 0); string_view channel = ArgS(args, 0);
string_view messages[] = {ArgS(args, 1)}; string_view messages[] = {ArgS(args, 1)};
auto* cs = ServerState::tlocal()->channel_store(); auto* cs = ServerState::tlocal()->channel_store();
builder->SendLong(cs->SendMessages(channel, messages)); cmd_cntx.rb->SendLong(cs->SendMessages(channel, messages));
} }
void Service::Subscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::Subscribe(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) {
if (cluster::IsClusterEnabled()) { if (cluster::IsClusterEnabled()) {
return builder->SendError("SUBSCRIBE is not supported in cluster mode yet"); return cmd_cntx.rb->SendError("SUBSCRIBE is not supported in cluster mode yet");
} }
cntx->ChangeSubscription(true /*add*/, true /* reply*/, std::move(args), cmd_cntx.conn_cntx->ChangeSubscription(true /*add*/, true /* reply*/, std::move(args),
static_cast<RedisReplyBuilder*>(builder)); static_cast<RedisReplyBuilder*>(cmd_cntx.rb));
} }
void Service::Unsubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::Unsubscribe(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) { auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (cluster::IsClusterEnabled()) { if (cluster::IsClusterEnabled()) {
return builder->SendError("UNSUBSCRIBE is not supported in cluster mode yet"); return cmd_cntx.rb->SendError("UNSUBSCRIBE is not supported in cluster mode yet");
}
if (args.size() == 0) {
cmd_cntx.conn_cntx->UnsubscribeAll(true, rb);
} else {
cmd_cntx.conn_cntx->ChangeSubscription(false, true, args, rb);
}
}
void Service::PSubscribe(CmdArgList args, const CommandContext& cmd_cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (cluster::IsClusterEnabled()) {
return rb->SendError("PSUBSCRIBE is not supported in cluster mode yet");
}
cmd_cntx.conn_cntx->ChangePSubscription(true, true, args, rb);
}
void Service::PUnsubscribe(CmdArgList args, const CommandContext& cmd_cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (cluster::IsClusterEnabled()) {
return rb->SendError("PUNSUBSCRIBE is not supported in cluster mode yet");
} }
if (args.size() == 0) { if (args.size() == 0) {
cntx->UnsubscribeAll(true, static_cast<RedisReplyBuilder*>(builder)); cmd_cntx.conn_cntx->PUnsubscribeAll(true, rb);
} else { } else {
cntx->ChangeSubscription(false, true, args, static_cast<RedisReplyBuilder*>(builder)); cmd_cntx.conn_cntx->ChangePSubscription(false, true, args, rb);
}
}
void Service::PSubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (cluster::IsClusterEnabled()) {
return builder->SendError("PSUBSCRIBE is not supported in cluster mode yet");
}
cntx->ChangePSubscription(true, true, args, static_cast<RedisReplyBuilder*>(builder));
}
void Service::PUnsubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (cluster::IsClusterEnabled()) {
return builder->SendError("PUNSUBSCRIBE is not supported in cluster mode yet");
}
if (args.size() == 0) {
cntx->PUnsubscribeAll(true, static_cast<RedisReplyBuilder*>(builder));
} else {
cntx->ChangePSubscription(false, true, args, static_cast<RedisReplyBuilder*>(builder));
} }
} }
// Not a real implementation. Serves as a decorator to accept some function commands // Not a real implementation. Serves as a decorator to accept some function commands
// for testing. // for testing.
void Service::Function(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::Function(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
if (sub_cmd == "FLUSH") { if (sub_cmd == "FLUSH") {
return builder->SendOk(); return cmd_cntx.rb->SendOk();
} }
string err = UnknownSubCmd(sub_cmd, "FUNCTION"); string err = UnknownSubCmd(sub_cmd, "FUNCTION");
return builder->SendError(err, kSyntaxErrType); return cmd_cntx.rb->SendError(err, kSyntaxErrType);
} }
void Service::PubsubChannels(string_view pattern, SinkReplyBuilder* builder) { void Service::PubsubChannels(string_view pattern, SinkReplyBuilder* builder) {
@ -2329,22 +2332,22 @@ void Service::PubsubNumSub(CmdArgList args, SinkReplyBuilder* builder) {
} }
} }
void Service::Monitor(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::Monitor(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) { VLOG(1) << "starting monitor on this connection: " << cmd_cntx.conn_cntx->conn()->GetClientId();
VLOG(1) << "starting monitor on this connection: " << cntx->conn()->GetClientId();
// we are registering the current connection for all threads so they will be aware of // we are registering the current connection for all threads so they will be aware of
// this connection, to send to it any command // this connection, to send to it any command
builder->SendOk(); cmd_cntx.rb->SendOk();
cntx->ChangeMonitor(true /* start */); cmd_cntx.conn_cntx->ChangeMonitor(true /* start */);
} }
void Service::Pubsub(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::Pubsub(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) { auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (cluster::IsClusterEnabled()) { if (cluster::IsClusterEnabled()) {
return builder->SendError("PUBSUB is not supported in cluster mode yet"); return rb->SendError("PUBSUB is not supported in cluster mode yet");
} }
if (args.size() < 1) { if (args.size() < 1) {
builder->SendError(WrongNumArgsError(cntx->cid->name())); rb->SendError(WrongNumArgsError(cmd_cntx.conn_cntx->cid->name()));
return; return;
} }
@ -2363,7 +2366,6 @@ void Service::Pubsub(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder
"HELP", "HELP",
"\tPrints this help."}; "\tPrints this help."};
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendSimpleStrArr(help_arr); rb->SendSimpleStrArr(help_arr);
return; return;
} }
@ -2374,19 +2376,18 @@ void Service::Pubsub(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder
pattern = ArgS(args, 1); pattern = ArgS(args, 1);
} }
PubsubChannels(pattern, builder); PubsubChannels(pattern, rb);
} else if (subcmd == "NUMPAT") { } else if (subcmd == "NUMPAT") {
PubsubPatterns(builder); PubsubPatterns(rb);
} else if (subcmd == "NUMSUB") { } else if (subcmd == "NUMSUB") {
args.remove_prefix(1); args.remove_prefix(1);
PubsubNumSub(args, builder); PubsubNumSub(args, rb);
} else { } else {
builder->SendError(UnknownSubCmd(subcmd, "PUBSUB")); rb->SendError(UnknownSubCmd(subcmd, "PUBSUB"));
} }
} }
void Service::Command(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Service::Command(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) {
unsigned cmd_cnt = 0; unsigned cmd_cnt = 0;
registry_.Traverse([&](string_view name, const CommandId& cd) { registry_.Traverse([&](string_view name, const CommandId& cd) {
if ((cd.opt_mask() & CO::HIDDEN) == 0) { if ((cd.opt_mask() & CO::HIDDEN) == 0) {
@ -2394,8 +2395,8 @@ void Service::Command(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
} }
}); });
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
auto serialize_command = [&rb, this](string_view name, const CommandId& cid) { auto serialize_command = [rb, this](string_view name, const CommandId& cid) {
rb->StartArray(7); rb->StartArray(7);
rb->SendSimpleString(cid.name()); rb->SendSimpleString(cid.name());
rb->SendLong(cid.arity()); rb->SendLong(cid.arity());
@ -2440,7 +2441,7 @@ void Service::Command(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
// COUNT // COUNT
if (subcmd == "COUNT") { if (subcmd == "COUNT") {
return builder->SendLong(cmd_cnt); return rb->SendLong(cmd_cnt);
} }
bool sufficient_args = (args.size() == 2); bool sufficient_args = (args.size() == 2);
@ -2461,10 +2462,10 @@ void Service::Command(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
sufficient_args = (args.size() == 1); sufficient_args = (args.size() == 1);
if (subcmd == "DOCS" && sufficient_args) { if (subcmd == "DOCS" && sufficient_args) {
return builder->SendOk(); return rb->SendOk();
} }
return builder->SendError(kSyntaxErr, kSyntaxErrType); return rb->SendError(kSyntaxErr, kSyntaxErrType);
} }
VarzValue::Map Service::GetVarzStats() { VarzValue::Map Service::GetVarzStats() {
@ -2595,12 +2596,9 @@ Service::ContextInfo Service::GetContextInfo(facade::ConnectionContext* cntx) co
.blocked = server_cntx->blocked}; .blocked = server_cntx->blocked};
} }
using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx);
#define HFUNC(x) SetHandler(&Service::x) #define HFUNC(x) SetHandler(&Service::x)
#define MFUNC(x) \ #define MFUNC(x) \
SetHandler([this](CmdArgList sp, Transaction* tx, SinkReplyBuilder* builder, \ SetHandler([this](CmdArgList sp, const CommandContext& cntx) { this->x(std::move(sp), cntx); })
ConnectionContext* cntx) { this->x(std::move(sp), tx, builder, cntx); })
namespace acl { namespace acl {
constexpr uint32_t kQuit = FAST | CONNECTION; constexpr uint32_t kQuit = FAST | CONNECTION;

View file

@ -125,43 +125,27 @@ class Service : public facade::ServiceInterface {
private: private:
using SinkReplyBuilder = facade::SinkReplyBuilder; using SinkReplyBuilder = facade::SinkReplyBuilder;
static void Quit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, static void Quit(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx); static void Multi(CmdArgList args, const CommandContext& cmd_cntx);
static void Multi(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
static void Watch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, static void Watch(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx); static void Unwatch(CmdArgList args, const CommandContext& cmd_cntx);
static void Unwatch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Discard(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Discard(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx); void Eval(CmdArgList args, const CommandContext& cmd_cntx, bool read_only = false);
void Eval(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx, void EvalRo(CmdArgList args, const CommandContext& cmd_cntx);
bool read_only = false); void EvalSha(CmdArgList args, const CommandContext& cmd_cntx, bool read_only = false);
void EvalRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); void EvalShaRo(CmdArgList args, const CommandContext& cmd_cntx);
void EvalSha(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx, void Exec(CmdArgList args, const CommandContext& cmd_cntx);
bool read_only = false); void Publish(CmdArgList args, const CommandContext& cmd_cntx);
void EvalShaRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Subscribe(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx); void Unsubscribe(CmdArgList args, const CommandContext& cmd_cntx);
void Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); void PSubscribe(CmdArgList args, const CommandContext& cmd_cntx);
void Publish(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void PUnsubscribe(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx); void Function(CmdArgList args, const CommandContext& cmd_cntx);
void Subscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Monitor(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx); void Pubsub(CmdArgList args, const CommandContext& cmd_cntx);
void Unsubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void Command(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx);
void PSubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void PUnsubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Function(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Monitor(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Pubsub(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void Command(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void PubsubChannels(std::string_view pattern, SinkReplyBuilder* builder); void PubsubChannels(std::string_view pattern, SinkReplyBuilder* builder);
void PubsubPatterns(SinkReplyBuilder* builder); void PubsubPatterns(SinkReplyBuilder* builder);

View file

@ -9,7 +9,6 @@ extern "C" {
} }
// Custom types: Range 30-35 is used by DF RDB types. // Custom types: Range 30-35 is used by DF RDB types.
constexpr uint8_t RDB_TYPE_JSON_OLD = 20;
constexpr uint8_t RDB_TYPE_JSON = 30; constexpr uint8_t RDB_TYPE_JSON = 30;
constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31; constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31;
constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32; constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32;

View file

@ -477,6 +477,8 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const unique_ptr<LoadTrace>& ptr
CreateZSet(ptr.get()); CreateZSet(ptr.get());
break; break;
case RDB_TYPE_STREAM_LISTPACKS: case RDB_TYPE_STREAM_LISTPACKS:
case RDB_TYPE_STREAM_LISTPACKS_2:
case RDB_TYPE_STREAM_LISTPACKS_3:
CreateStream(ptr.get()); CreateStream(ptr.get());
break; break;
default: default:
@ -955,8 +957,16 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
} }
s->length = ltrace->stream_trace->stream_len; s->length = ltrace->stream_trace->stream_len;
s->last_id.ms = ltrace->stream_trace->ms; CopyStreamId(ltrace->stream_trace->last_id, &s->last_id);
s->last_id.seq = ltrace->stream_trace->seq; CopyStreamId(ltrace->stream_trace->first_id, &s->first_id);
CopyStreamId(ltrace->stream_trace->max_deleted_entry_id, &s->max_deleted_entry_id);
s->entries_added = ltrace->stream_trace->entries_added;
if (rdb_type_ == RDB_TYPE_STREAM_LISTPACKS) {
/* Since the rax is already loaded, we can find the first entry's
* ID. */
streamGetEdgeID(s, 1, 1, &s->first_id);
}
for (const auto& cg : ltrace->stream_trace->cgroup) { for (const auto& cg : ltrace->stream_trace->cgroup) {
string_view cgname = ToSV(cg.name); string_view cgname = ToSV(cg.name);
@ -964,7 +974,12 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
cg_id.ms = cg.ms; cg_id.ms = cg.ms;
cg_id.seq = cg.seq; cg_id.seq = cg.seq;
streamCG* cgroup = streamCreateCG(s, cgname.data(), cgname.size(), &cg_id, 0); uint64_t entries_read = cg.entries_read;
if (rdb_type_ == RDB_TYPE_STREAM_LISTPACKS) {
entries_read = streamEstimateDistanceFromFirstEverEntry(s, &cg_id);
}
streamCG* cgroup = streamCreateCG(s, cgname.data(), cgname.size(), &cg_id, entries_read);
if (cgroup == NULL) { if (cgroup == NULL) {
LOG(ERROR) << "Duplicated consumer group name " << cgname; LOG(ERROR) << "Duplicated consumer group name " << cgname;
ec_ = RdbError(errc::duplicate_key); ec_ = RdbError(errc::duplicate_key);
@ -995,6 +1010,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
return; return;
} }
consumer->active_time = cons.active_time;
/* Create the PEL (pending entries list) about entries owned by this specific /* Create the PEL (pending entries list) about entries owned by this specific
* consumer. */ * consumer. */
for (const auto& rawid : cons.nack_arr) { for (const auto& rawid : cons.nack_arr) {
@ -1512,7 +1528,7 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
// RDB_TYPE_JSON == 20. On newer versions > 9 we bumped up RDB_TYPE_JSON to 30 // RDB_TYPE_JSON == 20. On newer versions > 9 we bumped up RDB_TYPE_JSON to 30
// because it overlapped with the new type RDB_TYPE_SET_LISTPACK // because it overlapped with the new type RDB_TYPE_SET_LISTPACK
if (rdb_version_ < 10) { if (rdb_version_ < 10) {
// consider it RDB_TYPE_JSON_OLD // consider it RDB_TYPE_JSON_OLD (20)
iores = ReadJson(); iores = ReadJson();
} else { } else {
iores = ReadGeneric(rdbtype); iores = ReadGeneric(rdbtype);
@ -1876,21 +1892,20 @@ auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result<OpaqueObj> {
// so if there are still unread elements return the partial stream. // so if there are still unread elements return the partial stream.
if (listpacks > n) { if (listpacks > n) {
pending_read_.remaining = listpacks - n; pending_read_.remaining = listpacks - n;
return OpaqueObj{std::move(load_trace), RDB_TYPE_STREAM_LISTPACKS}; return OpaqueObj{std::move(load_trace), rdbtype};
} else if (pending_read_.remaining > 0) {
pending_read_.remaining = 0;
} }
// Load stream metadata. pending_read_.remaining = 0;
// Load stream metadata.
load_trace->stream_trace.reset(new StreamTrace); load_trace->stream_trace.reset(new StreamTrace);
/* Load total number of items inside the stream. */ /* Load total number of items inside the stream. */
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->stream_len); SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->stream_len);
/* Load the last entry ID. */ /* Load the last entry ID. */
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->ms); SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->last_id.ms);
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->seq); SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->last_id.seq);
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) { if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
/* Load the first entry ID. */ /* Load the first entry ID. */
@ -1907,13 +1922,7 @@ auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result<OpaqueObj> {
/* During migration the offset can be initialized to the stream's /* During migration the offset can be initialized to the stream's
* length. At this point, we also don't care about tombstones * length. At this point, we also don't care about tombstones
* because CG offsets will be later initialized as well. */ * because CG offsets will be later initialized as well. */
load_trace->stream_trace->max_deleted_entry_id.ms = 0;
load_trace->stream_trace->max_deleted_entry_id.seq = 0;
load_trace->stream_trace->entries_added = load_trace->stream_trace->stream_len; load_trace->stream_trace->entries_added = load_trace->stream_trace->stream_len;
// TODO add implementation, we need to find the first entry's ID.
// The redis code is next
// streamGetEdgeID(s,1,1,&s->first_id);
} }
/* Consumer groups loading */ /* Consumer groups loading */
@ -1937,24 +1946,11 @@ auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result<OpaqueObj> {
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.ms); SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.ms);
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.seq); SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.seq);
uint64_t cg_offset; cgroup.entries_read = 0;
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) { if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
SET_OR_UNEXPECT(LoadLen(nullptr), cg_offset); SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.entries_read);
(void)cg_offset;
} else {
// TODO implement
// cg_offset = should be calculated like streamEstimateDistanceFromFirstEverEntry();
} }
// TODO add our implementation for the next Redis logic
// streamCG* cgroup = streamCreateCG(s, cgname, sdslen(cgname), &cg_id, cg_offset);
// if (cgroup == NULL) {
// rdbReportCorruptRDB("Duplicated consumer group name %s", cgname);
// decrRefCount(o);
// sdsfree(cgname);
// return NULL;
// }
/* Load the global PEL for this consumer group, however we'll /* Load the global PEL for this consumer group, however we'll
* not yet populate the NACK structures with the message * not yet populate the NACK structures with the message
* owner, since consumers for this group and their messages will * owner, since consumers for this group and their messages will
@ -2724,6 +2720,11 @@ std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, LoadConfig co
return visitor.ec(); return visitor.ec();
} }
void RdbLoaderBase::CopyStreamId(const StreamID& src, struct streamID* dest) {
dest->ms = src.ms;
dest->seq = src.seq;
}
void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
EngineShard* es = EngineShard::tlocal(); EngineShard* es = EngineShard::tlocal();
DbContext db_cntx{&namespaces->GetDefaultNamespace(), db_ind, GetCurrentTimeMs()}; DbContext db_cntx{&namespaces->GetDefaultNamespace(), db_ind, GetCurrentTimeMs()};

View file

@ -16,6 +16,8 @@ extern "C" {
#include "server/common.h" #include "server/common.h"
#include "server/journal/serializer.h" #include "server/journal/serializer.h"
struct streamID;
namespace dfly { namespace dfly {
class EngineShardSet; class EngineShardSet;
@ -84,15 +86,15 @@ class RdbLoaderBase {
}; };
struct StreamID { struct StreamID {
uint64_t ms; uint64_t ms = 0;
uint64_t seq; uint64_t seq = 0;
}; };
struct StreamCGTrace { struct StreamCGTrace {
RdbVariant name; RdbVariant name;
uint64_t ms; uint64_t ms;
uint64_t seq; uint64_t seq;
uint64_t entries_read;
std::vector<StreamPelTrace> pel_arr; std::vector<StreamPelTrace> pel_arr;
std::vector<StreamConsumerTrace> cons_arr; std::vector<StreamConsumerTrace> cons_arr;
}; };
@ -100,10 +102,10 @@ class RdbLoaderBase {
struct StreamTrace { struct StreamTrace {
size_t lp_len; size_t lp_len;
size_t stream_len; size_t stream_len;
uint64_t ms, seq; StreamID last_id;
StreamID first_id; /* The first non-tombstone entry, zero if empty. */ StreamID first_id; /* The first non-tombstone entry, zero if empty. */
StreamID max_deleted_entry_id; /* The maximal ID that was deleted. */ StreamID max_deleted_entry_id; /* The maximal ID that was deleted. */
uint64_t entries_added; /* All time count of elements added. */ uint64_t entries_added = 0; /* All time count of elements added. */
std::vector<StreamCGTrace> cgroup; std::vector<StreamCGTrace> cgroup;
}; };
@ -192,6 +194,8 @@ class RdbLoaderBase {
std::error_code EnsureReadInternal(size_t min_to_read); std::error_code EnsureReadInternal(size_t min_to_read);
static void CopyStreamId(const StreamID& src, struct streamID* dest);
base::IoBuf* mem_buf_ = nullptr; base::IoBuf* mem_buf_ = nullptr;
base::IoBuf origin_mem_buf_; base::IoBuf origin_mem_buf_;
::io::Source* src_ = nullptr; ::io::Source* src_ = nullptr;

View file

@ -51,10 +51,16 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_
"set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot," "set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot,"
"set 3 for multi entry lz4 compression on df snapshot and single entry on rdb snapshot"); "set 3 for multi entry lz4 compression on df snapshot and single entry on rdb snapshot");
ABSL_FLAG(int, compression_level, 2, "The compression level to use on zstd/lz4 compression"); ABSL_FLAG(int, compression_level, 2, "The compression level to use on zstd/lz4 compression");
// TODO: to retire both flags in v1.27 (Jan 2025)
ABSL_FLAG(bool, list_rdb_encode_v2, true, ABSL_FLAG(bool, list_rdb_encode_v2, true,
"V2 rdb encoding of list uses listpack encoding format, compatible with redis 7. V1 rdb " "V2 rdb encoding of list uses listpack encoding format, compatible with redis 7. V1 rdb "
"enconding of list uses ziplist encoding compatible with redis 6"); "enconding of list uses ziplist encoding compatible with redis 6");
ABSL_FLAG(bool, stream_rdb_encode_v2, false,
"V2 uses format, compatible with redis 7.2 and Dragonfly v1.26+, while v1 format "
"is compatible with redis 6");
namespace dfly { namespace dfly {
using namespace std; using namespace std;
@ -160,7 +166,7 @@ std::string AbslUnparseFlag(dfly::CompressionMode flag) {
dfly::CompressionMode GetDefaultCompressionMode() { dfly::CompressionMode GetDefaultCompressionMode() {
const auto flag = absl::GetFlag(FLAGS_compression_mode); const auto flag = absl::GetFlag(FLAGS_compression_mode);
if (serialization_max_chunk_size == 0) { if (ServerState::tlocal()->serialization_max_chunk_size == 0) {
return flag; return flag;
} }
@ -209,12 +215,12 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
} }
break; break;
case OBJ_STREAM: case OBJ_STREAM:
return RDB_TYPE_STREAM_LISTPACKS; return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS_3
: RDB_TYPE_STREAM_LISTPACKS;
case OBJ_MODULE: case OBJ_MODULE:
return RDB_TYPE_MODULE_2; return RDB_TYPE_MODULE_2;
case OBJ_JSON: case OBJ_JSON:
return RDB_TYPE_JSON; // save with RDB_TYPE_JSON, deprecate RDB_TYPE_JSON_OLD after July return RDB_TYPE_JSON;
// 2024.
case OBJ_SBF: case OBJ_SBF:
return RDB_TYPE_SBF; return RDB_TYPE_SBF;
} }
@ -657,6 +663,22 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) {
RETURN_ON_ERR(SaveLen(s->last_id.ms)); RETURN_ON_ERR(SaveLen(s->last_id.ms));
RETURN_ON_ERR(SaveLen(s->last_id.seq)); RETURN_ON_ERR(SaveLen(s->last_id.seq));
uint8_t rdb_type = RdbObjectType(pv);
// 'first_id', 'max_deleted_entry_id' and 'entries_added' are added
// in RDB_TYPE_STREAM_LISTPACKS_2
if (rdb_type >= RDB_TYPE_STREAM_LISTPACKS_2) {
/* Save the first entry ID. */
RETURN_ON_ERR(SaveLen(s->first_id.ms));
RETURN_ON_ERR(SaveLen(s->first_id.seq));
/* Save the maximal tombstone ID. */
RETURN_ON_ERR(SaveLen(s->max_deleted_entry_id.ms));
RETURN_ON_ERR(SaveLen(s->max_deleted_entry_id.seq));
/* Save the offset. */
RETURN_ON_ERR(SaveLen(s->entries_added));
}
/* The consumer groups and their clients are part of the stream /* The consumer groups and their clients are part of the stream
* type, so serialize every consumer group. */ * type, so serialize every consumer group. */
@ -678,16 +700,20 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) {
RETURN_ON_ERR(SaveString((uint8_t*)ri.key, ri.key_len)); RETURN_ON_ERR(SaveString((uint8_t*)ri.key, ri.key_len));
/* Last ID. */ /* Last ID. */
RETURN_ON_ERR(SaveLen(s->last_id.ms)); RETURN_ON_ERR(SaveLen(cg->last_id.ms));
RETURN_ON_ERR(SaveLen(s->last_id.seq)); RETURN_ON_ERR(SaveLen(cg->last_id.seq));
if (rdb_type >= RDB_TYPE_STREAM_LISTPACKS_2) {
/* Save the group's logical reads counter. */
RETURN_ON_ERR(SaveLen(cg->entries_read));
}
/* Save the global PEL. */ /* Save the global PEL. */
RETURN_ON_ERR(SaveStreamPEL(cg->pel, true)); RETURN_ON_ERR(SaveStreamPEL(cg->pel, true));
/* Save the consumers of this group. */ /* Save the consumers of this group. */
RETURN_ON_ERR(SaveStreamConsumers(rdb_type >= RDB_TYPE_STREAM_LISTPACKS_3, cg));
RETURN_ON_ERR(SaveStreamConsumers(cg));
} }
} }
@ -818,7 +844,7 @@ error_code RdbSerializer::SaveStreamPEL(rax* pel, bool nacks) {
return error_code{}; return error_code{};
} }
error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) { error_code RdbSerializer::SaveStreamConsumers(bool save_active, streamCG* cg) {
/* Number of consumers in this consumer group. */ /* Number of consumers in this consumer group. */
RETURN_ON_ERR(SaveLen(raxSize(cg->consumers))); RETURN_ON_ERR(SaveLen(raxSize(cg->consumers)));
@ -836,10 +862,15 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
/* Consumer name. */ /* Consumer name. */
RETURN_ON_ERR(SaveString(ri.key, ri.key_len)); RETURN_ON_ERR(SaveString(ri.key, ri.key_len));
/* Last seen time. */ /* seen time. */
absl::little_endian::Store64(buf, consumer->seen_time); absl::little_endian::Store64(buf, consumer->seen_time);
RETURN_ON_ERR(WriteRaw(buf)); RETURN_ON_ERR(WriteRaw(buf));
if (save_active) {
/* Active time. */
absl::little_endian::Store64(buf, consumer->active_time);
RETURN_ON_ERR(WriteRaw(buf));
}
/* Consumer PEL, without the ACKs (see last parameter of the function /* Consumer PEL, without the ACKs (see last parameter of the function
* passed with value of 0), at loading time we'll lookup the ID * passed with value of 0), at loading time we'll lookup the ID
* in the consumer group global PEL and will put a reference in the * in the consumer group global PEL and will put a reference in the

View file

@ -255,7 +255,7 @@ class RdbSerializer : public SerializerBase {
std::error_code SaveBinaryDouble(double val); std::error_code SaveBinaryDouble(double val);
std::error_code SaveListPackAsZiplist(uint8_t* lp); std::error_code SaveListPackAsZiplist(uint8_t* lp);
std::error_code SaveStreamPEL(rax* pel, bool nacks); std::error_code SaveStreamPEL(rax* pel, bool nacks);
std::error_code SaveStreamConsumers(streamCG* cg); std::error_code SaveStreamConsumers(bool save_active, streamCG* cg);
std::error_code SavePlainNodeAsZiplist(const quicklistNode* node); std::error_code SavePlainNodeAsZiplist(const quicklistNode* node);
// Might preempt // Might preempt

View file

@ -36,6 +36,8 @@ ABSL_DECLARE_FLAG(dfly::CompressionMode, compression_mode);
namespace dfly { namespace dfly {
static const auto kMatchNil = ArgType(RespExpr::NIL);
class RdbTest : public BaseFamilyTest { class RdbTest : public BaseFamilyTest {
protected: protected:
void SetUp(); void SetUp();
@ -136,13 +138,20 @@ TEST_F(RdbTest, Stream) {
auto resp = Run({"type", "key:10"}); auto resp = Run({"type", "key:10"});
EXPECT_EQ(resp, "stream"); EXPECT_EQ(resp, "stream");
resp = Run({"xinfo", "groups", "key:0"}); resp = Run({"xinfo", "groups", "key:0"});
EXPECT_THAT(resp, ArrLen(2)); EXPECT_THAT(resp, ArrLen(2));
EXPECT_THAT(resp.GetVec()[0],
RespElementsAre("name", "g1", "consumers", 0, "pending", 0, "last-delivered-id",
"1655444851524-3", "entries-read", 128, "lag", 0));
EXPECT_THAT(resp.GetVec()[1],
RespElementsAre("name", "g2", "consumers", 1, "pending", 0, "last-delivered-id",
"1655444851523-1", "entries-read", kMatchNil, "lag", kMatchNil));
resp = Run({"xinfo", "groups", "key:1"}); // test dereferences array of size 1 resp = Run({"xinfo", "groups", "key:1"}); // test dereferences array of size 1
EXPECT_THAT(resp, RespArray(ElementsAre("name", "g2", "consumers", IntArg(0), "pending", EXPECT_THAT(resp, RespElementsAre("name", "g2", "consumers", IntArg(0), "pending", IntArg(0),
IntArg(0), "last-delivered-id", "1655444851523-1", "last-delivered-id", "1655444851523-1", "entries-read",
"entries-read", IntArg(0), "lag", IntArg(0)))); kMatchNil, "lag", kMatchNil));
resp = Run({"xinfo", "groups", "key:2"}); resp = Run({"xinfo", "groups", "key:2"});
EXPECT_THAT(resp, ArrLen(0)); EXPECT_THAT(resp, ArrLen(0));
@ -629,14 +638,30 @@ TEST_F(RdbTest, LoadHugeList) {
// Tests loading a huge stream, where the stream is loaded in multiple partial // Tests loading a huge stream, where the stream is loaded in multiple partial
// reads. // reads.
TEST_F(RdbTest, LoadHugeStream) { TEST_F(RdbTest, LoadHugeStream) {
TEST_current_time_ms = 1000;
// Add a huge stream (test:0) with 2000 entries, and 4 1k elements per entry // Add a huge stream (test:0) with 2000 entries, and 4 1k elements per entry
// (note must be more than 512*4kb elements to test partial reads). // (note must be more than 512*4kb elements to test partial reads).
for (int i = 0; i != 2000; i++) { // We add 2000 entries to the stream to ensure that the stream, because populate strream
// adds only a single entry at a time, with multiple elements in it.
for (unsigned i = 0; i < 2000; i++) {
Run({"debug", "populate", "1", "test", "2000", "rand", "type", "stream", "elements", "4"}); Run({"debug", "populate", "1", "test", "2000", "rand", "type", "stream", "elements", "4"});
} }
ASSERT_EQ(2000, CheckedInt({"xlen", "test:0"})); ASSERT_EQ(2000, CheckedInt({"xlen", "test:0"}));
Run({"XGROUP", "CREATE", "test:0", "grp1", "0"});
Run({"XGROUP", "CREATE", "test:0", "grp2", "0"});
Run({"XREADGROUP", "GROUP", "grp1", "Alice", "COUNT", "1", "STREAMS", "test:0", ">"});
Run({"XREADGROUP", "GROUP", "grp2", "Alice", "COUNT", "1", "STREAMS", "test:0", ">"});
RespExpr resp = Run({"save", "df"}); auto resp = Run({"xinfo", "stream", "test:0"});
EXPECT_THAT(
resp, RespElementsAre("length", 2000, "radix-tree-keys", 2000, "radix-tree-nodes", 2010,
"last-generated-id", "1000-1999", "max-deleted-entry-id", "0-0",
"entries-added", 2000, "recorded-first-entry-id", "1000-0", "groups", 2,
"first-entry", ArrLen(2), "last-entry", ArrLen(2)));
resp = Run({"save", "df"});
ASSERT_EQ(resp, "OK"); ASSERT_EQ(resp, "OK");
auto save_info = service_->server_family().GetLastSaveInfo(); auto save_info = service_->server_family().GetLastSaveInfo();
@ -644,18 +669,29 @@ TEST_F(RdbTest, LoadHugeStream) {
ASSERT_EQ(resp, "OK"); ASSERT_EQ(resp, "OK");
ASSERT_EQ(2000, CheckedInt({"xlen", "test:0"})); ASSERT_EQ(2000, CheckedInt({"xlen", "test:0"}));
resp = Run({"xinfo", "stream", "test:0"});
EXPECT_THAT(
resp, RespElementsAre("length", 2000, "radix-tree-keys", 2000, "radix-tree-nodes", 2010,
"last-generated-id", "1000-1999", "max-deleted-entry-id", "0-0",
"entries-added", 2000, "recorded-first-entry-id", "1000-0", "groups", 2,
"first-entry", ArrLen(2), "last-entry", ArrLen(2)));
resp = Run({"xinfo", "groups", "test:0"});
EXPECT_THAT(resp, RespElementsAre(RespElementsAre("name", "grp1", "consumers", 1, "pending", 1,
"last-delivered-id", "1000-0", "entries-read",
1, "lag", 1999),
_));
} }
TEST_F(RdbTest, LoadStream2) { TEST_F(RdbTest, LoadStream2) {
auto ec = LoadRdb("RDB_TYPE_STREAM_LISTPACKS_2.rdb"); auto ec = LoadRdb("RDB_TYPE_STREAM_LISTPACKS_2.rdb");
ASSERT_FALSE(ec) << ec.message(); ASSERT_FALSE(ec) << ec.message();
auto res = Run({"XINFO", "STREAM", "mystream"}); auto res = Run({"XINFO", "STREAM", "mystream"});
ASSERT_THAT( ASSERT_THAT(res.GetVec(),
res.GetVec(), ElementsAre("length", 2, "radix-tree-keys", 1, "radix-tree-nodes", 2,
ElementsAre("length", IntArg(2), "radix-tree-keys", IntArg(1), "radix-tree-nodes", IntArg(2), "last-generated-id", "1732613360686-0", "max-deleted-entry-id", "0-0",
"last-generated-id", "1732613360686-0", "max-deleted-entry-id", "0-0", "entries-added", 2, "recorded-first-entry-id", "1732613352350-0",
"entries-added", IntArg(0), "recorded-first-entry-id", "0-0", "groups", IntArg(1), "groups", 1, "first-entry", RespElementsAre("1732613352350-0", _),
"first-entry", ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY))); "last-entry", RespElementsAre("1732613360686-0", _)));
} }
TEST_F(RdbTest, LoadStream3) { TEST_F(RdbTest, LoadStream3) {
@ -664,10 +700,10 @@ TEST_F(RdbTest, LoadStream3) {
auto res = Run({"XINFO", "STREAM", "mystream"}); auto res = Run({"XINFO", "STREAM", "mystream"});
ASSERT_THAT( ASSERT_THAT(
res.GetVec(), res.GetVec(),
ElementsAre("length", IntArg(2), "radix-tree-keys", IntArg(1), "radix-tree-nodes", IntArg(2), ElementsAre("length", 2, "radix-tree-keys", 1, "radix-tree-nodes", 2, "last-generated-id",
"last-generated-id", "1732614679549-0", "max-deleted-entry-id", "0-0", "1732614679549-0", "max-deleted-entry-id", "0-0", "entries-added", 2,
"entries-added", IntArg(0), "recorded-first-entry-id", "0-0", "groups", IntArg(1), "recorded-first-entry-id", "1732614676541-0", "groups", 1, "first-entry",
"first-entry", ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY))); ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY)));
} }
TEST_F(RdbTest, SnapshotTooBig) { TEST_F(RdbTest, SnapshotTooBig) {

View file

@ -877,6 +877,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
absl::GetFlag(FLAGS_s3_ec2_metadata), absl::GetFlag(FLAGS_s3_sign_payload)); absl::GetFlag(FLAGS_s3_ec2_metadata), absl::GetFlag(FLAGS_s3_sign_payload));
#else #else
LOG(ERROR) << "Compiled without AWS support"; LOG(ERROR) << "Compiled without AWS support";
exit(1);
#endif #endif
} else if (IsGCSPath(flag_dir)) { } else if (IsGCSPath(flag_dir)) {
auto gcs = std::make_shared<detail::GcsSnapshotStorage>(); auto gcs = std::make_shared<detail::GcsSnapshotStorage>();

View file

@ -298,6 +298,7 @@ class ServerState { // public struct - to allow initialization.
// Exec descriptor frequency count for this thread. // Exec descriptor frequency count for this thread.
absl::flat_hash_map<std::string, unsigned> exec_freq_count; absl::flat_hash_map<std::string, unsigned> exec_freq_count;
double rss_oom_deny_ratio; double rss_oom_deny_ratio;
size_t serialization_max_chunk_size;
private: private:
int64_t live_transactions_ = 0; int64_t live_transactions_ = 0;

View file

@ -78,7 +78,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot
journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb)); journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb));
} }
const auto flush_threshold = serialization_max_chunk_size; const auto flush_threshold = ServerState::tlocal()->serialization_max_chunk_size;
std::function<void(size_t, RdbSerializer::FlushState)> flush_fun; std::function<void(size_t, RdbSerializer::FlushState)> flush_fun;
if (flush_threshold != 0 && allow_flush == SnapshotFlush::kAllow) { if (flush_threshold != 0 && allow_flush == SnapshotFlush::kAllow) {
flush_fun = [this, flush_threshold](size_t bytes_serialized, flush_fun = [this, flush_threshold](size_t bytes_serialized,

View file

@ -46,7 +46,7 @@ struct ParsedStreamId {
// Whether to lookup messages after the last ID in the stream. Used for XREAD // Whether to lookup messages after the last ID in the stream. Used for XREAD
// when using ID '$'. // when using ID '$'.
bool last_id = false; bool resolve_last_id = false;
}; };
struct RangeId { struct RangeId {
@ -82,7 +82,8 @@ struct NACKInfo {
struct ConsumerInfo { struct ConsumerInfo {
string name; string name;
size_t seen_time; mstime_t seen_time;
mstime_t active_time;
size_t pel_count; size_t pel_count;
vector<NACKInfo> pending; vector<NACKInfo> pending;
size_t idle; size_t idle;
@ -361,7 +362,7 @@ int StreamAppendItem(stream* s, CmdArgList fields, uint64_t now_ms, streamID* ad
} }
/* Avoid overflow when trying to add an element to the stream (listpack /* Avoid overflow when trying to add an element to the stream (listpack
* can only host up to 32bit length sttrings, and also a total listpack size * can only host up to 32bit length strings, and also a total listpack size
* can't be bigger than 32bit length. */ * can't be bigger than 32bit length. */
size_t totelelen = 0; size_t totelelen = 0;
for (size_t i = 0; i < fields.size(); i++) { for (size_t i = 0; i < fields.size(); i++) {
@ -769,6 +770,7 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeO
LOG(DFATAL) << "Internal error"; LOG(DFATAL) << "Internal error";
return OpStatus::SKIPPED; // ("NACK half-created. Should not be possible."); return OpStatus::SKIPPED; // ("NACK half-created. Should not be possible.");
} }
opts.consumer->active_time = now_ms;
} }
if (opts.count == result.size()) if (opts.count == result.size())
break; break;
@ -985,6 +987,7 @@ void GetConsumers(stream* s, streamCG* cg, long long count, GroupInfo* ginfo) {
consumer_info.name = consumer->name; consumer_info.name = consumer->name;
consumer_info.seen_time = consumer->seen_time; consumer_info.seen_time = consumer->seen_time;
consumer_info.active_time = consumer->active_time;
consumer_info.pel_count = raxSize(consumer->pel); consumer_info.pel_count = raxSize(consumer->pel);
/* Consumer PEL */ /* Consumer PEL */
@ -1106,6 +1109,7 @@ OpResult<vector<ConsumerInfo>> OpConsumers(const DbContext& db_cntx, EngineShard
consumer_info.name = consumer->name; consumer_info.name = consumer->name;
consumer_info.pel_count = raxSize(consumer->pel); consumer_info.pel_count = raxSize(consumer->pel);
consumer_info.idle = idle; consumer_info.idle = idle;
consumer_info.active_time = consumer->active_time;
result.push_back(std::move(consumer_info)); result.push_back(std::move(consumer_info));
} }
raxStop(&ri); raxStop(&ri);
@ -1181,6 +1185,18 @@ OpResult<FindGroupResult> FindGroup(const OpArgs& op_args, string_view key, stri
return FindGroupResult{s, cg, std::move(res_it->post_updater), res_it->it}; return FindGroupResult{s, cg, std::move(res_it->post_updater), res_it->it};
} }
// Try to get the consumer. If not found, create a new one.
streamConsumer* FindOrAddConsumer(string_view name, streamCG* cg, uint64_t now_ms) {
// Try to get the consumer. If not found, create a new one.
auto cname = WrapSds(name);
streamConsumer* consumer = streamLookupConsumer(cg, cname);
if (consumer)
consumer->seen_time = now_ms;
else // TODO: notify xgroup-createconsumer event once we support stream events.
consumer = StreamCreateConsumer(cg, name, now_ms, SCC_DEFAULT);
return consumer;
}
constexpr uint8_t kClaimForce = 1 << 0; constexpr uint8_t kClaimForce = 1 << 0;
constexpr uint8_t kClaimJustID = 1 << 1; constexpr uint8_t kClaimJustID = 1 << 1;
constexpr uint8_t kClaimLastID = 1 << 2; constexpr uint8_t kClaimLastID = 1 << 2;
@ -1240,7 +1256,6 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
auto cgr_res = FindGroup(op_args, key, opts.group); auto cgr_res = FindGroup(op_args, key, opts.group);
RETURN_ON_BAD_STATUS(cgr_res); RETURN_ON_BAD_STATUS(cgr_res);
streamConsumer* consumer = nullptr;
uint64_t now_ms = op_args.db_cntx.time_now_ms; uint64_t now_ms = op_args.db_cntx.time_now_ms;
ClaimInfo result; ClaimInfo result;
result.justid = (opts.flags & kClaimJustID); result.justid = (opts.flags & kClaimJustID);
@ -1254,6 +1269,8 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
StreamMemTracker tracker; StreamMemTracker tracker;
streamConsumer* consumer = FindOrAddConsumer(opts.consumer, cgr_res->cg, now_ms);
for (streamID id : ids) { for (streamID id : ids) {
std::array<uint8_t, sizeof(streamID)> buf; std::array<uint8_t, sizeof(streamID)> buf;
StreamEncodeID(buf.begin(), &id); StreamEncodeID(buf.begin(), &id);
@ -1288,13 +1305,6 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
} }
} }
// Try to get the consumer. If not found, create a new one.
auto cname = WrapSds(opts.consumer);
if ((consumer = streamLookupConsumer(cgr_res->cg, cname, SLC_NO_REFRESH)) == nullptr) {
consumer = StreamCreateConsumer(cgr_res->cg, opts.consumer, now_ms,
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
}
// If the entry belongs to the same consumer, we don't have to // If the entry belongs to the same consumer, we don't have to
// do anything. Else remove the entry from the old consumer. // do anything. Else remove the entry from the old consumer.
if (nack->consumer != consumer) { if (nack->consumer != consumer) {
@ -1320,9 +1330,11 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
raxInsert(consumer->pel, buf.begin(), sizeof(buf), nack, nullptr); raxInsert(consumer->pel, buf.begin(), sizeof(buf), nack, nullptr);
nack->consumer = consumer; nack->consumer = consumer;
} }
consumer->active_time = now_ms;
/* Send the reply for this entry. */ /* Send the reply for this entry. */
AppendClaimResultItem(result, cgr_res->s, id); AppendClaimResultItem(result, cgr_res->s, id);
// TODO: propagate this change with streamPropagateXCLAIM
} }
} }
tracker.UpdateStreamSize(cgr_res->it->second); tracker.UpdateStreamSize(cgr_res->it->second);
@ -1382,8 +1394,7 @@ OpResult<uint32_t> OpDelConsumer(const OpArgs& op_args, string_view key, string_
StreamMemTracker mem_tracker; StreamMemTracker mem_tracker;
long long pending = 0; long long pending = 0;
streamConsumer* consumer = streamConsumer* consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(consumer_name));
streamLookupConsumer(cgroup_res->cg, WrapSds(consumer_name), SLC_NO_REFRESH);
if (consumer) { if (consumer) {
pending = raxSize(consumer->pel); pending = raxSize(consumer->pel);
streamDelConsumer(cgroup_res->cg, consumer); streamDelConsumer(cgroup_res->cg, consumer);
@ -1571,13 +1582,8 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
uint64_t now_ms = op_args.db_cntx.time_now_ms; uint64_t now_ms = op_args.db_cntx.time_now_ms;
int count = opts.count; int count = opts.count;
auto cname = WrapSds(opts.consumer); streamConsumer* consumer = FindOrAddConsumer(opts.consumer, group, now_ms);
streamConsumer* consumer = streamLookupConsumer(group, cname, SLC_DEFAULT);
if (consumer == nullptr) {
consumer = StreamCreateConsumer(group, opts.consumer, now_ms, SCC_DEFAULT);
// TODO: notify xgroup-createconsumer event once we support stream events.
}
consumer->seen_time = now_ms;
while (attempts-- && count && raxNext(&ri)) { while (attempts-- && count && raxNext(&ri)) {
streamNACK* nack = (streamNACK*)ri.data; streamNACK* nack = (streamNACK*)ri.data;
@ -1621,7 +1627,7 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
raxInsert(consumer->pel, ri.key, ri.key_len, nack, nullptr); raxInsert(consumer->pel, ri.key, ri.key_len, nack, nullptr);
nack->consumer = consumer; nack->consumer = consumer;
} }
consumer->active_time = now_ms;
AppendClaimResultItem(result, stream, id); AppendClaimResultItem(result, stream, id);
count--; count--;
// TODO: propagate xclaim to replica // TODO: propagate xclaim to replica
@ -1760,7 +1766,7 @@ OpResult<PendingResult> OpPending(const OpArgs& op_args, string_view key, const
streamConsumer* consumer = nullptr; streamConsumer* consumer = nullptr;
if (!opts.consumer_name.empty()) { if (!opts.consumer_name.empty()) {
consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(opts.consumer_name), SLC_NO_REFRESH); consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(opts.consumer_name));
} }
PendingResult result; PendingResult result;
@ -2114,8 +2120,12 @@ std::optional<ReadOpts> ParseReadArgsOrReply(CmdArgList args, bool read_group,
size_t pair_count = args.size() - opts.streams_arg; size_t pair_count = args.size() - opts.streams_arg;
if ((pair_count % 2) != 0) { if ((pair_count % 2) != 0) {
const auto m = "Unbalanced list of streams: for each stream key an ID must be specified"; const char* cmd_name = read_group ? "xreadgroup" : "xread";
builder->SendError(m, kSyntaxErr); const char* symbol = read_group ? ">" : "$";
const auto msg = absl::StrCat("Unbalanced '", cmd_name,
"' list of streams: for each stream key an ID or '", symbol,
"' must be specified");
builder->SendError(msg, kSyntaxErr);
return std::nullopt; return std::nullopt;
} }
streams_count = pair_count / 2; streams_count = pair_count / 2;
@ -2149,7 +2159,7 @@ std::optional<ReadOpts> ParseReadArgsOrReply(CmdArgList args, bool read_group,
} }
id.val.ms = 0; id.val.ms = 0;
id.val.seq = 0; id.val.seq = 0;
id.last_id = true; id.resolve_last_id = true;
sitem.id = id; sitem.id = id;
auto [_, is_inserted] = opts.stream_ids.emplace(key, sitem); auto [_, is_inserted] = opts.stream_ids.emplace(key, sitem);
if (!is_inserted) { if (!is_inserted) {
@ -2325,12 +2335,8 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder,
// Update consumer // Update consumer
if (sitem.group) { if (sitem.group) {
auto cname = WrapSds(opts->consumer_name); range_opts.consumer =
range_opts.consumer = streamLookupConsumer(sitem.group, cname, SLC_NO_REFRESH); FindOrAddConsumer(opts->consumer_name, sitem.group, GetCurrentTimeMs());
if (!range_opts.consumer) {
range_opts.consumer = StreamCreateConsumer(
sitem.group, opts->consumer_name, GetCurrentTimeMs(), SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
}
} }
range_opts.noack = opts->noack; range_opts.noack = opts->noack;
@ -2898,7 +2904,7 @@ void StreamFamily::XInfo(CmdArgList args, const CommandContext& cmd_cntx) {
rb->SendBulkString("consumers"); rb->SendBulkString("consumers");
rb->StartArray(ginfo.consumer_info_vec.size()); rb->StartArray(ginfo.consumer_info_vec.size());
for (const auto& consumer_info : ginfo.consumer_info_vec) { for (const auto& consumer_info : ginfo.consumer_info_vec) {
rb->StartCollection(4, RedisReplyBuilder::MAP); rb->StartCollection(5, RedisReplyBuilder::MAP);
rb->SendBulkString("name"); rb->SendBulkString("name");
rb->SendBulkString(consumer_info.name); rb->SendBulkString(consumer_info.name);
@ -2906,6 +2912,9 @@ void StreamFamily::XInfo(CmdArgList args, const CommandContext& cmd_cntx) {
rb->SendBulkString("seen-time"); rb->SendBulkString("seen-time");
rb->SendLong(consumer_info.seen_time); rb->SendLong(consumer_info.seen_time);
rb->SendBulkString("active-time");
rb->SendLong(consumer_info.active_time);
rb->SendBulkString("pel-count"); rb->SendBulkString("pel-count");
rb->SendLong(consumer_info.pel_count); rb->SendLong(consumer_info.pel_count);
@ -2957,14 +2966,20 @@ void StreamFamily::XInfo(CmdArgList args, const CommandContext& cmd_cntx) {
OpResult<vector<ConsumerInfo>> result = shard_set->Await(sid, std::move(cb)); OpResult<vector<ConsumerInfo>> result = shard_set->Await(sid, std::move(cb));
if (result) { if (result) {
rb->StartArray(result->size()); rb->StartArray(result->size());
int64_t now_ms = GetCurrentTimeMs();
for (const auto& consumer_info : *result) { for (const auto& consumer_info : *result) {
rb->StartCollection(3, RedisReplyBuilder::MAP); int64_t active = consumer_info.active_time;
int64_t inactive = active != -1 ? now_ms - active : -1;
rb->StartCollection(4, RedisReplyBuilder::MAP);
rb->SendBulkString("name"); rb->SendBulkString("name");
rb->SendBulkString(consumer_info.name); rb->SendBulkString(consumer_info.name);
rb->SendBulkString("pending"); rb->SendBulkString("pending");
rb->SendLong(consumer_info.pel_count); rb->SendLong(consumer_info.pel_count);
rb->SendBulkString("idle"); rb->SendBulkString("idle");
rb->SendLong(consumer_info.idle); rb->SendLong(consumer_info.idle);
rb->SendBulkString("inactive");
rb->SendLong(inactive);
} }
return; return;
} }
@ -3095,26 +3110,11 @@ variant<bool, facade::ErrorReply> HasEntries2(const OpArgs& op_args, string_view
return facade::ErrorReply{ return facade::ErrorReply{
NoGroupOrKey(skey, opts->group_name, " in XREADGROUP with GROUP option")}; NoGroupOrKey(skey, opts->group_name, " in XREADGROUP with GROUP option")};
auto cname = WrapSds(opts->consumer_name); consumer = FindOrAddConsumer(opts->consumer_name, group, op_args.db_cntx.time_now_ms);
consumer = streamLookupConsumer(group, cname, SLC_NO_REFRESH);
if (!consumer) {
consumer = StreamCreateConsumer(group, opts->consumer_name, op_args.db_cntx.time_now_ms,
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
}
requested_sitem.group = group; requested_sitem.group = group;
requested_sitem.consumer = consumer; requested_sitem.consumer = consumer;
}
// Resolve $ to the last ID in the stream.
if (requested_sitem.id.last_id && !opts->read_group) {
requested_sitem.id.val = last_id;
streamIncrID(&requested_sitem.id.val); // include id's strictly greater
requested_sitem.id.last_id = false;
return false;
}
if (opts->read_group) {
// If '>' is not provided, consumer PEL is used. So don't need to block. // If '>' is not provided, consumer PEL is used. So don't need to block.
if (requested_sitem.id.val.ms != UINT64_MAX || requested_sitem.id.val.seq != UINT64_MAX) { if (requested_sitem.id.val.ms != UINT64_MAX || requested_sitem.id.val.seq != UINT64_MAX) {
opts->serve_history = true; opts->serve_history = true;
@ -3126,6 +3126,14 @@ variant<bool, facade::ErrorReply> HasEntries2(const OpArgs& op_args, string_view
requested_sitem.id.val = requested_sitem.group->last_id; requested_sitem.id.val = requested_sitem.group->last_id;
streamIncrID(&requested_sitem.id.val); streamIncrID(&requested_sitem.id.val);
} }
} else {
// Resolve $ to the last ID in the stream.
if (requested_sitem.id.resolve_last_id) {
requested_sitem.id.val = last_id;
streamIncrID(&requested_sitem.id.val); // include id's strictly greater
requested_sitem.id.resolve_last_id = false;
return false;
}
} }
return streamCompareID(&last_id, &requested_sitem.id.val) >= 0; return streamCompareID(&last_id, &requested_sitem.id.val) >= 0;

View file

@ -4,6 +4,7 @@
#include "server/stream_family.h" #include "server/stream_family.h"
#include "base/flags.h"
#include "base/gtest.h" #include "base/gtest.h"
#include "base/logging.h" #include "base/logging.h"
#include "facade/facade_test.h" #include "facade/facade_test.h"
@ -14,6 +15,8 @@ using namespace testing;
using namespace std; using namespace std;
using namespace util; using namespace util;
ABSL_DECLARE_FLAG(bool, stream_rdb_encode_v2);
namespace dfly { namespace dfly {
const auto kMatchNil = ArgType(RespExpr::NIL); const auto kMatchNil = ArgType(RespExpr::NIL);
@ -404,8 +407,8 @@ TEST_F(StreamFamilyTest, XReadInvalidArgs) {
EXPECT_THAT(resp, ErrArg("syntax error")); EXPECT_THAT(resp, ErrArg("syntax error"));
// Unbalanced list of streams. // Unbalanced list of streams.
resp = Run({"xread", "count", "invalid", "streams", "s1", "s2", "s3", "0", "0"}); resp = Run({"xread", "count", "invalid", "streams", "s1", "s2", "0", "0"});
EXPECT_THAT(resp, ErrArg("syntax error")); EXPECT_THAT(resp, ErrArg("value is not an integer"));
// Wrong type. // Wrong type.
Run({"set", "foo", "v"}); Run({"set", "foo", "v"});
@ -442,7 +445,12 @@ TEST_F(StreamFamilyTest, XReadGroupInvalidArgs) {
// Unbalanced list of streams. // Unbalanced list of streams.
resp = Run({"xreadgroup", "group", "group", "alice", "streams", "s1", "s2", "s3", "0", "0"}); resp = Run({"xreadgroup", "group", "group", "alice", "streams", "s1", "s2", "s3", "0", "0"});
EXPECT_THAT(resp, ErrArg("syntax error")); EXPECT_THAT(resp, ErrArg("Unbalanced 'xreadgroup' list of streams: for each stream key an ID or "
"'>' must be specified"));
resp = Run({"XREAD", "COUNT", "1", "STREAMS", "mystream"});
ASSERT_THAT(resp, ErrArg("Unbalanced 'xread' list of streams: for each stream key an ID or '$' "
"must be specified"));
} }
TEST_F(StreamFamilyTest, XReadGroupEmpty) { TEST_F(StreamFamilyTest, XReadGroupEmpty) {
@ -857,8 +865,8 @@ TEST_F(StreamFamilyTest, XInfoConsumers) {
Run({"xgroup", "createconsumer", "mystream", "mygroup", "second-consumer"}); Run({"xgroup", "createconsumer", "mystream", "mygroup", "second-consumer"});
resp = Run({"xinfo", "consumers", "mystream", "mygroup"}); resp = Run({"xinfo", "consumers", "mystream", "mygroup"});
EXPECT_THAT(resp, ArrLen(2)); EXPECT_THAT(resp, ArrLen(2));
EXPECT_THAT(resp.GetVec()[0], ArrLen(6)); EXPECT_THAT(resp.GetVec()[0], ArrLen(8));
EXPECT_THAT(resp.GetVec()[1], ArrLen(6)); EXPECT_THAT(resp.GetVec()[1], ArrLen(8));
EXPECT_THAT(resp.GetVec()[0].GetVec()[1], "first-consumer"); EXPECT_THAT(resp.GetVec()[0].GetVec()[1], "first-consumer");
EXPECT_THAT(resp.GetVec()[1].GetVec()[1], "second-consumer"); EXPECT_THAT(resp.GetVec()[1].GetVec()[1], "second-consumer");
@ -1016,7 +1024,7 @@ TEST_F(StreamFamilyTest, XInfoStream) {
"consumers", ArrLen(1))); "consumers", ArrLen(1)));
EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec(), EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec(),
ElementsAre("name", "first-consumer", "seen-time", ArgType(RespExpr::INT64), ElementsAre("name", "first-consumer", "seen-time", ArgType(RespExpr::INT64),
"pel-count", IntArg(0), "pending", ArrLen(0))); "active-time", IntArg(-1), "pel-count", IntArg(0), "pending", ArrLen(0)));
// full with count less than number of messages in stream // full with count less than number of messages in stream
resp = Run({"xinfo", "stream", "mystream", "full", "count", "5"}); resp = Run({"xinfo", "stream", "mystream", "full", "count", "5"});
@ -1040,9 +1048,9 @@ TEST_F(StreamFamilyTest, XInfoStream) {
EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[9], IntArg(11)); // pel-count EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[9], IntArg(11)); // pel-count
EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[11], ArrLen(11)); // pending list EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[11], ArrLen(11)); // pending list
// consumer // consumer
EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec()[5],
IntArg(11)); // pel-count
EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec()[7], EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec()[7],
IntArg(11)); // pel-count
EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec()[9],
ArrLen(11)); // pending list ArrLen(11)); // pending list
// delete message // delete message
@ -1069,9 +1077,10 @@ TEST_F(StreamFamilyTest, XInfoStream) {
ElementsAre("name", "mygroup", "last-delivered-id", "11-1", "entries-read", ElementsAre("name", "mygroup", "last-delivered-id", "11-1", "entries-read",
IntArg(11), "lag", IntArg(0), "pel-count", IntArg(11), "pending", IntArg(11), "lag", IntArg(0), "pel-count", IntArg(11), "pending",
ArrLen(11), "consumers", ArrLen(1))); ArrLen(11), "consumers", ArrLen(1)));
EXPECT_THAT(resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec(), EXPECT_THAT(
ElementsAre("name", "first-consumer", "seen-time", ArgType(RespExpr::INT64), resp.GetVec()[17].GetVec()[0].GetVec()[13].GetVec()[0].GetVec(),
"pel-count", IntArg(11), "pending", ArrLen(11))); ElementsAre("name", "first-consumer", "seen-time", ArgType(RespExpr::INT64), "active-time",
ArgType(RespExpr::INT64), "pel-count", IntArg(11), "pending", ArrLen(11)));
} }
TEST_F(StreamFamilyTest, AutoClaimPelItemsFromAnotherConsumer) { TEST_F(StreamFamilyTest, AutoClaimPelItemsFromAnotherConsumer) {
@ -1190,4 +1199,51 @@ TEST_F(StreamFamilyTest, XsetIdSmallerMaxDeleted) {
ASSERT_THAT(resp, ErrArg("smaller")); ASSERT_THAT(resp, ErrArg("smaller"));
} }
TEST_F(StreamFamilyTest, SeenActiveTime) {
TEST_current_time_ms = 1000;
Run({"XGROUP", "CREATE", "mystream", "mygroup", "$", "MKSTREAM"});
Run({"XREADGROUP", "GROUP", "mygroup", "Alice", "COUNT", "1", "STREAMS", "mystream", ">"});
AdvanceTime(100);
auto resp = Run({"xinfo", "consumers", "mystream", "mygroup"});
EXPECT_THAT(resp, RespElementsAre("name", "Alice", "pending", IntArg(0), "idle", IntArg(100),
"inactive", IntArg(-1)));
Run({"XADD", "mystream", "*", "f", "v"});
Run({"XREADGROUP", "GROUP", "mygroup", "Alice", "COUNT", "1", "STREAMS", "mystream", ">"});
AdvanceTime(50);
resp = Run({"xinfo", "consumers", "mystream", "mygroup"});
EXPECT_THAT(resp, RespElementsAre("name", "Alice", "pending", IntArg(1), "idle", IntArg(50),
"inactive", IntArg(50)));
AdvanceTime(100);
resp = Run({"XREADGROUP", "GROUP", "mygroup", "Alice", "COUNT", "1", "STREAMS", "mystream", ">"});
EXPECT_THAT(resp, ArgType(RespExpr::NIL_ARRAY));
resp = Run({"xinfo", "consumers", "mystream", "mygroup"});
// Idle is 0 because XREADGROUP just run, but inactive continues clocking because nothing was
// read.
EXPECT_THAT(resp, RespElementsAre("name", "Alice", "pending", IntArg(1), "idle", IntArg(0),
"inactive", IntArg(150)));
// Serialize/deserialize.
resp = Run({"XINFO", "STREAM", "mystream", "FULL"});
auto groups = resp.GetVec()[17];
auto consumers = groups.GetVec()[0].GetVec()[13].GetVec()[0];
EXPECT_THAT(consumers, RespElementsAre("name", "Alice", "seen-time", IntArg(1250), "active-time",
IntArg(1100), "pel-count", IntArg(1), "pending", _));
absl::SetFlag(&FLAGS_stream_rdb_encode_v2, true);
resp = Run({"DUMP", "mystream"});
Run({"del", "mystream"});
resp = Run({"RESTORE", "mystream", "0", resp.GetString()});
EXPECT_EQ(resp, "OK");
resp = Run({"XINFO", "STREAM", "mystream", "FULL"});
groups = resp.GetVec()[17];
consumers = groups.GetVec()[0].GetVec()[13].GetVec()[0];
EXPECT_THAT(consumers, RespElementsAre("name", "Alice", "seen-time", IntArg(1250), "active-time",
IntArg(1100), "pel-count", IntArg(1), "pending", _));
}
} // namespace dfly } // namespace dfly

View file

@ -1563,9 +1563,6 @@ OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
string_view arg = ArgS(args, i); string_view arg = ArgS(args, i);
if (absl::EqualsIgnoreCase(arg, "STREAMS")) { if (absl::EqualsIgnoreCase(arg, "STREAMS")) {
size_t left = args.size() - i - 1; size_t left = args.size() - i - 1;
if (left < 2 || left % 2 != 0)
return OpStatus::SYNTAX_ERR;
return KeyIndex(i + 1, i + 1 + (left / 2)); return KeyIndex(i + 1, i + 1 + (left / 2));
} }
} }

View file

@ -6,31 +6,6 @@ from . import dfly_args
from .instance import DflyInstance, DflyInstanceFactory from .instance import DflyInstance, DflyInstanceFactory
async def calculate_estimated_connection_memory(
async_client: aioredis.Redis, df_server: DflyInstance
):
memory_info = await async_client.info("memory")
already_used_rss_memory = memory_info["used_memory_rss"]
connections_number = 100
connections = []
for _ in range(connections_number):
conn = aioredis.Redis(port=df_server.port)
await conn.ping()
connections.append(conn)
await asyncio.sleep(1) # Wait RSS update
memory_info = await async_client.info("memory")
estimated_connections_memory = memory_info["used_memory_rss"] - already_used_rss_memory
# Close test connection
for conn in connections:
await conn.close()
return estimated_connections_memory // connections_number
@pytest.mark.opt_only @pytest.mark.opt_only
@pytest.mark.parametrize( @pytest.mark.parametrize(
"type, keys, val_size, elements", "type, keys, val_size, elements",
@ -188,106 +163,49 @@ async def test_eval_with_oom(df_factory: DflyInstanceFactory):
assert rss_before_eval * 1.01 > info["used_memory_rss"] assert rss_before_eval * 1.01 > info["used_memory_rss"]
@pytest.mark.skip("rss eviction disabled")
@pytest.mark.asyncio @pytest.mark.asyncio
@dfly_args( @dfly_args(
{ {
"proactor_threads": 1, "proactor_threads": 1,
"cache_mode": "true", "cache_mode": "true",
"maxmemory": "256mb", "maxmemory": "5gb",
"rss_oom_deny_ratio": 0.5, "rss_oom_deny_ratio": 0.8,
"max_eviction_per_heartbeat": 1000, "max_eviction_per_heartbeat": 100,
} }
) )
async def test_cache_eviction_with_rss_deny_oom( async def test_cache_eviction_with_rss_deny_oom(
async_client: aioredis.Redis, async_client: aioredis.Redis,
df_server: DflyInstance,
): ):
""" """
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
""" """
max_memory = 256 * 1024 * 1024 # 256 MB max_memory = 5 * 1024 * 1024 * 1024 # 5G
rss_max_memory = int(max_memory * 0.5) # 50% of max memory rss_max_memory = int(max_memory * 0.8)
data_fill_size = int(0.55 * rss_max_memory) # 55% of rss_max_memory data_fill_size = int(0.9 * rss_max_memory) # 95% of rss_max_memory
rss_increase_size = int(0.55 * rss_max_memory) # 55% of max rss_max_memory
key_size = 1024 * 5 # 5 kb val_size = 1024 * 5 # 5 kb
num_keys = data_fill_size // key_size num_keys = data_fill_size // val_size
await asyncio.sleep(1) # Wait for RSS update
estimated_connection_memory = await calculate_estimated_connection_memory(
async_client, df_server
)
num_connections = rss_increase_size // estimated_connection_memory
logging.info(
f"Estimated connection memory: {estimated_connection_memory}. Number of connections: {num_connections}."
)
# Fill data to 55% of rss max memory
await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", key_size)
await asyncio.sleep(1) # Wait for RSS heartbeat update
# First test that eviction is not triggered without connection creation
stats_info = await async_client.info("stats")
assert stats_info["evicted_keys"] == 0, "No eviction should start yet."
await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size)
# Test that used memory is less than 90% of max memory # Test that used memory is less than 90% of max memory
memory_info = await async_client.info("memory") memory_info = await async_client.info("memory")
assert ( assert (
memory_info["used_memory"] < max_memory * 0.9 memory_info["used_memory"] < max_memory * 0.9
), "Used memory should be less than 90% of max memory." ), "Used memory should be less than 90% of max memory."
assert ( assert (
memory_info["used_memory_rss"] < rss_max_memory * 0.9 memory_info["used_memory_rss"] > rss_max_memory * 0.9
), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)." ), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)."
# Disable heartbeat eviction
await async_client.execute_command("CONFIG SET enable_heartbeat_eviction false")
# Increase RSS memory by 55% of rss max memory
# We can simulate RSS increase by creating new connections
connections = []
for _ in range(num_connections):
conn = aioredis.Redis(port=df_server.port)
await conn.ping()
connections.append(conn)
await asyncio.sleep(1)
# Check that RSS memory is above rss limit
memory_info = await async_client.info("memory")
assert (
memory_info["used_memory_rss"] >= rss_max_memory * 0.9
), "RSS memory should exceed 90% of the maximum RSS memory limit (max_memory * rss_oom_deny_ratio)."
# Enable heartbeat eviction
await async_client.execute_command("CONFIG SET enable_heartbeat_eviction true")
await asyncio.sleep(1) # Wait for RSS heartbeat update
await async_client.execute_command("MEMORY DECOMMIT")
await asyncio.sleep(1) # Wait for RSS update
# Get RSS memory after creating new connections # Get RSS memory after creating new connections
memory_info = await async_client.info("memory") memory_info = await async_client.info("memory")
stats_info = await async_client.info("stats") while memory_info["used_memory_rss"] > rss_max_memory * 0.9:
await asyncio.sleep(1)
logging.info(f'Evicted keys number: {stats_info["evicted_keys"]}. Total keys: {num_keys}.') memory_info = await async_client.info("memory")
logging.info(
assert ( f'Current rss: {memory_info["used_memory_rss"]}. rss eviction threshold: {rss_max_memory * 0.9}.'
memory_info["used_memory"] < data_fill_size )
), "Used memory should be less than initial fill size due to eviction." stats_info = await async_client.info("stats")
logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')
assert (
memory_info["used_memory_rss"] < rss_max_memory * 0.9
), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio) after eviction."
# Check that eviction has occurred
assert (
stats_info["evicted_keys"] > 0
), "Eviction should have occurred due to rss memory pressure."
for conn in connections:
await conn.close()