diff --git a/README.md b/README.md index 8e43c1d74..282ec447b 100644 --- a/README.md +++ b/README.md @@ -177,7 +177,8 @@ API 2.0 - [ ] WATCH - [ ] UNWATCH - [X] DISCARD - - [ ] CLIENT KILL/LIST/UNPAUSE/PAUSE/GETNAME/SETNAME/REPLY/TRACKINGINFO + - [X] CLIENT LIST/SETNAME + - [ ] CLIENT KILL/UNPAUSE/PAUSE/GETNAME/REPLY/TRACKINGINFO - [X] COMMAND - [X] COMMAND COUNT - [ ] COMMAND GETKEYS/INFO diff --git a/src/facade/conn_context.h b/src/facade/conn_context.h index 5cf01a68d..1543fad1a 100644 --- a/src/facade/conn_context.h +++ b/src/facade/conn_context.h @@ -50,6 +50,9 @@ class ConnectionContext { bool force_dispatch: 1; // whether we should route all requests to the dispatch fiber. virtual void OnClose() {} + + std::string GetContextInfo() const { return std::string{}; } + private: Connection* owner_; std::unique_ptr rbuilder_; diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 0730d4b7f..88c86e7f9 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -103,7 +103,6 @@ struct Connection::Shutdown { } }; - struct Connection::Request { absl::FixedArray args; @@ -122,6 +121,8 @@ struct Connection::Request { Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx, ServiceInterface* service) : io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service) { + static atomic_uint32_t next_id{1}; + protocol_ = protocol; constexpr size_t kReqSz = sizeof(Connection::Request); @@ -136,6 +137,12 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, memcache_parser_.reset(new MemcacheParser); break; } + + creation_time_ = time(nullptr); + last_interaction_ = creation_time_; + memset(name_, 0, sizeof(name_)); + memset(phase_, 0, sizeof(phase_)); + id_ = next_id.fetch_add(1, memory_order_relaxed); } Connection::~Connection() { @@ -259,6 +266,24 @@ void Connection::SendMsgVecAsync(absl::Span msg_vec, } } +string Connection::GetClientInfo() const { + LinuxSocketBase* lsb = static_cast(socket_.get()); + + string res; + auto le = lsb->LocalEndpoint(); + auto re = lsb->RemoteEndpoint(); + time_t now = time(nullptr); + + absl::StrAppend(&res, "id=", id_, " addr=", re.address().to_string(), ":", re.port()); + absl::StrAppend(&res, " laddr=", le.address().to_string(), ":", le.port()); + absl::StrAppend(&res, " fd=", lsb->native_handle(), " name=", name_); + absl::StrAppend(&res, " age=", now - creation_time_, " idle=", now - last_interaction_); + absl::StrAppend(&res, " phase=", phase_, " "); + absl::StrAppend(&res, cc_->GetContextInfo()); + + return res; +} + io::Result Connection::CheckForHttpProto(FiberSocketBase* peer) { size_t last_len = 0; do { @@ -295,6 +320,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { // At the start we read from the socket to determine the HTTP/Memstore protocol. // Therefore we may already have some data in the buffer. if (io_buf_.InputLen() > 0) { + SetPhase("process"); if (redis_parser_) { parse_status = ParseRedis(); } else { @@ -379,6 +405,7 @@ auto Connection::ParseRedis() -> ParserStatus { if (dispatch_q_.empty() && is_sync_dispatch && consumed >= io_buf_.InputLen()) { RespToArgList(args, &arg_vec); service_->DispatchCommand(CmdArgList{arg_vec.data(), arg_vec.size()}, cc_.get()); + last_interaction_ = time(nullptr); } else { // Dispatch via queue to speedup input reading. Request* req = FromArgs(std::move(args), tlh); @@ -469,7 +496,10 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant recv_sz = peer->Recv(append_buf); + last_interaction_ = time(nullptr); if (!recv_sz) { ec = recv_sz.error(); @@ -480,6 +510,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variantio_read_bytes += *recv_sz; ++stats->io_read_cnt; + SetPhase("process"); if (redis_parser_) parse_status = ParseRedis(); @@ -554,6 +585,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) { builder->SetBatchMode(!dispatch_q_.empty()); cc_->async_dispatch = true; service_->DispatchCommand(CmdArgList{req->args.data(), req->args.size()}, cc_.get()); + last_interaction_ = time(nullptr); cc_->async_dispatch = false; } req->~Request(); diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 68bd32ab9..aff8b8ed4 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -52,6 +52,16 @@ class Connection : public util::Connection { void SendMsgVecAsync(absl::Span msg_vec, util::fibers_ext::BlockingCounter bc); + void SetName(std::string_view name) { + CopyCharBuf(name, sizeof(name_), name_); + } + + void SetPhase(std::string_view phase) { + CopyCharBuf(phase, sizeof(phase_), phase_); + } + + std::string GetClientInfo() const; + protected: void OnShutdown() override; @@ -60,6 +70,13 @@ class Connection : public util::Connection { void HandleRequests() final; + static void CopyCharBuf(std::string_view src, unsigned dest_len, char* dest) { + src = src.substr(0, dest_len - 1); + if (!src.empty()) + memcpy(dest, src.data(), src.size()); + dest[src.size()] = '\0'; + } + // io::Result CheckForHttpProto(util::FiberSocketBase* peer); @@ -77,6 +94,9 @@ class Connection : public util::Connection { util::HttpListenerBase* http_listener_; SSL_CTX* ctx_; ServiceInterface* service_; + time_t creation_time_, last_interaction_; + char name_[16]; + char phase_[16]; std::unique_ptr cc_; @@ -88,7 +108,10 @@ class Connection : public util::Connection { util::fibers_ext::EventCount evc_; unsigned parser_error_ = 0; + uint32_t id_; + Protocol protocol_; + struct Shutdown; std::unique_ptr shutdown_; BreakerCb breaker_cb_; diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index 51abcd261..e63610712 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -38,11 +38,15 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) { Service service(pool); - service.Init(acceptor); + Listener* main_listener = new Listener{Protocol::REDIS, &service}; + + Service::InitOpts opts; + opts.disable_time_update = false; + service.Init(acceptor, main_listener, opts); + const char* bind_addr = FLAGS_bind.empty() ? nullptr : FLAGS_bind.c_str(); - error_code ec = - acceptor->AddListener(bind_addr, FLAGS_port, new Listener{Protocol::REDIS, &service}); + error_code ec = acceptor->AddListener(bind_addr, FLAGS_port, main_listener); LOG_IF(FATAL, ec) << "Cound not open port " << FLAGS_port << ", error: " << ec.message(); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index fd3ceb720..e58e075ed 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -307,7 +307,8 @@ Service::Service(ProactorPool* pp) : pp_(*pp), shard_set_(pp), server_family_(th Service::~Service() { } -void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) { +void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_interface, + const InitOpts& opts) { InitRedisTables(); uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size(); @@ -324,7 +325,7 @@ void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) { request_latency_usec.Init(&pp_); StringFamily::Init(&pp_); GenericFamily::Init(&pp_); - server_family_.Init(acceptor); + server_family_.Init(acceptor, main_interface); cmd_req.Init(&pp_, {"type"}); } diff --git a/src/server/main_service.h b/src/server/main_service.h index 5b79c8350..69f3e5131 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -33,7 +33,8 @@ class Service : public facade::ServiceInterface { explicit Service(util::ProactorPool* pp); ~Service(); - void Init(util::AcceptServer* acceptor, const InitOpts& opts = InitOpts{}); + void Init(util::AcceptServer* acceptor, util::ListenerInterface* main_interface, + const InitOpts& opts = InitOpts{}); void Shutdown(); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 05606ab98..40ef6841b 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -6,6 +6,7 @@ #include #include // for master_id_ generation. +#include #include #include #include @@ -18,6 +19,7 @@ extern "C" { #include "base/logging.h" #include "io/proc_reader.h" +#include "facade/dragonfly_connection.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/debugcmd.h" @@ -74,6 +76,11 @@ error_code CreateDirs(fs::path dir_path) { return ec; } +string UnknowSubCmd(string_view subcmd, string cmd) { + return absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd, + "'. Try ", cmd, " HELP."); +} + } // namespace ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service->shard_set()) { @@ -85,9 +92,10 @@ ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service- ServerFamily::~ServerFamily() { } -void ServerFamily::Init(util::AcceptServer* acceptor) { +void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_listener) { CHECK(acceptor_ == nullptr); acceptor_ = acceptor; + main_listener_ = main_listener; pb_task_ = ess_.pool()->GetNextProactor(); auto cache_cb = [] { @@ -336,12 +344,27 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) { ToUpper(&args[1]); string_view sub_cmd = ArgS(args, 1); - if (sub_cmd == "SETNAME") { + if (sub_cmd == "SETNAME" && args.size() == 3) { + cntx->owner()->SetName(ArgS(args, 2)); return (*cntx)->SendOk(); + } else if (sub_cmd == "LIST") { + vector client_info; + fibers::mutex mu; + auto cb = [&](Connection* conn) { + facade::Connection* dcon = static_cast(conn); + string info = dcon->GetClientInfo(); + lock_guard lk(mu); + client_info.push_back(move(info)); + }; + + main_listener_->TraverseConnections(cb); + string result = absl::StrJoin(move(client_info), "\n"); + result.append("\n"); + return (*cntx)->SendBulkString(result); } LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported"; - (*cntx)->SendError(kSyntaxErr); + return (*cntx)->SendError(UnknowSubCmd(sub_cmd, "CLIENT"), kSyntaxErr); } void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) { @@ -365,9 +388,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) { }); return (*cntx)->SendOk(); } else { - string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd, - "'. Try CONFIG HELP."); - return (*cntx)->SendError(err, kSyntaxErr); + return (*cntx)->SendError(UnknowSubCmd(sub_cmd, "CONFIG"), kSyntaxErr); } } diff --git a/src/server/server_family.h b/src/server/server_family.h index 8b477d0e7..c782125a1 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -12,6 +12,7 @@ namespace util { class AcceptServer; +class ListenerInterface; } // namespace util namespace dfly { @@ -39,7 +40,7 @@ class ServerFamily { ServerFamily(Service* service); ~ServerFamily(); - void Init(util::AcceptServer* acceptor); + void Init(util::AcceptServer* acceptor, util::ListenerInterface* main_listener); void Register(CommandRegistry* registry); void Shutdown(); @@ -96,6 +97,7 @@ class ServerFamily { EngineShardSet& ess_; util::AcceptServer* acceptor_ = nullptr; + util::ListenerInterface* main_listener_ = nullptr; util::ProactorBase* pb_task_ = nullptr; mutable ::boost::fibers::mutex replicaof_mu_, save_mu_; diff --git a/src/server/string_family.cc b/src/server/string_family.cc index f6803b1f2..1468195af 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -34,6 +34,8 @@ DEFINE_VARZ(VarzQps, set_qps); DEFINE_VARZ(VarzQps, get_qps); constexpr uint32_t kMaxStrLen = 1 << 28; +constexpr uint32_t kMinTieredLen = TieredStorage::kMinBlobLen; + string GetString(EngineShard* shard, const PrimeValue& pv) { string res; @@ -160,7 +162,7 @@ OpResult SetCmd::Set(const SetParams& params, std::string_view key, std::s EngineShard* shard = db_slice_.shard_owner(); if (shard->tiered_storage()) { // external storage enabled. - if (value.size() >= 64) { + if (value.size() >= kMinTieredLen) { shard->tiered_storage()->UnloadItem(params.db_index, it); } } @@ -205,6 +207,16 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt // overwrite existing entry. prime_value.SetString(value); + + + if (value.size() >= kMinTieredLen) { // external storage enabled. + EngineShard* shard = db_slice_.shard_owner(); + + if (shard->tiered_storage()) { + shard->tiered_storage()->UnloadItem(params.db_index, it); + } + } + db_slice_.PostUpdate(params.db_index, it); return OpStatus::OK; diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 6ef920bc2..ed8c73aae 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -62,7 +62,7 @@ void BaseFamilyTest::SetUp() { Service::InitOpts opts; opts.disable_time_update = true; - service_->Init(nullptr, opts); + service_->Init(nullptr, nullptr, opts); ess_ = &service_->shard_set(); expire_now_ = absl::GetCurrentTimeNanos() / 1000000; diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 93ea1d665..dfd6b3d7c 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -6,12 +6,10 @@ #include #include "base/ring_buffer.h" - #include "core/external_alloc.h" #include "server/common.h" #include "server/io_mgr.h" #include "server/table.h" - #include "util/fibers/event_count.h" namespace dfly { @@ -20,6 +18,8 @@ class DbSlice; class TieredStorage { public: + enum : uint16_t { kMinBlobLen = 64 }; + explicit TieredStorage(DbSlice* db_slice); ~TieredStorage(); @@ -76,16 +76,16 @@ class TieredStorage { // map of cursor -> pending size // absl::flat_hash_map pending_upload; - // multi_cnt_ - counts how many unloaded items exists in the batch at specified page offset. // here multi_cnt_.first is (file_offset in 4k pages) and // multi_cnt_.second is MultiBatch object storing number of allocated records in the batch // and its capacity (/ 4k). struct MultiBatch { - uint16_t used; // number of used bytes + uint16_t used; // number of used bytes uint16_t reserved; // in 4k pages. - MultiBatch(uint16_t mem_used) : used(mem_used) {} + MultiBatch(uint16_t mem_used) : used(mem_used) { + } }; absl::flat_hash_map multi_cnt_; diff --git a/tests/generate_sets.py b/tests/generate_sets.py index 98b8d764b..840ace78d 100755 --- a/tests/generate_sets.py +++ b/tests/generate_sets.py @@ -32,7 +32,6 @@ def fill_hset(args, redis): def main(): - # Check processor architecture parser = argparse.ArgumentParser(description='fill hset entities') parser.add_argument( '-p', type=int, help='redis port', dest='port', default=6380)