From 6f6897cef13019e040b1bc453b3a2fd9f5361778 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 29 Oct 2024 14:52:09 +0200 Subject: [PATCH] chore: pass RedisReplyBuilder explicitly from dragonfly connection (#4009) Signed-off-by: Roman Gershman --- src/facade/conn_context.cc | 12 ---- src/facade/conn_context.h | 34 ------------ src/facade/dragonfly_connection.cc | 17 ++++-- src/facade/ok_main.cc | 13 +++-- src/facade/service_interface.h | 11 ++-- src/server/acl/acl_family.cc | 6 +- src/server/cluster/cluster_family.cc | 2 +- src/server/command_registry.cc | 18 ++---- src/server/command_registry.h | 16 +++--- src/server/conn_context.cc | 23 -------- src/server/conn_context.h | 4 -- src/server/debugcmd.cc | 3 +- src/server/generic_family.cc | 2 +- src/server/http_api.cc | 5 +- src/server/journal/executor.cc | 2 +- src/server/main_service.cc | 83 ++++++++++++++-------------- src/server/main_service.h | 15 ++--- src/server/multi_command_squasher.cc | 21 ++++--- src/server/multi_command_squasher.h | 13 +++-- src/server/rdb_load.cc | 2 +- src/server/replica.cc | 8 ++- src/server/server_family.cc | 56 +++++++++---------- src/server/string_family.cc | 2 +- src/server/test_utils.cc | 10 ++-- src/server/zset_family.cc | 8 +-- 25 files changed, 156 insertions(+), 230 deletions(-) diff --git a/src/facade/conn_context.cc b/src/facade/conn_context.cc index e55a51ed8..7528c5e4b 100644 --- a/src/facade/conn_context.cc +++ b/src/facade/conn_context.cc @@ -52,16 +52,4 @@ size_t ConnectionContext::UsedMemory() const { return dfly::HeapSize(rbuilder_) + dfly::HeapSize(authed_username) + dfly::HeapSize(acl_commands); } -void ConnectionContext::SendError(std::string_view str, std::string_view type) { - rbuilder_->SendError(str, type); -} - -void ConnectionContext::SendError(ErrorReply error) { - rbuilder_->SendError(error); -} - -void ConnectionContext::SendError(OpStatus status) { - rbuilder_->SendError(status); -} - } // namespace facade diff --git a/src/facade/conn_context.h b/src/facade/conn_context.h index 3dd3b6ac9..ddbce31d5 100644 --- a/src/facade/conn_context.h +++ b/src/facade/conn_context.h @@ -47,40 +47,6 @@ class ConnectionContext { return res; } - virtual void SendError(std::string_view str, std::string_view type = std::string_view{}); - - virtual void SendError(ErrorReply error); - - virtual void SendError(OpStatus status); - - void SendStored() { - rbuilder_->SendStored(); - } - - void SendSetSkipped() { - rbuilder_->SendSetSkipped(); - } - - void SendMGetResponse(SinkReplyBuilder::MGetResponse resp) { - rbuilder_->SendMGetResponse(std::move(resp)); - } - - void SendLong(long val) { - rbuilder_->SendLong(val); - } - - void SendSimpleString(std::string_view str) { - rbuilder_->SendSimpleString(str); - } - - void SendOk() { - rbuilder_->SendOk(); - } - - void SendProtocolError(std::string_view str) { - rbuilder_->SendProtocolError(str); - } - virtual size_t UsedMemory() const; // connection state / properties. diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 1cdab6cb2..85929aa4c 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -486,14 +486,16 @@ void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) { void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg) { DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front()); - self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get()); + self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, + self->reply_builder_, self->cc_.get()); self->last_interaction_ = time(nullptr); self->skip_next_squashing_ = false; } void Connection::DispatchOperations::operator()(const Connection::MCPipelineMessage& msg) { - self->service_->DispatchMC(msg.cmd, msg.value, self->cc_.get()); + self->service_->DispatchMC(msg.cmd, msg.value, static_cast(self->reply_builder_), + self->cc_.get()); self->last_interaction_ = time(nullptr); } @@ -1087,7 +1089,7 @@ Connection::ParserStatus Connection::ParseRedis() { auto dispatch_sync = [this, &parse_args, &cmd_vec] { RespExpr::VecToArgList(parse_args, &cmd_vec); - service_->DispatchCommand(absl::MakeSpan(cmd_vec), cc_.get()); + service_->DispatchCommand(absl::MakeSpan(cmd_vec), reply_builder_, cc_.get()); }; auto dispatch_async = [this, &parse_args, tlh = mi_heap_get_backing()]() -> MessageHandle { return {FromArgs(std::move(parse_args), tlh)}; @@ -1131,7 +1133,10 @@ auto Connection::ParseMemcache() -> ParserStatus { MemcacheParser::Command cmd; string_view value; - auto dispatch_sync = [this, &cmd, &value] { service_->DispatchMC(cmd, value, cc_.get()); }; + auto dispatch_sync = [this, &cmd, &value] { + service_->DispatchMC(cmd, value, static_cast(reply_builder_), cc_.get()); + }; + auto dispatch_async = [&cmd, &value]() -> MessageHandle { return {make_unique(std::move(cmd), value)}; }; @@ -1353,6 +1358,7 @@ bool Connection::ShouldEndDispatchFiber(const MessageHandle& msg) { void Connection::SquashPipeline() { DCHECK_EQ(dispatch_q_.size(), pending_pipeline_cmd_cnt_); + DCHECK_EQ(reply_builder_->type(), SinkReplyBuilder::REDIS); // Only Redis is supported. vector squash_cmds; squash_cmds.reserve(dispatch_q_.size()); @@ -1367,7 +1373,8 @@ void Connection::SquashPipeline() { cc_->async_dispatch = true; - size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get()); + size_t dispatched = + service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), reply_builder_, cc_.get()); if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared reply_builder_->FlushBatch(); diff --git a/src/facade/ok_main.cc b/src/facade/ok_main.cc index 2996139c7..0b8b06823 100644 --- a/src/facade/ok_main.cc +++ b/src/facade/ok_main.cc @@ -21,19 +21,20 @@ namespace { class OkService : public ServiceInterface { public: - void DispatchCommand(ArgSlice args, ConnectionContext* cntx) final { - cntx->SendOk(); + void DispatchCommand(ArgSlice args, SinkReplyBuilder* builder, ConnectionContext* cntx) final { + builder->SendOk(); } - size_t DispatchManyCommands(absl::Span args_lists, ConnectionContext* cntx) final { + size_t DispatchManyCommands(absl::Span args_lists, SinkReplyBuilder* builder, + ConnectionContext* cntx) final { for (auto args : args_lists) - DispatchCommand(args, cntx); + DispatchCommand(args, builder, cntx); return args_lists.size(); } void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, - ConnectionContext* cntx) final { - cntx->SendError(""); + MCReplyBuilder* builder, ConnectionContext* cntx) final { + builder->SendError(""); } ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) final { diff --git a/src/facade/service_interface.h b/src/facade/service_interface.h index d50100cb6..e881cba2c 100644 --- a/src/facade/service_interface.h +++ b/src/facade/service_interface.h @@ -18,20 +18,23 @@ namespace facade { class ConnectionContext; class Connection; -struct ConnectionStats; +class SinkReplyBuilder; +class MCReplyBuilder; class ServiceInterface { public: virtual ~ServiceInterface() { } - virtual void DispatchCommand(ArgSlice args, ConnectionContext* cntx) = 0; + virtual void DispatchCommand(ArgSlice args, SinkReplyBuilder* builder, + ConnectionContext* cntx) = 0; // Returns number of processed commands - virtual size_t DispatchManyCommands(absl::Span args_list, ConnectionContext* cntx) = 0; + virtual size_t DispatchManyCommands(absl::Span args_list, SinkReplyBuilder* builder, + ConnectionContext* cntx) = 0; virtual void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, - ConnectionContext* cntx) = 0; + MCReplyBuilder* builder, ConnectionContext* cntx) = 0; virtual ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) = 0; diff --git a/src/server/acl/acl_family.cc b/src/server/acl/acl_family.cc index fe1281fb5..93a5796c8 100644 --- a/src/server/acl/acl_family.cc +++ b/src/server/acl/acl_family.cc @@ -629,8 +629,8 @@ void AclFamily::DryRun(CmdArgList args, Transaction* tx, SinkReplyBuilder* build using MemberFunc2 = void (AclFamily::*)(CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder); -using MemberFunc3 = void (AclFamily::*)(CmdArgList args, Transaction* tx, - facade::SinkReplyBuilder* builder, ConnectionContext* cntx); +using MemberFunc = void (AclFamily::*)(CmdArgList args, Transaction* tx, + facade::SinkReplyBuilder* builder, ConnectionContext* cntx); CommandId::Handler2 HandlerFunc(AclFamily* acl, MemberFunc2 f) { return [=](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder) { @@ -638,7 +638,7 @@ CommandId::Handler2 HandlerFunc(AclFamily* acl, MemberFunc2 f) { }; } -CommandId::Handler3 HandlerFunc(AclFamily* acl, MemberFunc3 f) { +CommandId::Handler HandlerFunc(AclFamily* acl, MemberFunc f) { return [=](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder, ConnectionContext* cntx) { return (acl->*f)(args, tx, builder, cntx); }; } diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 2badc4f91..a195dc99c 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -1007,7 +1007,7 @@ using EngineFunc = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* bu using EngineFunc2 = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder); -inline CommandId::Handler3 HandlerFunc(ClusterFamily* se, EngineFunc f) { +inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) { return [=](CmdArgList args, Transaction*, SinkReplyBuilder* builder, ConnectionContext* cntx) { return (se->*f)(args, builder, cntx); }; diff --git a/src/server/command_registry.cc b/src/server/command_registry.cc index c1c4959fc..a2cbcb770 100644 --- a/src/server/command_registry.cc +++ b/src/server/command_registry.cc @@ -58,9 +58,10 @@ bool CommandId::IsMultiTransactional() const { return CO::IsTransKind(name()) || CO::IsEvalKind(name()); } -uint64_t CommandId::Invoke(CmdArgList args, ConnectionContext* cntx) const { +uint64_t CommandId::Invoke(CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder, + ConnectionContext* cntx) const { int64_t before = absl::GetCurrentTimeNanos(); - handler_(args, cntx); + handler_(args, tx, builder, cntx); int64_t after = absl::GetCurrentTimeNanos(); ServerState* ss = ServerState::tlocal(); // Might have migrated thread, read after invocation @@ -95,17 +96,8 @@ optional CommandId::Validate(CmdArgList tail_args) const { } CommandId&& CommandId::SetHandler(Handler2 f) && { - handler_ = [f = std::move(f)](CmdArgList args, ConnectionContext* cntx) { - f(args, cntx->transaction, cntx->reply_builder()); - }; - - return std::move(*this); -} - -CommandId&& CommandId::SetHandler(Handler3 f) && { - handler_ = [f = std::move(f)](CmdArgList args, ConnectionContext* cntx) { - f(args, cntx->transaction, cntx->reply_builder(), cntx); - }; + handler_ = [f = std::move(f)](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder, + facade::ConnectionContext*) { f(args, tx, builder); }; return std::move(*this); } diff --git a/src/server/command_registry.h b/src/server/command_registry.h index 50dfb4611..f8060c0fd 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -88,13 +88,18 @@ class CommandId : public facade::CommandId { using Handler = fu2::function_base; + void(CmdArgList, Transaction*, facade::SinkReplyBuilder*, + ConnectionContext*) const>; + using Handler2 = + fu2::function_base; using ArgValidator = fu2::function_base(CmdArgList) const>; // Returns the invoke time in usec. - uint64_t Invoke(CmdArgList args, ConnectionContext* cntx) const; + uint64_t Invoke(CmdArgList args, Transaction*, facade::SinkReplyBuilder*, + ConnectionContext* cntx) const; // Returns error if validation failed, otherwise nullopt std::optional Validate(CmdArgList tail_args) const; @@ -122,14 +127,7 @@ class CommandId : public facade::CommandId { return std::move(*this); } - using Handler2 = - fu2::function_base; - using Handler3 = fu2::function_base; CommandId&& SetHandler(Handler2 f) &&; - CommandId&& SetHandler(Handler3 f) &&; CommandId&& SetValidator(ArgValidator f) && { validator_ = std::move(f); diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index fdf31d5f5..dcb627828 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -269,29 +269,6 @@ size_t ConnectionContext::UsedMemory() const { return facade::ConnectionContext::UsedMemory() + dfly::HeapSize(conn_state); } -void ConnectionContext::SendError(std::string_view str, std::string_view type) { - string_view name = cid ? cid->name() : string_view{}; - - VLOG(1) << "Sending error " << str << " " << type << " during " << name; - facade::ConnectionContext::SendError(str, type); -} - -void ConnectionContext::SendError(facade::ErrorReply error) { - string_view name = cid ? cid->name() : string_view{}; - - VLOG(1) << "Sending error " << error.ToSv() << " during " << name; - facade::ConnectionContext::SendError(std::move(error)); -} - -void ConnectionContext::SendError(facade::OpStatus status) { - if (status != facade::OpStatus::OK) { - string_view name = cid ? cid->name() : string_view{}; - VLOG(1) << "Sending error " << status << " during " << name; - } - - facade::ConnectionContext::SendError(status); -} - void ConnectionState::ExecInfo::Clear() { DCHECK(!preborrowed_interpreter); // Must have been released properly state = EXEC_INACTIVE; diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 7fe083350..c53bbaf21 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -300,10 +300,6 @@ class ConnectionContext : public facade::ConnectionContext { size_t UsedMemory() const override; - void SendError(std::string_view str, std::string_view type = std::string_view{}) override; - void SendError(facade::ErrorReply error) override; - void SendError(facade::OpStatus status) override; - // Whether this connection is a connection from a replica to its master. // This flag is true only on replica side, where we need to setup a special ConnectionContext // instance that helps applying commands coming from master. diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 2bd6fa138..a7b513295 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -145,6 +145,7 @@ void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool local_tx->StartMultiNonAtomic(); boost::intrusive_ptr stub_tx = new Transaction{local_tx.get(), EngineShard::tlocal()->shard_id(), nullopt}; + absl::InlinedVector args_view; facade::CapturingReplyBuilder crb; ConnectionContext local_cntx{cntx, stub_tx.get(), &crb}; @@ -171,7 +172,7 @@ void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool crb.SetReplyMode(ReplyMode::NONE); stub_tx->InitByArgs(cntx->ns, local_cntx.conn_state.db_index, args_span); - sf->service().InvokeCmd(cid, args_span, &local_cntx); + sf->service().InvokeCmd(cid, args_span, &crb, &local_cntx); } local_cntx.Inject(nullptr); diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 71506f78b..390d70793 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -1615,7 +1615,7 @@ void GenericFamily::Select(CmdArgList args, Transaction*, SinkReplyBuilder* buil }; shard_set->RunBriefInParallel(std::move(cb)); - return cntx->SendOk(); + return builder->SendOk(); } void GenericFamily::Dump(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { diff --git a/src/server/http_api.cc b/src/server/http_api.cc index 8a3573a81..88df84a38 100644 --- a/src/server/http_api.cc +++ b/src/server/http_api.cc @@ -228,12 +228,11 @@ void HttpAPI(const http::QueryArgs& args, HttpRequest&& req, Service* service, DCHECK(context); facade::CapturingReplyBuilder reply_builder; - auto* prev = context->Inject(&reply_builder); + // TODO: to finish this. - service->DispatchCommand(absl::MakeSpan(cmd_slices), context); + service->DispatchCommand(absl::MakeSpan(cmd_slices), &reply_builder, context); facade::CapturingReplyBuilder::Payload payload = reply_builder.Take(); - context->Inject(prev); auto response = http::MakeStringResponse(); http::SetMime(http::kJsonMime, &response); diff --git a/src/server/journal/executor.cc b/src/server/journal/executor.cc index cbc94e4f7..6e91e1d4c 100644 --- a/src/server/journal/executor.cc +++ b/src/server/journal/executor.cc @@ -75,7 +75,7 @@ void JournalExecutor::FlushSlots(const cluster::SlotRange& slot_range) { void JournalExecutor::Execute(journal::ParsedEntry::CmdData& cmd) { auto span = CmdArgList{cmd.cmd_args.data(), cmd.cmd_args.size()}; - service_->DispatchCommand(span, &conn_context_); + service_->DispatchCommand(span, &reply_builder_, &conn_context_); } void JournalExecutor::SelectDb(DbIndex dbid) { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 723c56b35..df619404c 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1232,8 +1232,8 @@ std::optional Service::VerifyCommandState(const CommandId* cid, CmdA return VerifyConnectionAclStatus(cid, &dfly_cntx, "has no ACL permissions", tail_args); } -void Service::DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) { - auto* builder = cntx->reply_builder(); +void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder, + facade::ConnectionContext* cntx) { absl::Cleanup clear_last_error([builder]() { std::ignore = builder->ConsumeLastError(); }); DCHECK(!args.empty()); DCHECK_NE(0u, shard_set->size()) << "Init was not called"; @@ -1334,7 +1334,7 @@ void Service::DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) { dfly_cntx->cid = cid; - if (!InvokeCmd(cid, args_no_cmd, dfly_cntx)) { + if (!InvokeCmd(cid, args_no_cmd, builder, dfly_cntx)) { builder->SendError("Internal Error"); builder->CloseConnection(); } @@ -1391,10 +1391,10 @@ OpResult OpTrackKeys(const OpArgs slice_args, const facade::Connection::We return OpStatus::OK; } -bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* cntx) { +bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, SinkReplyBuilder* builder, + ConnectionContext* cntx) { DCHECK(cid); DCHECK(!cid->Validate(tail_args)); - auto* builder = cntx->reply_builder(); if (auto err = VerifyCommandExecution(cid, cntx, tail_args); err) { // We need to skip this because ACK's should not be replied to @@ -1441,7 +1441,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo auto last_error = builder->ConsumeLastError(); DCHECK(last_error.empty()); try { - invoke_time_usec = cid->Invoke(tail_args, cntx); + invoke_time_usec = cid->Invoke(tail_args, tx, builder, cntx); } catch (std::exception& e) { LOG(ERROR) << "Internal error, system probably unstable " << e.what(); return false; @@ -1491,10 +1491,11 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo return true; } -size_t Service::DispatchManyCommands(absl::Span args_list, +size_t Service::DispatchManyCommands(absl::Span args_list, SinkReplyBuilder* builder, facade::ConnectionContext* cntx) { ConnectionContext* dfly_cntx = static_cast(cntx); DCHECK(!dfly_cntx->conn_state.exec_info.IsRunning()); + DCHECK_EQ(builder->type(), SinkReplyBuilder::REDIS); vector stored_cmds; intrusive_ptr dist_trans; @@ -1515,7 +1516,9 @@ size_t Service::DispatchManyCommands(absl::Span args_list, } dfly_cntx->transaction = dist_trans.get(); - MultiCommandSquasher::Execute(absl::MakeSpan(stored_cmds), dfly_cntx, this, true, false); + MultiCommandSquasher::Execute(absl::MakeSpan(stored_cmds), + static_cast(builder), dfly_cntx, this, true, + false); dfly_cntx->transaction = nullptr; dispatched += stored_cmds.size(); @@ -1559,7 +1562,7 @@ size_t Service::DispatchManyCommands(absl::Span args_list, break; // Dispatch non squashed command only after all squshed commands were executed and replied - DispatchCommand(args, cntx); + DispatchCommand(args, builder, cntx); dispatched++; } @@ -1572,14 +1575,13 @@ size_t Service::DispatchManyCommands(absl::Span args_list, } void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, - facade::ConnectionContext* cntx) { + MCReplyBuilder* mc_builder, facade::ConnectionContext* cntx) { absl::InlinedVector args; char cmd_name[16]; char ttl[16]; char store_opt[32] = {0}; char ttl_op[] = "EXAT"; - MCReplyBuilder* mc_builder = static_cast(cntx->reply_builder()); mc_builder->SetNoreply(cmd.no_reply); switch (cmd.type) { @@ -1671,7 +1673,7 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va } } - DispatchCommand(CmdArgList{args}, cntx); + DispatchCommand(CmdArgList{args}, mc_builder, cntx); // Reset back. dfly_cntx->conn_state.memcache_flag = 0; @@ -1824,7 +1826,7 @@ optional Service::FlushEvalAsyncCmds(ConnectionC CapturingReplyBuilder crb{ReplyMode::ONLY_ERR}; WithReplies(&crb, cntx, [&] { - MultiCommandSquasher::Execute(absl::MakeSpan(info->async_cmds), cntx, this, true, true); + MultiCommandSquasher::Execute(absl::MakeSpan(info->async_cmds), &crb, cntx, this, true, true); }); info->async_cmds_heap_mem = 0; @@ -1873,7 +1875,7 @@ void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca) if (ca.async) return; - DispatchCommand(ca.args, cntx); + DispatchCommand(ca.args, &replier, cntx); } void Service::Eval(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, @@ -1892,7 +1894,7 @@ void Service::Eval(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, string sha{std::move(res.value())}; - CallSHA(args, sha, interpreter, cntx); + CallSHA(args, sha, interpreter, builder, cntx); } void Service::EvalSha(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, @@ -1900,11 +1902,11 @@ void Service::EvalSha(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde string sha = absl::AsciiStrToLower(ArgS(args, 0)); BorrowedInterpreter interpreter{cntx->transaction, cntx}; - CallSHA(args, sha, interpreter, cntx); + CallSHA(args, sha, interpreter, builder, cntx); } void Service::CallSHA(CmdArgList args, string_view sha, Interpreter* interpreter, - ConnectionContext* cntx) { + SinkReplyBuilder* builder, ConnectionContext* cntx) { uint32_t num_keys; CHECK(absl::SimpleAtoi(ArgS(args, 1), &num_keys)); // we already validated this @@ -1914,7 +1916,7 @@ void Service::CallSHA(CmdArgList args, string_view sha, Interpreter* interpreter ev_args.args = args.subspan(2 + num_keys); uint64_t start = absl::GetCurrentTimeNanos(); - EvalInternal(args, ev_args, interpreter, cntx); + EvalInternal(args, ev_args, interpreter, builder, cntx); uint64_t end = absl::GetCurrentTimeNanos(); ServerState::tlocal()->RecordCallLatency(sha, (end - start) / 1000); @@ -1954,25 +1956,10 @@ Transaction::MultiMode DetermineMultiMode(ScriptMgr::ScriptParams params) { // Start multi transaction for eval. Returns true if transaction was scheduled. // Skips scheduling if multi mode requires declaring keys, but no keys were declared. -// Return nullopt if eval runs inside multi and conflicts with multi mode -optional StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptParams params, - ConnectionContext* cntx) { +bool StartMultiEval(DbIndex dbid, CmdArgList keys, Transaction::MultiMode script_mode, + ConnectionContext* cntx) { Transaction* tx = cntx->transaction; Namespace* ns = cntx->ns; - SinkReplyBuilder* builder = cntx->reply_builder(); - Transaction::MultiMode script_mode = DetermineMultiMode(params); - Transaction::MultiMode multi_mode = tx->GetMultiMode(); - // Check if eval is already part of a running multi transaction - if (multi_mode != Transaction::NOT_DETERMINED) { - if (multi_mode > script_mode) { - string err = StrCat( - "Multi mode conflict when running eval in multi transaction. Multi mode is: ", multi_mode, - " eval mode is: ", script_mode); - builder->SendError(err); - return nullopt; - } - return false; - } if (keys.empty() && script_mode == Transaction::LOCK_AHEAD) return false; @@ -2015,9 +2002,8 @@ static bool CanRunSingleShardMulti(optional sid, const ScriptMgr::Scrip } void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpreter* interpreter, - ConnectionContext* cntx) { + SinkReplyBuilder* builder, ConnectionContext* cntx) { DCHECK(!eval_args.sha.empty()); - auto* builder = cntx->reply_builder(); // Sanitizing the input to avoid code injection. if (eval_args.sha.size() != 40 || !IsSHA(eval_args.sha)) { @@ -2097,9 +2083,20 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret cntx->conn()->RequestAsyncMigration(shard_set->pool()->at(*sid)); } } else { - optional scheduled = StartMultiEval(cntx->db_index(), eval_args.keys, *params, cntx); - if (!scheduled) { - return; + Transaction::MultiMode script_mode = DetermineMultiMode(*params); + Transaction::MultiMode tx_mode = tx->GetMultiMode(); + bool scheduled = false; + + // Check if eval is already part of a running multi transaction + if (tx_mode != Transaction::NOT_DETERMINED) { + if (tx_mode > script_mode) { + string err = StrCat( + "Multi mode conflict when running eval in multi transaction. Multi mode is: ", tx_mode, + " eval mode is: ", script_mode); + return builder->SendError(err); + } + } else { + scheduled = StartMultiEval(cntx->db_index(), eval_args.keys, script_mode, cntx); } ++ServerState::tlocal()->stats.eval_io_coordination_cnt; @@ -2115,7 +2112,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret } // Conclude the transaction. - if (*scheduled) + if (scheduled) tx->UnlockMulti(); } @@ -2312,7 +2309,7 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, if (absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE && !cntx->conn_state.tracking_info_.IsTrackingOn()) { - MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), cntx, this); + MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), rb, cntx, this); } else { CmdArgVec arg_vec; for (auto& scmd : exec_info.body) { @@ -2334,7 +2331,7 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, } } - bool ok = InvokeCmd(scmd.Cid(), args, cntx); + bool ok = InvokeCmd(scmd.Cid(), args, builder, cntx); if (!ok || rb->GetError()) // checks for i/o error, not logical error. break; } diff --git a/src/server/main_service.h b/src/server/main_service.h index 66538cd19..710bdf773 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -24,7 +24,6 @@ class AcceptServer; namespace dfly { -class Interpreter; using facade::MemcacheParser; class Service : public facade::ServiceInterface { @@ -37,14 +36,16 @@ class Service : public facade::ServiceInterface { void Shutdown(); // Prepare command execution, verify and execute, reply to context - void DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) final; + void DispatchCommand(ArgSlice args, facade::SinkReplyBuilder* builder, + facade::ConnectionContext* cntx) final; // Execute multiple consecutive commands, possibly in parallel by squashing - size_t DispatchManyCommands(absl::Span args_list, + size_t DispatchManyCommands(absl::Span args_list, facade::SinkReplyBuilder* builder, facade::ConnectionContext* cntx) final; // Check VerifyCommandExecution and invoke command with args - bool InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* reply_cntx); + bool InvokeCmd(const CommandId* cid, CmdArgList tail_args, facade::SinkReplyBuilder* builder, + ConnectionContext* reply_cntx); // Verify command can be executed now (check out of memory), always called immediately before // execution @@ -59,7 +60,7 @@ class Service : public facade::ServiceInterface { const ConnectionContext& cntx); void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, - facade::ConnectionContext* cntx) final; + facade::MCReplyBuilder* builder, facade::ConnectionContext* cntx) final; facade::ConnectionContext* CreateContext(util::FiberSocketBase* peer, facade::Connection* owner) final; @@ -169,9 +170,9 @@ class Service : public facade::ServiceInterface { const ConnectionContext& dfly_cntx); void EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpreter* interpreter, - ConnectionContext* cntx); + SinkReplyBuilder* builder, ConnectionContext* cntx); void CallSHA(CmdArgList args, std::string_view sha, Interpreter* interpreter, - ConnectionContext* cntx); + SinkReplyBuilder* builder, ConnectionContext* cntx); // Return optional payload - first received error that occured when executing commands. std::optional FlushEvalAsyncCmds(ConnectionContext* cntx, diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index cfa52e5d1..dfda107f3 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -112,7 +112,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm return need_flush ? SquashResult::SQUASHED_FULL : SquashResult::SQUASHED; } -bool MultiCommandSquasher::ExecuteStandalone(StoredCmd* cmd) { +bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, StoredCmd* cmd) { DCHECK(order_.empty()); // check no squashed chain is interrupted cmd->Fill(&tmp_keylist_); @@ -120,8 +120,8 @@ bool MultiCommandSquasher::ExecuteStandalone(StoredCmd* cmd) { if (verify_commands_) { if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) { - cntx_->SendError(std::move(*err)); - std::ignore = cntx_->reply_builder()->ConsumeLastError(); + rb->SendError(std::move(*err)); + std::ignore = rb->ConsumeLastError(); return !error_abort_; } } @@ -132,7 +132,7 @@ bool MultiCommandSquasher::ExecuteStandalone(StoredCmd* cmd) { if (cmd->Cid()->IsTransactional()) tx->InitByArgs(cntx_->ns, cntx_->conn_state.db_index, args); - service_->InvokeCmd(cmd->Cid(), args, cntx_); + service_->InvokeCmd(cmd->Cid(), args, rb, cntx_); return true; } @@ -168,7 +168,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard crb.SetReplyMode(cmd->ReplyMode()); local_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args); - service_->InvokeCmd(cmd->Cid(), args, &local_cntx); + service_->InvokeCmd(cmd->Cid(), args, &crb, &local_cntx); sinfo.replies.emplace_back(crb.Take()); @@ -186,7 +186,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard return OpStatus::OK; } -bool MultiCommandSquasher::ExecuteSquashed() { +bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { DCHECK(!cntx_->conn_state.exec_info.IsCollecting()); if (order_.empty()) @@ -235,7 +235,6 @@ bool MultiCommandSquasher::ExecuteSquashed() { uint64_t after_hop = proactor->GetMonotonicTimeNs(); bool aborted = false; - RedisReplyBuilder* rb = static_cast(cntx_->reply_builder()); for (auto idx : order_) { auto& replies = sharded_[idx].replies; CHECK(!replies.empty()); @@ -259,7 +258,7 @@ bool MultiCommandSquasher::ExecuteSquashed() { return !aborted; } -void MultiCommandSquasher::Run() { +void MultiCommandSquasher::Run(RedisReplyBuilder* rb) { DVLOG(1) << "Trying to squash " << cmds_.size() << " commands for transaction " << cntx_->transaction->DebugId(); @@ -270,17 +269,17 @@ void MultiCommandSquasher::Run() { break; if (res == SquashResult::NOT_SQUASHED || res == SquashResult::SQUASHED_FULL) { - if (!ExecuteSquashed()) + if (!ExecuteSquashed(rb)) break; } if (res == SquashResult::NOT_SQUASHED) { - if (!ExecuteStandalone(&cmd)) + if (!ExecuteStandalone(rb, &cmd)) break; } } - ExecuteSquashed(); // Flush leftover + ExecuteSquashed(rb); // Flush leftover // Set last txid. cntx_->last_command_debug.clock = cntx_->transaction->txid(); diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index 26922b22b..1a813c613 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -22,9 +22,10 @@ namespace dfly { // contains a non-atomic multi transaction to execute squashed commands. class MultiCommandSquasher { public: - static void Execute(absl::Span cmds, ConnectionContext* cntx, Service* service, - bool verify_commands = false, bool error_abort = false) { - MultiCommandSquasher{cmds, cntx, service, verify_commands, error_abort}.Run(); + static void Execute(absl::Span cmds, facade::RedisReplyBuilder* rb, + ConnectionContext* cntx, Service* service, bool verify_commands = false, + bool error_abort = false) { + MultiCommandSquasher{cmds, cntx, service, verify_commands, error_abort}.Run(rb); } private: @@ -53,16 +54,16 @@ class MultiCommandSquasher { SquashResult TrySquash(StoredCmd* cmd); // Execute separate non-squashed cmd. Return false if aborting on error. - bool ExecuteStandalone(StoredCmd* cmd); + bool ExecuteStandalone(facade::RedisReplyBuilder* rb, StoredCmd* cmd); // Callback that runs on shards during squashed hop. facade::OpStatus SquashedHopCb(Transaction* parent_tx, EngineShard* es); // Execute all currently squashed commands. Return false if aborting on error. - bool ExecuteSquashed(); + bool ExecuteSquashed(facade::RedisReplyBuilder* rb); // Run all commands until completion. - void Run(); + void Run(facade::RedisReplyBuilder* rb); bool IsAtomic() const; diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 5f4b1862b..c01944d13 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -2884,7 +2884,7 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) { string ft_create = "FT.CREATE"; arg_vec.insert(arg_vec.begin(), MutableSlice{ft_create.data(), ft_create.size()}); - service_->DispatchCommand(absl::MakeSpan(arg_vec), &cntx); + service_->DispatchCommand(absl::MakeSpan(arg_vec), &crb, &cntx); auto response = crb.Take(); if (auto err = facade::CapturingReplyBuilder::TryExtractError(response); err) { diff --git a/src/server/replica.cc b/src/server/replica.cc index 9d66ce4ce..58fb53bb7 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -593,12 +593,14 @@ error_code Replica::InitiateDflySync() { error_code Replica::ConsumeRedisStream() { base::IoBuf io_buf(16_KB); - io::NullSink null_sink; // we never reply back on the commands. - ConnectionContext conn_context{&null_sink, nullptr, {}}; + ConnectionContext conn_context{static_cast(nullptr), nullptr, {}}; conn_context.is_replicating = true; conn_context.journal_emulated = true; conn_context.skip_acl_validation = true; conn_context.ns = &namespaces.GetDefaultNamespace(); + + // we never reply back on the commands. + facade::CapturingReplyBuilder null_builder{facade::ReplyMode::NONE}; ResetParser(true); // Master waits for this command in order to start sending replication stream. @@ -651,7 +653,7 @@ error_code Replica::ConsumeRedisStream() { facade::RespExpr::VecToArgList(LastResponseArgs(), &args_vector); CmdArgList arg_list{args_vector.data(), args_vector.size()}; - service_.DispatchCommand(arg_list, &conn_context); + service_.DispatchCommand(arg_list, &null_builder, &conn_context); } } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index dbd4a8891..f0476f3d2 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -233,7 +233,7 @@ constexpr string_view kS3Prefix = "s3://"sv; using EngineFunc = void (ServerFamily::*)(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); -inline CommandId::Handler3 HandlerFunc(ServerFamily* se, EngineFunc f) { +inline CommandId::Handler HandlerFunc(ServerFamily* se, EngineFunc f) { return [=](CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { return (se->*f)(args, tx, builder, cntx); }; @@ -498,10 +498,10 @@ void ClientSetInfo(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext void ClientId(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) { if (args.size() != 0) { - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } - return cntx->SendLong(cntx->conn()->GetClientId()); + return builder->SendLong(cntx->conn()->GetClientId()); } void ClientKill(CmdArgList args, absl::Span listeners, SinkReplyBuilder* builder, @@ -536,7 +536,7 @@ void ClientKill(CmdArgList args, absl::Span listeners, SinkRe } if (!evaluator) { - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } const bool is_admin_request = cntx->conn()->IsPrivileged(); @@ -560,11 +560,11 @@ void ClientKill(CmdArgList args, absl::Span listeners, SinkRe } if (kill_errors.load() == 0) { - return cntx->SendLong(killed_connections.load()); + return builder->SendLong(killed_connections.load()); } else { - return cntx->SendError(absl::StrCat("Killed ", killed_connections.load(), - " client(s), but unable to kill ", kill_errors.load(), - " admin client(s).")); + return builder->SendError(absl::StrCat("Killed ", killed_connections.load(), + " client(s), but unable to kill ", kill_errors.load(), + " admin client(s).")); } } @@ -1811,7 +1811,7 @@ bool ServerFamily::DoAuth(ConnectionContext* cntx, std::string_view username, void ServerFamily::Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { if (args.size() > 2) { - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } // non admin port auth @@ -1821,16 +1821,16 @@ void ServerFamily::Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil const size_t index = one_arg ? 0 : 1; std::string_view password = facade::ToSV(args[index]); if (DoAuth(cntx, username, password)) { - return cntx->SendOk(); + return builder->SendOk(); } auto& log = ServerState::tlocal()->acl_log; using Reason = acl::AclLog::Reason; log.Add(*cntx, "AUTH", Reason::AUTH, std::string(username)); - return cntx->SendError(facade::kAuthRejected); + return builder->SendError(facade::kAuthRejected); } if (!cntx->req_auth) { - return cntx->SendError( + return builder->SendError( "AUTH called without any password configured for " "admin port. Are you sure your configuration is correct?"); } @@ -1838,9 +1838,9 @@ void ServerFamily::Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil string_view pass = ArgS(args, 0); if (pass == GetPassword()) { cntx->authenticated = true; - cntx->SendOk(); + builder->SendOk(); } else { - cntx->SendError(facade::kAuthRejected); + builder->SendError(facade::kAuthRejected); } } @@ -2888,7 +2888,7 @@ void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder* } else if (cmd == "LISTENING-PORT") { uint32_t replica_listening_port; if (!absl::SimpleAtoi(arg, &replica_listening_port)) { - cntx->SendError(kInvalidIntErr); + builder->SendError(kInvalidIntErr); return; } cntx->conn_state.replication_info.repl_listening_port = replica_listening_port; @@ -2908,7 +2908,7 @@ void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder* } else if (cmd == "CLIENT-VERSION" && args.size() == 2) { unsigned version; if (!absl::SimpleAtoi(arg, &version)) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } dfly_cmd_->SetDflyClientVersion(cntx, DflyVersion(version)); } else if (cmd == "ACK" && args.size() == 2) { @@ -2928,17 +2928,13 @@ void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder* VLOG(2) << "Received client ACK=" << ack; cntx->replication_flow->last_acked_lsn = ack; return; - } else if (cmd == "ACL-CHECK") { - // TODO(kostasrim): Remove this branch 20/6/2024 - cntx->SendOk(); - return; } else { VLOG(1) << "Error " << cmd << " " << arg << " " << args.size(); return err_cb(); } } - return cntx->SendOk(); + return builder->SendOk(); } void ServerFamily::Role(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, @@ -2998,7 +2994,7 @@ void ServerFamily::LastSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* util::fb2::LockGuard lk(save_mu_); save_time = last_save_info_.save_time; } - cntx->SendLong(save_time); + builder->SendLong(save_time); } void ServerFamily::Latency(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, @@ -3011,13 +3007,13 @@ void ServerFamily::Latency(CmdArgList args, Transaction* tx, SinkReplyBuilder* b } LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported"; - cntx->SendError(kSyntaxErr); + builder->SendError(kSyntaxErr); } void ServerFamily::ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { if (args.size() > 1) { - cntx->SendError(kSyntaxErr); + builder->SendError(kSyntaxErr); return; } @@ -3027,7 +3023,7 @@ void ServerFamily::ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilde } else if (absl::EqualsIgnoreCase(sub_cmd, "NOSAVE")) { save_on_shutdown_ = false; } else { - cntx->SendError(kSyntaxErr); + builder->SendError(kSyntaxErr); return; } } @@ -3036,7 +3032,7 @@ void ServerFamily::ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilde [](ProactorBase* pb) { ServerState::tlocal()->EnterLameDuck(); }); CHECK_NOTNULL(acceptor_)->Stop(); - cntx->SendOk(); + builder->SendOk(); } void ServerFamily::Dfly(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, @@ -3074,26 +3070,26 @@ void ServerFamily::SlowLog(CmdArgList args, Transaction* tx, SinkReplyBuilder* b lengths[index] = ServerState::tlocal()->GetSlowLog().Length(); }); int sum = std::accumulate(lengths.begin(), lengths.end(), 0); - return cntx->SendLong(sum); + return builder->SendLong(sum); } if (sub_cmd == "RESET") { service_.proactor_pool().AwaitFiberOnAll( [](auto index, auto* context) { ServerState::tlocal()->GetSlowLog().Reset(); }); - return cntx->SendOk(); + return builder->SendOk(); } if (sub_cmd == "GET") { return SlowLogGet(args, sub_cmd, &service_.proactor_pool(), builder); } - cntx->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType); + builder->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType); } void ServerFamily::Module(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); if (sub_cmd != "LIST") - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); auto* rb = static_cast(builder); rb->StartArray(2); diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 334505f4c..6c5b21d25 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -970,7 +970,7 @@ void StringFamily::Set(CmdArgList args, Transaction* tx, SinkReplyBuilder* build int64_t now_ms = GetCurrentTimeMs(); auto [rel_ms, abs_ms] = expiry.Calculate(now_ms, false); if (abs_ms < 0) - return cntx->SendError(InvalidExpireTime("set")); + return builder->SendError(InvalidExpireTime("set")); // Remove existed key if the key is expired already if (rel_ms < 0) { diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index c3a02285b..955290583 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -390,7 +390,7 @@ RespExpr BaseFamilyTest::Run(std::string_view id, ArgSlice slice) { DCHECK(context->transaction == nullptr) << id; - service_->DispatchCommand(CmdArgList{args}, context); + service_->DispatchCommand(CmdArgList{args}, context->reply_builder(), context); DCHECK(context->transaction == nullptr); @@ -433,7 +433,7 @@ auto BaseFamilyTest::RunMC(MP::CmdType cmd_type, string_view key, string_view va DCHECK(context->transaction == nullptr); - service_->DispatchMC(cmd, value, context); + service_->DispatchMC(cmd, value, static_cast(context->reply_builder()), context); DCHECK(context->transaction == nullptr); @@ -452,7 +452,8 @@ auto BaseFamilyTest::RunMC(MP::CmdType cmd_type, std::string_view key) -> MCResp auto* context = conn->cmd_cntx(); - service_->DispatchMC(cmd, string_view{}, context); + service_->DispatchMC(cmd, string_view{}, static_cast(context->reply_builder()), + context); return conn->SplitLines(); } @@ -478,7 +479,8 @@ auto BaseFamilyTest::GetMC(MP::CmdType cmd_type, std::initializer_listcmd_cntx(); - service_->DispatchMC(cmd, string_view{}, context); + service_->DispatchMC(cmd, string_view{}, static_cast(context->reply_builder()), + context); return conn->SplitLines(); } diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 71e898cd0..7d942d228 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -1235,10 +1235,10 @@ void BZPopMinMax(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, float timeout; auto timeout_str = ArgS(args, args.size() - 1); if (!absl::SimpleAtof(timeout_str, &timeout)) { - return cntx->SendError("timeout is not a float or out of range"); + return builder->SendError("timeout is not a float or out of range"); } if (timeout < 0) { - return cntx->SendError("timeout is negative"); + return builder->SendError("timeout is negative"); } VLOG(1) << "BZPop timeout(" << timeout << ")"; @@ -1274,14 +1274,14 @@ void BZPopMinMax(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, DVLOG(1) << "result for " << tx->DebugId() << " is " << popped_key.status(); switch (popped_key.status()) { case OpStatus::WRONG_TYPE: - return cntx->SendError(kWrongTypeErr); + return builder->SendError(kWrongTypeErr); case OpStatus::CANCELLED: case OpStatus::TIMED_OUT: return rb->SendNullArray(); case OpStatus::KEY_MOVED: { auto error = cluster::SlotOwnershipErrorStr(*tx->GetUniqueSlotId()); CHECK(error.has_value()); - return cntx->SendError(std::move(*error)); + return builder->SendError(std::move(*error)); } default: LOG(ERROR) << "Unexpected error " << popped_key.status();