1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: change the interface of stream and server commands (#4219)

This commit is contained in:
Roman Gershman 2024-11-28 18:44:01 +02:00 committed by GitHub
parent c9654d4050
commit 010bd8add4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 300 additions and 332 deletions

View file

@ -14,7 +14,6 @@
#include <string_view>
#include <vector>
#include "base/logging.h"
#include "core/compact_object.h"
#include "facade/facade_types.h"
#include "facade/op_status.h"
@ -367,7 +366,7 @@ struct BorrowedInterpreter {
// Give up ownership of the interpreter, it must be returned manually.
Interpreter* Release() && {
DCHECK(owned_);
assert(owned_);
owned_ = false;
return interpreter_;
}

View file

@ -492,7 +492,7 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
VLOG(1) << "Takeover accepted, shutting down.";
std::string save_arg = "NOSAVE";
MutableSlice sargs(save_arg);
return sf_->ShutdownCmd(CmdArgList(&sargs, 1), nullptr, rb);
return sf_->ShutdownCmd(CmdArgList(&sargs, 1), CommandContext{nullptr, rb, nullptr});
}
void DflyCmd::Expire(CmdArgList args, Transaction* tx, RedisReplyBuilder* rb) {

View file

@ -230,22 +230,10 @@ namespace {
const auto kRedisVersion = "6.2.11";
using EngineFunc = void (ServerFamily::*)(CmdArgList args, Transaction* tx,
SinkReplyBuilder* builder, ConnectionContext* cntx);
using EngineFunc = void (ServerFamily::*)(CmdArgList args, const CommandContext&);
using EngineFunc2 = void (ServerFamily::*)(CmdArgList args, Transaction* tx,
SinkReplyBuilder* builder);
inline CommandId::Handler HandlerFunc(ServerFamily* se, EngineFunc f) {
return [=](CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) {
return (se->*f)(args, tx, builder, cntx);
};
}
inline auto HandlerFunc(ServerFamily* se, EngineFunc2 f) {
return [=](CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return (se->*f)(args, tx, builder);
};
inline CommandId::Handler3 HandlerFunc(ServerFamily* se, EngineFunc f) {
return [=](CmdArgList args, const CommandContext& cntx) { return (se->*f)(args, cntx); };
}
using CI = CommandId;
@ -1741,18 +1729,18 @@ LastSaveInfo ServerFamily::GetLastSaveInfo() const {
return last_save_info_;
}
void ServerFamily::DbSize(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ServerFamily::DbSize(CmdArgList args, const CommandContext& cmd_cntx) {
atomic_ulong num_keys{0};
shard_set->RunBriefInParallel(
[&](EngineShard* shard) {
auto db_size = cntx->ns->GetDbSlice(shard->shard_id()).DbSize(cntx->conn_state.db_index);
auto db_size = cmd_cntx.conn_cntx->ns->GetDbSlice(shard->shard_id())
.DbSize(cmd_cntx.conn_cntx->conn_state.db_index);
num_keys.fetch_add(db_size, memory_order_relaxed);
},
[](ShardId) { return true; });
return builder->SendLong(num_keys.load(memory_order_relaxed));
return cmd_cntx.rb->SendLong(num_keys.load(memory_order_relaxed));
}
void ServerFamily::CancelBlockingOnThread(std::function<OpStatus(ArgSlice)> status_cb) {
@ -1802,25 +1790,23 @@ void ServerFamily::SendInvalidationMessages() const {
}
}
void ServerFamily::FlushDb(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
DCHECK(tx);
Drakarys(tx, tx->GetDbIndex());
void ServerFamily::FlushDb(CmdArgList args, const CommandContext& cmd_cntx) {
DCHECK(cmd_cntx.tx);
Drakarys(cmd_cntx.tx, cmd_cntx.tx->GetDbIndex());
SendInvalidationMessages();
builder->SendOk();
cmd_cntx.rb->SendOk();
}
void ServerFamily::FlushAll(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ServerFamily::FlushAll(CmdArgList args, const CommandContext& cmd_cntx) {
if (args.size() > 1) {
builder->SendError(kSyntaxErr);
cmd_cntx.rb->SendError(kSyntaxErr);
return;
}
DCHECK(tx);
Drakarys(tx, DbSlice::kDbAll);
DCHECK(cmd_cntx.tx);
Drakarys(cmd_cntx.tx, DbSlice::kDbAll);
SendInvalidationMessages();
builder->SendOk();
cmd_cntx.rb->SendOk();
}
bool ServerFamily::DoAuth(ConnectionContext* cntx, std::string_view username,
@ -1840,12 +1826,12 @@ bool ServerFamily::DoAuth(ConnectionContext* cntx, std::string_view username,
return is_authorized;
}
void ServerFamily::Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ServerFamily::Auth(CmdArgList args, const CommandContext& cmd_cntx) {
if (args.size() > 2) {
return builder->SendError(kSyntaxErr);
return cmd_cntx.rb->SendError(kSyntaxErr);
}
ConnectionContext* cntx = cmd_cntx.conn_cntx;
// non admin port auth
if (!cntx->conn()->IsPrivileged()) {
const bool one_arg = args.size() == 1;
@ -1853,16 +1839,16 @@ void ServerFamily::Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
const size_t index = one_arg ? 0 : 1;
std::string_view password = facade::ToSV(args[index]);
if (DoAuth(cntx, username, password)) {
return builder->SendOk();
return cmd_cntx.rb->SendOk();
}
auto& log = ServerState::tlocal()->acl_log;
using Reason = acl::AclLog::Reason;
log.Add(*cntx, "AUTH", Reason::AUTH, std::string(username));
return builder->SendError(facade::kAuthRejected);
return cmd_cntx.rb->SendError(facade::kAuthRejected);
}
if (!cntx->req_auth) {
return builder->SendError(
return cmd_cntx.rb->SendError(
"AUTH <password> called without any password configured for "
"admin port. Are you sure your configuration is correct?");
}
@ -1870,16 +1856,17 @@ void ServerFamily::Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
string_view pass = ArgS(args, 0);
if (pass == GetPassword()) {
cntx->authenticated = true;
builder->SendOk();
cmd_cntx.rb->SendOk();
} else {
builder->SendError(facade::kAuthRejected);
cmd_cntx.rb->SendError(facade::kAuthRejected);
}
}
void ServerFamily::Client(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ServerFamily::Client(CmdArgList args, const CommandContext& cmd_cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
CmdArgList sub_args = args.subspan(1);
auto* builder = cmd_cntx.rb;
auto* cntx = cmd_cntx.conn_cntx;
if (sub_cmd == "SETNAME") {
return ClientSetName(sub_args, builder, cntx);
@ -1894,7 +1881,7 @@ void ServerFamily::Client(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
} else if (sub_cmd == "KILL") {
return ClientKill(sub_args, absl::MakeSpan(listeners_), builder, cntx);
} else if (sub_cmd == "CACHING") {
return ClientCaching(sub_args, builder, tx, cntx);
return ClientCaching(sub_args, builder, cmd_cntx.tx, cntx);
} else if (sub_cmd == "SETINFO") {
return ClientSetInfo(sub_args, builder, cntx);
} else if (sub_cmd == "ID") {
@ -1905,10 +1892,10 @@ void ServerFamily::Client(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
return builder->SendError(UnknownSubCmd(sub_cmd, "CLIENT"), kSyntaxErrType);
}
void ServerFamily::Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ServerFamily::Config(CmdArgList args, const CommandContext& cmd_cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
auto* builder = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (sub_cmd == "HELP") {
string_view help_arr[] = {
"CONFIG <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
@ -1922,8 +1909,7 @@ void ServerFamily::Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
" Prints this help.",
};
auto* rb = static_cast<RedisReplyBuilder*>(builder);
return rb->SendSimpleStrArr(help_arr);
return builder->SendSimpleStrArr(help_arr);
}
if (sub_cmd == "SET") {
@ -1980,23 +1966,21 @@ void ServerFamily::Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
}
if (sub_cmd == "RESETSTAT") {
ResetStat(cntx->ns);
ResetStat(cmd_cntx.conn_cntx->ns);
return builder->SendOk();
} else {
return builder->SendError(UnknownSubCmd(sub_cmd, "CONFIG"), kSyntaxErrType);
}
}
void ServerFamily::Debug(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
DebugCmd dbg_cmd{this, &service_.cluster_family(), cntx};
void ServerFamily::Debug(CmdArgList args, const CommandContext& cmd_cntx) {
DebugCmd dbg_cmd{this, &service_.cluster_family(), cmd_cntx.conn_cntx};
return dbg_cmd.Run(args, builder);
return dbg_cmd.Run(args, cmd_cntx.rb);
}
void ServerFamily::Memory(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
MemoryCmd mem_cmd{this, builder, cntx};
void ServerFamily::Memory(CmdArgList args, const CommandContext& cmd_cntx) {
MemoryCmd mem_cmd{this, cmd_cntx.rb, cmd_cntx.conn_cntx};
return mem_cmd.Run(args);
}
@ -2039,42 +2023,40 @@ std::optional<ServerFamily::VersionBasename> ServerFamily::GetVersionAndBasename
// BGSAVE [DF|RDB] [basename]
// TODO add missing [SCHEDULE]
void ServerFamily::BgSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
auto maybe_res = GetVersionAndBasename(args, builder);
void ServerFamily::BgSave(CmdArgList args, const CommandContext& cmd_cntx) {
auto maybe_res = GetVersionAndBasename(args, cmd_cntx.rb);
if (!maybe_res) {
return;
}
const auto [version, basename] = *maybe_res;
if (auto ec = DoSaveCheckAndStart(version, basename, tx); ec) {
builder->SendError(ec.Format());
if (auto ec = DoSaveCheckAndStart(version, basename, cmd_cntx.tx); ec) {
cmd_cntx.rb->SendError(ec.Format());
return;
}
bg_save_fb_.JoinIfNeeded();
bg_save_fb_ = fb2::Fiber("bg_save_fiber", &ServerFamily::BgSaveFb, this,
boost::intrusive_ptr<Transaction>(tx));
builder->SendOk();
boost::intrusive_ptr<Transaction>(cmd_cntx.tx));
cmd_cntx.rb->SendOk();
}
// SAVE [DF|RDB] [basename]
// Allows saving the snapshot of the dataset on disk, potentially overriding the format
// and the snapshot name.
void ServerFamily::Save(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
auto maybe_res = GetVersionAndBasename(args, builder);
void ServerFamily::Save(CmdArgList args, const CommandContext& cmd_cntx) {
auto maybe_res = GetVersionAndBasename(args, cmd_cntx.rb);
if (!maybe_res) {
return;
}
const auto [version, basename] = *maybe_res;
GenericError ec = DoSave(version, basename, tx);
GenericError ec = DoSave(version, basename, cmd_cntx.tx);
if (ec) {
builder->SendError(ec.Format());
cmd_cntx.rb->SendError(ec.Format());
} else {
builder->SendOk();
cmd_cntx.rb->SendOk();
}
}
@ -2206,10 +2188,9 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
return result;
}
void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) {
if (args.size() > 1) {
return builder->SendError(kSyntaxErr);
return cmd_cntx.rb->SendError(kSyntaxErr);
}
string section;
@ -2254,7 +2235,7 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
Metrics m;
// Save time by not calculating metrics if we don't need them.
if (!(section == "SERVER" || section == "REPLICATION")) {
m = GetMetrics(cntx->ns);
m = GetMetrics(cmd_cntx.conn_cntx->ns);
}
DbStats total;
@ -2616,12 +2597,11 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
if (should_enter("CLUSTER")) {
append("cluster_enabled", cluster::IsClusterEnabledOrEmulated());
}
auto* rb = static_cast<RedisReplyBuilder*>(builder);
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
rb->SendVerbatimString(info);
}
void ServerFamily::Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ServerFamily::Hello(CmdArgList args, const CommandContext& cmd_cntx) {
// If no arguments are provided default to RESP2.
bool is_resp3 = false;
bool has_auth = false;
@ -2630,12 +2610,13 @@ void ServerFamily::Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
string_view password;
string_view clientname;
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (args.size() > 0) {
string_view proto_version = ArgS(args, 0);
is_resp3 = proto_version == "3";
bool valid_proto_version = proto_version == "2" || is_resp3;
if (!valid_proto_version) {
builder->SendError(UnknownCmd("HELLO", args));
rb->SendError(UnknownCmd("HELLO", args));
return;
}
@ -2652,18 +2633,19 @@ void ServerFamily::Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
clientname = ArgS(args, i + 1);
i += 1;
} else {
builder->SendError(kSyntaxErr);
rb->SendError(kSyntaxErr);
return;
}
}
}
auto* cntx = cmd_cntx.conn_cntx;
if (has_auth && !DoAuth(cntx, username, password)) {
return builder->SendError(facade::kAuthRejected);
return rb->SendError(facade::kAuthRejected);
}
if (cntx->req_auth && !cntx->authenticated) {
builder->SendError(
rb->SendError(
"-NOAUTH HELLO must be called with the client already "
"authenticated, otherwise the HELLO <proto> AUTH <user> <pass> "
"option can be used to authenticate the client and "
@ -2675,7 +2657,6 @@ void ServerFamily::Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
cntx->conn()->SetName(string{clientname});
}
auto* rb = static_cast<RedisReplyBuilder*>(builder);
int proto_version = 2;
if (is_resp3) {
proto_version = 3;
@ -2703,27 +2684,26 @@ void ServerFamily::Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
rb->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave");
}
void ServerFamily::AddReplicaOf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) {
util::fb2::LockGuard lk(replicaof_mu_);
if (ServerState::tlocal()->is_master) {
builder->SendError("Calling ADDREPLICAOFF allowed only after server is already a replica");
cmd_cntx.rb->SendError("Calling ADDREPLICAOFF allowed only after server is already a replica");
return;
}
CHECK(replica_);
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder);
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, cmd_cntx.rb);
if (!replicaof_args.has_value()) {
return;
}
if (replicaof_args->IsReplicaOfNoOne()) {
return builder->SendError("ADDREPLICAOF does not support no one");
return cmd_cntx.rb->SendError("ADDREPLICAOF does not support no one");
}
LOG(INFO) << "Add Replica " << *replicaof_args;
auto add_replica = make_unique<Replica>(replicaof_args->host, replicaof_args->port, &service_,
master_replid(), replicaof_args->slot_range);
error_code ec = add_replica->Start(builder);
error_code ec = add_replica->Start(cmd_cntx.rb);
if (!ec) {
cluster_replicas_.push_back(std::move(add_replica));
}
@ -2827,9 +2807,8 @@ void ServerFamily::StopAllClusterReplicas() {
cluster_replicas_.clear();
}
void ServerFamily::ReplicaOf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
ReplicaOfInternal(args, tx, builder, ActionOnConnectionFail::kReturnOnError);
void ServerFamily::ReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) {
ReplicaOfInternal(args, cmd_cntx.tx, cmd_cntx.rb, ActionOnConnectionFail::kReturnOnError);
}
void ServerFamily::Replicate(string_view host, string_view port) {
@ -2847,8 +2826,7 @@ void ServerFamily::Replicate(string_view host, string_view port) {
// REPLTAKEOVER <seconds> [SAVE]
// SAVE is used only by tests.
void ServerFamily::ReplTakeOver(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx) {
VLOG(1) << "ReplTakeOver start";
CmdArgParser parser{args};
@ -2856,6 +2834,7 @@ void ServerFamily::ReplTakeOver(CmdArgList args, Transaction* tx, SinkReplyBuild
int timeout_sec = parser.Next<int>();
bool save_flag = static_cast<bool>(parser.Check("SAVE"));
auto* builder = cmd_cntx.rb;
if (parser.HasNext())
return builder->SendError(absl::StrCat("Unsupported option:", string_view(parser.Next())));
@ -2892,8 +2871,8 @@ void ServerFamily::ReplTakeOver(CmdArgList args, Transaction* tx, SinkReplyBuild
return builder->SendOk();
}
void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ServerFamily::ReplConf(CmdArgList args, const CommandContext& cmd_cntx) {
auto* builder = cmd_cntx.rb;
{
util::fb2::LockGuard lk(replicaof_mu_);
if (!ServerState::tlocal()->is_master) {
@ -2909,6 +2888,7 @@ void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder*
if (args.size() % 2 == 1)
return err_cb();
auto* cntx = cmd_cntx.conn_cntx;
for (unsigned i = 0; i < args.size(); i += 2) {
DCHECK_LT(i + 1, args.size());
@ -2985,9 +2965,8 @@ void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder*
return builder->SendOk();
}
void ServerFamily::Role(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
void ServerFamily::Role(CmdArgList args, const CommandContext& cmd_cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
util::fb2::LockGuard lk(replicaof_mu_);
// Thread local var is_master is updated under mutex replicaof_mu_ together with replica_,
// ensuring eventual consistency of is_master. When determining if the server is a replica and
@ -3030,24 +3009,21 @@ void ServerFamily::Role(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
}
}
void ServerFamily::Script(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
script_mgr_->Run(std::move(args), tx, builder, cntx);
void ServerFamily::Script(CmdArgList args, const CommandContext& cmd_cntx) {
script_mgr_->Run(std::move(args), cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx);
}
void ServerFamily::LastSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ServerFamily::LastSave(CmdArgList args, const CommandContext& cmd_cntx) {
time_t save_time;
{
util::fb2::LockGuard lk(save_mu_);
save_time = last_save_info_.save_time;
}
builder->SendLong(save_time);
cmd_cntx.rb->SendLong(save_time);
}
void ServerFamily::Latency(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
void ServerFamily::Latency(CmdArgList args, const CommandContext& cmd_cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
if (sub_cmd == "LATEST") {
@ -3055,12 +3031,12 @@ void ServerFamily::Latency(CmdArgList args, Transaction* tx, SinkReplyBuilder* b
}
LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported";
builder->SendError(kSyntaxErr);
rb->SendError(kSyntaxErr);
}
void ServerFamily::ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void ServerFamily::ShutdownCmd(CmdArgList args, const CommandContext& cmd_cntx) {
if (args.size() > 1) {
builder->SendError(kSyntaxErr);
cmd_cntx.rb->SendError(kSyntaxErr);
return;
}
@ -3070,7 +3046,7 @@ void ServerFamily::ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilde
} else if (absl::EqualsIgnoreCase(sub_cmd, "NOSAVE")) {
save_on_shutdown_ = false;
} else {
builder->SendError(kSyntaxErr);
cmd_cntx.rb->SendError(kSyntaxErr);
return;
}
}
@ -3079,18 +3055,17 @@ void ServerFamily::ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilde
[](ProactorBase* pb) { ServerState::tlocal()->EnterLameDuck(); });
CHECK_NOTNULL(acceptor_)->Stop();
builder->SendOk();
cmd_cntx.rb->SendOk();
}
void ServerFamily::Dfly(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
dfly_cmd_->Run(args, tx, static_cast<RedisReplyBuilder*>(builder), cntx);
void ServerFamily::Dfly(CmdArgList args, const CommandContext& cmd_cntx) {
dfly_cmd_->Run(args, cmd_cntx.tx, static_cast<RedisReplyBuilder*>(cmd_cntx.rb),
cmd_cntx.conn_cntx);
}
void ServerFamily::SlowLog(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ServerFamily::SlowLog(CmdArgList args, const CommandContext& cmd_cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (sub_cmd == "HELP") {
string_view help[] = {
"SLOWLOG <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
@ -3106,7 +3081,7 @@ void ServerFamily::SlowLog(CmdArgList args, Transaction* tx, SinkReplyBuilder* b
"HELP",
" Prints this help.",
};
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendSimpleStrArr(help);
return;
}
@ -3117,28 +3092,28 @@ void ServerFamily::SlowLog(CmdArgList args, Transaction* tx, SinkReplyBuilder* b
lengths[index] = ServerState::tlocal()->GetSlowLog().Length();
});
int sum = std::accumulate(lengths.begin(), lengths.end(), 0);
return builder->SendLong(sum);
return rb->SendLong(sum);
}
if (sub_cmd == "RESET") {
service_.proactor_pool().AwaitFiberOnAll(
[](auto index, auto* context) { ServerState::tlocal()->GetSlowLog().Reset(); });
return builder->SendOk();
return rb->SendOk();
}
if (sub_cmd == "GET") {
return SlowLogGet(args, sub_cmd, &service_.proactor_pool(), builder);
return SlowLogGet(args, sub_cmd, &service_.proactor_pool(), rb);
}
builder->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType);
rb->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType);
}
void ServerFamily::Module(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ServerFamily::Module(CmdArgList args, const CommandContext& cmd_cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
if (sub_cmd != "LIST")
return builder->SendError(kSyntaxErr);
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (sub_cmd != "LIST")
return rb->SendError(kSyntaxErr);
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(2);
// Json

View file

@ -39,7 +39,7 @@ namespace journal {
class Journal;
} // namespace journal
class ConnectionContext;
struct CommandContext;
class CommandRegistry;
class Service;
class ScriptMgr;
@ -164,7 +164,7 @@ class ServerFamily {
void Shutdown() ABSL_LOCKS_EXCLUDED(replicaof_mu_);
// Public because is used by DflyCmd.
void ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void ShutdownCmd(CmdArgList args, const CommandContext& cmd_cntx);
Service& service() {
return service_;
@ -259,40 +259,31 @@ class ServerFamily {
return shard_set->size();
}
void Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void Client(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void DbSize(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void Debug(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void Dfly(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void Memory(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void FlushDb(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void FlushAll(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx)
void Auth(CmdArgList args, const CommandContext& cmd_cntx);
void Client(CmdArgList args, const CommandContext& cmd_cntx);
void Config(CmdArgList args, const CommandContext& cmd_cntx);
void DbSize(CmdArgList args, const CommandContext& cmd_cntx);
void Debug(CmdArgList args, const CommandContext& cmd_cntx);
void Dfly(CmdArgList args, const CommandContext& cmd_cntx);
void Memory(CmdArgList args, const CommandContext& cmd_cntx);
void FlushDb(CmdArgList args, const CommandContext& cmd_cntx);
void FlushAll(CmdArgList args, const CommandContext& cmd_cntx);
void Info(CmdArgList args, const CommandContext& cmd_cntx)
ABSL_LOCKS_EXCLUDED(save_mu_, replicaof_mu_);
void Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void LastSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) ABSL_LOCKS_EXCLUDED(save_mu_);
void Latency(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void ReplicaOf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void AddReplicaOf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void ReplTakeOver(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) ABSL_LOCKS_EXCLUDED(replicaof_mu_);
void ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Role(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx)
void Hello(CmdArgList args, const CommandContext& cmd_cntx);
void LastSave(CmdArgList args, const CommandContext& cmd_cntx) ABSL_LOCKS_EXCLUDED(save_mu_);
void Latency(CmdArgList args, const CommandContext& cmd_cntx);
void ReplicaOf(CmdArgList args, const CommandContext& cmd_cntx);
void AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx);
void ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx)
ABSL_LOCKS_EXCLUDED(replicaof_mu_);
void Save(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void BgSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void Script(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void SlowLog(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Module(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void ReplConf(CmdArgList args, const CommandContext& cmd_cntx);
void Role(CmdArgList args, const CommandContext& cmd_cntx) ABSL_LOCKS_EXCLUDED(replicaof_mu_);
void Save(CmdArgList args, const CommandContext& cmd_cntx);
void BgSave(CmdArgList args, const CommandContext& cmd_cntx);
void Script(CmdArgList args, const CommandContext& cmd_cntx);
void SlowLog(CmdArgList args, const CommandContext& cmd_cntx);
void Module(CmdArgList args, const CommandContext& cmd_cntx);
void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx);

View file

@ -2450,10 +2450,72 @@ void XReadGeneric2(CmdArgList args, bool read_group, Transaction* tx, SinkReplyB
}
}
void HelpSubCmd(facade::CmdArgParser* parser, Transaction* tx, SinkReplyBuilder* builder) {
XGroupHelp(parser->Tail(), tx, builder);
}
bool ParseXpendingOptions(CmdArgList& args, PendingOpts& opts, SinkReplyBuilder* builder) {
size_t id_indx = 0;
string arg = absl::AsciiStrToUpper(ArgS(args, id_indx));
if (arg == "IDLE" && args.size() > 4) {
id_indx++;
if (!absl::SimpleAtoi(ArgS(args, id_indx), &opts.min_idle_time)) {
builder->SendError(kInvalidIntErr, kSyntaxErrType);
return false;
}
// Ignore negative min_idle_time
opts.min_idle_time = std::max(opts.min_idle_time, static_cast<int64_t>(0));
args.remove_prefix(2);
id_indx = 0;
}
if (args.size() < 3) {
builder->SendError(WrongNumArgsError("XPENDING"), kSyntaxErrType);
return false;
}
// Parse start and end
RangeId rs, re;
string_view start = ArgS(args, id_indx);
id_indx++;
string_view end = ArgS(args, id_indx);
if (!ParseRangeId(start, &rs) || !ParseRangeId(end, &re)) {
builder->SendError(kInvalidStreamId, kSyntaxErrType);
return false;
}
if (rs.exclude && streamIncrID(&rs.parsed_id.val) != C_OK) {
builder->SendError("invalid start ID for the interval", kSyntaxErrType);
return false;
}
if (re.exclude && streamDecrID(&re.parsed_id.val) != C_OK) {
builder->SendError("invalid end ID for the interval", kSyntaxErrType);
return false;
}
id_indx++;
opts.start = rs.parsed_id;
opts.end = re.parsed_id;
// Parse count
if (!absl::SimpleAtoi(ArgS(args, id_indx), &opts.count)) {
builder->SendError(kInvalidIntErr, kSyntaxErrType);
return false;
}
// Ignore negative count value
opts.count = std::max(opts.count, static_cast<int64_t>(0));
if (args.size() - id_indx - 1) {
id_indx++;
opts.consumer_name = ArgS(args, id_indx);
}
return true;
}
} // namespace
void StreamFamily::XAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
auto parse_resp = ParseAddOrTrimArgsOrReply(args, true, builder);
void StreamFamily::XAdd(CmdArgList args, const CommandContext& cmd_cntx) {
auto parse_resp = ParseAddOrTrimArgsOrReply(args, true, cmd_cntx.rb);
if (!parse_resp) {
return;
}
@ -2463,13 +2525,13 @@ void StreamFamily::XAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
args.remove_prefix(id_indx);
if (args.size() < 2 || args.size() % 2 == 0) {
return builder->SendError(WrongNumArgsError("XADD"), kSyntaxErrType);
return cmd_cntx.rb->SendError(WrongNumArgsError("XADD"), kSyntaxErrType);
}
string_view id = ArgS(args, 0);
if (!ParseID(id, true, 0, &add_opts.parsed_id)) {
return builder->SendError(kInvalidStreamId, kSyntaxErrType);
return cmd_cntx.rb->SendError(kInvalidStreamId, kSyntaxErrType);
}
args.remove_prefix(1);
@ -2477,8 +2539,8 @@ void StreamFamily::XAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
return OpAdd(t->GetOpArgs(shard), add_opts, args);
};
OpResult<streamID> add_result = tx->ScheduleSingleHopT(cb);
auto* rb = static_cast<RedisReplyBuilder*>(builder);
OpResult<streamID> add_result = cmd_cntx.tx->ScheduleSingleHopT(cb);
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (add_result) {
return rb->SendBulkString(StreamIdRepr(*add_result));
@ -2489,12 +2551,12 @@ void StreamFamily::XAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
}
if (add_result.status() == OpStatus::STREAM_ID_SMALL) {
return builder->SendError(
return cmd_cntx.rb->SendError(
"The ID specified in XADD is equal or smaller than "
"the target stream top item");
}
return builder->SendError(add_result.status());
return cmd_cntx.rb->SendError(add_result.status());
}
absl::InlinedVector<streamID, 8> GetXclaimIds(CmdArgList& args) {
@ -2567,13 +2629,13 @@ bool ParseXclaimOptions(CmdArgList& args, ClaimOpts& opts, SinkReplyBuilder* bui
return true;
}
void StreamFamily::XClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void StreamFamily::XClaim(CmdArgList args, const CommandContext& cmd_cntx) {
ClaimOpts opts;
string_view key = ArgS(args, 0);
opts.group = ArgS(args, 1);
opts.consumer = ArgS(args, 2);
if (!absl::SimpleAtoi(ArgS(args, 3), &opts.min_idle_time)) {
return builder->SendError(kSyntaxErr);
return cmd_cntx.rb->SendError(kSyntaxErr);
}
// Ignore negative min-idle-time
opts.min_idle_time = std::max(opts.min_idle_time, static_cast<int64>(0));
@ -2582,11 +2644,11 @@ void StreamFamily::XClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
auto ids = GetXclaimIds(args);
if (ids.empty()) {
// No ids given.
return builder->SendError(kInvalidStreamId, kSyntaxErrType);
return cmd_cntx.rb->SendError(kInvalidStreamId, kSyntaxErrType);
}
// parse the options
if (!ParseXclaimOptions(args, opts, builder))
if (!ParseXclaimOptions(args, opts, cmd_cntx.rb))
return;
if (auto now = GetCurrentTimeMs();
@ -2596,16 +2658,16 @@ void StreamFamily::XClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpClaim(t->GetOpArgs(shard), key, opts, absl::Span{ids.data(), ids.size()});
};
OpResult<ClaimInfo> result = tx->ScheduleSingleHopT(std::move(cb));
OpResult<ClaimInfo> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (!result) {
builder->SendError(result.status());
cmd_cntx.rb->SendError(result.status());
return;
}
StreamReplies{builder}.SendClaimInfo(result.value());
StreamReplies{cmd_cntx.rb}.SendClaimInfo(result.value());
}
void StreamFamily::XDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void StreamFamily::XDel(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);
@ -2615,7 +2677,7 @@ void StreamFamily::XDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
ParsedStreamId parsed_id;
string_view str_id = ArgS(args, i);
if (!ParseID(str_id, true, 0, &parsed_id)) {
return builder->SendError(kInvalidStreamId, kSyntaxErrType);
return cmd_cntx.rb->SendError(kInvalidStreamId, kSyntaxErrType);
}
ids[i] = parsed_id.val;
}
@ -2624,19 +2686,15 @@ void StreamFamily::XDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
return OpDel(t->GetOpArgs(shard), key, absl::Span{ids.data(), ids.size()});
};
OpResult<uint32_t> result = tx->ScheduleSingleHopT(cb);
OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(cb);
if (result || result.status() == OpStatus::KEY_NOTFOUND) {
return builder->SendLong(*result);
return cmd_cntx.rb->SendLong(*result);
}
builder->SendError(result.status());
cmd_cntx.rb->SendError(result.status());
}
void HelpSubCmd(facade::CmdArgParser* parser, Transaction* tx, SinkReplyBuilder* builder) {
XGroupHelp(parser->Tail(), tx, builder);
}
void StreamFamily::XGroup(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void StreamFamily::XGroup(CmdArgList args, const CommandContext& cmd_cntx) {
facade::CmdArgParser parser{args};
auto sub_cmd_func = parser.MapNext("HELP", &HelpSubCmd, "CREATE", &CreateGroup, "DESTROY",
@ -2644,14 +2702,13 @@ void StreamFamily::XGroup(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
"DELCONSUMER", &DelConsumer, "SETID", &SetId);
if (auto err = parser.Error(); err)
return builder->SendError(err->MakeReply());
return cmd_cntx.rb->SendError(err->MakeReply());
sub_cmd_func(&parser, tx, builder);
sub_cmd_func(&parser, cmd_cntx.tx, cmd_cntx.rb);
}
void StreamFamily::XInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
void StreamFamily::XInfo(CmdArgList args, const CommandContext& cmd_cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
if (sub_cmd == "HELP") {
@ -2674,7 +2731,8 @@ void StreamFamily::XInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
// We do not use transactional xemantics for xinfo since it's informational command.
auto cb = [&]() {
EngineShard* shard = EngineShard::tlocal();
DbContext db_context{cntx->ns, cntx->db_index(), GetCurrentTimeMs()};
DbContext db_context{cmd_cntx.conn_cntx->ns, cmd_cntx.conn_cntx->db_index(),
GetCurrentTimeMs()};
return OpListGroups(db_context, key, shard);
};
@ -2708,13 +2766,13 @@ void StreamFamily::XInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
}
return;
}
return builder->SendError(result.status());
return rb->SendError(result.status());
} else if (sub_cmd == "STREAM") {
int full = 0;
size_t count = 10; // default count for xinfo streams
if (args.size() == 4 || args.size() > 5) {
return builder->SendError(
return rb->SendError(
"unknown subcommand or wrong number of arguments for 'STREAM'. Try XINFO HELP.");
}
@ -2722,30 +2780,30 @@ void StreamFamily::XInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
full = 1;
string full_arg = absl::AsciiStrToUpper(ArgS(args, 2));
if (full_arg != "FULL") {
return builder->SendError(
return rb->SendError(
"unknown subcommand or wrong number of arguments for 'STREAM'. Try XINFO HELP.");
}
if (args.size() > 3) {
string count_arg = absl::AsciiStrToUpper(ArgS(args, 3));
string_view count_value_arg = ArgS(args, 4);
if (count_arg != "COUNT") {
return builder->SendError(
return rb->SendError(
"unknown subcommand or wrong number of arguments for 'STREAM'. Try XINFO HELP.");
}
if (!absl::SimpleAtoi(count_value_arg, &count)) {
return builder->SendError(kInvalidIntErr);
return rb->SendError(kInvalidIntErr);
}
}
}
auto cb = [&]() {
EngineShard* shard = EngineShard::tlocal();
return OpStreams(DbContext{cntx->ns, cntx->db_index(), GetCurrentTimeMs()}, key, shard,
full, count);
return OpStreams(
DbContext{cmd_cntx.conn_cntx->ns, cmd_cntx.conn_cntx->db_index(), GetCurrentTimeMs()},
key, shard, full, count);
};
auto* rb = static_cast<RedisReplyBuilder*>(builder);
OpResult<StreamInfo> sinfo = shard_set->Await(sid, std::move(cb));
if (sinfo) {
if (full) {
@ -2865,12 +2923,13 @@ void StreamFamily::XInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
}
return;
}
return builder->SendError(sinfo.status());
return rb->SendError(sinfo.status());
} else if (sub_cmd == "CONSUMERS") {
string_view stream_name = ArgS(args, 1);
string_view group_name = ArgS(args, 2);
auto cb = [&]() {
return OpConsumers(DbContext{cntx->ns, cntx->db_index(), GetCurrentTimeMs()},
return OpConsumers(
DbContext{cmd_cntx.conn_cntx->ns, cmd_cntx.conn_cntx->db_index(), GetCurrentTimeMs()},
EngineShard::tlocal(), stream_name, group_name);
};
@ -2889,106 +2948,48 @@ void StreamFamily::XInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
return;
}
if (result.status() == OpStatus::INVALID_VALUE) {
return builder->SendError(NoGroupError(stream_name, group_name));
return rb->SendError(NoGroupError(stream_name, group_name));
}
return builder->SendError(result.status());
return rb->SendError(result.status());
}
}
return builder->SendError(UnknownSubCmd(sub_cmd, "XINFO"));
return rb->SendError(UnknownSubCmd(sub_cmd, "XINFO"));
}
void StreamFamily::XLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void StreamFamily::XLen(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) { return OpLen(t->GetOpArgs(shard), key); };
OpResult<uint32_t> result = tx->ScheduleSingleHopT(cb);
OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(cb);
if (result || result.status() == OpStatus::KEY_NOTFOUND) {
return builder->SendLong(*result);
return cmd_cntx.rb->SendLong(*result);
}
return builder->SendError(result.status());
return cmd_cntx.rb->SendError(result.status());
}
bool ParseXpendingOptions(CmdArgList& args, PendingOpts& opts, SinkReplyBuilder* builder) {
size_t id_indx = 0;
string arg = absl::AsciiStrToUpper(ArgS(args, id_indx));
if (arg == "IDLE" && args.size() > 4) {
id_indx++;
if (!absl::SimpleAtoi(ArgS(args, id_indx), &opts.min_idle_time)) {
builder->SendError(kInvalidIntErr, kSyntaxErrType);
return false;
}
// Ignore negative min_idle_time
opts.min_idle_time = std::max(opts.min_idle_time, static_cast<int64_t>(0));
args.remove_prefix(2);
id_indx = 0;
}
if (args.size() < 3) {
builder->SendError(WrongNumArgsError("XPENDING"), kSyntaxErrType);
return false;
}
// Parse start and end
RangeId rs, re;
string_view start = ArgS(args, id_indx);
id_indx++;
string_view end = ArgS(args, id_indx);
if (!ParseRangeId(start, &rs) || !ParseRangeId(end, &re)) {
builder->SendError(kInvalidStreamId, kSyntaxErrType);
return false;
}
if (rs.exclude && streamIncrID(&rs.parsed_id.val) != C_OK) {
builder->SendError("invalid start ID for the interval", kSyntaxErrType);
return false;
}
if (re.exclude && streamDecrID(&re.parsed_id.val) != C_OK) {
builder->SendError("invalid end ID for the interval", kSyntaxErrType);
return false;
}
id_indx++;
opts.start = rs.parsed_id;
opts.end = re.parsed_id;
// Parse count
if (!absl::SimpleAtoi(ArgS(args, id_indx), &opts.count)) {
builder->SendError(kInvalidIntErr, kSyntaxErrType);
return false;
}
// Ignore negative count value
opts.count = std::max(opts.count, static_cast<int64_t>(0));
if (args.size() - id_indx - 1) {
id_indx++;
opts.consumer_name = ArgS(args, id_indx);
}
return true;
}
void StreamFamily::XPending(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void StreamFamily::XPending(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0);
PendingOpts opts;
opts.group_name = ArgS(args, 1);
args.remove_prefix(2);
if (!args.empty() && !ParseXpendingOptions(args, opts, builder)) {
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (!args.empty() && !ParseXpendingOptions(args, opts, rb)) {
return;
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpPending(t->GetOpArgs(shard), key, opts);
};
OpResult<PendingResult> op_result = tx->ScheduleSingleHopT(cb);
OpResult<PendingResult> op_result = cmd_cntx.tx->ScheduleSingleHopT(cb);
if (!op_result) {
if (op_result.status() == OpStatus::SKIPPED)
return builder->SendError(NoGroupError(key, opts.group_name));
return builder->SendError(op_result.status());
return rb->SendError(NoGroupError(key, opts.group_name));
return rb->SendError(op_result.status());
}
const PendingResult& result = op_result.value();
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (std::holds_alternative<PendingReducedResult>(result)) {
const auto& res = std::get<PendingReducedResult>(result);
rb->StartArray(4);
@ -3024,20 +3025,20 @@ void StreamFamily::XPending(CmdArgList args, Transaction* tx, SinkReplyBuilder*
}
}
void StreamFamily::XRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void StreamFamily::XRange(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = args[0];
string_view start = args[1];
string_view end = args[2];
XRangeGeneric(key, start, end, args.subspan(3), false, tx, builder);
XRangeGeneric(key, start, end, args.subspan(3), false, cmd_cntx.tx, cmd_cntx.rb);
}
void StreamFamily::XRevRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void StreamFamily::XRevRange(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = args[0];
string_view start = args[1];
string_view end = args[2];
XRangeGeneric(key, end, start, args.subspan(3), true, tx, builder);
XRangeGeneric(key, end, start, args.subspan(3), true, cmd_cntx.tx, cmd_cntx.rb);
}
variant<bool, facade::ErrorReply> HasEntries2(const OpArgs& op_args, string_view skey,
@ -3108,46 +3109,44 @@ variant<bool, facade::ErrorReply> HasEntries2(const OpArgs& op_args, string_view
return streamCompareID(&last_id, &requested_sitem.id.val) >= 0;
}
void StreamFamily::XRead(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
return XReadGeneric2(args, false, tx, builder, cntx);
void StreamFamily::XRead(CmdArgList args, const CommandContext& cmd_cntx) {
return XReadGeneric2(args, false, cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx);
}
void StreamFamily::XReadGroup(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
return XReadGeneric2(args, true, tx, builder, cntx);
void StreamFamily::XReadGroup(CmdArgList args, const CommandContext& cmd_cntx) {
return XReadGeneric2(args, true, cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx);
}
void StreamFamily::XSetId(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void StreamFamily::XSetId(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0);
string_view idstr = ArgS(args, 1);
ParsedStreamId parsed_id;
if (!ParseID(idstr, true, 0, &parsed_id)) {
return builder->SendError(kInvalidStreamId, kSyntaxErrType);
return cmd_cntx.rb->SendError(kInvalidStreamId, kSyntaxErrType);
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpSetId2(t->GetOpArgs(shard), key, parsed_id.val);
};
OpStatus result = tx->ScheduleSingleHop(std::move(cb));
OpStatus result = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
switch (result) {
case OpStatus::STREAM_ID_SMALL:
return builder->SendError(
return cmd_cntx.rb->SendError(
"The ID specified in XSETID is smaller than the "
"target stream top item");
case OpStatus::ENTRIES_ADDED_SMALL:
return builder->SendError(
return cmd_cntx.rb->SendError(
"The entries_added specified in XSETID is smaller than "
"the target stream length");
default:
return builder->SendError(result);
return cmd_cntx.rb->SendError(result);
}
}
void StreamFamily::XTrim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
auto parse_resp = ParseAddOrTrimArgsOrReply(args, true, builder);
void StreamFamily::XTrim(CmdArgList args, const CommandContext& cmd_cntx) {
auto parse_resp = ParseAddOrTrimArgsOrReply(args, true, cmd_cntx.rb);
if (!parse_resp) {
return;
}
@ -3158,14 +3157,14 @@ void StreamFamily::XTrim(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
return OpTrim(t->GetOpArgs(shard), trim_opts);
};
OpResult<int64_t> trim_result = tx->ScheduleSingleHopT(cb);
OpResult<int64_t> trim_result = cmd_cntx.tx->ScheduleSingleHopT(cb);
if (trim_result) {
return builder->SendLong(*trim_result);
return cmd_cntx.rb->SendLong(*trim_result);
}
return builder->SendError(trim_result.status());
return cmd_cntx.rb->SendError(trim_result.status());
}
void StreamFamily::XAck(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void StreamFamily::XAck(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0);
string_view group = ArgS(args, 1);
args.remove_prefix(2);
@ -3175,7 +3174,7 @@ void StreamFamily::XAck(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
ParsedStreamId parsed_id;
string_view str_id = ArgS(args, i);
if (!ParseID(str_id, true, 0, &parsed_id)) {
return builder->SendError(kInvalidStreamId, kSyntaxErrType);
return cmd_cntx.rb->SendError(kInvalidStreamId, kSyntaxErrType);
}
ids[i] = parsed_id.val;
}
@ -3184,33 +3183,36 @@ void StreamFamily::XAck(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
return OpAck(t->GetOpArgs(shard), key, group, absl::Span{ids.data(), ids.size()});
};
OpResult<uint32_t> result = tx->ScheduleSingleHopT(cb);
OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(cb);
if (result || result.status() == OpStatus::KEY_NOTFOUND) {
return builder->SendLong(*result);
return cmd_cntx.rb->SendLong(*result);
}
builder->SendError(result.status());
return;
cmd_cntx.rb->SendError(result.status());
}
void StreamFamily::XAutoClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void StreamFamily::XAutoClaim(CmdArgList args, const CommandContext& cmd_cntx) {
ClaimOpts opts;
string_view key = ArgS(args, 0);
opts.group = ArgS(args, 1);
opts.consumer = ArgS(args, 2);
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (!absl::SimpleAtoi(ArgS(args, 3), &opts.min_idle_time)) {
return builder->SendError(kSyntaxErr);
return rb->SendError(kSyntaxErr);
}
opts.min_idle_time = std::max((int64)0, opts.min_idle_time);
string_view start = ArgS(args, 4);
RangeId rs;
if (!ParseRangeId(start, &rs)) {
return;
return rb->SendError(kSyntaxErr);
}
if (rs.exclude && streamDecrID(&rs.parsed_id.val) != C_OK) {
return builder->SendError("invalid start ID for the interval", kSyntaxErrType);
return rb->SendError("invalid start ID for the interval", kSyntaxErrType);
}
opts.start = rs.parsed_id.val;
@ -3223,10 +3225,10 @@ void StreamFamily::XAutoClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder
if (arg == "COUNT") {
arg = ArgS(args, ++i);
if (!absl::SimpleAtoi(arg, &opts.count)) {
return builder->SendError(kInvalidIntErr);
return rb->SendError(kInvalidIntErr);
}
if (opts.count <= 0) {
return builder->SendError("COUNT must be > 0");
return rb->SendError("COUNT must be > 0");
}
continue;
}
@ -3234,26 +3236,27 @@ void StreamFamily::XAutoClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder
if (arg == "JUSTID") {
opts.flags |= kClaimJustID;
} else {
return builder->SendError("Unknown argument given for XAUTOCLAIM command", kSyntaxErr);
return cmd_cntx.rb->SendError("Unknown argument given for XAUTOCLAIM command", kSyntaxErr);
}
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpAutoClaim(t->GetOpArgs(shard), key, opts);
};
OpResult<ClaimInfo> result = tx->ScheduleSingleHopT(std::move(cb));
OpResult<ClaimInfo> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result.status() == OpStatus::KEY_NOTFOUND) {
builder->SendError(NoGroupOrKey(key, opts.group));
rb->SendError(NoGroupOrKey(key, opts.group));
return;
}
if (!result) {
builder->SendError(result.status());
rb->SendError(result.status());
return;
}
ClaimInfo cresult = result.value();
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(3);
rb->SendBulkString(StreamIdRepr(cresult.end_id));
StreamReplies{rb}.SendClaimInfo(cresult);

View file

@ -13,7 +13,7 @@ class SinkReplyBuilder;
namespace dfly {
class CommandRegistry;
class ConnectionContext;
struct CommandContext;
class StreamFamily {
public:
@ -22,24 +22,21 @@ class StreamFamily {
private:
using SinkReplyBuilder = facade::SinkReplyBuilder;
static void XAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void XClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void XDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void XGroup(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void XInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
static void XLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void XPending(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void XRevRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void XRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void XRead(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
static void XReadGroup(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
static void XSetId(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void XTrim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void XAck(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void XAutoClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void XAdd(CmdArgList args, const CommandContext& cmd_cntx);
static void XClaim(CmdArgList args, const CommandContext& cmd_cntx);
static void XDel(CmdArgList args, const CommandContext& cmd_cntx);
static void XGroup(CmdArgList args, const CommandContext& cmd_cntx);
static void XInfo(CmdArgList args, const CommandContext& cmd_cntx);
static void XLen(CmdArgList args, const CommandContext& cmd_cntx);
static void XPending(CmdArgList args, const CommandContext& cmd_cntx);
static void XRevRange(CmdArgList args, const CommandContext& cmd_cntx);
static void XRange(CmdArgList args, const CommandContext& cmd_cntx);
static void XRead(CmdArgList args, const CommandContext& cmd_cntx);
static void XReadGroup(CmdArgList args, const CommandContext& cmd_cntx);
static void XSetId(CmdArgList args, const CommandContext& cmd_cntx);
static void XTrim(CmdArgList args, const CommandContext& cmd_cntx);
static void XAck(CmdArgList args, const CommandContext& cmd_cntx);
static void XAutoClaim(CmdArgList args, const CommandContext& cmd_cntx);
};
} // namespace dfly

View file

@ -4,7 +4,7 @@
#pragma once
#include "server/command_registry.h"
#include "facade/facade_types.h"
namespace facade {
class SinkReplyBuilder;
@ -12,6 +12,9 @@ class SinkReplyBuilder;
namespace dfly {
struct CommandContext;
class CommandRegistry;
class StringFamily {
public:
static void Register(CommandRegistry* registry);