diff --git a/src/core/core_types.h b/src/core/core_types.h deleted file mode 100644 index 087cbfe1a..000000000 --- a/src/core/core_types.h +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2022, DragonflyDB authors. All rights reserved. -// See LICENSE for licensing terms. -// - -#pragma once - -#include - -namespace dfly { -using MutableSlice = absl::Span; -using MutSliceSpan = absl::Span; - -} // namespace dfly diff --git a/src/core/interpreter.cc b/src/core/interpreter.cc index a8d53550f..b5ec02033 100644 --- a/src/core/interpreter.cc +++ b/src/core/interpreter.cc @@ -244,7 +244,7 @@ optional FetchKey(lua_State* lua, const char* key) { return type; } -void SetGlobalArrayInternal(lua_State* lua, const char* name, MutSliceSpan args) { +void SetGlobalArrayInternal(lua_State* lua, const char* name, Interpreter::SliceSpan args) { lua_createtable(lua, args.size(), 0); for (size_t j = 0; j < args.size(); j++) { lua_pushlstring(lua, args[j].data(), args[j].size()); @@ -652,7 +652,7 @@ auto Interpreter::RunFunction(string_view sha, std::string* error) -> RunResult return err == 0 ? RUN_OK : RUN_ERR; } -void Interpreter::SetGlobalArray(const char* name, MutSliceSpan args) { +void Interpreter::SetGlobalArray(const char* name, SliceSpan args) { SetGlobalArrayInternal(lua_, name, args); } @@ -952,7 +952,7 @@ int Interpreter::RedisGenericCommand(bool raise_error, bool async, ObjectExplore } char name_buffer[32]; // backing storage for cmd name - absl::FixedArray, 4> args(argc); + absl::FixedArray args(argc); // Copy command name to name_buffer and set it as first arg. unsigned name_len = lua_rawlen(lua_, 1); @@ -1004,7 +1004,7 @@ int Interpreter::RedisGenericCommand(bool raise_error, bool async, ObjectExplore explorer = &*translator; } - redis_func_(CallArgs{MutSliceSpan{args}, &buffer_, explorer, async, raise_error, &raise_error}); + redis_func_(CallArgs{SliceSpan{args}, &buffer_, explorer, async, raise_error, &raise_error}); cmd_depth_--; // Shrink reusable buffer if it's too big. diff --git a/src/core/interpreter.h b/src/core/interpreter.h index 8f25d63d8..27448cbf6 100644 --- a/src/core/interpreter.h +++ b/src/core/interpreter.h @@ -4,11 +4,12 @@ #pragma once +#include + #include #include #include -#include "core/core_types.h" #include "util/fibers/synchronization.h" typedef struct lua_State lua_State; @@ -40,10 +41,12 @@ class ObjectExplorer { class Interpreter { public: + using SliceSpan = absl::Span; + // Arguments received from redis.call struct CallArgs { // Full arguments, including cmd name. - MutSliceSpan args; + SliceSpan args; // Pointer to backing storage for args (excluding cmd name). // Moving can invalidate arg slice pointers. Moved by async to re-use buffer. @@ -93,7 +96,7 @@ class Interpreter { RUN_ERR = 2, }; - void SetGlobalArray(const char* name, MutSliceSpan args); + void SetGlobalArray(const char* name, SliceSpan args); // Runs already added function sha returned by a successful call to AddFunction(). // Returns: true if the call succeeded, otherwise fills error and returns false. diff --git a/src/core/interpreter_test.cc b/src/core/interpreter_test.cc index 1fe0909fc..4e1735091 100644 --- a/src/core/interpreter_test.cc +++ b/src/core/interpreter_test.cc @@ -73,6 +73,7 @@ class TestSerializer : public ObjectExplorer { } }; +using SliceSpan = Interpreter::SliceSpan; class InterpreterTest : public ::testing::Test { protected: InterpreterTest() { @@ -99,12 +100,12 @@ class InterpreterTest : public ::testing::Test { }; void InterpreterTest::SetGlobalArray(const char* name, const vector& vec) { - vector slices(vec.size()); + vector slices(vec.size()); for (size_t i = 0; i < vec.size(); ++i) { strings_.emplace_back(new string(vec[i])); - slices[i] = MutableSlice{*strings_.back()}; + slices[i] = string_view{*strings_.back()}; } - intptr_.SetGlobalArray(name, MutSliceSpan{slices}); + intptr_.SetGlobalArray(name, SliceSpan{slices}); } bool InterpreterTest::Execute(string_view script) { @@ -329,7 +330,7 @@ TEST_F(InterpreterTest, CallArray) { TEST_F(InterpreterTest, ArgKeys) { vector vec_arr{}; - vector slices; + vector slices; SetGlobalArray("ARGV", {"foo", "bar"}); SetGlobalArray("KEYS", {"key1", "key2"}); EXPECT_TRUE(Execute("return {ARGV[1], KEYS[1], KEYS[2]}")); diff --git a/src/core/search/base.h b/src/core/search/base.h index 4949f95ab..964dcfcee 100644 --- a/src/core/search/base.h +++ b/src/core/search/base.h @@ -14,7 +14,6 @@ #include #include "base/pmr/memory_resource.h" -#include "core/core_types.h" #include "core/string_map.h" namespace dfly::search { diff --git a/src/core/small_string.h b/src/core/small_string.h index cfd278cc2..e80c0036c 100644 --- a/src/core/small_string.h +++ b/src/core/small_string.h @@ -6,8 +6,6 @@ #include #include -#include "core/core_types.h" - namespace dfly { // blob strings of upto ~256B. Small sizes are probably predominant diff --git a/src/core/uring.h b/src/core/uring.h deleted file mode 100644 index 6580cb3ee..000000000 --- a/src/core/uring.h +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2023, Roman Gershman. All rights reserved. -// See LICENSE for licensing terms. -// - -#pragma once - -#ifdef __linux__ -#include "util/fibers/uring_file.h" -#include "util/fibers/uring_proactor.h" -namespace dfly { - -using util::fb2::FiberCall; -using util::fb2::LinuxFile; -using util::fb2::OpenLinux; -using util::fb2::OpenRead; - -} // namespace dfly - -#endif diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index c41de5e92..e4fd48a46 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -17,7 +17,6 @@ #include "base/io_buf.h" #include "base/logging.h" #include "core/heap_size.h" -#include "core/uring.h" #include "facade/conn_context.h" #include "facade/dragonfly_listener.h" #include "facade/memcache_parser.h" @@ -30,6 +29,10 @@ #include "util/tls/tls_socket.h" #endif +#ifdef __linux__ +#include "util/fibers/uring_file.h" +#endif + using namespace std; using facade::operator""_MB; @@ -1341,7 +1344,7 @@ bool Connection::ShouldEndDispatchFiber(const MessageHandle& msg) { void Connection::SquashPipeline() { DCHECK_EQ(dispatch_q_.size(), pending_pipeline_cmd_cnt_); - vector squash_cmds; + vector squash_cmds; squash_cmds.reserve(dispatch_q_.size()); for (auto& msg : dispatch_q_) { diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 07569cc9d..762581e6c 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -91,7 +91,7 @@ class Connection : public util::Connection { // The capacity is chosen so that we allocate a fully utilized (256 bytes) block. using StorageType = absl::InlinedVector>; - absl::InlinedVector args; + absl::InlinedVector args; StorageType storage; }; diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index b5958117f..acf8c166f 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -35,16 +35,12 @@ constexpr size_t kSanitizerOverhead = 0u; enum class Protocol : uint8_t { MEMCACHE = 1, REDIS = 2 }; -using MutableSlice = absl::Span; -using CmdArgList = absl::Span; -using CmdArgVec = std::vector; +using MutableSlice = std::string_view; +using CmdArgList = absl::Span; +using CmdArgVec = std::vector; using ArgSlice = absl::Span; using OwnedArgSlice = absl::Span; -inline std::string_view ToSV(MutableSlice slice) { - return std::string_view{slice.data(), slice.size()}; -} - inline std::string_view ToSV(std::string_view slice) { return slice; } @@ -57,13 +53,8 @@ inline std::string_view ToSV(std::string&& slice) = delete; constexpr auto kToSV = [](auto&& v) { return ToSV(std::forward(v)); }; -inline std::string_view ArgS(CmdArgList args, size_t i) { - auto arg = args[i]; - return {arg.data(), arg.size()}; -} - -inline auto ArgS(CmdArgList args) { - return base::it::Transform(kToSV, base::it::Range{args.begin(), args.end()}); +inline std::string_view ArgS(ArgSlice args, size_t i) { + return args[i]; } struct ArgRange { @@ -95,7 +86,7 @@ struct ArgRange { return std::visit([idx](const auto& span) { return facade::ToSV(span[idx]); }, span); } - std::variant span; + std::variant span; }; struct ConnectionStats { size_t read_buf_capacity = 0; // total capacity of input buffers @@ -179,10 +170,6 @@ struct ErrorReply { std::optional status{std::nullopt}; }; -inline MutableSlice ToMSS(absl::Span span) { - return MutableSlice{reinterpret_cast(span.data()), span.size()}; -} - constexpr inline unsigned long long operator""_MB(unsigned long long x) { return 1024L * 1024L * x; } diff --git a/src/facade/ok_main.cc b/src/facade/ok_main.cc index 964e38640..2996139c7 100644 --- a/src/facade/ok_main.cc +++ b/src/facade/ok_main.cc @@ -21,11 +21,11 @@ namespace { class OkService : public ServiceInterface { public: - void DispatchCommand(CmdArgList args, ConnectionContext* cntx) final { + void DispatchCommand(ArgSlice args, ConnectionContext* cntx) final { cntx->SendOk(); } - size_t DispatchManyCommands(absl::Span args_lists, ConnectionContext* cntx) final { + size_t DispatchManyCommands(absl::Span args_lists, ConnectionContext* cntx) final { for (auto args : args_lists) DispatchCommand(args, cntx); return args_lists.size(); diff --git a/src/facade/resp_expr.cc b/src/facade/resp_expr.cc index 140e51bb9..dbaf42f70 100644 --- a/src/facade/resp_expr.cc +++ b/src/facade/resp_expr.cc @@ -13,7 +13,7 @@ void RespExpr::VecToArgList(const Vec& src, CmdArgVec* dest) { for (size_t i = 0; i < src.size(); ++i) { DCHECK(src[i].type == RespExpr::STRING); - (*dest)[i] = ToMSS(src[i].GetBuf()); + (*dest)[i] = ToSV(src[i].GetBuf()); } } diff --git a/src/facade/service_interface.h b/src/facade/service_interface.h index f2968470c..d50100cb6 100644 --- a/src/facade/service_interface.h +++ b/src/facade/service_interface.h @@ -25,11 +25,10 @@ class ServiceInterface { virtual ~ServiceInterface() { } - virtual void DispatchCommand(CmdArgList args, ConnectionContext* cntx) = 0; + virtual void DispatchCommand(ArgSlice args, ConnectionContext* cntx) = 0; // Returns number of processed commands - virtual size_t DispatchManyCommands(absl::Span args_list, - ConnectionContext* cntx) = 0; + virtual size_t DispatchManyCommands(absl::Span args_list, ConnectionContext* cntx) = 0; virtual void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, ConnectionContext* cntx) = 0; diff --git a/src/server/acl/validator.cc b/src/server/acl/validator.cc index 5513ece55..20bc1b994 100644 --- a/src/server/acl/validator.cc +++ b/src/server/acl/validator.cc @@ -18,7 +18,7 @@ extern "C" { namespace dfly::acl { [[nodiscard]] bool IsUserAllowedToInvokeCommand(const ConnectionContext& cntx, const CommandId& id, - CmdArgList tail_args) { + ArgSlice tail_args) { if (cntx.skip_acl_validation) { return true; } diff --git a/src/server/command_registry.cc b/src/server/command_registry.cc index 8741788a1..c1c4959fc 100644 --- a/src/server/command_registry.cc +++ b/src/server/command_registry.cc @@ -189,8 +189,8 @@ CommandRegistry::FamiliesVec CommandRegistry::GetFamilies() { return std::move(family_of_commands_); } -std::pair CommandRegistry::FindExtended(string_view cmd, - CmdArgList tail_args) const { +std::pair CommandRegistry::FindExtended(string_view cmd, + ArgSlice tail_args) const { if (cmd == RenamedOrOriginal("ACL"sv)) { if (tail_args.empty()) { return {Find(cmd), {}}; diff --git a/src/server/command_registry.h b/src/server/command_registry.h index 5d7e15460..50dfb4611 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -203,8 +203,8 @@ class CommandRegistry { using FamiliesVec = std::vector>; FamiliesVec GetFamilies(); - std::pair FindExtended(std::string_view cmd, - facade::CmdArgList tail_args) const; + std::pair FindExtended(std::string_view cmd, + facade::ArgSlice tail_args) const; private: absl::flat_hash_map cmd_map_; diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index ef29cb454..fdf31d5f5 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -21,7 +21,7 @@ namespace dfly { using namespace std; using namespace facade; -StoredCmd::StoredCmd(const CommandId* cid, CmdArgList args, facade::ReplyMode mode) +StoredCmd::StoredCmd(const CommandId* cid, ArgSlice args, facade::ReplyMode mode) : cid_{cid}, buffer_{}, sizes_(args.size()), reply_mode_{mode} { size_t total_size = 0; for (auto args : args) { @@ -38,7 +38,7 @@ StoredCmd::StoredCmd(const CommandId* cid, CmdArgList args, facade::ReplyMode mo } } -StoredCmd::StoredCmd(string&& buffer, const CommandId* cid, CmdArgList args, facade::ReplyMode mode) +StoredCmd::StoredCmd(string&& buffer, const CommandId* cid, ArgSlice args, facade::ReplyMode mode) : cid_{cid}, buffer_{std::move(buffer)}, sizes_(args.size()), reply_mode_{mode} { for (unsigned i = 0; i < args.size(); i++) { // Assume tightly packed list. @@ -47,7 +47,7 @@ StoredCmd::StoredCmd(string&& buffer, const CommandId* cid, CmdArgList args, fac } } -void StoredCmd::Fill(CmdArgList args) { +void StoredCmd::Fill(absl::Span args) { DCHECK_GE(args.size(), sizes_.size()); unsigned offset = 0; @@ -162,7 +162,7 @@ vector ChangeSubscriptions(bool pattern, CmdArgList args, bool to_add, // Gather all the channels we need to subscribe to / remove. size_t i = 0; - for (string_view channel : ArgS(args)) { + for (string_view channel : args) { if (to_add && local_store.emplace(channel).second) csu.Record(channel); else if (!to_add && local_store.erase(channel) > 0) diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 50a099837..7fe083350 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -27,11 +27,10 @@ struct FlowInfo; // Used for storing MULTI/EXEC commands. class StoredCmd { public: - StoredCmd(const CommandId* cid, CmdArgList args, - facade::ReplyMode mode = facade::ReplyMode::FULL); + StoredCmd(const CommandId* cid, ArgSlice args, facade::ReplyMode mode = facade::ReplyMode::FULL); // Create on top of already filled tightly-packed buffer. - StoredCmd(std::string&& buffer, const CommandId* cid, CmdArgList args, + StoredCmd(std::string&& buffer, const CommandId* cid, ArgSlice args, facade::ReplyMode mode = facade::ReplyMode::FULL); size_t NumArgs() const; @@ -40,7 +39,7 @@ class StoredCmd { // Fill the arg list with stored arguments, it should be at least of size NumArgs(). // Between filling and invocation, cmd should NOT be moved. - void Fill(CmdArgList args); + void Fill(absl::Span args); void Fill(CmdArgVec* dest) { dest->resize(sizes_.size()); diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 5c94e5657..b59da432e 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -145,7 +145,7 @@ void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool local_tx->StartMultiNonAtomic(); boost::intrusive_ptr stub_tx = new Transaction{local_tx.get(), EngineShard::tlocal()->shard_id(), nullopt}; - absl::InlinedVector args_view; + absl::InlinedVector args_view; facade::CapturingReplyBuilder crb; ConnectionContext local_cntx{cntx, stub_tx.get(), &crb}; @@ -162,7 +162,7 @@ void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool args_view.clear(); for (auto& arg : args) { - args_view.push_back(absl::MakeSpan(arg)); + args_view.push_back(arg); } auto args_span = absl::MakeSpan(args_view); diff --git a/src/server/http_api.cc b/src/server/http_api.cc index c00d2a372..8a3573a81 100644 --- a/src/server/http_api.cc +++ b/src/server/http_api.cc @@ -219,9 +219,9 @@ void HttpAPI(const http::QueryArgs& args, HttpRequest&& req, Service* service, for (size_t i = 0; i < vec.size(); ++i) { cmd_args.push_back(vec[i].AsString().c_str()); } - vector cmd_slices(cmd_args.size()); + vector cmd_slices(cmd_args.size()); for (size_t i = 0; i < cmd_args.size(); ++i) { - cmd_slices[i] = absl::MakeSpan(cmd_args[i]); + cmd_slices[i] = cmd_args[i]; } facade::ConnectionContext* context = (facade::ConnectionContext*)http_cntx->user_data(); diff --git a/src/server/journal/executor.cc b/src/server/journal/executor.cc index 753968718..cbc94e4f7 100644 --- a/src/server/journal/executor.cc +++ b/src/server/journal/executor.cc @@ -23,13 +23,13 @@ template journal::ParsedEntry::CmdData BuildFromParts(Ts... par vector raw_parts{absl::StrCat(std::forward(parts))...}; auto cmd_str = accumulate(raw_parts.begin(), raw_parts.end(), std::string{}); - auto buf = make_unique(cmd_str.size()); + auto buf = make_unique(cmd_str.size()); memcpy(buf.get(), cmd_str.data(), cmd_str.size()); - CmdArgVec slice_parts{}; + CmdArgVec slice_parts; size_t start = 0; for (const auto& part : raw_parts) { - slice_parts.emplace_back(buf.get() + start, part.size()); + slice_parts.emplace_back(reinterpret_cast(buf.get()) + start, part.size()); start += part.size(); } diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index e6aaed75f..a46058401 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -140,7 +140,7 @@ template io::Result JournalReader::ReadUInt(); template io::Result JournalReader::ReadUInt(); template io::Result JournalReader::ReadUInt(); -io::Result JournalReader::ReadString(MutableSlice buffer) { +io::Result JournalReader::ReadString(io::MutableBytes buffer) { size_t size = 0; SET_OR_UNEXPECT(ReadUInt(), size); @@ -164,17 +164,17 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data) SET_OR_RETURN(ReadUInt(), cmd_size); // Read all strings consecutively. - data->command_buf = make_unique(cmd_size); - char* ptr = data->command_buf.get(); + data->command_buf = make_unique(cmd_size); + uint8_t* ptr = data->command_buf.get(); for (auto& span : data->cmd_args) { size_t size; SET_OR_RETURN(ReadString({ptr, cmd_size}), size); DCHECK(size <= cmd_size); - span = MutableSlice{ptr, size}; + span = string_view{reinterpret_cast(ptr), size}; ptr += size; cmd_size -= size; } - return std::error_code{}; + return {}; } io::Result JournalReader::ReadEntry() { diff --git a/src/server/journal/serializer.h b/src/server/journal/serializer.h index a2516135c..37024fd2d 100644 --- a/src/server/journal/serializer.h +++ b/src/server/journal/serializer.h @@ -26,10 +26,6 @@ class JournalWriter { private: void Write(std::string_view sv); // Write string. - void Write(facade::MutableSlice slice) { - Write(facade::ToSV(slice)); - } - void Write(const journal::Entry::Payload& payload); private: @@ -58,7 +54,7 @@ struct JournalReader { template io::Result ReadUInt(); // Read and copy to buffer, return size. - io::Result ReadString(MutableSlice buffer); + io::Result ReadString(io::MutableBytes buffer); // Read argument array into string buffer. std::error_code ReadCommand(journal::ParsedEntry::CmdData* entry); diff --git a/src/server/journal/types.h b/src/server/journal/types.h index 4eb3e6b8c..f03af1409 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -39,14 +39,12 @@ struct Entry : public EntryBase { // Payload represents a non-owning view into a command executed on the shard. struct Payload { std::string_view cmd; - std::variant + std::variant // Parts of a full command. args; Payload() = default; - Payload(std::string_view c, CmdArgList a) : cmd(c), args(a) { - } + Payload(std::string_view c, const ShardArgs& a) : cmd(c), args(a) { } Payload(std::string_view c, ArgSlice a) : cmd(c), args(a) { @@ -81,7 +79,7 @@ struct Entry : public EntryBase { struct ParsedEntry : public EntryBase { struct CmdData { - std::unique_ptr command_buf; + std::unique_ptr command_buf; CmdArgVec cmd_args; // represents the parsed command. }; CmdData cmd; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 3fa5959d6..4872d0eef 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1092,7 +1092,7 @@ optional CheckKeysDeclared(const ConnectionState::ScriptInfo& eval_i static optional VerifyConnectionAclStatus(const CommandId* cid, const ConnectionContext* cntx, - string_view error_msg, CmdArgList tail_args) { + string_view error_msg, ArgSlice tail_args) { // If we are on a squashed context we need to use the owner, because the // context we are operating on is a stub and the acl username is not copied // See: MultiCommandSquasher::SquashedHopCb @@ -1238,7 +1238,7 @@ std::optional Service::VerifyCommandState(const CommandId* cid, CmdA return VerifyConnectionAclStatus(cid, &dfly_cntx, "has no ACL permissions", tail_args); } -void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) { +void Service::DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) { absl::Cleanup clear_last_error( [cntx]() { std::ignore = cntx->reply_builder()->ConsumeLastError(); }); DCHECK(!args.empty()); @@ -1246,7 +1246,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) ServerState& etl = *ServerState::tlocal(); - string cmd = absl::AsciiStrToUpper(ArgS(args, 0)); + string cmd = absl::AsciiStrToUpper(args[0]); const auto [cid, args_no_cmd] = registry_.FindExtended(cmd, args.subspan(1)); if (cid == nullptr) { @@ -1793,7 +1793,7 @@ void Service::Watch(CmdArgList args, ConnectionContext* cntx) { // Duplicate keys are stored to keep correct count. exec_info.watched_existed += keys_existed.load(memory_order_relaxed); - for (std::string_view key : ArgS(args)) { + for (string_view key : args) { exec_info.watched_keys.emplace_back(cntx->db_index(), key); } @@ -1840,7 +1840,7 @@ optional Service::FlushEvalAsyncCmds(ConnectionC void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca) { DCHECK(cntx->transaction); - DVLOG(2) << "CallFromScript " << ArgS(ca.args, 0); + DVLOG(2) << "CallFromScript " << ca.args[0]; InterpreterReplier replier(ca.translator); facade::SinkReplyBuilder* orig = cntx->Inject(&replier); @@ -1850,7 +1850,7 @@ void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca) if (ca.async) { auto& info = cntx->conn_state.script_info; - string cmd = absl::AsciiStrToUpper(ArgS(ca.args, 0)); + string cmd = absl::AsciiStrToUpper(ca.args[0]); // Full command verification happens during squashed execution if (auto* cid = registry_.Find(cmd); cid != nullptr) { @@ -1858,7 +1858,7 @@ void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca) info->async_cmds.emplace_back(std::move(*ca.buffer), cid, ca.args.subspan(1), replies); info->async_cmds_heap_mem += info->async_cmds.back().UsedMemory(); } else if (ca.error_abort) { // If we don't abort on errors, we can ignore it completely - findcmd_err = ReportUnknownCmd(ArgS(ca.args, 0)); + findcmd_err = ReportUnknownCmd(ca.args[0]); } } @@ -2421,7 +2421,7 @@ void Service::PubsubNumSub(CmdArgList args, ConnectionContext* cntx) { auto* rb = static_cast(cntx->reply_builder()); rb->StartArray(args.size() * 2); - for (string_view channel : ArgS(args)) { + for (string_view channel : args) { rb->SendBulkString(channel); rb->SendLong(ServerState::tlocal()->channel_store()->FetchSubscribers(channel).size()); } diff --git a/src/server/main_service.h b/src/server/main_service.h index 5195a1d0e..eb30d156a 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -37,10 +37,10 @@ class Service : public facade::ServiceInterface { void Shutdown(); // Prepare command execution, verify and execute, reply to context - void DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) final; + void DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) final; // Execute multiple consecutive commands, possibly in parallel by squashing - size_t DispatchManyCommands(absl::Span args_list, + size_t DispatchManyCommands(absl::Span args_list, facade::ConnectionContext* cntx) final; // Check VerifyCommandExecution and invoke command with args @@ -55,7 +55,7 @@ class Service : public facade::ServiceInterface { // Verify command prepares excution in correct state. // It's usually called before command execution. Only for multi/exec transactions it's checked // when the command is queued for execution, not before the execution itself. - std::optional VerifyCommandState(const CommandId* cid, CmdArgList tail_args, + std::optional VerifyCommandState(const CommandId* cid, ArgSlice tail_args, const ConnectionContext& cntx); void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 998a74b86..9402f7b90 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -246,7 +246,7 @@ struct CmdArgListFormatter { string UnknownCmd(string cmd, CmdArgList args) { return absl::StrCat("unknown command '", cmd, "' with args beginning with: ", - StrJoin(args.begin(), args.end(), ", ", CmdArgListFormatter())); + absl::StrJoin(args.begin(), args.end(), ", ", CmdArgListFormatter())); } bool IsCloudPath(string_view path) { diff --git a/src/server/set_family.cc b/src/server/set_family.cc index d5f849d8d..5f191a68a 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -43,7 +43,7 @@ using SetType = pair; namespace { // Possible sources of new set entries -using NewEntries = std::variant>; +using NewEntries = std::variant>; auto EntriesRange(const NewEntries& entries) { return base::it::Wrap(facade::kToSV, entries); diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 12911e895..2bf5708ec 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -2556,12 +2556,19 @@ void StreamFamily::XPending(CmdArgList args, ConnectionContext* cntx) { } void StreamFamily::XRange(CmdArgList args, ConnectionContext* cntx) { - XRangeGeneric(std::move(args), false, cntx); + string_view key = args[0]; + string_view start = args[1]; + string_view end = args[2]; + + XRangeGeneric(key, start, end, args.subspan(3), false, cntx); } void StreamFamily::XRevRange(CmdArgList args, ConnectionContext* cntx) { - swap(args[1], args[2]); - XRangeGeneric(std::move(args), true, cntx); + string_view key = args[0]; + string_view start = args[1]; + string_view end = args[2]; + + XRangeGeneric(key, end, start, args.subspan(3), true, cntx); } std::optional ParseReadArgsOrReply(CmdArgList args, bool read_group, @@ -3048,10 +3055,8 @@ void StreamFamily::XTrim(CmdArgList args, ConnectionContext* cntx) { return cntx->SendError(trim_result.status()); } -void StreamFamily::XRangeGeneric(CmdArgList args, bool is_rev, ConnectionContext* cntx) { - string_view key = ArgS(args, 0); - string_view start = ArgS(args, 1); - string_view end = ArgS(args, 2); +void StreamFamily::XRangeGeneric(std::string_view key, std::string_view start, std::string_view end, + CmdArgList args, bool is_rev, ConnectionContext* cntx) { RangeOpts range_opts; RangeId rs, re; if (!ParseRangeId(start, &rs) || !ParseRangeId(end, &re)) { @@ -3066,13 +3071,13 @@ void StreamFamily::XRangeGeneric(CmdArgList args, bool is_rev, ConnectionContext return cntx->SendError("invalid end ID for the interval", kSyntaxErrType); } - if (args.size() > 3) { - if (args.size() != 5) { + if (args.size() > 0) { + if (args.size() != 2) { return cntx->SendError(WrongNumArgsError("XRANGE"), kSyntaxErrType); } - string opt = absl::AsciiStrToUpper(ArgS(args, 3)); - string_view val = ArgS(args, 4); + string opt = absl::AsciiStrToUpper(ArgS(args, 0)); + string_view val = ArgS(args, 1); if (opt != "COUNT" || !absl::SimpleAtoi(val, &range_opts.count)) { return cntx->SendError(kSyntaxErr); diff --git a/src/server/stream_family.h b/src/server/stream_family.h index 9372563d9..9bd522013 100644 --- a/src/server/stream_family.h +++ b/src/server/stream_family.h @@ -29,7 +29,8 @@ class StreamFamily { static void XReadGroup(CmdArgList args, ConnectionContext* cntx); static void XSetId(CmdArgList args, ConnectionContext* cntx); static void XTrim(CmdArgList args, ConnectionContext* cntx); - static void XRangeGeneric(CmdArgList args, bool is_rev, ConnectionContext* cntx); + static void XRangeGeneric(std::string_view key, std::string_view start, std::string_view end, + CmdArgList args, bool is_rev, ConnectionContext* cntx); static void XAck(CmdArgList args, ConnectionContext* cntx); static void XAutoClaim(CmdArgList args, ConnectionContext* cntx); }; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index a358e4e66..2a17557aa 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -296,7 +296,7 @@ void Transaction::PrepareMultiFps(CmdArgList keys) { auto& tag_fps = multi_->tag_fps; tag_fps.reserve(keys.size()); - for (string_view str : ArgS(keys)) { + for (string_view str : keys) { ShardId sid = Shard(str, shard_set->size()); tag_fps.emplace(sid, LockTag(str).Fingerprint()); } @@ -474,7 +474,7 @@ void Transaction::StartMultiLockedAhead(Namespace* ns, DbIndex dbid, CmdArgList if (!skip_scheduling) ScheduleInternal(); - full_args_ = {nullptr, 0}; // InitBase set it to temporary keys, now we reset it. + full_args_ = {}; // InitBase set it to temporary keys, now we reset it. } void Transaction::StartMultiNonAtomic() { diff --git a/src/server/tx_base.cc b/src/server/tx_base.cc index 370ebe933..a9bd799b9 100644 --- a/src/server/tx_base.cc +++ b/src/server/tx_base.cc @@ -75,15 +75,6 @@ void TriggerJournalWriteToSink() { journal->RecordEntry(0, journal::Op::NOOP, 0, 0, nullopt, {}, true); } -std::ostream& operator<<(std::ostream& os, ArgSlice list) { - os << "["; - if (!list.empty()) { - std::for_each(list.begin(), list.end() - 1, [&os](const auto& val) { os << val << ", "; }); - os << (*(list.end() - 1)); - } - return os << "]"; -} - LockTag::LockTag(std::string_view key) { if (LockTagOptions::instance().enabled) str_ = LockTagOptions::instance().Tag(key); diff --git a/src/server/tx_base.h b/src/server/tx_base.h index aa8db41fe..af7091a62 100644 --- a/src/server/tx_base.h +++ b/src/server/tx_base.h @@ -230,6 +230,6 @@ void RecordExpiry(DbIndex dbid, std::string_view key); // Must be called from shard thread of journal to sink. void TriggerJournalWriteToSink(); -std::ostream& operator<<(std::ostream& os, ArgSlice list); +// std::ostream& operator<<(std::ostream& os, ArgSlice list); } // namespace dfly