mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
chore: update command interface for main_service commands (#4265)
Clean up command_registry interface as well. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
bf4f45a7c2
commit
4428480a4e
4 changed files with 139 additions and 198 deletions
|
@ -96,10 +96,9 @@ bool CommandId::IsMultiTransactional() const {
|
|||
return CO::IsTransKind(name()) || CO::IsEvalKind(name());
|
||||
}
|
||||
|
||||
uint64_t CommandId::Invoke(CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) const {
|
||||
uint64_t CommandId::Invoke(CmdArgList args, const CommandContext& cmd_cntx) const {
|
||||
int64_t before = absl::GetCurrentTimeNanos();
|
||||
handler_(args, tx, builder, cntx);
|
||||
handler_(args, cmd_cntx);
|
||||
int64_t after = absl::GetCurrentTimeNanos();
|
||||
|
||||
ServerState* ss = ServerState::tlocal(); // Might have migrated thread, read after invocation
|
||||
|
@ -133,21 +132,6 @@ optional<facade::ErrorReply> CommandId::Validate(CmdArgList tail_args) const {
|
|||
return nullopt;
|
||||
}
|
||||
|
||||
CommandId&& CommandId::SetHandler(Handler2 f) && {
|
||||
handler_ = [f = std::move(f)](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder,
|
||||
ConnectionContext*) { f(args, tx, builder); };
|
||||
|
||||
return std::move(*this);
|
||||
}
|
||||
|
||||
CommandId&& CommandId::SetHandler(Handler3 f) && {
|
||||
handler_ = [f = std::move(f)](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
f(std::move(args), CommandContext{tx, builder, cntx});
|
||||
};
|
||||
return std::move(*this);
|
||||
};
|
||||
|
||||
CommandRegistry::CommandRegistry() {
|
||||
vector<string> rename_command = GetFlag(FLAGS_rename_command);
|
||||
|
||||
|
|
|
@ -96,23 +96,13 @@ class CommandId : public facade::CommandId {
|
|||
command_stats_ = std::make_unique<CmdCallStats[]>(thread_count);
|
||||
}
|
||||
|
||||
using Handler =
|
||||
fu2::function_base<true /*owns*/, true /*copyable*/, fu2::capacity_default,
|
||||
false /* non-throwing*/, false /* strong exceptions guarantees*/,
|
||||
void(CmdArgList, Transaction*, facade::SinkReplyBuilder*,
|
||||
ConnectionContext*) const>;
|
||||
using Handler2 =
|
||||
fu2::function_base<true, true, fu2::capacity_default, false, false,
|
||||
void(CmdArgList, Transaction*, facade::SinkReplyBuilder*) const>;
|
||||
|
||||
using Handler3 = fu2::function_base<true, true, fu2::capacity_default, false, false,
|
||||
void(CmdArgList, const CommandContext&) const>;
|
||||
using ArgValidator = fu2::function_base<true, true, fu2::capacity_default, false, false,
|
||||
std::optional<facade::ErrorReply>(CmdArgList) const>;
|
||||
|
||||
// Returns the invoke time in usec.
|
||||
uint64_t Invoke(CmdArgList args, Transaction*, facade::SinkReplyBuilder*,
|
||||
ConnectionContext* cntx) const;
|
||||
uint64_t Invoke(CmdArgList args, const CommandContext& cmd_cntx) const;
|
||||
|
||||
// Returns error if validation failed, otherwise nullopt
|
||||
std::optional<facade::ErrorReply> Validate(CmdArgList tail_args) const;
|
||||
|
@ -135,15 +125,11 @@ class CommandId : public facade::CommandId {
|
|||
|
||||
static const char* OptName(CO::CommandOpt fl);
|
||||
|
||||
CommandId&& SetHandler(Handler f) && {
|
||||
CommandId&& SetHandler(Handler3 f) && {
|
||||
handler_ = std::move(f);
|
||||
return std::move(*this);
|
||||
}
|
||||
|
||||
CommandId&& SetHandler(Handler2 f) &&;
|
||||
|
||||
CommandId&& SetHandler(Handler3 f) &&;
|
||||
|
||||
CommandId&& SetValidator(ArgValidator f) && {
|
||||
validator_ = std::move(f);
|
||||
return std::move(*this);
|
||||
|
@ -169,7 +155,7 @@ class CommandId : public facade::CommandId {
|
|||
private:
|
||||
bool implicit_acl_;
|
||||
std::unique_ptr<CmdCallStats[]> command_stats_;
|
||||
Handler handler_;
|
||||
Handler3 handler_;
|
||||
ArgValidator validator_;
|
||||
};
|
||||
|
||||
|
|
|
@ -1346,7 +1346,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, SinkReplyBui
|
|||
auto last_error = builder->ConsumeLastError();
|
||||
DCHECK(last_error.empty());
|
||||
try {
|
||||
invoke_time_usec = cid->Invoke(tail_args, tx, builder, cntx);
|
||||
invoke_time_usec = cid->Invoke(tail_args, CommandContext{tx, builder, cntx});
|
||||
} catch (std::exception& e) {
|
||||
LOG(ERROR) << "Internal error, system probably unstable " << e.what();
|
||||
return false;
|
||||
|
@ -1649,34 +1649,31 @@ absl::flat_hash_map<std::string, unsigned> Service::UknownCmdMap() const {
|
|||
return unknown_cmds_;
|
||||
}
|
||||
|
||||
void Service::Quit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
if (builder->GetProtocol() == Protocol::REDIS)
|
||||
builder->SendOk();
|
||||
void Service::Quit(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
if (cmd_cntx.rb->GetProtocol() == Protocol::REDIS)
|
||||
cmd_cntx.rb->SendOk();
|
||||
|
||||
builder->CloseConnection();
|
||||
cmd_cntx.rb->CloseConnection();
|
||||
|
||||
DeactivateMonitoring(static_cast<ConnectionContext*>(cntx));
|
||||
cntx->conn()->ShutdownSelf();
|
||||
DeactivateMonitoring(cmd_cntx.conn_cntx);
|
||||
cmd_cntx.conn_cntx->conn()->ShutdownSelf();
|
||||
}
|
||||
|
||||
void Service::Multi(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
if (cntx->conn_state.exec_info.IsCollecting()) {
|
||||
return builder->SendError("MULTI calls can not be nested");
|
||||
void Service::Multi(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
if (cmd_cntx.conn_cntx->conn_state.exec_info.IsCollecting()) {
|
||||
return cmd_cntx.rb->SendError("MULTI calls can not be nested");
|
||||
}
|
||||
cntx->conn_state.exec_info.state = ConnectionState::ExecInfo::EXEC_COLLECT;
|
||||
cmd_cntx.conn_cntx->conn_state.exec_info.state = ConnectionState::ExecInfo::EXEC_COLLECT;
|
||||
// TODO: to protect against huge exec transactions.
|
||||
return builder->SendOk();
|
||||
return cmd_cntx.rb->SendOk();
|
||||
}
|
||||
|
||||
void Service::Watch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
auto& exec_info = cntx->conn_state.exec_info;
|
||||
void Service::Watch(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
auto& exec_info = cmd_cntx.conn_cntx->conn_state.exec_info;
|
||||
|
||||
// Skip if EXEC will already fail due previous WATCH.
|
||||
if (exec_info.watched_dirty.load(memory_order_relaxed)) {
|
||||
return builder->SendOk();
|
||||
return cmd_cntx.rb->SendOk();
|
||||
}
|
||||
|
||||
atomic_uint32_t keys_existed = 0;
|
||||
|
@ -1684,28 +1681,27 @@ void Service::Watch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
|||
ShardId shard_id = shard->shard_id();
|
||||
ShardArgs largs = t->GetShardArgs(shard_id);
|
||||
for (auto k : largs) {
|
||||
t->GetDbSlice(shard_id).RegisterWatchedKey(cntx->db_index(), k, &exec_info);
|
||||
t->GetDbSlice(shard_id).RegisterWatchedKey(cmd_cntx.conn_cntx->db_index(), k, &exec_info);
|
||||
}
|
||||
|
||||
auto res = GenericFamily::OpExists(t->GetOpArgs(shard), largs);
|
||||
keys_existed.fetch_add(res.value_or(0), memory_order_relaxed);
|
||||
return OpStatus::OK;
|
||||
};
|
||||
tx->ScheduleSingleHop(std::move(cb));
|
||||
cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
|
||||
|
||||
// Duplicate keys are stored to keep correct count.
|
||||
exec_info.watched_existed += keys_existed.load(memory_order_relaxed);
|
||||
for (string_view key : args) {
|
||||
exec_info.watched_keys.emplace_back(cntx->db_index(), key);
|
||||
exec_info.watched_keys.emplace_back(cmd_cntx.conn_cntx->db_index(), key);
|
||||
}
|
||||
|
||||
return builder->SendOk();
|
||||
return cmd_cntx.rb->SendOk();
|
||||
}
|
||||
|
||||
void Service::Unwatch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
UnwatchAllKeys(cntx->ns, &cntx->conn_state.exec_info);
|
||||
return builder->SendOk();
|
||||
void Service::Unwatch(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
UnwatchAllKeys(cmd_cntx.conn_cntx->ns, &cmd_cntx.conn_cntx->conn_state.exec_info);
|
||||
return cmd_cntx.rb->SendOk();
|
||||
}
|
||||
|
||||
optional<CapturingReplyBuilder::Payload> Service::FlushEvalAsyncCmds(ConnectionContext* cntx,
|
||||
|
@ -1771,41 +1767,37 @@ void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca)
|
|||
DispatchCommand(ca.args, &replier, cntx);
|
||||
}
|
||||
|
||||
void Service::Eval(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx, bool read_only) {
|
||||
void Service::Eval(CmdArgList args, const CommandContext& cmd_cntx, bool read_only) {
|
||||
string_view body = ArgS(args, 0);
|
||||
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
|
||||
if (body.empty()) {
|
||||
return rb->SendNull();
|
||||
}
|
||||
|
||||
BorrowedInterpreter interpreter{tx, &cntx->conn_state};
|
||||
BorrowedInterpreter interpreter{cmd_cntx.tx, &cmd_cntx.conn_cntx->conn_state};
|
||||
auto res = server_family_.script_mgr()->Insert(body, interpreter);
|
||||
if (!res)
|
||||
return builder->SendError(res.error().Format(), facade::kScriptErrType);
|
||||
return cmd_cntx.rb->SendError(res.error().Format(), facade::kScriptErrType);
|
||||
|
||||
string sha{std::move(res.value())};
|
||||
|
||||
CallSHA(args, sha, interpreter, builder, cntx, read_only);
|
||||
CallSHA(args, sha, interpreter, cmd_cntx.rb, cmd_cntx.conn_cntx, read_only);
|
||||
}
|
||||
|
||||
void Service::EvalRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
Eval(args, tx, builder, cntx, true);
|
||||
void Service::EvalRo(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
Eval(args, cmd_cntx, true);
|
||||
}
|
||||
|
||||
void Service::EvalSha(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx, bool read_only) {
|
||||
void Service::EvalSha(CmdArgList args, const CommandContext& cmd_cntx, bool read_only) {
|
||||
string sha = absl::AsciiStrToLower(ArgS(args, 0));
|
||||
|
||||
BorrowedInterpreter interpreter{cntx->transaction, &cntx->conn_state};
|
||||
CallSHA(args, sha, interpreter, builder, cntx, read_only);
|
||||
BorrowedInterpreter interpreter{cmd_cntx.tx, &cmd_cntx.conn_cntx->conn_state};
|
||||
CallSHA(args, sha, interpreter, cmd_cntx.rb, cmd_cntx.conn_cntx, read_only);
|
||||
}
|
||||
|
||||
void Service::EvalShaRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
EvalSha(args, tx, builder, cntx, true);
|
||||
void Service::EvalShaRo(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
EvalSha(args, cmd_cntx, true);
|
||||
}
|
||||
|
||||
void Service::CallSHA(CmdArgList args, string_view sha, Interpreter* interpreter,
|
||||
|
@ -2037,15 +2029,14 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
|
|||
}
|
||||
}
|
||||
|
||||
void Service::Discard(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
void Service::Discard(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
|
||||
|
||||
if (!cntx->conn_state.exec_info.IsCollecting()) {
|
||||
return builder->SendError("DISCARD without MULTI");
|
||||
if (!cmd_cntx.conn_cntx->conn_state.exec_info.IsCollecting()) {
|
||||
return cmd_cntx.rb->SendError("DISCARD without MULTI");
|
||||
}
|
||||
|
||||
MultiCleanup(cntx);
|
||||
MultiCleanup(cmd_cntx.conn_cntx);
|
||||
rb->SendOk();
|
||||
}
|
||||
|
||||
|
@ -2145,25 +2136,25 @@ void StartMultiExec(ConnectionContext* cntx, ConnectionState::ExecInfo* exec_inf
|
|||
};
|
||||
}
|
||||
|
||||
void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
auto& exec_info = cntx->conn_state.exec_info;
|
||||
void Service::Exec(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
|
||||
auto& exec_info = cmd_cntx.conn_cntx->conn_state.exec_info;
|
||||
auto* cntx = cmd_cntx.conn_cntx;
|
||||
|
||||
// Clean the context no matter the outcome
|
||||
absl::Cleanup exec_clear = [&cntx] { MultiCleanup(cntx); };
|
||||
absl::Cleanup exec_clear = [cntx] { MultiCleanup(cntx); };
|
||||
|
||||
if (exec_info.state == ConnectionState::ExecInfo::EXEC_ERROR) {
|
||||
return builder->SendError("-EXECABORT Transaction discarded because of previous errors");
|
||||
return rb->SendError("-EXECABORT Transaction discarded because of previous errors");
|
||||
}
|
||||
|
||||
// Check basic invariants
|
||||
if (!exec_info.IsCollecting()) {
|
||||
return builder->SendError("EXEC without MULTI");
|
||||
return rb->SendError("EXEC without MULTI");
|
||||
}
|
||||
|
||||
if (IsWatchingOtherDbs(cntx->db_index(), exec_info)) {
|
||||
return builder->SendError("Dragonfly does not allow WATCH and EXEC on different databases");
|
||||
return rb->SendError("Dragonfly does not allow WATCH and EXEC on different databases");
|
||||
}
|
||||
|
||||
if (exec_info.watched_dirty.load(memory_order_relaxed)) {
|
||||
|
@ -2178,7 +2169,8 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
|||
|
||||
// We borrow a single interpreter for all the EVALs/Script load inside. Returned by MultiCleanup
|
||||
if (state != ExecScriptUse::NONE) {
|
||||
exec_info.preborrowed_interpreter = BorrowedInterpreter(tx, &cntx->conn_state).Release();
|
||||
exec_info.preborrowed_interpreter =
|
||||
BorrowedInterpreter(cmd_cntx.tx, &cntx->conn_state).Release();
|
||||
}
|
||||
|
||||
// Determine according multi mode, not only only flag, but based on presence of global commands
|
||||
|
@ -2194,7 +2186,7 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
|||
// EXEC should not run if any of the watched keys expired.
|
||||
if (!exec_info.watched_keys.empty() &&
|
||||
!CheckWatchedKeyExpiry(cntx, registry_.Find("EXISTS"), exec_cid_)) {
|
||||
tx->UnlockMulti();
|
||||
cmd_cntx.tx->UnlockMulti();
|
||||
return rb->SendNull();
|
||||
}
|
||||
|
||||
|
@ -2208,7 +2200,7 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
|||
|
||||
if (!exec_info.body.empty()) {
|
||||
if (GetFlag(FLAGS_track_exec_frequencies)) {
|
||||
string descr = CreateExecDescriptor(exec_info.body, tx->GetUniqueShardCnt());
|
||||
string descr = CreateExecDescriptor(exec_info.body, cmd_cntx.tx->GetUniqueShardCnt());
|
||||
ServerState::tlocal()->exec_freq_count[descr]++;
|
||||
}
|
||||
|
||||
|
@ -2220,7 +2212,7 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
|||
for (auto& scmd : exec_info.body) {
|
||||
VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs();
|
||||
|
||||
tx->MultiSwitchCmd(scmd.Cid());
|
||||
cmd_cntx.tx->MultiSwitchCmd(scmd.Cid());
|
||||
cntx->cid = scmd.Cid();
|
||||
|
||||
arg_vec.resize(scmd.NumArgs());
|
||||
|
@ -2229,14 +2221,14 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
|||
CmdArgList args = absl::MakeSpan(arg_vec);
|
||||
|
||||
if (scmd.Cid()->IsTransactional()) {
|
||||
OpStatus st = tx->InitByArgs(cntx->ns, cntx->conn_state.db_index, args);
|
||||
OpStatus st = cmd_cntx.tx->InitByArgs(cntx->ns, cntx->conn_state.db_index, args);
|
||||
if (st != OpStatus::OK) {
|
||||
builder->SendError(st);
|
||||
rb->SendError(st);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
bool ok = InvokeCmd(scmd.Cid(), args, builder, cntx);
|
||||
bool ok = InvokeCmd(scmd.Cid(), args, rb, cmd_cntx.conn_cntx);
|
||||
if (!ok || rb->GetError()) // checks for i/o error, not logical error.
|
||||
break;
|
||||
}
|
||||
|
@ -2245,78 +2237,78 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
|||
|
||||
if (scheduled) {
|
||||
VLOG(2) << "Exec unlocking " << exec_info.body.size() << " commands";
|
||||
tx->UnlockMulti();
|
||||
cmd_cntx.tx->UnlockMulti();
|
||||
}
|
||||
|
||||
cntx->cid = exec_cid_;
|
||||
VLOG(2) << "Exec completed";
|
||||
}
|
||||
|
||||
void Service::Publish(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
void Service::Publish(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
if (cluster::IsClusterEnabled()) {
|
||||
return builder->SendError("PUBLISH is not supported in cluster mode yet");
|
||||
return cmd_cntx.rb->SendError("PUBLISH is not supported in cluster mode yet");
|
||||
}
|
||||
string_view channel = ArgS(args, 0);
|
||||
string_view messages[] = {ArgS(args, 1)};
|
||||
|
||||
auto* cs = ServerState::tlocal()->channel_store();
|
||||
builder->SendLong(cs->SendMessages(channel, messages));
|
||||
cmd_cntx.rb->SendLong(cs->SendMessages(channel, messages));
|
||||
}
|
||||
|
||||
void Service::Subscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
void Service::Subscribe(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
if (cluster::IsClusterEnabled()) {
|
||||
return builder->SendError("SUBSCRIBE is not supported in cluster mode yet");
|
||||
return cmd_cntx.rb->SendError("SUBSCRIBE is not supported in cluster mode yet");
|
||||
}
|
||||
cntx->ChangeSubscription(true /*add*/, true /* reply*/, std::move(args),
|
||||
static_cast<RedisReplyBuilder*>(builder));
|
||||
cmd_cntx.conn_cntx->ChangeSubscription(true /*add*/, true /* reply*/, std::move(args),
|
||||
static_cast<RedisReplyBuilder*>(cmd_cntx.rb));
|
||||
}
|
||||
|
||||
void Service::Unsubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
void Service::Unsubscribe(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
|
||||
if (cluster::IsClusterEnabled()) {
|
||||
return builder->SendError("UNSUBSCRIBE is not supported in cluster mode yet");
|
||||
return cmd_cntx.rb->SendError("UNSUBSCRIBE is not supported in cluster mode yet");
|
||||
}
|
||||
|
||||
if (args.size() == 0) {
|
||||
cmd_cntx.conn_cntx->UnsubscribeAll(true, rb);
|
||||
} else {
|
||||
cmd_cntx.conn_cntx->ChangeSubscription(false, true, args, rb);
|
||||
}
|
||||
}
|
||||
|
||||
void Service::PSubscribe(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
|
||||
|
||||
if (cluster::IsClusterEnabled()) {
|
||||
return rb->SendError("PSUBSCRIBE is not supported in cluster mode yet");
|
||||
}
|
||||
cmd_cntx.conn_cntx->ChangePSubscription(true, true, args, rb);
|
||||
}
|
||||
|
||||
void Service::PUnsubscribe(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
|
||||
|
||||
if (cluster::IsClusterEnabled()) {
|
||||
return rb->SendError("PUNSUBSCRIBE is not supported in cluster mode yet");
|
||||
}
|
||||
if (args.size() == 0) {
|
||||
cntx->UnsubscribeAll(true, static_cast<RedisReplyBuilder*>(builder));
|
||||
cmd_cntx.conn_cntx->PUnsubscribeAll(true, rb);
|
||||
} else {
|
||||
cntx->ChangeSubscription(false, true, args, static_cast<RedisReplyBuilder*>(builder));
|
||||
}
|
||||
}
|
||||
|
||||
void Service::PSubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
if (cluster::IsClusterEnabled()) {
|
||||
return builder->SendError("PSUBSCRIBE is not supported in cluster mode yet");
|
||||
}
|
||||
cntx->ChangePSubscription(true, true, args, static_cast<RedisReplyBuilder*>(builder));
|
||||
}
|
||||
|
||||
void Service::PUnsubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
if (cluster::IsClusterEnabled()) {
|
||||
return builder->SendError("PUNSUBSCRIBE is not supported in cluster mode yet");
|
||||
}
|
||||
if (args.size() == 0) {
|
||||
cntx->PUnsubscribeAll(true, static_cast<RedisReplyBuilder*>(builder));
|
||||
} else {
|
||||
cntx->ChangePSubscription(false, true, args, static_cast<RedisReplyBuilder*>(builder));
|
||||
cmd_cntx.conn_cntx->ChangePSubscription(false, true, args, rb);
|
||||
}
|
||||
}
|
||||
|
||||
// Not a real implementation. Serves as a decorator to accept some function commands
|
||||
// for testing.
|
||||
void Service::Function(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
void Service::Function(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
|
||||
|
||||
if (sub_cmd == "FLUSH") {
|
||||
return builder->SendOk();
|
||||
return cmd_cntx.rb->SendOk();
|
||||
}
|
||||
|
||||
string err = UnknownSubCmd(sub_cmd, "FUNCTION");
|
||||
return builder->SendError(err, kSyntaxErrType);
|
||||
return cmd_cntx.rb->SendError(err, kSyntaxErrType);
|
||||
}
|
||||
|
||||
void Service::PubsubChannels(string_view pattern, SinkReplyBuilder* builder) {
|
||||
|
@ -2340,22 +2332,22 @@ void Service::PubsubNumSub(CmdArgList args, SinkReplyBuilder* builder) {
|
|||
}
|
||||
}
|
||||
|
||||
void Service::Monitor(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
VLOG(1) << "starting monitor on this connection: " << cntx->conn()->GetClientId();
|
||||
void Service::Monitor(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
VLOG(1) << "starting monitor on this connection: " << cmd_cntx.conn_cntx->conn()->GetClientId();
|
||||
// we are registering the current connection for all threads so they will be aware of
|
||||
// this connection, to send to it any command
|
||||
builder->SendOk();
|
||||
cntx->ChangeMonitor(true /* start */);
|
||||
cmd_cntx.rb->SendOk();
|
||||
cmd_cntx.conn_cntx->ChangeMonitor(true /* start */);
|
||||
}
|
||||
|
||||
void Service::Pubsub(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
void Service::Pubsub(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
|
||||
|
||||
if (cluster::IsClusterEnabled()) {
|
||||
return builder->SendError("PUBSUB is not supported in cluster mode yet");
|
||||
return rb->SendError("PUBSUB is not supported in cluster mode yet");
|
||||
}
|
||||
if (args.size() < 1) {
|
||||
builder->SendError(WrongNumArgsError(cntx->cid->name()));
|
||||
rb->SendError(WrongNumArgsError(cmd_cntx.conn_cntx->cid->name()));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -2374,7 +2366,6 @@ void Service::Pubsub(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder
|
|||
"HELP",
|
||||
"\tPrints this help."};
|
||||
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
rb->SendSimpleStrArr(help_arr);
|
||||
return;
|
||||
}
|
||||
|
@ -2385,19 +2376,18 @@ void Service::Pubsub(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder
|
|||
pattern = ArgS(args, 1);
|
||||
}
|
||||
|
||||
PubsubChannels(pattern, builder);
|
||||
PubsubChannels(pattern, rb);
|
||||
} else if (subcmd == "NUMPAT") {
|
||||
PubsubPatterns(builder);
|
||||
PubsubPatterns(rb);
|
||||
} else if (subcmd == "NUMSUB") {
|
||||
args.remove_prefix(1);
|
||||
PubsubNumSub(args, builder);
|
||||
PubsubNumSub(args, rb);
|
||||
} else {
|
||||
builder->SendError(UnknownSubCmd(subcmd, "PUBSUB"));
|
||||
rb->SendError(UnknownSubCmd(subcmd, "PUBSUB"));
|
||||
}
|
||||
}
|
||||
|
||||
void Service::Command(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
void Service::Command(CmdArgList args, const CommandContext& cmd_cntx) {
|
||||
unsigned cmd_cnt = 0;
|
||||
registry_.Traverse([&](string_view name, const CommandId& cd) {
|
||||
if ((cd.opt_mask() & CO::HIDDEN) == 0) {
|
||||
|
@ -2405,8 +2395,8 @@ void Service::Command(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
|
|||
}
|
||||
});
|
||||
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
auto serialize_command = [&rb, this](string_view name, const CommandId& cid) {
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
|
||||
auto serialize_command = [rb, this](string_view name, const CommandId& cid) {
|
||||
rb->StartArray(7);
|
||||
rb->SendSimpleString(cid.name());
|
||||
rb->SendLong(cid.arity());
|
||||
|
@ -2451,7 +2441,7 @@ void Service::Command(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
|
|||
|
||||
// COUNT
|
||||
if (subcmd == "COUNT") {
|
||||
return builder->SendLong(cmd_cnt);
|
||||
return rb->SendLong(cmd_cnt);
|
||||
}
|
||||
|
||||
bool sufficient_args = (args.size() == 2);
|
||||
|
@ -2472,10 +2462,10 @@ void Service::Command(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
|
|||
|
||||
sufficient_args = (args.size() == 1);
|
||||
if (subcmd == "DOCS" && sufficient_args) {
|
||||
return builder->SendOk();
|
||||
return rb->SendOk();
|
||||
}
|
||||
|
||||
return builder->SendError(kSyntaxErr, kSyntaxErrType);
|
||||
return rb->SendError(kSyntaxErr, kSyntaxErrType);
|
||||
}
|
||||
|
||||
VarzValue::Map Service::GetVarzStats() {
|
||||
|
@ -2606,12 +2596,9 @@ Service::ContextInfo Service::GetContextInfo(facade::ConnectionContext* cntx) co
|
|||
.blocked = server_cntx->blocked};
|
||||
}
|
||||
|
||||
using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx);
|
||||
|
||||
#define HFUNC(x) SetHandler(&Service::x)
|
||||
#define MFUNC(x) \
|
||||
SetHandler([this](CmdArgList sp, Transaction* tx, SinkReplyBuilder* builder, \
|
||||
ConnectionContext* cntx) { this->x(std::move(sp), tx, builder, cntx); })
|
||||
#define MFUNC(x) \
|
||||
SetHandler([this](CmdArgList sp, const CommandContext& cntx) { this->x(std::move(sp), cntx); })
|
||||
|
||||
namespace acl {
|
||||
constexpr uint32_t kQuit = FAST | CONNECTION;
|
||||
|
|
|
@ -125,43 +125,27 @@ class Service : public facade::ServiceInterface {
|
|||
private:
|
||||
using SinkReplyBuilder = facade::SinkReplyBuilder;
|
||||
|
||||
static void Quit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
static void Multi(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
static void Quit(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
static void Multi(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
|
||||
static void Watch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
static void Unwatch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
static void Watch(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
static void Unwatch(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
|
||||
void Discard(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
void Eval(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx,
|
||||
bool read_only = false);
|
||||
void EvalRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
|
||||
void EvalSha(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx,
|
||||
bool read_only = false);
|
||||
void EvalShaRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
void Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
|
||||
void Publish(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
void Subscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
void Unsubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
void PSubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
void PUnsubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
void Function(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
void Monitor(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
void Pubsub(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
|
||||
void Command(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
void Discard(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
void Eval(CmdArgList args, const CommandContext& cmd_cntx, bool read_only = false);
|
||||
void EvalRo(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
void EvalSha(CmdArgList args, const CommandContext& cmd_cntx, bool read_only = false);
|
||||
void EvalShaRo(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
void Exec(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
void Publish(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
void Subscribe(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
void Unsubscribe(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
void PSubscribe(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
void PUnsubscribe(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
void Function(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
void Monitor(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
void Pubsub(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
void Command(CmdArgList args, const CommandContext& cmd_cntx);
|
||||
|
||||
void PubsubChannels(std::string_view pattern, SinkReplyBuilder* builder);
|
||||
void PubsubPatterns(SinkReplyBuilder* builder);
|
||||
|
|
Loading…
Reference in a new issue