From 010bd8add4f8c8eb100db11861f8b0762e73a88d Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 28 Nov 2024 18:44:01 +0200 Subject: [PATCH] chore: change the interface of stream and server commands (#4219) --- src/server/common.h | 3 +- src/server/dflycmd.cc | 2 +- src/server/server_family.cc | 223 ++++++++++++-------------- src/server/server_family.h | 59 +++---- src/server/stream_family.cc | 305 ++++++++++++++++++------------------ src/server/stream_family.h | 35 ++--- src/server/string_family.h | 5 +- 7 files changed, 300 insertions(+), 332 deletions(-) diff --git a/src/server/common.h b/src/server/common.h index 1772f4065..8460b6e78 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -14,7 +14,6 @@ #include #include -#include "base/logging.h" #include "core/compact_object.h" #include "facade/facade_types.h" #include "facade/op_status.h" @@ -367,7 +366,7 @@ struct BorrowedInterpreter { // Give up ownership of the interpreter, it must be returned manually. Interpreter* Release() && { - DCHECK(owned_); + assert(owned_); owned_ = false; return interpreter_; } diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 2d9a28c73..c7eadc6c0 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -492,7 +492,7 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext VLOG(1) << "Takeover accepted, shutting down."; std::string save_arg = "NOSAVE"; MutableSlice sargs(save_arg); - return sf_->ShutdownCmd(CmdArgList(&sargs, 1), nullptr, rb); + return sf_->ShutdownCmd(CmdArgList(&sargs, 1), CommandContext{nullptr, rb, nullptr}); } void DflyCmd::Expire(CmdArgList args, Transaction* tx, RedisReplyBuilder* rb) { diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 8186ef8b7..a4052c62f 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -230,22 +230,10 @@ namespace { const auto kRedisVersion = "6.2.11"; -using EngineFunc = void (ServerFamily::*)(CmdArgList args, Transaction* tx, - SinkReplyBuilder* builder, ConnectionContext* cntx); +using EngineFunc = void (ServerFamily::*)(CmdArgList args, const CommandContext&); -using EngineFunc2 = void (ServerFamily::*)(CmdArgList args, Transaction* tx, - SinkReplyBuilder* builder); - -inline CommandId::Handler HandlerFunc(ServerFamily* se, EngineFunc f) { - return [=](CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { - return (se->*f)(args, tx, builder, cntx); - }; -} - -inline auto HandlerFunc(ServerFamily* se, EngineFunc2 f) { - return [=](CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - return (se->*f)(args, tx, builder); - }; +inline CommandId::Handler3 HandlerFunc(ServerFamily* se, EngineFunc f) { + return [=](CmdArgList args, const CommandContext& cntx) { return (se->*f)(args, cntx); }; } using CI = CommandId; @@ -1741,18 +1729,18 @@ LastSaveInfo ServerFamily::GetLastSaveInfo() const { return last_save_info_; } -void ServerFamily::DbSize(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ServerFamily::DbSize(CmdArgList args, const CommandContext& cmd_cntx) { atomic_ulong num_keys{0}; shard_set->RunBriefInParallel( [&](EngineShard* shard) { - auto db_size = cntx->ns->GetDbSlice(shard->shard_id()).DbSize(cntx->conn_state.db_index); + auto db_size = cmd_cntx.conn_cntx->ns->GetDbSlice(shard->shard_id()) + .DbSize(cmd_cntx.conn_cntx->conn_state.db_index); num_keys.fetch_add(db_size, memory_order_relaxed); }, [](ShardId) { return true; }); - return builder->SendLong(num_keys.load(memory_order_relaxed)); + return cmd_cntx.rb->SendLong(num_keys.load(memory_order_relaxed)); } void ServerFamily::CancelBlockingOnThread(std::function status_cb) { @@ -1802,25 +1790,23 @@ void ServerFamily::SendInvalidationMessages() const { } } -void ServerFamily::FlushDb(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - DCHECK(tx); - Drakarys(tx, tx->GetDbIndex()); +void ServerFamily::FlushDb(CmdArgList args, const CommandContext& cmd_cntx) { + DCHECK(cmd_cntx.tx); + Drakarys(cmd_cntx.tx, cmd_cntx.tx->GetDbIndex()); SendInvalidationMessages(); - builder->SendOk(); + cmd_cntx.rb->SendOk(); } -void ServerFamily::FlushAll(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ServerFamily::FlushAll(CmdArgList args, const CommandContext& cmd_cntx) { if (args.size() > 1) { - builder->SendError(kSyntaxErr); + cmd_cntx.rb->SendError(kSyntaxErr); return; } - DCHECK(tx); - Drakarys(tx, DbSlice::kDbAll); + DCHECK(cmd_cntx.tx); + Drakarys(cmd_cntx.tx, DbSlice::kDbAll); SendInvalidationMessages(); - builder->SendOk(); + cmd_cntx.rb->SendOk(); } bool ServerFamily::DoAuth(ConnectionContext* cntx, std::string_view username, @@ -1840,12 +1826,12 @@ bool ServerFamily::DoAuth(ConnectionContext* cntx, std::string_view username, return is_authorized; } -void ServerFamily::Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ServerFamily::Auth(CmdArgList args, const CommandContext& cmd_cntx) { if (args.size() > 2) { - return builder->SendError(kSyntaxErr); + return cmd_cntx.rb->SendError(kSyntaxErr); } + ConnectionContext* cntx = cmd_cntx.conn_cntx; // non admin port auth if (!cntx->conn()->IsPrivileged()) { const bool one_arg = args.size() == 1; @@ -1853,16 +1839,16 @@ void ServerFamily::Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil const size_t index = one_arg ? 0 : 1; std::string_view password = facade::ToSV(args[index]); if (DoAuth(cntx, username, password)) { - return builder->SendOk(); + return cmd_cntx.rb->SendOk(); } auto& log = ServerState::tlocal()->acl_log; using Reason = acl::AclLog::Reason; log.Add(*cntx, "AUTH", Reason::AUTH, std::string(username)); - return builder->SendError(facade::kAuthRejected); + return cmd_cntx.rb->SendError(facade::kAuthRejected); } if (!cntx->req_auth) { - return builder->SendError( + return cmd_cntx.rb->SendError( "AUTH called without any password configured for " "admin port. Are you sure your configuration is correct?"); } @@ -1870,16 +1856,17 @@ void ServerFamily::Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil string_view pass = ArgS(args, 0); if (pass == GetPassword()) { cntx->authenticated = true; - builder->SendOk(); + cmd_cntx.rb->SendOk(); } else { - builder->SendError(facade::kAuthRejected); + cmd_cntx.rb->SendError(facade::kAuthRejected); } } -void ServerFamily::Client(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ServerFamily::Client(CmdArgList args, const CommandContext& cmd_cntx) { string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); CmdArgList sub_args = args.subspan(1); + auto* builder = cmd_cntx.rb; + auto* cntx = cmd_cntx.conn_cntx; if (sub_cmd == "SETNAME") { return ClientSetName(sub_args, builder, cntx); @@ -1894,7 +1881,7 @@ void ServerFamily::Client(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu } else if (sub_cmd == "KILL") { return ClientKill(sub_args, absl::MakeSpan(listeners_), builder, cntx); } else if (sub_cmd == "CACHING") { - return ClientCaching(sub_args, builder, tx, cntx); + return ClientCaching(sub_args, builder, cmd_cntx.tx, cntx); } else if (sub_cmd == "SETINFO") { return ClientSetInfo(sub_args, builder, cntx); } else if (sub_cmd == "ID") { @@ -1905,10 +1892,10 @@ void ServerFamily::Client(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu return builder->SendError(UnknownSubCmd(sub_cmd, "CLIENT"), kSyntaxErrType); } -void ServerFamily::Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ServerFamily::Config(CmdArgList args, const CommandContext& cmd_cntx) { string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); + auto* builder = static_cast(cmd_cntx.rb); if (sub_cmd == "HELP") { string_view help_arr[] = { "CONFIG [ [value] [opt] ...]. Subcommands are:", @@ -1922,8 +1909,7 @@ void ServerFamily::Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu " Prints this help.", }; - auto* rb = static_cast(builder); - return rb->SendSimpleStrArr(help_arr); + return builder->SendSimpleStrArr(help_arr); } if (sub_cmd == "SET") { @@ -1980,23 +1966,21 @@ void ServerFamily::Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu } if (sub_cmd == "RESETSTAT") { - ResetStat(cntx->ns); + ResetStat(cmd_cntx.conn_cntx->ns); return builder->SendOk(); } else { return builder->SendError(UnknownSubCmd(sub_cmd, "CONFIG"), kSyntaxErrType); } } -void ServerFamily::Debug(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - DebugCmd dbg_cmd{this, &service_.cluster_family(), cntx}; +void ServerFamily::Debug(CmdArgList args, const CommandContext& cmd_cntx) { + DebugCmd dbg_cmd{this, &service_.cluster_family(), cmd_cntx.conn_cntx}; - return dbg_cmd.Run(args, builder); + return dbg_cmd.Run(args, cmd_cntx.rb); } -void ServerFamily::Memory(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - MemoryCmd mem_cmd{this, builder, cntx}; +void ServerFamily::Memory(CmdArgList args, const CommandContext& cmd_cntx) { + MemoryCmd mem_cmd{this, cmd_cntx.rb, cmd_cntx.conn_cntx}; return mem_cmd.Run(args); } @@ -2039,42 +2023,40 @@ std::optional ServerFamily::GetVersionAndBasename // BGSAVE [DF|RDB] [basename] // TODO add missing [SCHEDULE] -void ServerFamily::BgSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - auto maybe_res = GetVersionAndBasename(args, builder); +void ServerFamily::BgSave(CmdArgList args, const CommandContext& cmd_cntx) { + auto maybe_res = GetVersionAndBasename(args, cmd_cntx.rb); if (!maybe_res) { return; } const auto [version, basename] = *maybe_res; - if (auto ec = DoSaveCheckAndStart(version, basename, tx); ec) { - builder->SendError(ec.Format()); + if (auto ec = DoSaveCheckAndStart(version, basename, cmd_cntx.tx); ec) { + cmd_cntx.rb->SendError(ec.Format()); return; } bg_save_fb_.JoinIfNeeded(); bg_save_fb_ = fb2::Fiber("bg_save_fiber", &ServerFamily::BgSaveFb, this, - boost::intrusive_ptr(tx)); - builder->SendOk(); + boost::intrusive_ptr(cmd_cntx.tx)); + cmd_cntx.rb->SendOk(); } // SAVE [DF|RDB] [basename] // Allows saving the snapshot of the dataset on disk, potentially overriding the format // and the snapshot name. -void ServerFamily::Save(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - auto maybe_res = GetVersionAndBasename(args, builder); +void ServerFamily::Save(CmdArgList args, const CommandContext& cmd_cntx) { + auto maybe_res = GetVersionAndBasename(args, cmd_cntx.rb); if (!maybe_res) { return; } const auto [version, basename] = *maybe_res; - GenericError ec = DoSave(version, basename, tx); + GenericError ec = DoSave(version, basename, cmd_cntx.tx); if (ec) { - builder->SendError(ec.Format()); + cmd_cntx.rb->SendError(ec.Format()); } else { - builder->SendOk(); + cmd_cntx.rb->SendOk(); } } @@ -2206,10 +2188,9 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const { return result; } -void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) { if (args.size() > 1) { - return builder->SendError(kSyntaxErr); + return cmd_cntx.rb->SendError(kSyntaxErr); } string section; @@ -2254,7 +2235,7 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil Metrics m; // Save time by not calculating metrics if we don't need them. if (!(section == "SERVER" || section == "REPLICATION")) { - m = GetMetrics(cntx->ns); + m = GetMetrics(cmd_cntx.conn_cntx->ns); } DbStats total; @@ -2616,12 +2597,11 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil if (should_enter("CLUSTER")) { append("cluster_enabled", cluster::IsClusterEnabledOrEmulated()); } - auto* rb = static_cast(builder); + auto* rb = static_cast(cmd_cntx.rb); rb->SendVerbatimString(info); } -void ServerFamily::Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ServerFamily::Hello(CmdArgList args, const CommandContext& cmd_cntx) { // If no arguments are provided default to RESP2. bool is_resp3 = false; bool has_auth = false; @@ -2630,12 +2610,13 @@ void ServerFamily::Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui string_view password; string_view clientname; + auto* rb = static_cast(cmd_cntx.rb); if (args.size() > 0) { string_view proto_version = ArgS(args, 0); is_resp3 = proto_version == "3"; bool valid_proto_version = proto_version == "2" || is_resp3; if (!valid_proto_version) { - builder->SendError(UnknownCmd("HELLO", args)); + rb->SendError(UnknownCmd("HELLO", args)); return; } @@ -2652,18 +2633,19 @@ void ServerFamily::Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui clientname = ArgS(args, i + 1); i += 1; } else { - builder->SendError(kSyntaxErr); + rb->SendError(kSyntaxErr); return; } } } + auto* cntx = cmd_cntx.conn_cntx; if (has_auth && !DoAuth(cntx, username, password)) { - return builder->SendError(facade::kAuthRejected); + return rb->SendError(facade::kAuthRejected); } if (cntx->req_auth && !cntx->authenticated) { - builder->SendError( + rb->SendError( "-NOAUTH HELLO must be called with the client already " "authenticated, otherwise the HELLO AUTH " "option can be used to authenticate the client and " @@ -2675,7 +2657,6 @@ void ServerFamily::Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui cntx->conn()->SetName(string{clientname}); } - auto* rb = static_cast(builder); int proto_version = 2; if (is_resp3) { proto_version = 3; @@ -2703,27 +2684,26 @@ void ServerFamily::Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui rb->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave"); } -void ServerFamily::AddReplicaOf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) { util::fb2::LockGuard lk(replicaof_mu_); if (ServerState::tlocal()->is_master) { - builder->SendError("Calling ADDREPLICAOFF allowed only after server is already a replica"); + cmd_cntx.rb->SendError("Calling ADDREPLICAOFF allowed only after server is already a replica"); return; } CHECK(replica_); - auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder); + auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, cmd_cntx.rb); if (!replicaof_args.has_value()) { return; } if (replicaof_args->IsReplicaOfNoOne()) { - return builder->SendError("ADDREPLICAOF does not support no one"); + return cmd_cntx.rb->SendError("ADDREPLICAOF does not support no one"); } LOG(INFO) << "Add Replica " << *replicaof_args; auto add_replica = make_unique(replicaof_args->host, replicaof_args->port, &service_, master_replid(), replicaof_args->slot_range); - error_code ec = add_replica->Start(builder); + error_code ec = add_replica->Start(cmd_cntx.rb); if (!ec) { cluster_replicas_.push_back(std::move(add_replica)); } @@ -2827,9 +2807,8 @@ void ServerFamily::StopAllClusterReplicas() { cluster_replicas_.clear(); } -void ServerFamily::ReplicaOf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - ReplicaOfInternal(args, tx, builder, ActionOnConnectionFail::kReturnOnError); +void ServerFamily::ReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) { + ReplicaOfInternal(args, cmd_cntx.tx, cmd_cntx.rb, ActionOnConnectionFail::kReturnOnError); } void ServerFamily::Replicate(string_view host, string_view port) { @@ -2847,8 +2826,7 @@ void ServerFamily::Replicate(string_view host, string_view port) { // REPLTAKEOVER [SAVE] // SAVE is used only by tests. -void ServerFamily::ReplTakeOver(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx) { VLOG(1) << "ReplTakeOver start"; CmdArgParser parser{args}; @@ -2856,6 +2834,7 @@ void ServerFamily::ReplTakeOver(CmdArgList args, Transaction* tx, SinkReplyBuild int timeout_sec = parser.Next(); bool save_flag = static_cast(parser.Check("SAVE")); + auto* builder = cmd_cntx.rb; if (parser.HasNext()) return builder->SendError(absl::StrCat("Unsupported option:", string_view(parser.Next()))); @@ -2892,8 +2871,8 @@ void ServerFamily::ReplTakeOver(CmdArgList args, Transaction* tx, SinkReplyBuild return builder->SendOk(); } -void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ServerFamily::ReplConf(CmdArgList args, const CommandContext& cmd_cntx) { + auto* builder = cmd_cntx.rb; { util::fb2::LockGuard lk(replicaof_mu_); if (!ServerState::tlocal()->is_master) { @@ -2909,6 +2888,7 @@ void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder* if (args.size() % 2 == 1) return err_cb(); + auto* cntx = cmd_cntx.conn_cntx; for (unsigned i = 0; i < args.size(); i += 2) { DCHECK_LT(i + 1, args.size()); @@ -2985,9 +2965,8 @@ void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder* return builder->SendOk(); } -void ServerFamily::Role(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - auto* rb = static_cast(builder); +void ServerFamily::Role(CmdArgList args, const CommandContext& cmd_cntx) { + auto* rb = static_cast(cmd_cntx.rb); util::fb2::LockGuard lk(replicaof_mu_); // Thread local var is_master is updated under mutex replicaof_mu_ together with replica_, // ensuring eventual consistency of is_master. When determining if the server is a replica and @@ -3030,24 +3009,21 @@ void ServerFamily::Role(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil } } -void ServerFamily::Script(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - script_mgr_->Run(std::move(args), tx, builder, cntx); +void ServerFamily::Script(CmdArgList args, const CommandContext& cmd_cntx) { + script_mgr_->Run(std::move(args), cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx); } -void ServerFamily::LastSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ServerFamily::LastSave(CmdArgList args, const CommandContext& cmd_cntx) { time_t save_time; { util::fb2::LockGuard lk(save_mu_); save_time = last_save_info_.save_time; } - builder->SendLong(save_time); + cmd_cntx.rb->SendLong(save_time); } -void ServerFamily::Latency(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - auto* rb = static_cast(builder); +void ServerFamily::Latency(CmdArgList args, const CommandContext& cmd_cntx) { + auto* rb = static_cast(cmd_cntx.rb); string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); if (sub_cmd == "LATEST") { @@ -3055,12 +3031,12 @@ void ServerFamily::Latency(CmdArgList args, Transaction* tx, SinkReplyBuilder* b } LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported"; - builder->SendError(kSyntaxErr); + rb->SendError(kSyntaxErr); } -void ServerFamily::ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void ServerFamily::ShutdownCmd(CmdArgList args, const CommandContext& cmd_cntx) { if (args.size() > 1) { - builder->SendError(kSyntaxErr); + cmd_cntx.rb->SendError(kSyntaxErr); return; } @@ -3070,7 +3046,7 @@ void ServerFamily::ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilde } else if (absl::EqualsIgnoreCase(sub_cmd, "NOSAVE")) { save_on_shutdown_ = false; } else { - builder->SendError(kSyntaxErr); + cmd_cntx.rb->SendError(kSyntaxErr); return; } } @@ -3079,18 +3055,17 @@ void ServerFamily::ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilde [](ProactorBase* pb) { ServerState::tlocal()->EnterLameDuck(); }); CHECK_NOTNULL(acceptor_)->Stop(); - builder->SendOk(); + cmd_cntx.rb->SendOk(); } -void ServerFamily::Dfly(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - dfly_cmd_->Run(args, tx, static_cast(builder), cntx); +void ServerFamily::Dfly(CmdArgList args, const CommandContext& cmd_cntx) { + dfly_cmd_->Run(args, cmd_cntx.tx, static_cast(cmd_cntx.rb), + cmd_cntx.conn_cntx); } -void ServerFamily::SlowLog(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ServerFamily::SlowLog(CmdArgList args, const CommandContext& cmd_cntx) { string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); - + auto* rb = static_cast(cmd_cntx.rb); if (sub_cmd == "HELP") { string_view help[] = { "SLOWLOG [ [value] [opt] ...]. Subcommands are:", @@ -3106,7 +3081,7 @@ void ServerFamily::SlowLog(CmdArgList args, Transaction* tx, SinkReplyBuilder* b "HELP", " Prints this help.", }; - auto* rb = static_cast(builder); + rb->SendSimpleStrArr(help); return; } @@ -3117,28 +3092,28 @@ void ServerFamily::SlowLog(CmdArgList args, Transaction* tx, SinkReplyBuilder* b lengths[index] = ServerState::tlocal()->GetSlowLog().Length(); }); int sum = std::accumulate(lengths.begin(), lengths.end(), 0); - return builder->SendLong(sum); + return rb->SendLong(sum); } if (sub_cmd == "RESET") { service_.proactor_pool().AwaitFiberOnAll( [](auto index, auto* context) { ServerState::tlocal()->GetSlowLog().Reset(); }); - return builder->SendOk(); + return rb->SendOk(); } if (sub_cmd == "GET") { - return SlowLogGet(args, sub_cmd, &service_.proactor_pool(), builder); + return SlowLogGet(args, sub_cmd, &service_.proactor_pool(), rb); } - builder->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType); + rb->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType); } -void ServerFamily::Module(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ServerFamily::Module(CmdArgList args, const CommandContext& cmd_cntx) { string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); - if (sub_cmd != "LIST") - return builder->SendError(kSyntaxErr); + auto* rb = static_cast(cmd_cntx.rb); + + if (sub_cmd != "LIST") + return rb->SendError(kSyntaxErr); - auto* rb = static_cast(builder); rb->StartArray(2); // Json diff --git a/src/server/server_family.h b/src/server/server_family.h index 5d7302a2b..2e47283c4 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -39,7 +39,7 @@ namespace journal { class Journal; } // namespace journal -class ConnectionContext; +struct CommandContext; class CommandRegistry; class Service; class ScriptMgr; @@ -164,7 +164,7 @@ class ServerFamily { void Shutdown() ABSL_LOCKS_EXCLUDED(replicaof_mu_); // Public because is used by DflyCmd. - void ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + void ShutdownCmd(CmdArgList args, const CommandContext& cmd_cntx); Service& service() { return service_; @@ -259,40 +259,31 @@ class ServerFamily { return shard_set->size(); } - void Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); - void Client(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); - void Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); - void DbSize(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); - void Debug(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); - void Dfly(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); - void Memory(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); - void FlushDb(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - void FlushAll(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - void Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) + void Auth(CmdArgList args, const CommandContext& cmd_cntx); + void Client(CmdArgList args, const CommandContext& cmd_cntx); + void Config(CmdArgList args, const CommandContext& cmd_cntx); + void DbSize(CmdArgList args, const CommandContext& cmd_cntx); + void Debug(CmdArgList args, const CommandContext& cmd_cntx); + void Dfly(CmdArgList args, const CommandContext& cmd_cntx); + void Memory(CmdArgList args, const CommandContext& cmd_cntx); + void FlushDb(CmdArgList args, const CommandContext& cmd_cntx); + void FlushAll(CmdArgList args, const CommandContext& cmd_cntx); + void Info(CmdArgList args, const CommandContext& cmd_cntx) ABSL_LOCKS_EXCLUDED(save_mu_, replicaof_mu_); - void Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); - void LastSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) ABSL_LOCKS_EXCLUDED(save_mu_); - void Latency(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - void ReplicaOf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - void AddReplicaOf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - void ReplTakeOver(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) ABSL_LOCKS_EXCLUDED(replicaof_mu_); - void ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - void Role(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) + void Hello(CmdArgList args, const CommandContext& cmd_cntx); + void LastSave(CmdArgList args, const CommandContext& cmd_cntx) ABSL_LOCKS_EXCLUDED(save_mu_); + void Latency(CmdArgList args, const CommandContext& cmd_cntx); + void ReplicaOf(CmdArgList args, const CommandContext& cmd_cntx); + void AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx); + void ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx) ABSL_LOCKS_EXCLUDED(replicaof_mu_); - void Save(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); - void BgSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); - void Script(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); - void SlowLog(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - void Module(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); + void ReplConf(CmdArgList args, const CommandContext& cmd_cntx); + void Role(CmdArgList args, const CommandContext& cmd_cntx) ABSL_LOCKS_EXCLUDED(replicaof_mu_); + void Save(CmdArgList args, const CommandContext& cmd_cntx); + void BgSave(CmdArgList args, const CommandContext& cmd_cntx); + void Script(CmdArgList args, const CommandContext& cmd_cntx); + void SlowLog(CmdArgList args, const CommandContext& cmd_cntx); + void Module(CmdArgList args, const CommandContext& cmd_cntx); void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx); diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 774f2b676..34a57be76 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -2450,10 +2450,72 @@ void XReadGeneric2(CmdArgList args, bool read_group, Transaction* tx, SinkReplyB } } +void HelpSubCmd(facade::CmdArgParser* parser, Transaction* tx, SinkReplyBuilder* builder) { + XGroupHelp(parser->Tail(), tx, builder); +} + +bool ParseXpendingOptions(CmdArgList& args, PendingOpts& opts, SinkReplyBuilder* builder) { + size_t id_indx = 0; + string arg = absl::AsciiStrToUpper(ArgS(args, id_indx)); + + if (arg == "IDLE" && args.size() > 4) { + id_indx++; + if (!absl::SimpleAtoi(ArgS(args, id_indx), &opts.min_idle_time)) { + builder->SendError(kInvalidIntErr, kSyntaxErrType); + return false; + } + // Ignore negative min_idle_time + opts.min_idle_time = std::max(opts.min_idle_time, static_cast(0)); + args.remove_prefix(2); + id_indx = 0; + } + if (args.size() < 3) { + builder->SendError(WrongNumArgsError("XPENDING"), kSyntaxErrType); + return false; + } + + // Parse start and end + RangeId rs, re; + string_view start = ArgS(args, id_indx); + id_indx++; + string_view end = ArgS(args, id_indx); + if (!ParseRangeId(start, &rs) || !ParseRangeId(end, &re)) { + builder->SendError(kInvalidStreamId, kSyntaxErrType); + return false; + } + + if (rs.exclude && streamIncrID(&rs.parsed_id.val) != C_OK) { + builder->SendError("invalid start ID for the interval", kSyntaxErrType); + return false; + } + + if (re.exclude && streamDecrID(&re.parsed_id.val) != C_OK) { + builder->SendError("invalid end ID for the interval", kSyntaxErrType); + return false; + } + id_indx++; + opts.start = rs.parsed_id; + opts.end = re.parsed_id; + + // Parse count + if (!absl::SimpleAtoi(ArgS(args, id_indx), &opts.count)) { + builder->SendError(kInvalidIntErr, kSyntaxErrType); + return false; + } + + // Ignore negative count value + opts.count = std::max(opts.count, static_cast(0)); + if (args.size() - id_indx - 1) { + id_indx++; + opts.consumer_name = ArgS(args, id_indx); + } + return true; +} + } // namespace -void StreamFamily::XAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - auto parse_resp = ParseAddOrTrimArgsOrReply(args, true, builder); +void StreamFamily::XAdd(CmdArgList args, const CommandContext& cmd_cntx) { + auto parse_resp = ParseAddOrTrimArgsOrReply(args, true, cmd_cntx.rb); if (!parse_resp) { return; } @@ -2463,13 +2525,13 @@ void StreamFamily::XAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil args.remove_prefix(id_indx); if (args.size() < 2 || args.size() % 2 == 0) { - return builder->SendError(WrongNumArgsError("XADD"), kSyntaxErrType); + return cmd_cntx.rb->SendError(WrongNumArgsError("XADD"), kSyntaxErrType); } string_view id = ArgS(args, 0); if (!ParseID(id, true, 0, &add_opts.parsed_id)) { - return builder->SendError(kInvalidStreamId, kSyntaxErrType); + return cmd_cntx.rb->SendError(kInvalidStreamId, kSyntaxErrType); } args.remove_prefix(1); @@ -2477,8 +2539,8 @@ void StreamFamily::XAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil return OpAdd(t->GetOpArgs(shard), add_opts, args); }; - OpResult add_result = tx->ScheduleSingleHopT(cb); - auto* rb = static_cast(builder); + OpResult add_result = cmd_cntx.tx->ScheduleSingleHopT(cb); + auto* rb = static_cast(cmd_cntx.rb); if (add_result) { return rb->SendBulkString(StreamIdRepr(*add_result)); @@ -2489,12 +2551,12 @@ void StreamFamily::XAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil } if (add_result.status() == OpStatus::STREAM_ID_SMALL) { - return builder->SendError( + return cmd_cntx.rb->SendError( "The ID specified in XADD is equal or smaller than " "the target stream top item"); } - return builder->SendError(add_result.status()); + return cmd_cntx.rb->SendError(add_result.status()); } absl::InlinedVector GetXclaimIds(CmdArgList& args) { @@ -2567,13 +2629,13 @@ bool ParseXclaimOptions(CmdArgList& args, ClaimOpts& opts, SinkReplyBuilder* bui return true; } -void StreamFamily::XClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void StreamFamily::XClaim(CmdArgList args, const CommandContext& cmd_cntx) { ClaimOpts opts; string_view key = ArgS(args, 0); opts.group = ArgS(args, 1); opts.consumer = ArgS(args, 2); if (!absl::SimpleAtoi(ArgS(args, 3), &opts.min_idle_time)) { - return builder->SendError(kSyntaxErr); + return cmd_cntx.rb->SendError(kSyntaxErr); } // Ignore negative min-idle-time opts.min_idle_time = std::max(opts.min_idle_time, static_cast(0)); @@ -2582,11 +2644,11 @@ void StreamFamily::XClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu auto ids = GetXclaimIds(args); if (ids.empty()) { // No ids given. - return builder->SendError(kInvalidStreamId, kSyntaxErrType); + return cmd_cntx.rb->SendError(kInvalidStreamId, kSyntaxErrType); } // parse the options - if (!ParseXclaimOptions(args, opts, builder)) + if (!ParseXclaimOptions(args, opts, cmd_cntx.rb)) return; if (auto now = GetCurrentTimeMs(); @@ -2596,16 +2658,16 @@ void StreamFamily::XClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu auto cb = [&](Transaction* t, EngineShard* shard) { return OpClaim(t->GetOpArgs(shard), key, opts, absl::Span{ids.data(), ids.size()}); }; - OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); if (!result) { - builder->SendError(result.status()); + cmd_cntx.rb->SendError(result.status()); return; } - StreamReplies{builder}.SendClaimInfo(result.value()); + StreamReplies{cmd_cntx.rb}.SendClaimInfo(result.value()); } -void StreamFamily::XDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void StreamFamily::XDel(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); args.remove_prefix(1); @@ -2615,7 +2677,7 @@ void StreamFamily::XDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil ParsedStreamId parsed_id; string_view str_id = ArgS(args, i); if (!ParseID(str_id, true, 0, &parsed_id)) { - return builder->SendError(kInvalidStreamId, kSyntaxErrType); + return cmd_cntx.rb->SendError(kInvalidStreamId, kSyntaxErrType); } ids[i] = parsed_id.val; } @@ -2624,19 +2686,15 @@ void StreamFamily::XDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil return OpDel(t->GetOpArgs(shard), key, absl::Span{ids.data(), ids.size()}); }; - OpResult result = tx->ScheduleSingleHopT(cb); + OpResult result = cmd_cntx.tx->ScheduleSingleHopT(cb); if (result || result.status() == OpStatus::KEY_NOTFOUND) { - return builder->SendLong(*result); + return cmd_cntx.rb->SendLong(*result); } - builder->SendError(result.status()); + cmd_cntx.rb->SendError(result.status()); } -void HelpSubCmd(facade::CmdArgParser* parser, Transaction* tx, SinkReplyBuilder* builder) { - XGroupHelp(parser->Tail(), tx, builder); -} - -void StreamFamily::XGroup(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void StreamFamily::XGroup(CmdArgList args, const CommandContext& cmd_cntx) { facade::CmdArgParser parser{args}; auto sub_cmd_func = parser.MapNext("HELP", &HelpSubCmd, "CREATE", &CreateGroup, "DESTROY", @@ -2644,14 +2702,13 @@ void StreamFamily::XGroup(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu "DELCONSUMER", &DelConsumer, "SETID", &SetId); if (auto err = parser.Error(); err) - return builder->SendError(err->MakeReply()); + return cmd_cntx.rb->SendError(err->MakeReply()); - sub_cmd_func(&parser, tx, builder); + sub_cmd_func(&parser, cmd_cntx.tx, cmd_cntx.rb); } -void StreamFamily::XInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - auto* rb = static_cast(builder); +void StreamFamily::XInfo(CmdArgList args, const CommandContext& cmd_cntx) { + auto* rb = static_cast(cmd_cntx.rb); string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); if (sub_cmd == "HELP") { @@ -2674,7 +2731,8 @@ void StreamFamily::XInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui // We do not use transactional xemantics for xinfo since it's informational command. auto cb = [&]() { EngineShard* shard = EngineShard::tlocal(); - DbContext db_context{cntx->ns, cntx->db_index(), GetCurrentTimeMs()}; + DbContext db_context{cmd_cntx.conn_cntx->ns, cmd_cntx.conn_cntx->db_index(), + GetCurrentTimeMs()}; return OpListGroups(db_context, key, shard); }; @@ -2708,13 +2766,13 @@ void StreamFamily::XInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui } return; } - return builder->SendError(result.status()); + return rb->SendError(result.status()); } else if (sub_cmd == "STREAM") { int full = 0; size_t count = 10; // default count for xinfo streams if (args.size() == 4 || args.size() > 5) { - return builder->SendError( + return rb->SendError( "unknown subcommand or wrong number of arguments for 'STREAM'. Try XINFO HELP."); } @@ -2722,30 +2780,30 @@ void StreamFamily::XInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui full = 1; string full_arg = absl::AsciiStrToUpper(ArgS(args, 2)); if (full_arg != "FULL") { - return builder->SendError( + return rb->SendError( "unknown subcommand or wrong number of arguments for 'STREAM'. Try XINFO HELP."); } if (args.size() > 3) { string count_arg = absl::AsciiStrToUpper(ArgS(args, 3)); string_view count_value_arg = ArgS(args, 4); if (count_arg != "COUNT") { - return builder->SendError( + return rb->SendError( "unknown subcommand or wrong number of arguments for 'STREAM'. Try XINFO HELP."); } if (!absl::SimpleAtoi(count_value_arg, &count)) { - return builder->SendError(kInvalidIntErr); + return rb->SendError(kInvalidIntErr); } } } auto cb = [&]() { EngineShard* shard = EngineShard::tlocal(); - return OpStreams(DbContext{cntx->ns, cntx->db_index(), GetCurrentTimeMs()}, key, shard, - full, count); + return OpStreams( + DbContext{cmd_cntx.conn_cntx->ns, cmd_cntx.conn_cntx->db_index(), GetCurrentTimeMs()}, + key, shard, full, count); }; - auto* rb = static_cast(builder); OpResult sinfo = shard_set->Await(sid, std::move(cb)); if (sinfo) { if (full) { @@ -2865,13 +2923,14 @@ void StreamFamily::XInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui } return; } - return builder->SendError(sinfo.status()); + return rb->SendError(sinfo.status()); } else if (sub_cmd == "CONSUMERS") { string_view stream_name = ArgS(args, 1); string_view group_name = ArgS(args, 2); auto cb = [&]() { - return OpConsumers(DbContext{cntx->ns, cntx->db_index(), GetCurrentTimeMs()}, - EngineShard::tlocal(), stream_name, group_name); + return OpConsumers( + DbContext{cmd_cntx.conn_cntx->ns, cmd_cntx.conn_cntx->db_index(), GetCurrentTimeMs()}, + EngineShard::tlocal(), stream_name, group_name); }; OpResult> result = shard_set->Await(sid, std::move(cb)); @@ -2889,106 +2948,48 @@ void StreamFamily::XInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui return; } if (result.status() == OpStatus::INVALID_VALUE) { - return builder->SendError(NoGroupError(stream_name, group_name)); + return rb->SendError(NoGroupError(stream_name, group_name)); } - return builder->SendError(result.status()); + return rb->SendError(result.status()); } } - return builder->SendError(UnknownSubCmd(sub_cmd, "XINFO")); + return rb->SendError(UnknownSubCmd(sub_cmd, "XINFO")); } -void StreamFamily::XLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void StreamFamily::XLen(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) { return OpLen(t->GetOpArgs(shard), key); }; - OpResult result = tx->ScheduleSingleHopT(cb); + OpResult result = cmd_cntx.tx->ScheduleSingleHopT(cb); if (result || result.status() == OpStatus::KEY_NOTFOUND) { - return builder->SendLong(*result); + return cmd_cntx.rb->SendLong(*result); } - return builder->SendError(result.status()); + return cmd_cntx.rb->SendError(result.status()); } -bool ParseXpendingOptions(CmdArgList& args, PendingOpts& opts, SinkReplyBuilder* builder) { - size_t id_indx = 0; - string arg = absl::AsciiStrToUpper(ArgS(args, id_indx)); - - if (arg == "IDLE" && args.size() > 4) { - id_indx++; - if (!absl::SimpleAtoi(ArgS(args, id_indx), &opts.min_idle_time)) { - builder->SendError(kInvalidIntErr, kSyntaxErrType); - return false; - } - // Ignore negative min_idle_time - opts.min_idle_time = std::max(opts.min_idle_time, static_cast(0)); - args.remove_prefix(2); - id_indx = 0; - } - if (args.size() < 3) { - builder->SendError(WrongNumArgsError("XPENDING"), kSyntaxErrType); - return false; - } - - // Parse start and end - RangeId rs, re; - string_view start = ArgS(args, id_indx); - id_indx++; - string_view end = ArgS(args, id_indx); - if (!ParseRangeId(start, &rs) || !ParseRangeId(end, &re)) { - builder->SendError(kInvalidStreamId, kSyntaxErrType); - return false; - } - - if (rs.exclude && streamIncrID(&rs.parsed_id.val) != C_OK) { - builder->SendError("invalid start ID for the interval", kSyntaxErrType); - return false; - } - - if (re.exclude && streamDecrID(&re.parsed_id.val) != C_OK) { - builder->SendError("invalid end ID for the interval", kSyntaxErrType); - return false; - } - id_indx++; - opts.start = rs.parsed_id; - opts.end = re.parsed_id; - - // Parse count - if (!absl::SimpleAtoi(ArgS(args, id_indx), &opts.count)) { - builder->SendError(kInvalidIntErr, kSyntaxErrType); - return false; - } - - // Ignore negative count value - opts.count = std::max(opts.count, static_cast(0)); - if (args.size() - id_indx - 1) { - id_indx++; - opts.consumer_name = ArgS(args, id_indx); - } - return true; -} - -void StreamFamily::XPending(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void StreamFamily::XPending(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); PendingOpts opts; opts.group_name = ArgS(args, 1); args.remove_prefix(2); - if (!args.empty() && !ParseXpendingOptions(args, opts, builder)) { + auto* rb = static_cast(cmd_cntx.rb); + if (!args.empty() && !ParseXpendingOptions(args, opts, rb)) { return; } auto cb = [&](Transaction* t, EngineShard* shard) { return OpPending(t->GetOpArgs(shard), key, opts); }; - OpResult op_result = tx->ScheduleSingleHopT(cb); + OpResult op_result = cmd_cntx.tx->ScheduleSingleHopT(cb); if (!op_result) { if (op_result.status() == OpStatus::SKIPPED) - return builder->SendError(NoGroupError(key, opts.group_name)); - return builder->SendError(op_result.status()); + return rb->SendError(NoGroupError(key, opts.group_name)); + return rb->SendError(op_result.status()); } const PendingResult& result = op_result.value(); - auto* rb = static_cast(builder); if (std::holds_alternative(result)) { const auto& res = std::get(result); rb->StartArray(4); @@ -3024,20 +3025,20 @@ void StreamFamily::XPending(CmdArgList args, Transaction* tx, SinkReplyBuilder* } } -void StreamFamily::XRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void StreamFamily::XRange(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = args[0]; string_view start = args[1]; string_view end = args[2]; - XRangeGeneric(key, start, end, args.subspan(3), false, tx, builder); + XRangeGeneric(key, start, end, args.subspan(3), false, cmd_cntx.tx, cmd_cntx.rb); } -void StreamFamily::XRevRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void StreamFamily::XRevRange(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = args[0]; string_view start = args[1]; string_view end = args[2]; - XRangeGeneric(key, end, start, args.subspan(3), true, tx, builder); + XRangeGeneric(key, end, start, args.subspan(3), true, cmd_cntx.tx, cmd_cntx.rb); } variant HasEntries2(const OpArgs& op_args, string_view skey, @@ -3108,46 +3109,44 @@ variant HasEntries2(const OpArgs& op_args, string_view return streamCompareID(&last_id, &requested_sitem.id.val) >= 0; } -void StreamFamily::XRead(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - return XReadGeneric2(args, false, tx, builder, cntx); +void StreamFamily::XRead(CmdArgList args, const CommandContext& cmd_cntx) { + return XReadGeneric2(args, false, cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx); } -void StreamFamily::XReadGroup(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - return XReadGeneric2(args, true, tx, builder, cntx); +void StreamFamily::XReadGroup(CmdArgList args, const CommandContext& cmd_cntx) { + return XReadGeneric2(args, true, cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx); } -void StreamFamily::XSetId(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void StreamFamily::XSetId(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); string_view idstr = ArgS(args, 1); ParsedStreamId parsed_id; if (!ParseID(idstr, true, 0, &parsed_id)) { - return builder->SendError(kInvalidStreamId, kSyntaxErrType); + return cmd_cntx.rb->SendError(kInvalidStreamId, kSyntaxErrType); } auto cb = [&](Transaction* t, EngineShard* shard) { return OpSetId2(t->GetOpArgs(shard), key, parsed_id.val); }; - OpStatus result = tx->ScheduleSingleHop(std::move(cb)); + OpStatus result = cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); switch (result) { case OpStatus::STREAM_ID_SMALL: - return builder->SendError( + return cmd_cntx.rb->SendError( "The ID specified in XSETID is smaller than the " "target stream top item"); case OpStatus::ENTRIES_ADDED_SMALL: - return builder->SendError( + return cmd_cntx.rb->SendError( "The entries_added specified in XSETID is smaller than " "the target stream length"); default: - return builder->SendError(result); + return cmd_cntx.rb->SendError(result); } } -void StreamFamily::XTrim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { - auto parse_resp = ParseAddOrTrimArgsOrReply(args, true, builder); +void StreamFamily::XTrim(CmdArgList args, const CommandContext& cmd_cntx) { + auto parse_resp = ParseAddOrTrimArgsOrReply(args, true, cmd_cntx.rb); if (!parse_resp) { return; } @@ -3158,14 +3157,14 @@ void StreamFamily::XTrim(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui return OpTrim(t->GetOpArgs(shard), trim_opts); }; - OpResult trim_result = tx->ScheduleSingleHopT(cb); + OpResult trim_result = cmd_cntx.tx->ScheduleSingleHopT(cb); if (trim_result) { - return builder->SendLong(*trim_result); + return cmd_cntx.rb->SendLong(*trim_result); } - return builder->SendError(trim_result.status()); + return cmd_cntx.rb->SendError(trim_result.status()); } -void StreamFamily::XAck(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void StreamFamily::XAck(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = ArgS(args, 0); string_view group = ArgS(args, 1); args.remove_prefix(2); @@ -3175,7 +3174,7 @@ void StreamFamily::XAck(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil ParsedStreamId parsed_id; string_view str_id = ArgS(args, i); if (!ParseID(str_id, true, 0, &parsed_id)) { - return builder->SendError(kInvalidStreamId, kSyntaxErrType); + return cmd_cntx.rb->SendError(kInvalidStreamId, kSyntaxErrType); } ids[i] = parsed_id.val; } @@ -3184,33 +3183,36 @@ void StreamFamily::XAck(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil return OpAck(t->GetOpArgs(shard), key, group, absl::Span{ids.data(), ids.size()}); }; - OpResult result = tx->ScheduleSingleHopT(cb); + OpResult result = cmd_cntx.tx->ScheduleSingleHopT(cb); if (result || result.status() == OpStatus::KEY_NOTFOUND) { - return builder->SendLong(*result); + return cmd_cntx.rb->SendLong(*result); } - builder->SendError(result.status()); - return; + cmd_cntx.rb->SendError(result.status()); } -void StreamFamily::XAutoClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void StreamFamily::XAutoClaim(CmdArgList args, const CommandContext& cmd_cntx) { ClaimOpts opts; string_view key = ArgS(args, 0); opts.group = ArgS(args, 1); opts.consumer = ArgS(args, 2); + auto* rb = static_cast(cmd_cntx.rb); + if (!absl::SimpleAtoi(ArgS(args, 3), &opts.min_idle_time)) { - return builder->SendError(kSyntaxErr); + return rb->SendError(kSyntaxErr); } opts.min_idle_time = std::max((int64)0, opts.min_idle_time); string_view start = ArgS(args, 4); RangeId rs; + if (!ParseRangeId(start, &rs)) { - return; + return rb->SendError(kSyntaxErr); } + if (rs.exclude && streamDecrID(&rs.parsed_id.val) != C_OK) { - return builder->SendError("invalid start ID for the interval", kSyntaxErrType); + return rb->SendError("invalid start ID for the interval", kSyntaxErrType); } opts.start = rs.parsed_id.val; @@ -3223,10 +3225,10 @@ void StreamFamily::XAutoClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder if (arg == "COUNT") { arg = ArgS(args, ++i); if (!absl::SimpleAtoi(arg, &opts.count)) { - return builder->SendError(kInvalidIntErr); + return rb->SendError(kInvalidIntErr); } if (opts.count <= 0) { - return builder->SendError("COUNT must be > 0"); + return rb->SendError("COUNT must be > 0"); } continue; } @@ -3234,26 +3236,27 @@ void StreamFamily::XAutoClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder if (arg == "JUSTID") { opts.flags |= kClaimJustID; } else { - return builder->SendError("Unknown argument given for XAUTOCLAIM command", kSyntaxErr); + return cmd_cntx.rb->SendError("Unknown argument given for XAUTOCLAIM command", kSyntaxErr); } } auto cb = [&](Transaction* t, EngineShard* shard) { return OpAutoClaim(t->GetOpArgs(shard), key, opts); }; - OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); if (result.status() == OpStatus::KEY_NOTFOUND) { - builder->SendError(NoGroupOrKey(key, opts.group)); + rb->SendError(NoGroupOrKey(key, opts.group)); return; } + if (!result) { - builder->SendError(result.status()); + rb->SendError(result.status()); return; } ClaimInfo cresult = result.value(); - auto* rb = static_cast(builder); + rb->StartArray(3); rb->SendBulkString(StreamIdRepr(cresult.end_id)); StreamReplies{rb}.SendClaimInfo(cresult); diff --git a/src/server/stream_family.h b/src/server/stream_family.h index 50cd9d742..4f964ff91 100644 --- a/src/server/stream_family.h +++ b/src/server/stream_family.h @@ -13,7 +13,7 @@ class SinkReplyBuilder; namespace dfly { class CommandRegistry; -class ConnectionContext; +struct CommandContext; class StreamFamily { public: @@ -22,24 +22,21 @@ class StreamFamily { private: using SinkReplyBuilder = facade::SinkReplyBuilder; - static void XAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void XClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void XDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void XGroup(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void XInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - static void XLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void XPending(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void XRevRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void XRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void XRead(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - static void XReadGroup(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - static void XSetId(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void XTrim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void XAck(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void XAutoClaim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void XAdd(CmdArgList args, const CommandContext& cmd_cntx); + static void XClaim(CmdArgList args, const CommandContext& cmd_cntx); + static void XDel(CmdArgList args, const CommandContext& cmd_cntx); + static void XGroup(CmdArgList args, const CommandContext& cmd_cntx); + static void XInfo(CmdArgList args, const CommandContext& cmd_cntx); + static void XLen(CmdArgList args, const CommandContext& cmd_cntx); + static void XPending(CmdArgList args, const CommandContext& cmd_cntx); + static void XRevRange(CmdArgList args, const CommandContext& cmd_cntx); + static void XRange(CmdArgList args, const CommandContext& cmd_cntx); + static void XRead(CmdArgList args, const CommandContext& cmd_cntx); + static void XReadGroup(CmdArgList args, const CommandContext& cmd_cntx); + static void XSetId(CmdArgList args, const CommandContext& cmd_cntx); + static void XTrim(CmdArgList args, const CommandContext& cmd_cntx); + static void XAck(CmdArgList args, const CommandContext& cmd_cntx); + static void XAutoClaim(CmdArgList args, const CommandContext& cmd_cntx); }; } // namespace dfly diff --git a/src/server/string_family.h b/src/server/string_family.h index 0cad0b4cd..31e027db1 100644 --- a/src/server/string_family.h +++ b/src/server/string_family.h @@ -4,7 +4,7 @@ #pragma once -#include "server/command_registry.h" +#include "facade/facade_types.h" namespace facade { class SinkReplyBuilder; @@ -12,6 +12,9 @@ class SinkReplyBuilder; namespace dfly { +struct CommandContext; +class CommandRegistry; + class StringFamily { public: static void Register(CommandRegistry* registry);