1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: pass RedisReplyBuilder explicitly from dragonfly connection (#4009)

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-10-29 14:52:09 +02:00 committed by GitHub
parent 566f246cee
commit 6f6897cef1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 156 additions and 230 deletions

View file

@ -52,16 +52,4 @@ size_t ConnectionContext::UsedMemory() const {
return dfly::HeapSize(rbuilder_) + dfly::HeapSize(authed_username) + dfly::HeapSize(acl_commands);
}
void ConnectionContext::SendError(std::string_view str, std::string_view type) {
rbuilder_->SendError(str, type);
}
void ConnectionContext::SendError(ErrorReply error) {
rbuilder_->SendError(error);
}
void ConnectionContext::SendError(OpStatus status) {
rbuilder_->SendError(status);
}
} // namespace facade

View file

@ -47,40 +47,6 @@ class ConnectionContext {
return res;
}
virtual void SendError(std::string_view str, std::string_view type = std::string_view{});
virtual void SendError(ErrorReply error);
virtual void SendError(OpStatus status);
void SendStored() {
rbuilder_->SendStored();
}
void SendSetSkipped() {
rbuilder_->SendSetSkipped();
}
void SendMGetResponse(SinkReplyBuilder::MGetResponse resp) {
rbuilder_->SendMGetResponse(std::move(resp));
}
void SendLong(long val) {
rbuilder_->SendLong(val);
}
void SendSimpleString(std::string_view str) {
rbuilder_->SendSimpleString(str);
}
void SendOk() {
rbuilder_->SendOk();
}
void SendProtocolError(std::string_view str) {
rbuilder_->SendProtocolError(str);
}
virtual size_t UsedMemory() const;
// connection state / properties.

View file

@ -486,14 +486,16 @@ void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) {
void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg) {
DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front());
self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get());
self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()},
self->reply_builder_, self->cc_.get());
self->last_interaction_ = time(nullptr);
self->skip_next_squashing_ = false;
}
void Connection::DispatchOperations::operator()(const Connection::MCPipelineMessage& msg) {
self->service_->DispatchMC(msg.cmd, msg.value, self->cc_.get());
self->service_->DispatchMC(msg.cmd, msg.value, static_cast<MCReplyBuilder*>(self->reply_builder_),
self->cc_.get());
self->last_interaction_ = time(nullptr);
}
@ -1087,7 +1089,7 @@ Connection::ParserStatus Connection::ParseRedis() {
auto dispatch_sync = [this, &parse_args, &cmd_vec] {
RespExpr::VecToArgList(parse_args, &cmd_vec);
service_->DispatchCommand(absl::MakeSpan(cmd_vec), cc_.get());
service_->DispatchCommand(absl::MakeSpan(cmd_vec), reply_builder_, cc_.get());
};
auto dispatch_async = [this, &parse_args, tlh = mi_heap_get_backing()]() -> MessageHandle {
return {FromArgs(std::move(parse_args), tlh)};
@ -1131,7 +1133,10 @@ auto Connection::ParseMemcache() -> ParserStatus {
MemcacheParser::Command cmd;
string_view value;
auto dispatch_sync = [this, &cmd, &value] { service_->DispatchMC(cmd, value, cc_.get()); };
auto dispatch_sync = [this, &cmd, &value] {
service_->DispatchMC(cmd, value, static_cast<MCReplyBuilder*>(reply_builder_), cc_.get());
};
auto dispatch_async = [&cmd, &value]() -> MessageHandle {
return {make_unique<MCPipelineMessage>(std::move(cmd), value)};
};
@ -1353,6 +1358,7 @@ bool Connection::ShouldEndDispatchFiber(const MessageHandle& msg) {
void Connection::SquashPipeline() {
DCHECK_EQ(dispatch_q_.size(), pending_pipeline_cmd_cnt_);
DCHECK_EQ(reply_builder_->type(), SinkReplyBuilder::REDIS); // Only Redis is supported.
vector<ArgSlice> squash_cmds;
squash_cmds.reserve(dispatch_q_.size());
@ -1367,7 +1373,8 @@ void Connection::SquashPipeline() {
cc_->async_dispatch = true;
size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get());
size_t dispatched =
service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), reply_builder_, cc_.get());
if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared
reply_builder_->FlushBatch();

View file

@ -21,19 +21,20 @@ namespace {
class OkService : public ServiceInterface {
public:
void DispatchCommand(ArgSlice args, ConnectionContext* cntx) final {
cntx->SendOk();
void DispatchCommand(ArgSlice args, SinkReplyBuilder* builder, ConnectionContext* cntx) final {
builder->SendOk();
}
size_t DispatchManyCommands(absl::Span<ArgSlice> args_lists, ConnectionContext* cntx) final {
size_t DispatchManyCommands(absl::Span<ArgSlice> args_lists, SinkReplyBuilder* builder,
ConnectionContext* cntx) final {
for (auto args : args_lists)
DispatchCommand(args, cntx);
DispatchCommand(args, builder, cntx);
return args_lists.size();
}
void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
ConnectionContext* cntx) final {
cntx->SendError("");
MCReplyBuilder* builder, ConnectionContext* cntx) final {
builder->SendError("");
}
ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) final {

View file

@ -18,20 +18,23 @@ namespace facade {
class ConnectionContext;
class Connection;
struct ConnectionStats;
class SinkReplyBuilder;
class MCReplyBuilder;
class ServiceInterface {
public:
virtual ~ServiceInterface() {
}
virtual void DispatchCommand(ArgSlice args, ConnectionContext* cntx) = 0;
virtual void DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
ConnectionContext* cntx) = 0;
// Returns number of processed commands
virtual size_t DispatchManyCommands(absl::Span<ArgSlice> args_list, ConnectionContext* cntx) = 0;
virtual size_t DispatchManyCommands(absl::Span<ArgSlice> args_list, SinkReplyBuilder* builder,
ConnectionContext* cntx) = 0;
virtual void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
ConnectionContext* cntx) = 0;
MCReplyBuilder* builder, ConnectionContext* cntx) = 0;
virtual ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) = 0;

View file

@ -629,8 +629,8 @@ void AclFamily::DryRun(CmdArgList args, Transaction* tx, SinkReplyBuilder* build
using MemberFunc2 = void (AclFamily::*)(CmdArgList args, Transaction* tx,
facade::SinkReplyBuilder* builder);
using MemberFunc3 = void (AclFamily::*)(CmdArgList args, Transaction* tx,
facade::SinkReplyBuilder* builder, ConnectionContext* cntx);
using MemberFunc = void (AclFamily::*)(CmdArgList args, Transaction* tx,
facade::SinkReplyBuilder* builder, ConnectionContext* cntx);
CommandId::Handler2 HandlerFunc(AclFamily* acl, MemberFunc2 f) {
return [=](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder) {
@ -638,7 +638,7 @@ CommandId::Handler2 HandlerFunc(AclFamily* acl, MemberFunc2 f) {
};
}
CommandId::Handler3 HandlerFunc(AclFamily* acl, MemberFunc3 f) {
CommandId::Handler HandlerFunc(AclFamily* acl, MemberFunc f) {
return [=](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder,
ConnectionContext* cntx) { return (acl->*f)(args, tx, builder, cntx); };
}

View file

@ -1007,7 +1007,7 @@ using EngineFunc = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* bu
using EngineFunc2 = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder);
inline CommandId::Handler3 HandlerFunc(ClusterFamily* se, EngineFunc f) {
inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) {
return [=](CmdArgList args, Transaction*, SinkReplyBuilder* builder, ConnectionContext* cntx) {
return (se->*f)(args, builder, cntx);
};

View file

@ -58,9 +58,10 @@ bool CommandId::IsMultiTransactional() const {
return CO::IsTransKind(name()) || CO::IsEvalKind(name());
}
uint64_t CommandId::Invoke(CmdArgList args, ConnectionContext* cntx) const {
uint64_t CommandId::Invoke(CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder,
ConnectionContext* cntx) const {
int64_t before = absl::GetCurrentTimeNanos();
handler_(args, cntx);
handler_(args, tx, builder, cntx);
int64_t after = absl::GetCurrentTimeNanos();
ServerState* ss = ServerState::tlocal(); // Might have migrated thread, read after invocation
@ -95,17 +96,8 @@ optional<facade::ErrorReply> CommandId::Validate(CmdArgList tail_args) const {
}
CommandId&& CommandId::SetHandler(Handler2 f) && {
handler_ = [f = std::move(f)](CmdArgList args, ConnectionContext* cntx) {
f(args, cntx->transaction, cntx->reply_builder());
};
return std::move(*this);
}
CommandId&& CommandId::SetHandler(Handler3 f) && {
handler_ = [f = std::move(f)](CmdArgList args, ConnectionContext* cntx) {
f(args, cntx->transaction, cntx->reply_builder(), cntx);
};
handler_ = [f = std::move(f)](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder,
facade::ConnectionContext*) { f(args, tx, builder); };
return std::move(*this);
}

View file

@ -88,13 +88,18 @@ class CommandId : public facade::CommandId {
using Handler =
fu2::function_base<true /*owns*/, true /*copyable*/, fu2::capacity_default,
false /* non-throwing*/, false /* strong exceptions guarantees*/,
void(CmdArgList, ConnectionContext*) const>;
void(CmdArgList, Transaction*, facade::SinkReplyBuilder*,
ConnectionContext*) const>;
using Handler2 =
fu2::function_base<true, true, fu2::capacity_default, false, false,
void(CmdArgList, Transaction*, facade::SinkReplyBuilder*) const>;
using ArgValidator = fu2::function_base<true, true, fu2::capacity_default, false, false,
std::optional<facade::ErrorReply>(CmdArgList) const>;
// Returns the invoke time in usec.
uint64_t Invoke(CmdArgList args, ConnectionContext* cntx) const;
uint64_t Invoke(CmdArgList args, Transaction*, facade::SinkReplyBuilder*,
ConnectionContext* cntx) const;
// Returns error if validation failed, otherwise nullopt
std::optional<facade::ErrorReply> Validate(CmdArgList tail_args) const;
@ -122,14 +127,7 @@ class CommandId : public facade::CommandId {
return std::move(*this);
}
using Handler2 =
fu2::function_base<true, true, fu2::capacity_default, false, false,
void(CmdArgList, Transaction*, facade::SinkReplyBuilder*) const>;
using Handler3 = fu2::function_base<true, true, fu2::capacity_default, false, false,
void(CmdArgList, Transaction*, facade::SinkReplyBuilder*,
ConnectionContext*) const>;
CommandId&& SetHandler(Handler2 f) &&;
CommandId&& SetHandler(Handler3 f) &&;
CommandId&& SetValidator(ArgValidator f) && {
validator_ = std::move(f);

View file

@ -269,29 +269,6 @@ size_t ConnectionContext::UsedMemory() const {
return facade::ConnectionContext::UsedMemory() + dfly::HeapSize(conn_state);
}
void ConnectionContext::SendError(std::string_view str, std::string_view type) {
string_view name = cid ? cid->name() : string_view{};
VLOG(1) << "Sending error " << str << " " << type << " during " << name;
facade::ConnectionContext::SendError(str, type);
}
void ConnectionContext::SendError(facade::ErrorReply error) {
string_view name = cid ? cid->name() : string_view{};
VLOG(1) << "Sending error " << error.ToSv() << " during " << name;
facade::ConnectionContext::SendError(std::move(error));
}
void ConnectionContext::SendError(facade::OpStatus status) {
if (status != facade::OpStatus::OK) {
string_view name = cid ? cid->name() : string_view{};
VLOG(1) << "Sending error " << status << " during " << name;
}
facade::ConnectionContext::SendError(status);
}
void ConnectionState::ExecInfo::Clear() {
DCHECK(!preborrowed_interpreter); // Must have been released properly
state = EXEC_INACTIVE;

View file

@ -300,10 +300,6 @@ class ConnectionContext : public facade::ConnectionContext {
size_t UsedMemory() const override;
void SendError(std::string_view str, std::string_view type = std::string_view{}) override;
void SendError(facade::ErrorReply error) override;
void SendError(facade::OpStatus status) override;
// Whether this connection is a connection from a replica to its master.
// This flag is true only on replica side, where we need to setup a special ConnectionContext
// instance that helps applying commands coming from master.

View file

@ -145,6 +145,7 @@ void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool
local_tx->StartMultiNonAtomic();
boost::intrusive_ptr<Transaction> stub_tx =
new Transaction{local_tx.get(), EngineShard::tlocal()->shard_id(), nullopt};
absl::InlinedVector<string_view, 5> args_view;
facade::CapturingReplyBuilder crb;
ConnectionContext local_cntx{cntx, stub_tx.get(), &crb};
@ -171,7 +172,7 @@ void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool
crb.SetReplyMode(ReplyMode::NONE);
stub_tx->InitByArgs(cntx->ns, local_cntx.conn_state.db_index, args_span);
sf->service().InvokeCmd(cid, args_span, &local_cntx);
sf->service().InvokeCmd(cid, args_span, &crb, &local_cntx);
}
local_cntx.Inject(nullptr);

View file

@ -1615,7 +1615,7 @@ void GenericFamily::Select(CmdArgList args, Transaction*, SinkReplyBuilder* buil
};
shard_set->RunBriefInParallel(std::move(cb));
return cntx->SendOk();
return builder->SendOk();
}
void GenericFamily::Dump(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {

View file

@ -228,12 +228,11 @@ void HttpAPI(const http::QueryArgs& args, HttpRequest&& req, Service* service,
DCHECK(context);
facade::CapturingReplyBuilder reply_builder;
auto* prev = context->Inject(&reply_builder);
// TODO: to finish this.
service->DispatchCommand(absl::MakeSpan(cmd_slices), context);
service->DispatchCommand(absl::MakeSpan(cmd_slices), &reply_builder, context);
facade::CapturingReplyBuilder::Payload payload = reply_builder.Take();
context->Inject(prev);
auto response = http::MakeStringResponse();
http::SetMime(http::kJsonMime, &response);

View file

@ -75,7 +75,7 @@ void JournalExecutor::FlushSlots(const cluster::SlotRange& slot_range) {
void JournalExecutor::Execute(journal::ParsedEntry::CmdData& cmd) {
auto span = CmdArgList{cmd.cmd_args.data(), cmd.cmd_args.size()};
service_->DispatchCommand(span, &conn_context_);
service_->DispatchCommand(span, &reply_builder_, &conn_context_);
}
void JournalExecutor::SelectDb(DbIndex dbid) {

View file

@ -1232,8 +1232,8 @@ std::optional<ErrorReply> Service::VerifyCommandState(const CommandId* cid, CmdA
return VerifyConnectionAclStatus(cid, &dfly_cntx, "has no ACL permissions", tail_args);
}
void Service::DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) {
auto* builder = cntx->reply_builder();
void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
facade::ConnectionContext* cntx) {
absl::Cleanup clear_last_error([builder]() { std::ignore = builder->ConsumeLastError(); });
DCHECK(!args.empty());
DCHECK_NE(0u, shard_set->size()) << "Init was not called";
@ -1334,7 +1334,7 @@ void Service::DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) {
dfly_cntx->cid = cid;
if (!InvokeCmd(cid, args_no_cmd, dfly_cntx)) {
if (!InvokeCmd(cid, args_no_cmd, builder, dfly_cntx)) {
builder->SendError("Internal Error");
builder->CloseConnection();
}
@ -1391,10 +1391,10 @@ OpResult<void> OpTrackKeys(const OpArgs slice_args, const facade::Connection::We
return OpStatus::OK;
}
bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* cntx) {
bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
DCHECK(cid);
DCHECK(!cid->Validate(tail_args));
auto* builder = cntx->reply_builder();
if (auto err = VerifyCommandExecution(cid, cntx, tail_args); err) {
// We need to skip this because ACK's should not be replied to
@ -1441,7 +1441,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
auto last_error = builder->ConsumeLastError();
DCHECK(last_error.empty());
try {
invoke_time_usec = cid->Invoke(tail_args, cntx);
invoke_time_usec = cid->Invoke(tail_args, tx, builder, cntx);
} catch (std::exception& e) {
LOG(ERROR) << "Internal error, system probably unstable " << e.what();
return false;
@ -1491,10 +1491,11 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
return true;
}
size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list, SinkReplyBuilder* builder,
facade::ConnectionContext* cntx) {
ConnectionContext* dfly_cntx = static_cast<ConnectionContext*>(cntx);
DCHECK(!dfly_cntx->conn_state.exec_info.IsRunning());
DCHECK_EQ(builder->type(), SinkReplyBuilder::REDIS);
vector<StoredCmd> stored_cmds;
intrusive_ptr<Transaction> dist_trans;
@ -1515,7 +1516,9 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
}
dfly_cntx->transaction = dist_trans.get();
MultiCommandSquasher::Execute(absl::MakeSpan(stored_cmds), dfly_cntx, this, true, false);
MultiCommandSquasher::Execute(absl::MakeSpan(stored_cmds),
static_cast<RedisReplyBuilder*>(builder), dfly_cntx, this, true,
false);
dfly_cntx->transaction = nullptr;
dispatched += stored_cmds.size();
@ -1559,7 +1562,7 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
break;
// Dispatch non squashed command only after all squshed commands were executed and replied
DispatchCommand(args, cntx);
DispatchCommand(args, builder, cntx);
dispatched++;
}
@ -1572,14 +1575,13 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
}
void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
facade::ConnectionContext* cntx) {
MCReplyBuilder* mc_builder, facade::ConnectionContext* cntx) {
absl::InlinedVector<MutableSlice, 8> args;
char cmd_name[16];
char ttl[16];
char store_opt[32] = {0};
char ttl_op[] = "EXAT";
MCReplyBuilder* mc_builder = static_cast<MCReplyBuilder*>(cntx->reply_builder());
mc_builder->SetNoreply(cmd.no_reply);
switch (cmd.type) {
@ -1671,7 +1673,7 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va
}
}
DispatchCommand(CmdArgList{args}, cntx);
DispatchCommand(CmdArgList{args}, mc_builder, cntx);
// Reset back.
dfly_cntx->conn_state.memcache_flag = 0;
@ -1824,7 +1826,7 @@ optional<CapturingReplyBuilder::Payload> Service::FlushEvalAsyncCmds(ConnectionC
CapturingReplyBuilder crb{ReplyMode::ONLY_ERR};
WithReplies(&crb, cntx, [&] {
MultiCommandSquasher::Execute(absl::MakeSpan(info->async_cmds), cntx, this, true, true);
MultiCommandSquasher::Execute(absl::MakeSpan(info->async_cmds), &crb, cntx, this, true, true);
});
info->async_cmds_heap_mem = 0;
@ -1873,7 +1875,7 @@ void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca)
if (ca.async)
return;
DispatchCommand(ca.args, cntx);
DispatchCommand(ca.args, &replier, cntx);
}
void Service::Eval(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
@ -1892,7 +1894,7 @@ void Service::Eval(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
string sha{std::move(res.value())};
CallSHA(args, sha, interpreter, cntx);
CallSHA(args, sha, interpreter, builder, cntx);
}
void Service::EvalSha(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
@ -1900,11 +1902,11 @@ void Service::EvalSha(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
string sha = absl::AsciiStrToLower(ArgS(args, 0));
BorrowedInterpreter interpreter{cntx->transaction, cntx};
CallSHA(args, sha, interpreter, cntx);
CallSHA(args, sha, interpreter, builder, cntx);
}
void Service::CallSHA(CmdArgList args, string_view sha, Interpreter* interpreter,
ConnectionContext* cntx) {
SinkReplyBuilder* builder, ConnectionContext* cntx) {
uint32_t num_keys;
CHECK(absl::SimpleAtoi(ArgS(args, 1), &num_keys)); // we already validated this
@ -1914,7 +1916,7 @@ void Service::CallSHA(CmdArgList args, string_view sha, Interpreter* interpreter
ev_args.args = args.subspan(2 + num_keys);
uint64_t start = absl::GetCurrentTimeNanos();
EvalInternal(args, ev_args, interpreter, cntx);
EvalInternal(args, ev_args, interpreter, builder, cntx);
uint64_t end = absl::GetCurrentTimeNanos();
ServerState::tlocal()->RecordCallLatency(sha, (end - start) / 1000);
@ -1954,25 +1956,10 @@ Transaction::MultiMode DetermineMultiMode(ScriptMgr::ScriptParams params) {
// Start multi transaction for eval. Returns true if transaction was scheduled.
// Skips scheduling if multi mode requires declaring keys, but no keys were declared.
// Return nullopt if eval runs inside multi and conflicts with multi mode
optional<bool> StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptParams params,
ConnectionContext* cntx) {
bool StartMultiEval(DbIndex dbid, CmdArgList keys, Transaction::MultiMode script_mode,
ConnectionContext* cntx) {
Transaction* tx = cntx->transaction;
Namespace* ns = cntx->ns;
SinkReplyBuilder* builder = cntx->reply_builder();
Transaction::MultiMode script_mode = DetermineMultiMode(params);
Transaction::MultiMode multi_mode = tx->GetMultiMode();
// Check if eval is already part of a running multi transaction
if (multi_mode != Transaction::NOT_DETERMINED) {
if (multi_mode > script_mode) {
string err = StrCat(
"Multi mode conflict when running eval in multi transaction. Multi mode is: ", multi_mode,
" eval mode is: ", script_mode);
builder->SendError(err);
return nullopt;
}
return false;
}
if (keys.empty() && script_mode == Transaction::LOCK_AHEAD)
return false;
@ -2015,9 +2002,8 @@ static bool CanRunSingleShardMulti(optional<ShardId> sid, const ScriptMgr::Scrip
}
void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpreter* interpreter,
ConnectionContext* cntx) {
SinkReplyBuilder* builder, ConnectionContext* cntx) {
DCHECK(!eval_args.sha.empty());
auto* builder = cntx->reply_builder();
// Sanitizing the input to avoid code injection.
if (eval_args.sha.size() != 40 || !IsSHA(eval_args.sha)) {
@ -2097,9 +2083,20 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
cntx->conn()->RequestAsyncMigration(shard_set->pool()->at(*sid));
}
} else {
optional<bool> scheduled = StartMultiEval(cntx->db_index(), eval_args.keys, *params, cntx);
if (!scheduled) {
return;
Transaction::MultiMode script_mode = DetermineMultiMode(*params);
Transaction::MultiMode tx_mode = tx->GetMultiMode();
bool scheduled = false;
// Check if eval is already part of a running multi transaction
if (tx_mode != Transaction::NOT_DETERMINED) {
if (tx_mode > script_mode) {
string err = StrCat(
"Multi mode conflict when running eval in multi transaction. Multi mode is: ", tx_mode,
" eval mode is: ", script_mode);
return builder->SendError(err);
}
} else {
scheduled = StartMultiEval(cntx->db_index(), eval_args.keys, script_mode, cntx);
}
++ServerState::tlocal()->stats.eval_io_coordination_cnt;
@ -2115,7 +2112,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
}
// Conclude the transaction.
if (*scheduled)
if (scheduled)
tx->UnlockMulti();
}
@ -2312,7 +2309,7 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
if (absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE &&
!cntx->conn_state.tracking_info_.IsTrackingOn()) {
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), cntx, this);
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), rb, cntx, this);
} else {
CmdArgVec arg_vec;
for (auto& scmd : exec_info.body) {
@ -2334,7 +2331,7 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
}
}
bool ok = InvokeCmd(scmd.Cid(), args, cntx);
bool ok = InvokeCmd(scmd.Cid(), args, builder, cntx);
if (!ok || rb->GetError()) // checks for i/o error, not logical error.
break;
}

View file

@ -24,7 +24,6 @@ class AcceptServer;
namespace dfly {
class Interpreter;
using facade::MemcacheParser;
class Service : public facade::ServiceInterface {
@ -37,14 +36,16 @@ class Service : public facade::ServiceInterface {
void Shutdown();
// Prepare command execution, verify and execute, reply to context
void DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) final;
void DispatchCommand(ArgSlice args, facade::SinkReplyBuilder* builder,
facade::ConnectionContext* cntx) final;
// Execute multiple consecutive commands, possibly in parallel by squashing
size_t DispatchManyCommands(absl::Span<ArgSlice> args_list,
size_t DispatchManyCommands(absl::Span<ArgSlice> args_list, facade::SinkReplyBuilder* builder,
facade::ConnectionContext* cntx) final;
// Check VerifyCommandExecution and invoke command with args
bool InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* reply_cntx);
bool InvokeCmd(const CommandId* cid, CmdArgList tail_args, facade::SinkReplyBuilder* builder,
ConnectionContext* reply_cntx);
// Verify command can be executed now (check out of memory), always called immediately before
// execution
@ -59,7 +60,7 @@ class Service : public facade::ServiceInterface {
const ConnectionContext& cntx);
void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
facade::ConnectionContext* cntx) final;
facade::MCReplyBuilder* builder, facade::ConnectionContext* cntx) final;
facade::ConnectionContext* CreateContext(util::FiberSocketBase* peer,
facade::Connection* owner) final;
@ -169,9 +170,9 @@ class Service : public facade::ServiceInterface {
const ConnectionContext& dfly_cntx);
void EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpreter* interpreter,
ConnectionContext* cntx);
SinkReplyBuilder* builder, ConnectionContext* cntx);
void CallSHA(CmdArgList args, std::string_view sha, Interpreter* interpreter,
ConnectionContext* cntx);
SinkReplyBuilder* builder, ConnectionContext* cntx);
// Return optional payload - first received error that occured when executing commands.
std::optional<facade::CapturingReplyBuilder::Payload> FlushEvalAsyncCmds(ConnectionContext* cntx,

View file

@ -112,7 +112,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm
return need_flush ? SquashResult::SQUASHED_FULL : SquashResult::SQUASHED;
}
bool MultiCommandSquasher::ExecuteStandalone(StoredCmd* cmd) {
bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, StoredCmd* cmd) {
DCHECK(order_.empty()); // check no squashed chain is interrupted
cmd->Fill(&tmp_keylist_);
@ -120,8 +120,8 @@ bool MultiCommandSquasher::ExecuteStandalone(StoredCmd* cmd) {
if (verify_commands_) {
if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) {
cntx_->SendError(std::move(*err));
std::ignore = cntx_->reply_builder()->ConsumeLastError();
rb->SendError(std::move(*err));
std::ignore = rb->ConsumeLastError();
return !error_abort_;
}
}
@ -132,7 +132,7 @@ bool MultiCommandSquasher::ExecuteStandalone(StoredCmd* cmd) {
if (cmd->Cid()->IsTransactional())
tx->InitByArgs(cntx_->ns, cntx_->conn_state.db_index, args);
service_->InvokeCmd(cmd->Cid(), args, cntx_);
service_->InvokeCmd(cmd->Cid(), args, rb, cntx_);
return true;
}
@ -168,7 +168,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard
crb.SetReplyMode(cmd->ReplyMode());
local_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args);
service_->InvokeCmd(cmd->Cid(), args, &local_cntx);
service_->InvokeCmd(cmd->Cid(), args, &crb, &local_cntx);
sinfo.replies.emplace_back(crb.Take());
@ -186,7 +186,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard
return OpStatus::OK;
}
bool MultiCommandSquasher::ExecuteSquashed() {
bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
DCHECK(!cntx_->conn_state.exec_info.IsCollecting());
if (order_.empty())
@ -235,7 +235,6 @@ bool MultiCommandSquasher::ExecuteSquashed() {
uint64_t after_hop = proactor->GetMonotonicTimeNs();
bool aborted = false;
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx_->reply_builder());
for (auto idx : order_) {
auto& replies = sharded_[idx].replies;
CHECK(!replies.empty());
@ -259,7 +258,7 @@ bool MultiCommandSquasher::ExecuteSquashed() {
return !aborted;
}
void MultiCommandSquasher::Run() {
void MultiCommandSquasher::Run(RedisReplyBuilder* rb) {
DVLOG(1) << "Trying to squash " << cmds_.size() << " commands for transaction "
<< cntx_->transaction->DebugId();
@ -270,17 +269,17 @@ void MultiCommandSquasher::Run() {
break;
if (res == SquashResult::NOT_SQUASHED || res == SquashResult::SQUASHED_FULL) {
if (!ExecuteSquashed())
if (!ExecuteSquashed(rb))
break;
}
if (res == SquashResult::NOT_SQUASHED) {
if (!ExecuteStandalone(&cmd))
if (!ExecuteStandalone(rb, &cmd))
break;
}
}
ExecuteSquashed(); // Flush leftover
ExecuteSquashed(rb); // Flush leftover
// Set last txid.
cntx_->last_command_debug.clock = cntx_->transaction->txid();

View file

@ -22,9 +22,10 @@ namespace dfly {
// contains a non-atomic multi transaction to execute squashed commands.
class MultiCommandSquasher {
public:
static void Execute(absl::Span<StoredCmd> cmds, ConnectionContext* cntx, Service* service,
bool verify_commands = false, bool error_abort = false) {
MultiCommandSquasher{cmds, cntx, service, verify_commands, error_abort}.Run();
static void Execute(absl::Span<StoredCmd> cmds, facade::RedisReplyBuilder* rb,
ConnectionContext* cntx, Service* service, bool verify_commands = false,
bool error_abort = false) {
MultiCommandSquasher{cmds, cntx, service, verify_commands, error_abort}.Run(rb);
}
private:
@ -53,16 +54,16 @@ class MultiCommandSquasher {
SquashResult TrySquash(StoredCmd* cmd);
// Execute separate non-squashed cmd. Return false if aborting on error.
bool ExecuteStandalone(StoredCmd* cmd);
bool ExecuteStandalone(facade::RedisReplyBuilder* rb, StoredCmd* cmd);
// Callback that runs on shards during squashed hop.
facade::OpStatus SquashedHopCb(Transaction* parent_tx, EngineShard* es);
// Execute all currently squashed commands. Return false if aborting on error.
bool ExecuteSquashed();
bool ExecuteSquashed(facade::RedisReplyBuilder* rb);
// Run all commands until completion.
void Run();
void Run(facade::RedisReplyBuilder* rb);
bool IsAtomic() const;

View file

@ -2884,7 +2884,7 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
string ft_create = "FT.CREATE";
arg_vec.insert(arg_vec.begin(), MutableSlice{ft_create.data(), ft_create.size()});
service_->DispatchCommand(absl::MakeSpan(arg_vec), &cntx);
service_->DispatchCommand(absl::MakeSpan(arg_vec), &crb, &cntx);
auto response = crb.Take();
if (auto err = facade::CapturingReplyBuilder::TryExtractError(response); err) {

View file

@ -593,12 +593,14 @@ error_code Replica::InitiateDflySync() {
error_code Replica::ConsumeRedisStream() {
base::IoBuf io_buf(16_KB);
io::NullSink null_sink; // we never reply back on the commands.
ConnectionContext conn_context{&null_sink, nullptr, {}};
ConnectionContext conn_context{static_cast<io::Sink*>(nullptr), nullptr, {}};
conn_context.is_replicating = true;
conn_context.journal_emulated = true;
conn_context.skip_acl_validation = true;
conn_context.ns = &namespaces.GetDefaultNamespace();
// we never reply back on the commands.
facade::CapturingReplyBuilder null_builder{facade::ReplyMode::NONE};
ResetParser(true);
// Master waits for this command in order to start sending replication stream.
@ -651,7 +653,7 @@ error_code Replica::ConsumeRedisStream() {
facade::RespExpr::VecToArgList(LastResponseArgs(), &args_vector);
CmdArgList arg_list{args_vector.data(), args_vector.size()};
service_.DispatchCommand(arg_list, &conn_context);
service_.DispatchCommand(arg_list, &null_builder, &conn_context);
}
}

View file

@ -233,7 +233,7 @@ constexpr string_view kS3Prefix = "s3://"sv;
using EngineFunc = void (ServerFamily::*)(CmdArgList args, Transaction* tx,
SinkReplyBuilder* builder, ConnectionContext* cntx);
inline CommandId::Handler3 HandlerFunc(ServerFamily* se, EngineFunc f) {
inline CommandId::Handler HandlerFunc(ServerFamily* se, EngineFunc f) {
return [=](CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) {
return (se->*f)(args, tx, builder, cntx);
};
@ -498,10 +498,10 @@ void ClientSetInfo(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext
void ClientId(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (args.size() != 0) {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}
return cntx->SendLong(cntx->conn()->GetClientId());
return builder->SendLong(cntx->conn()->GetClientId());
}
void ClientKill(CmdArgList args, absl::Span<facade::Listener*> listeners, SinkReplyBuilder* builder,
@ -536,7 +536,7 @@ void ClientKill(CmdArgList args, absl::Span<facade::Listener*> listeners, SinkRe
}
if (!evaluator) {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}
const bool is_admin_request = cntx->conn()->IsPrivileged();
@ -560,11 +560,11 @@ void ClientKill(CmdArgList args, absl::Span<facade::Listener*> listeners, SinkRe
}
if (kill_errors.load() == 0) {
return cntx->SendLong(killed_connections.load());
return builder->SendLong(killed_connections.load());
} else {
return cntx->SendError(absl::StrCat("Killed ", killed_connections.load(),
" client(s), but unable to kill ", kill_errors.load(),
" admin client(s)."));
return builder->SendError(absl::StrCat("Killed ", killed_connections.load(),
" client(s), but unable to kill ", kill_errors.load(),
" admin client(s)."));
}
}
@ -1811,7 +1811,7 @@ bool ServerFamily::DoAuth(ConnectionContext* cntx, std::string_view username,
void ServerFamily::Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (args.size() > 2) {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}
// non admin port auth
@ -1821,16 +1821,16 @@ void ServerFamily::Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
const size_t index = one_arg ? 0 : 1;
std::string_view password = facade::ToSV(args[index]);
if (DoAuth(cntx, username, password)) {
return cntx->SendOk();
return builder->SendOk();
}
auto& log = ServerState::tlocal()->acl_log;
using Reason = acl::AclLog::Reason;
log.Add(*cntx, "AUTH", Reason::AUTH, std::string(username));
return cntx->SendError(facade::kAuthRejected);
return builder->SendError(facade::kAuthRejected);
}
if (!cntx->req_auth) {
return cntx->SendError(
return builder->SendError(
"AUTH <password> called without any password configured for "
"admin port. Are you sure your configuration is correct?");
}
@ -1838,9 +1838,9 @@ void ServerFamily::Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
string_view pass = ArgS(args, 0);
if (pass == GetPassword()) {
cntx->authenticated = true;
cntx->SendOk();
builder->SendOk();
} else {
cntx->SendError(facade::kAuthRejected);
builder->SendError(facade::kAuthRejected);
}
}
@ -2888,7 +2888,7 @@ void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder*
} else if (cmd == "LISTENING-PORT") {
uint32_t replica_listening_port;
if (!absl::SimpleAtoi(arg, &replica_listening_port)) {
cntx->SendError(kInvalidIntErr);
builder->SendError(kInvalidIntErr);
return;
}
cntx->conn_state.replication_info.repl_listening_port = replica_listening_port;
@ -2908,7 +2908,7 @@ void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder*
} else if (cmd == "CLIENT-VERSION" && args.size() == 2) {
unsigned version;
if (!absl::SimpleAtoi(arg, &version)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
}
dfly_cmd_->SetDflyClientVersion(cntx, DflyVersion(version));
} else if (cmd == "ACK" && args.size() == 2) {
@ -2928,17 +2928,13 @@ void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder*
VLOG(2) << "Received client ACK=" << ack;
cntx->replication_flow->last_acked_lsn = ack;
return;
} else if (cmd == "ACL-CHECK") {
// TODO(kostasrim): Remove this branch 20/6/2024
cntx->SendOk();
return;
} else {
VLOG(1) << "Error " << cmd << " " << arg << " " << args.size();
return err_cb();
}
}
return cntx->SendOk();
return builder->SendOk();
}
void ServerFamily::Role(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
@ -2998,7 +2994,7 @@ void ServerFamily::LastSave(CmdArgList args, Transaction* tx, SinkReplyBuilder*
util::fb2::LockGuard lk(save_mu_);
save_time = last_save_info_.save_time;
}
cntx->SendLong(save_time);
builder->SendLong(save_time);
}
void ServerFamily::Latency(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
@ -3011,13 +3007,13 @@ void ServerFamily::Latency(CmdArgList args, Transaction* tx, SinkReplyBuilder* b
}
LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported";
cntx->SendError(kSyntaxErr);
builder->SendError(kSyntaxErr);
}
void ServerFamily::ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (args.size() > 1) {
cntx->SendError(kSyntaxErr);
builder->SendError(kSyntaxErr);
return;
}
@ -3027,7 +3023,7 @@ void ServerFamily::ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilde
} else if (absl::EqualsIgnoreCase(sub_cmd, "NOSAVE")) {
save_on_shutdown_ = false;
} else {
cntx->SendError(kSyntaxErr);
builder->SendError(kSyntaxErr);
return;
}
}
@ -3036,7 +3032,7 @@ void ServerFamily::ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilde
[](ProactorBase* pb) { ServerState::tlocal()->EnterLameDuck(); });
CHECK_NOTNULL(acceptor_)->Stop();
cntx->SendOk();
builder->SendOk();
}
void ServerFamily::Dfly(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
@ -3074,26 +3070,26 @@ void ServerFamily::SlowLog(CmdArgList args, Transaction* tx, SinkReplyBuilder* b
lengths[index] = ServerState::tlocal()->GetSlowLog().Length();
});
int sum = std::accumulate(lengths.begin(), lengths.end(), 0);
return cntx->SendLong(sum);
return builder->SendLong(sum);
}
if (sub_cmd == "RESET") {
service_.proactor_pool().AwaitFiberOnAll(
[](auto index, auto* context) { ServerState::tlocal()->GetSlowLog().Reset(); });
return cntx->SendOk();
return builder->SendOk();
}
if (sub_cmd == "GET") {
return SlowLogGet(args, sub_cmd, &service_.proactor_pool(), builder);
}
cntx->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType);
builder->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType);
}
void ServerFamily::Module(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
if (sub_cmd != "LIST")
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(2);

View file

@ -970,7 +970,7 @@ void StringFamily::Set(CmdArgList args, Transaction* tx, SinkReplyBuilder* build
int64_t now_ms = GetCurrentTimeMs();
auto [rel_ms, abs_ms] = expiry.Calculate(now_ms, false);
if (abs_ms < 0)
return cntx->SendError(InvalidExpireTime("set"));
return builder->SendError(InvalidExpireTime("set"));
// Remove existed key if the key is expired already
if (rel_ms < 0) {

View file

@ -390,7 +390,7 @@ RespExpr BaseFamilyTest::Run(std::string_view id, ArgSlice slice) {
DCHECK(context->transaction == nullptr) << id;
service_->DispatchCommand(CmdArgList{args}, context);
service_->DispatchCommand(CmdArgList{args}, context->reply_builder(), context);
DCHECK(context->transaction == nullptr);
@ -433,7 +433,7 @@ auto BaseFamilyTest::RunMC(MP::CmdType cmd_type, string_view key, string_view va
DCHECK(context->transaction == nullptr);
service_->DispatchMC(cmd, value, context);
service_->DispatchMC(cmd, value, static_cast<MCReplyBuilder*>(context->reply_builder()), context);
DCHECK(context->transaction == nullptr);
@ -452,7 +452,8 @@ auto BaseFamilyTest::RunMC(MP::CmdType cmd_type, std::string_view key) -> MCResp
auto* context = conn->cmd_cntx();
service_->DispatchMC(cmd, string_view{}, context);
service_->DispatchMC(cmd, string_view{}, static_cast<MCReplyBuilder*>(context->reply_builder()),
context);
return conn->SplitLines();
}
@ -478,7 +479,8 @@ auto BaseFamilyTest::GetMC(MP::CmdType cmd_type, std::initializer_list<std::stri
auto* context = conn->cmd_cntx();
service_->DispatchMC(cmd, string_view{}, context);
service_->DispatchMC(cmd, string_view{}, static_cast<MCReplyBuilder*>(context->reply_builder()),
context);
return conn->SplitLines();
}

View file

@ -1235,10 +1235,10 @@ void BZPopMinMax(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
float timeout;
auto timeout_str = ArgS(args, args.size() - 1);
if (!absl::SimpleAtof(timeout_str, &timeout)) {
return cntx->SendError("timeout is not a float or out of range");
return builder->SendError("timeout is not a float or out of range");
}
if (timeout < 0) {
return cntx->SendError("timeout is negative");
return builder->SendError("timeout is negative");
}
VLOG(1) << "BZPop timeout(" << timeout << ")";
@ -1274,14 +1274,14 @@ void BZPopMinMax(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
DVLOG(1) << "result for " << tx->DebugId() << " is " << popped_key.status();
switch (popped_key.status()) {
case OpStatus::WRONG_TYPE:
return cntx->SendError(kWrongTypeErr);
return builder->SendError(kWrongTypeErr);
case OpStatus::CANCELLED:
case OpStatus::TIMED_OUT:
return rb->SendNullArray();
case OpStatus::KEY_MOVED: {
auto error = cluster::SlotOwnershipErrorStr(*tx->GetUniqueSlotId());
CHECK(error.has_value());
return cntx->SendError(std::move(*error));
return builder->SendError(std::move(*error));
}
default:
LOG(ERROR) << "Unexpected error " << popped_key.status();