mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
chore: pass SinkReplyBuilder and Transaction explicitly. Part6 (#3987)
This commit is contained in:
parent
16f59d33f9
commit
7035606b4b
7 changed files with 427 additions and 393 deletions
|
@ -63,14 +63,14 @@ AclFamily::AclFamily(UserRegistry* registry, util::ProactorPool* pool)
|
|||
: registry_(registry), pool_(pool) {
|
||||
}
|
||||
|
||||
void AclFamily::Acl(CmdArgList args, ConnectionContext* cntx) {
|
||||
cntx->SendError("Wrong number of arguments for acl command");
|
||||
void AclFamily::Acl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
builder->SendError("Wrong number of arguments for acl command");
|
||||
}
|
||||
|
||||
void AclFamily::List(CmdArgList args, ConnectionContext* cntx) {
|
||||
void AclFamily::List(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
const auto registry_with_lock = registry_->GetRegistryWithLock();
|
||||
const auto& registry = registry_with_lock.registry;
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx->reply_builder());
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(builder);
|
||||
rb->StartArray(registry.size());
|
||||
|
||||
for (const auto& [username, user] : registry) {
|
||||
|
@ -91,7 +91,7 @@ void AclFamily::List(CmdArgList args, ConnectionContext* cntx) {
|
|||
absl::StrAppend(&buffer, username, " ", user.IsActive() ? "on "sv : "off "sv, password,
|
||||
acl_keys, maybe_space_com, acl_pub_sub, " ", acl_cat_and_commands);
|
||||
|
||||
cntx->SendSimpleString(buffer);
|
||||
builder->SendSimpleString(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -116,7 +116,7 @@ void AclFamily::StreamUpdatesToAllProactorConnections(const std::string& user,
|
|||
|
||||
using facade::ErrorReply;
|
||||
|
||||
void AclFamily::SetUser(CmdArgList args, ConnectionContext* cntx) {
|
||||
void AclFamily::SetUser(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
string_view username = facade::ToSV(args[0]);
|
||||
auto reg = registry_->GetRegistryWithWriteLock();
|
||||
const bool exists = reg.registry.contains(username);
|
||||
|
@ -124,9 +124,9 @@ void AclFamily::SetUser(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
auto req = ParseAclSetUser(args.subspan(1), false, has_all_keys);
|
||||
|
||||
auto error_case = [cntx](ErrorReply&& error) { cntx->SendError(error); };
|
||||
auto error_case = [builder](ErrorReply&& error) { builder->SendError(error); };
|
||||
|
||||
auto update_case = [username, ®, cntx, this, exists](User::UpdateRequest&& req) {
|
||||
auto update_case = [username, ®, builder, this, exists](User::UpdateRequest&& req) {
|
||||
auto& user = reg.registry[username];
|
||||
if (!exists) {
|
||||
User::UpdateRequest default_req;
|
||||
|
@ -137,7 +137,7 @@ void AclFamily::SetUser(CmdArgList args, ConnectionContext* cntx) {
|
|||
const bool reset_channels = req.reset_channels;
|
||||
user.Update(std::move(req), CategoryToIdx(), reverse_cat_table_, CategoryToCommandsIndex());
|
||||
// Send ok first because the connection might get evicted
|
||||
cntx->SendOk();
|
||||
builder->SendOk();
|
||||
if (exists) {
|
||||
if (!reset_channels) {
|
||||
StreamUpdatesToAllProactorConnections(string(username), user.AclCommands(), user.Keys(),
|
||||
|
@ -184,7 +184,7 @@ void AclFamily::EvictOpenConnectionsOnAllProactorsWithRegistry(
|
|||
}
|
||||
}
|
||||
|
||||
void AclFamily::DelUser(CmdArgList args, ConnectionContext* cntx) {
|
||||
void AclFamily::DelUser(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
auto& registry = *registry_;
|
||||
absl::flat_hash_set<string_view> users;
|
||||
|
||||
|
@ -199,17 +199,18 @@ void AclFamily::DelUser(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
if (users.empty()) {
|
||||
cntx->SendLong(0);
|
||||
builder->SendLong(0);
|
||||
return;
|
||||
}
|
||||
VLOG(1) << "Evicting open acl connections";
|
||||
EvictOpenConnectionsOnAllProactors(users);
|
||||
VLOG(1) << "Done evicting open acl connections";
|
||||
cntx->SendLong(users.size());
|
||||
builder->SendLong(users.size());
|
||||
}
|
||||
|
||||
void AclFamily::WhoAmI(CmdArgList args, ConnectionContext* cntx) {
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx->reply_builder());
|
||||
void AclFamily::WhoAmI(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(builder);
|
||||
rb->SendBulkString(absl::StrCat("User is ", cntx->authed_username));
|
||||
}
|
||||
|
||||
|
@ -239,10 +240,10 @@ string AclFamily::RegistryToString() const {
|
|||
return result;
|
||||
}
|
||||
|
||||
void AclFamily::Save(CmdArgList args, ConnectionContext* cntx) {
|
||||
void AclFamily::Save(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
auto acl_file_path = absl::GetFlag(FLAGS_aclfile);
|
||||
if (acl_file_path.empty()) {
|
||||
cntx->SendError("Dragonfly is not configured to use an ACL file.");
|
||||
builder->SendError("Dragonfly is not configured to use an ACL file.");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -250,7 +251,7 @@ void AclFamily::Save(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (!res) {
|
||||
std::string error = absl::StrCat("Failed to open the aclfile: ", res.error().message());
|
||||
LOG(ERROR) << error;
|
||||
cntx->SendError(error);
|
||||
builder->SendError(error);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -261,7 +262,7 @@ void AclFamily::Save(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (ec) {
|
||||
std::string error = absl::StrCat("Failed to write to the aclfile: ", ec.message());
|
||||
LOG(ERROR) << error;
|
||||
cntx->SendError(error);
|
||||
builder->SendError(error);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -269,15 +270,15 @@ void AclFamily::Save(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (ec) {
|
||||
std::string error = absl::StrCat("Failed to close the aclfile ", ec.message());
|
||||
LOG(WARNING) << error;
|
||||
cntx->SendError(error);
|
||||
builder->SendError(error);
|
||||
return;
|
||||
}
|
||||
|
||||
cntx->SendOk();
|
||||
builder->SendOk();
|
||||
}
|
||||
|
||||
GenericError AclFamily::LoadToRegistryFromFile(std::string_view full_path,
|
||||
ConnectionContext* cntx) {
|
||||
SinkReplyBuilder* builder) {
|
||||
auto is_file_read = io::ReadFileToString(full_path);
|
||||
if (!is_file_read) {
|
||||
auto error = absl::StrCat("Dragonfly could not load ACL file ", full_path, " with error ",
|
||||
|
@ -316,8 +317,8 @@ GenericError AclFamily::LoadToRegistryFromFile(std::string_view full_path,
|
|||
|
||||
auto registry_with_wlock = registry_->GetRegistryWithWriteLock();
|
||||
auto& registry = registry_with_wlock.registry;
|
||||
if (cntx) {
|
||||
cntx->SendOk();
|
||||
if (builder) {
|
||||
builder->SendOk();
|
||||
// Evict open connections for old users
|
||||
EvictOpenConnectionsOnAllProactorsWithRegistry(registry);
|
||||
registry.clear();
|
||||
|
@ -347,23 +348,23 @@ bool AclFamily::Load() {
|
|||
return !LoadToRegistryFromFile(acl_file, nullptr);
|
||||
}
|
||||
|
||||
void AclFamily::Load(CmdArgList args, ConnectionContext* cntx) {
|
||||
void AclFamily::Load(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
auto acl_file = absl::GetFlag(FLAGS_aclfile);
|
||||
if (acl_file.empty()) {
|
||||
cntx->SendError("Dragonfly is not configured to use an ACL file.");
|
||||
builder->SendError("Dragonfly is not configured to use an ACL file.");
|
||||
return;
|
||||
}
|
||||
|
||||
const auto load_error = LoadToRegistryFromFile(acl_file, cntx);
|
||||
const auto load_error = LoadToRegistryFromFile(acl_file, builder);
|
||||
|
||||
if (load_error) {
|
||||
cntx->SendError(absl::StrCat("Error loading: ", acl_file, " ", load_error.Format()));
|
||||
builder->SendError(absl::StrCat("Error loading: ", acl_file, " ", load_error.Format()));
|
||||
}
|
||||
}
|
||||
|
||||
void AclFamily::Log(CmdArgList args, ConnectionContext* cntx) {
|
||||
void AclFamily::Log(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
if (args.size() > 1) {
|
||||
cntx->SendError(facade::OpStatus::OUT_OF_RANGE);
|
||||
builder->SendError(facade::OpStatus::OUT_OF_RANGE);
|
||||
}
|
||||
|
||||
size_t max_output = 10;
|
||||
|
@ -372,12 +373,12 @@ void AclFamily::Log(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (absl::EqualsIgnoreCase(option, "RESET")) {
|
||||
pool_->AwaitFiberOnAll(
|
||||
[](auto index, auto* context) { ServerState::tlocal()->acl_log.Reset(); });
|
||||
cntx->SendOk();
|
||||
builder->SendOk();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!absl::SimpleAtoi(facade::ToSV(args[0]), &max_output)) {
|
||||
cntx->SendError("Invalid count");
|
||||
builder->SendError("Invalid count");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -392,7 +393,7 @@ void AclFamily::Log(CmdArgList args, ConnectionContext* cntx) {
|
|||
total_entries += log.size();
|
||||
}
|
||||
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx->reply_builder());
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(builder);
|
||||
if (total_entries == 0) {
|
||||
rb->SendEmptyArray();
|
||||
return;
|
||||
|
@ -453,19 +454,19 @@ void AclFamily::Log(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void AclFamily::Users(CmdArgList args, ConnectionContext* cntx) {
|
||||
void AclFamily::Users(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
const auto registry_with_lock = registry_->GetRegistryWithLock();
|
||||
const auto& registry = registry_with_lock.registry;
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx->reply_builder());
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(builder);
|
||||
rb->StartArray(registry.size());
|
||||
for (const auto& [username, _] : registry) {
|
||||
rb->SendSimpleString(username);
|
||||
}
|
||||
}
|
||||
|
||||
void AclFamily::Cat(CmdArgList args, ConnectionContext* cntx) {
|
||||
void AclFamily::Cat(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
if (args.size() > 1) {
|
||||
cntx->SendError(facade::OpStatus::SYNTAX_ERR);
|
||||
builder->SendError(facade::OpStatus::SYNTAX_ERR);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -474,7 +475,7 @@ void AclFamily::Cat(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
if (!cat_table_.contains(category)) {
|
||||
auto error = absl::StrCat("Unkown category: ", category);
|
||||
cntx->SendError(error);
|
||||
builder->SendError(error);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -487,7 +488,7 @@ void AclFamily::Cat(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
};
|
||||
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx->reply_builder());
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(builder);
|
||||
cmd_registry_->Traverse(cb);
|
||||
rb->StartArray(results.size());
|
||||
for (const auto& command : results) {
|
||||
|
@ -504,7 +505,7 @@ void AclFamily::Cat(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx->reply_builder());
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(builder);
|
||||
rb->StartArray(total_categories);
|
||||
for (auto& elem : reverse_cat_table_) {
|
||||
if (elem != "_RESERVED") {
|
||||
|
@ -513,12 +514,12 @@ void AclFamily::Cat(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void AclFamily::GetUser(CmdArgList args, ConnectionContext* cntx) {
|
||||
void AclFamily::GetUser(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
auto username = facade::ToSV(args[0]);
|
||||
const auto registry_with_lock = registry_->GetRegistryWithLock();
|
||||
const auto& registry = registry_with_lock.registry;
|
||||
if (!registry.contains(username)) {
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx->reply_builder());
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(builder);
|
||||
rb->SendNull();
|
||||
return;
|
||||
}
|
||||
|
@ -529,7 +530,7 @@ void AclFamily::GetUser(CmdArgList args, ConnectionContext* cntx) {
|
|||
pass.pop_back();
|
||||
}
|
||||
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx->reply_builder());
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(builder);
|
||||
rb->StartArray(10);
|
||||
|
||||
rb->SendSimpleString("flags");
|
||||
|
@ -566,9 +567,9 @@ void AclFamily::GetUser(CmdArgList args, ConnectionContext* cntx) {
|
|||
rb->SendSimpleString(pub_sub);
|
||||
}
|
||||
|
||||
void AclFamily::GenPass(CmdArgList args, ConnectionContext* cntx) {
|
||||
void AclFamily::GenPass(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
if (args.length() > 1) {
|
||||
cntx->SendError(facade::UnknownSubCmd("GENPASS", "ACL"));
|
||||
builder->SendError(facade::UnknownSubCmd("GENPASS", "ACL"));
|
||||
return;
|
||||
}
|
||||
uint32_t random_bits = 256;
|
||||
|
@ -576,7 +577,7 @@ void AclFamily::GenPass(CmdArgList args, ConnectionContext* cntx) {
|
|||
auto requested_bits = facade::ArgS(args, 0);
|
||||
|
||||
if (!absl::SimpleAtoi(requested_bits, &random_bits) || random_bits == 0 || random_bits > 4096) {
|
||||
return cntx->SendError(
|
||||
return builder->SendError(
|
||||
"ACL GENPASS argument must be the number of bits for the output password, a positive "
|
||||
"number up to 4096");
|
||||
}
|
||||
|
@ -591,16 +592,16 @@ void AclFamily::GenPass(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
response.resize(result_length);
|
||||
|
||||
cntx->SendSimpleString(response);
|
||||
builder->SendSimpleString(response);
|
||||
}
|
||||
|
||||
void AclFamily::DryRun(CmdArgList args, ConnectionContext* cntx) {
|
||||
void AclFamily::DryRun(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
auto username = facade::ArgS(args, 0);
|
||||
const auto registry_with_lock = registry_->GetRegistryWithLock();
|
||||
const auto& registry = registry_with_lock.registry;
|
||||
if (!registry.contains(username)) {
|
||||
auto error = absl::StrCat("User '", username, "' not found");
|
||||
cntx->SendError(error);
|
||||
builder->SendError(error);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -608,7 +609,7 @@ void AclFamily::DryRun(CmdArgList args, ConnectionContext* cntx) {
|
|||
auto* cid = cmd_registry_->Find(command);
|
||||
if (!cid) {
|
||||
auto error = absl::StrCat("Command '", command, "' not found");
|
||||
cntx->SendError(error);
|
||||
builder->SendError(error);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -616,19 +617,30 @@ void AclFamily::DryRun(CmdArgList args, ConnectionContext* cntx) {
|
|||
const bool is_allowed =
|
||||
IsUserAllowedToInvokeCommandGeneric(user.AclCommandsRef(), {{}, true}, {}, *cid).first;
|
||||
if (is_allowed) {
|
||||
cntx->SendOk();
|
||||
builder->SendOk();
|
||||
return;
|
||||
}
|
||||
|
||||
auto msg = absl::StrCat("This user has no permissions to run the '", command, "' command");
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx->reply_builder());
|
||||
auto* rb = static_cast<facade::RedisReplyBuilder*>(builder);
|
||||
rb->SendBulkString(msg);
|
||||
}
|
||||
|
||||
using MemberFunc = void (AclFamily::*)(CmdArgList args, ConnectionContext* cntx);
|
||||
using MemberFunc2 = void (AclFamily::*)(CmdArgList args, Transaction* tx,
|
||||
facade::SinkReplyBuilder* builder);
|
||||
|
||||
CommandId::Handler HandlerFunc(AclFamily* acl, MemberFunc f) {
|
||||
return [=](CmdArgList args, ConnectionContext* cntx) { return (acl->*f)(args, cntx); };
|
||||
using MemberFunc3 = void (AclFamily::*)(CmdArgList args, Transaction* tx,
|
||||
facade::SinkReplyBuilder* builder, ConnectionContext* cntx);
|
||||
|
||||
CommandId::Handler2 HandlerFunc(AclFamily* acl, MemberFunc2 f) {
|
||||
return [=](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder) {
|
||||
return (acl->*f)(args, tx, builder);
|
||||
};
|
||||
}
|
||||
|
||||
CommandId::Handler3 HandlerFunc(AclFamily* acl, MemberFunc3 f) {
|
||||
return [=](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) { return (acl->*f)(args, tx, builder, cntx); };
|
||||
}
|
||||
|
||||
#define HFUNC(x) SetHandler(HandlerFunc(this, &AclFamily::x))
|
||||
|
|
|
@ -18,6 +18,10 @@
|
|||
#include "server/command_registry.h"
|
||||
#include "server/common.h"
|
||||
|
||||
namespace facade {
|
||||
class SinkReplyBuilder;
|
||||
} // namespace facade
|
||||
|
||||
namespace dfly {
|
||||
|
||||
class ConnectionContext;
|
||||
|
@ -31,21 +35,23 @@ class AclFamily final {
|
|||
void Init(facade::Listener* listener, UserRegistry* registry);
|
||||
|
||||
private:
|
||||
void Acl(CmdArgList args, ConnectionContext* cntx);
|
||||
void List(CmdArgList args, ConnectionContext* cntx);
|
||||
void SetUser(CmdArgList args, ConnectionContext* cntx);
|
||||
void DelUser(CmdArgList args, ConnectionContext* cntx);
|
||||
void WhoAmI(CmdArgList args, ConnectionContext* cntx);
|
||||
void Save(CmdArgList args, ConnectionContext* cntx);
|
||||
void Load(CmdArgList args, ConnectionContext* cntx);
|
||||
using SinkReplyBuilder = facade::SinkReplyBuilder;
|
||||
|
||||
void Acl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
void List(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
void SetUser(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
void DelUser(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
void WhoAmI(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
|
||||
void Save(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
void Load(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
// Helper function for bootstrap
|
||||
bool Load();
|
||||
void Log(CmdArgList args, ConnectionContext* cntx);
|
||||
void Users(CmdArgList args, ConnectionContext* cntx);
|
||||
void Cat(CmdArgList args, ConnectionContext* cntx);
|
||||
void GetUser(CmdArgList args, ConnectionContext* cntx);
|
||||
void DryRun(CmdArgList args, ConnectionContext* cntx);
|
||||
void GenPass(CmdArgList args, ConnectionContext* cntx);
|
||||
void Log(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
void Users(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
void Cat(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
void GetUser(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
void DryRun(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
void GenPass(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
|
||||
// Helper function that updates all open connections and their
|
||||
// respective ACL fields on all the available proactor threads
|
||||
|
@ -62,7 +68,7 @@ class AclFamily final {
|
|||
void EvictOpenConnectionsOnAllProactorsWithRegistry(const UserRegistry::RegistryType& registry);
|
||||
|
||||
// Helper function that loads the acl state of an acl file into the user registry
|
||||
GenericError LoadToRegistryFromFile(std::string_view full_path, ConnectionContext* init);
|
||||
GenericError LoadToRegistryFromFile(std::string_view full_path, SinkReplyBuilder* builder);
|
||||
|
||||
// Serializes the whole registry into a string
|
||||
std::string RegistryToString() const;
|
||||
|
|
|
@ -67,7 +67,7 @@ ScriptMgr::ScriptKey::ScriptKey(string_view sha) : array{} {
|
|||
memcpy(data(), sha.data(), size());
|
||||
}
|
||||
|
||||
void ScriptMgr::Run(CmdArgList args, ConnectionContext* cntx) {
|
||||
void ScriptMgr::Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
string subcmd = absl::AsciiStrToUpper(ArgS(args, 0));
|
||||
|
||||
if (subcmd == "HELP") {
|
||||
|
@ -93,37 +93,37 @@ void ScriptMgr::Run(CmdArgList args, ConnectionContext* cntx) {
|
|||
" Invokes garbage collection on all unused interpreter instances.",
|
||||
"HELP",
|
||||
" Prints this help."};
|
||||
auto rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
auto rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
return rb->SendSimpleStrArr(kHelp);
|
||||
}
|
||||
|
||||
if (subcmd == "EXISTS" && args.size() > 1)
|
||||
return ExistsCmd(args, cntx);
|
||||
return ExistsCmd(args, tx, builder);
|
||||
|
||||
if (subcmd == "FLUSH")
|
||||
return FlushCmd(args, cntx);
|
||||
return FlushCmd(args, tx, builder);
|
||||
|
||||
if (subcmd == "LIST")
|
||||
return ListCmd(cntx);
|
||||
return ListCmd(tx, builder);
|
||||
|
||||
if (subcmd == "LATENCY")
|
||||
return LatencyCmd(cntx);
|
||||
return LatencyCmd(tx, builder);
|
||||
|
||||
if (subcmd == "LOAD" && args.size() == 2)
|
||||
return LoadCmd(args, cntx);
|
||||
return LoadCmd(args, tx, builder);
|
||||
|
||||
if (subcmd == "FLAGS" && args.size() > 2)
|
||||
return ConfigCmd(args, cntx);
|
||||
return ConfigCmd(args, tx, builder);
|
||||
|
||||
if (subcmd == "GC")
|
||||
return GCCmd(cntx);
|
||||
return GCCmd(tx, builder);
|
||||
|
||||
string err = absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd,
|
||||
"'. Try SCRIPT HELP.");
|
||||
cntx->SendError(err, kSyntaxErrType);
|
||||
builder->SendError(err, kSyntaxErrType);
|
||||
}
|
||||
|
||||
void ScriptMgr::ExistsCmd(CmdArgList args, ConnectionContext* cntx) const {
|
||||
void ScriptMgr::ExistsCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) const {
|
||||
vector<uint8_t> res(args.size() - 1, 0);
|
||||
for (size_t i = 1; i < args.size(); ++i) {
|
||||
if (string_view sha = ArgS(args, i); Find(sha)) {
|
||||
|
@ -131,22 +131,22 @@ void ScriptMgr::ExistsCmd(CmdArgList args, ConnectionContext* cntx) const {
|
|||
}
|
||||
}
|
||||
|
||||
auto rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
auto rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
rb->StartArray(res.size());
|
||||
for (uint8_t v : res) {
|
||||
rb->SendLong(v);
|
||||
}
|
||||
}
|
||||
|
||||
void ScriptMgr::FlushCmd(CmdArgList args, ConnectionContext* cntx) {
|
||||
void ScriptMgr::FlushCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
FlushAllScript();
|
||||
|
||||
return cntx->SendOk();
|
||||
return builder->SendOk();
|
||||
}
|
||||
|
||||
void ScriptMgr::LoadCmd(CmdArgList args, ConnectionContext* cntx) {
|
||||
void ScriptMgr::LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
string_view body = ArgS(args, 1);
|
||||
auto rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
auto rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
if (body.empty()) {
|
||||
char sha[41];
|
||||
Interpreter::FuncSha1(body, sha);
|
||||
|
@ -159,35 +159,35 @@ void ScriptMgr::LoadCmd(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
auto res = Insert(body, interpreter);
|
||||
if (!res)
|
||||
return cntx->SendError(res.error().Format());
|
||||
return builder->SendError(res.error().Format());
|
||||
|
||||
// Schedule empty callback inorder to journal command via transaction framework.
|
||||
cntx->transaction->ScheduleSingleHop([](auto* t, auto* shard) { return OpStatus::OK; });
|
||||
tx->ScheduleSingleHop([](auto* t, auto* shard) { return OpStatus::OK; });
|
||||
|
||||
return rb->SendBulkString(res.value());
|
||||
}
|
||||
|
||||
void ScriptMgr::ConfigCmd(CmdArgList args, ConnectionContext* cntx) {
|
||||
void ScriptMgr::ConfigCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
lock_guard lk{mu_};
|
||||
ScriptKey key{ArgS(args, 1)};
|
||||
auto& data = db_[key];
|
||||
|
||||
for (auto flag : args.subspan(2)) {
|
||||
if (auto err = ScriptParams::ApplyFlags(facade::ToSV(flag), &data); err)
|
||||
return cntx->SendError("Invalid config format: " + err.Format());
|
||||
return builder->SendError("Invalid config format: " + err.Format());
|
||||
}
|
||||
|
||||
UpdateScriptCaches(key, data);
|
||||
|
||||
// Schedule empty callback inorder to journal command via transaction framework.
|
||||
cntx->transaction->ScheduleSingleHop([](auto* t, auto* shard) { return OpStatus::OK; });
|
||||
tx->ScheduleSingleHop([](auto* t, auto* shard) { return OpStatus::OK; });
|
||||
|
||||
return cntx->SendOk();
|
||||
return builder->SendOk();
|
||||
}
|
||||
|
||||
void ScriptMgr::ListCmd(ConnectionContext* cntx) const {
|
||||
void ScriptMgr::ListCmd(Transaction* tx, SinkReplyBuilder* builder) const {
|
||||
vector<pair<string, ScriptData>> scripts = GetAll();
|
||||
auto rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
auto rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
rb->StartArray(scripts.size());
|
||||
for (const auto& [sha, data] : scripts) {
|
||||
rb->StartArray(2);
|
||||
|
@ -196,7 +196,7 @@ void ScriptMgr::ListCmd(ConnectionContext* cntx) const {
|
|||
}
|
||||
}
|
||||
|
||||
void ScriptMgr::LatencyCmd(ConnectionContext* cntx) const {
|
||||
void ScriptMgr::LatencyCmd(Transaction* tx, SinkReplyBuilder* builder) const {
|
||||
absl::flat_hash_map<std::string, base::Histogram> result;
|
||||
fb2::Mutex mu;
|
||||
|
||||
|
@ -209,7 +209,7 @@ void ScriptMgr::LatencyCmd(ConnectionContext* cntx) const {
|
|||
mu.unlock();
|
||||
});
|
||||
|
||||
auto rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
auto rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
rb->StartArray(result.size());
|
||||
for (const auto& k_v : result) {
|
||||
rb->StartArray(2);
|
||||
|
@ -218,14 +218,14 @@ void ScriptMgr::LatencyCmd(ConnectionContext* cntx) const {
|
|||
}
|
||||
}
|
||||
|
||||
void ScriptMgr::GCCmd(ConnectionContext* cntx) const {
|
||||
void ScriptMgr::GCCmd(Transaction* tx, SinkReplyBuilder* builder) const {
|
||||
auto cb = [](Interpreter* ir) {
|
||||
ir->RunGC();
|
||||
ThisFiber::Yield();
|
||||
};
|
||||
shard_set->pool()->AwaitFiberOnAll(
|
||||
[cb](auto* pb) { ServerState::tlocal()->AlterInterpreters(cb); });
|
||||
return cntx->SendOk();
|
||||
return builder->SendOk();
|
||||
}
|
||||
|
||||
// Check if script starts with lua flags instructions (--df flags=...).
|
||||
|
|
|
@ -11,6 +11,10 @@
|
|||
|
||||
#include "server/conn_context.h"
|
||||
|
||||
namespace facade {
|
||||
class SinkReplyBuilder;
|
||||
} // namespace facade
|
||||
|
||||
namespace dfly {
|
||||
|
||||
class EngineShardSet;
|
||||
|
@ -40,9 +44,11 @@ class ScriptMgr {
|
|||
};
|
||||
|
||||
public:
|
||||
using SinkReplyBuilder = facade::SinkReplyBuilder;
|
||||
|
||||
ScriptMgr();
|
||||
|
||||
void Run(CmdArgList args, ConnectionContext* cntx);
|
||||
void Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
|
||||
// Insert script and return sha. Get possible error from compilation or parsing script flags.
|
||||
io::Result<std::string, GenericError> Insert(std::string_view body, Interpreter* interpreter);
|
||||
|
@ -61,13 +67,13 @@ class ScriptMgr {
|
|||
void OnScriptError(std::string_view sha, std::string_view error);
|
||||
|
||||
private:
|
||||
void ExistsCmd(CmdArgList args, ConnectionContext* cntx) const;
|
||||
void FlushCmd(CmdArgList args, ConnectionContext* cntx);
|
||||
void LoadCmd(CmdArgList args, ConnectionContext* cntx);
|
||||
void ConfigCmd(CmdArgList args, ConnectionContext* cntx);
|
||||
void ListCmd(ConnectionContext* cntx) const;
|
||||
void LatencyCmd(ConnectionContext* cntx) const;
|
||||
void GCCmd(ConnectionContext* cntx) const;
|
||||
void ExistsCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) const;
|
||||
void FlushCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
void LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
void ConfigCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
void ListCmd(Transaction* tx, SinkReplyBuilder* builder) const;
|
||||
void LatencyCmd(Transaction* tx, SinkReplyBuilder* builder) const;
|
||||
void GCCmd(Transaction* tx, SinkReplyBuilder* builder) const;
|
||||
|
||||
void UpdateScriptCaches(ScriptKey sha, ScriptParams params) const;
|
||||
|
||||
|
|
|
@ -2967,7 +2967,7 @@ void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
void ServerFamily::Script(CmdArgList args, ConnectionContext* cntx) {
|
||||
script_mgr_->Run(std::move(args), cntx);
|
||||
script_mgr_->Run(std::move(args), cntx->transaction, cntx->reply_builder());
|
||||
}
|
||||
|
||||
void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -9,10 +9,15 @@
|
|||
#include "facade/op_status.h"
|
||||
#include "server/common.h"
|
||||
|
||||
namespace facade {
|
||||
class SinkReplyBuilder;
|
||||
} // namespace facade
|
||||
|
||||
namespace dfly {
|
||||
|
||||
class ConnectionContext;
|
||||
class CommandRegistry;
|
||||
class Transaction;
|
||||
|
||||
class ZSetFamily {
|
||||
public:
|
||||
|
@ -55,47 +60,50 @@ class ZSetFamily {
|
|||
|
||||
private:
|
||||
template <typename T> using OpResult = facade::OpResult<T>;
|
||||
using SinkReplyBuilder = facade::SinkReplyBuilder;
|
||||
|
||||
static void BZPopMin(CmdArgList args, ConnectionContext* cntx);
|
||||
static void BZPopMax(CmdArgList args, ConnectionContext* cntx);
|
||||
static void BZPopMin(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
static void BZPopMax(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx);
|
||||
|
||||
static void ZAdd(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZCard(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZCount(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZDiff(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZIncrBy(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZInterStore(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZInter(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZInterCard(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZLexCount(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZPopMax(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZPopMin(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRange(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRank(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRem(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRandMember(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZScore(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZMScore(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRangeByLex(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRevRangeByLex(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRangeByScore(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRangeStore(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRemRangeByRank(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRemRangeByScore(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRemRangeByLex(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRevRange(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRevRangeByScore(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZRevRank(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZScan(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZUnion(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZUnionStore(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZCard(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZDiff(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZIncrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZInterStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZInter(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZInterCard(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZLexCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZPopMax(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZPopMin(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZRank(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZRem(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZRandMember(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZScore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZMScore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZRangeByLex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZRevRangeByLex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZRangeByScore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZRangeStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZRemRangeByRank(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZRemRangeByScore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZRemRangeByLex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZRevRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZRevRangeByScore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZRevRank(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZScan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZUnion(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void ZUnionStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
|
||||
static void GeoAdd(CmdArgList args, ConnectionContext* cntx);
|
||||
static void GeoHash(CmdArgList args, ConnectionContext* cntx);
|
||||
static void GeoPos(CmdArgList args, ConnectionContext* cntx);
|
||||
static void GeoDist(CmdArgList args, ConnectionContext* cntx);
|
||||
static void GeoSearch(CmdArgList args, ConnectionContext* cntx);
|
||||
static void GeoRadiusByMember(CmdArgList args, ConnectionContext* cntx);
|
||||
static void GeoAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void GeoHash(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void GeoPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void GeoDist(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void GeoSearch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
static void GeoRadiusByMember(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Reference in a new issue