mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-15 17:51:06 +00:00
feat(server): track pipeline requests cache capacity (#776)
1. now pipeline_cache_capacity tracks how many bytes are allocated via free_req_pool_ cache. it's been shown now via "pipeline_cache_bytes" stat in "info memory" command. 2. a small refactoring of server_state code into server_state.cc 3. Reduce dependency of dfly_transaction on facade lib. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
6e612e7545
commit
6f1e49120e
12 changed files with 178 additions and 162 deletions
|
@ -117,7 +117,8 @@ struct Connection::RequestDeleter {
|
|||
|
||||
// Please note: The call to the Dtor is mandatory for this!!
|
||||
// This class contain types that don't have trivial destructed objects
|
||||
struct Connection::Request {
|
||||
class Connection::Request {
|
||||
public:
|
||||
using MonitorMessage = std::string;
|
||||
|
||||
struct PipelineMsg {
|
||||
|
@ -135,9 +136,6 @@ struct Connection::Request {
|
|||
StorageType storage;
|
||||
};
|
||||
|
||||
static constexpr size_t kSizeOfPipelineMsg = sizeof(PipelineMsg);
|
||||
|
||||
public:
|
||||
using MessagePayload = std::variant<PipelineMsg, PubMsgRecord, MonitorMessage>;
|
||||
|
||||
// Overload to create the a new pipeline message
|
||||
|
@ -153,7 +151,11 @@ struct Connection::Request {
|
|||
|
||||
MessagePayload payload;
|
||||
|
||||
size_t StorageCapacity() const;
|
||||
|
||||
private:
|
||||
static constexpr size_t kSizeOfPipelineMsg = sizeof(PipelineMsg);
|
||||
|
||||
Request(size_t nargs, size_t capacity) : payload(PipelineMsg{nargs, capacity}) {
|
||||
}
|
||||
|
||||
|
@ -167,6 +169,20 @@ struct Connection::Request {
|
|||
void SetArgs(const RespVec& args);
|
||||
};
|
||||
|
||||
struct Connection::DispatchOperations {
|
||||
DispatchOperations(SinkReplyBuilder* b, Connection* me)
|
||||
: stats{me->service_->GetThreadLocalConnectionStats()}, builder{b}, self(me) {
|
||||
}
|
||||
|
||||
void operator()(const PubMsgRecord& msg);
|
||||
void operator()(Request::PipelineMsg& msg);
|
||||
void operator()(const Request::MonitorMessage& msg);
|
||||
|
||||
ConnectionStats* stats = nullptr;
|
||||
SinkReplyBuilder* builder = nullptr;
|
||||
Connection* self = nullptr;
|
||||
};
|
||||
|
||||
Connection::RequestPtr Connection::Request::New(std::string msg) {
|
||||
void* ptr = mi_malloc(sizeof(Request));
|
||||
Request* req = new (ptr) Request(std::move(msg));
|
||||
|
@ -230,6 +246,52 @@ void Connection::Request::PipelineMsg::Reset(size_t nargs, size_t capacity) {
|
|||
args.resize(nargs);
|
||||
}
|
||||
|
||||
template <class... Ts> struct Overloaded : Ts... { using Ts::operator()...; };
|
||||
template <class... Ts> Overloaded(Ts...) -> Overloaded<Ts...>;
|
||||
|
||||
size_t Connection::Request::StorageCapacity() const {
|
||||
return std::visit(Overloaded{[](const PubMsgRecord& msg) -> size_t { return 0; },
|
||||
[](const PipelineMsg& arg) -> size_t {
|
||||
return arg.storage.capacity() + arg.args.capacity();
|
||||
},
|
||||
[](const MonitorMessage& arg) -> size_t { return arg.capacity(); }},
|
||||
payload);
|
||||
}
|
||||
|
||||
void Connection::DispatchOperations::operator()(const Request::MonitorMessage& msg) {
|
||||
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
|
||||
rbuilder->SendSimpleString(msg);
|
||||
}
|
||||
|
||||
void Connection::DispatchOperations::operator()(const PubMsgRecord& msg) {
|
||||
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
|
||||
++stats->async_writes_cnt;
|
||||
const PubMessage& pub_msg = msg.pub_msg;
|
||||
string_view arr[4];
|
||||
if (pub_msg.pattern.empty()) {
|
||||
arr[0] = "message";
|
||||
arr[1] = pub_msg.channel;
|
||||
arr[2] = *pub_msg.message;
|
||||
rbuilder->SendStringArr(absl::Span<string_view>{arr, 3});
|
||||
} else {
|
||||
arr[0] = "pmessage";
|
||||
arr[1] = pub_msg.pattern;
|
||||
arr[2] = pub_msg.channel;
|
||||
arr[3] = *pub_msg.message;
|
||||
rbuilder->SendStringArr(absl::Span<string_view>{arr, 4});
|
||||
}
|
||||
}
|
||||
|
||||
void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) {
|
||||
++stats->pipelined_cmd_cnt;
|
||||
bool empty = self->dispatch_q_.empty();
|
||||
builder->SetBatchMode(!empty);
|
||||
self->cc_->async_dispatch = true;
|
||||
self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get());
|
||||
self->last_interaction_ = time(nullptr);
|
||||
self->cc_->async_dispatch = false;
|
||||
}
|
||||
|
||||
Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
|
||||
ServiceInterface* service)
|
||||
: io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service) {
|
||||
|
@ -449,11 +511,13 @@ io::Result<bool> Connection::CheckForHttpProto(FiberSocketBase* peer) {
|
|||
}
|
||||
|
||||
void Connection::ConnectionFlow(FiberSocketBase* peer) {
|
||||
stats_ = service_->GetThreadLocalConnectionStats();
|
||||
|
||||
auto dispatch_fb = fibers::fiber(fibers::launch::dispatch, [&] { DispatchFiber(peer); });
|
||||
ConnectionStats* stats = service_->GetThreadLocalConnectionStats();
|
||||
++stats->num_conns;
|
||||
++stats->conn_received_cnt;
|
||||
stats->read_buf_capacity += io_buf_.Capacity();
|
||||
|
||||
++stats_->num_conns;
|
||||
++stats_->conn_received_cnt;
|
||||
stats_->read_buf_capacity += io_buf_.Capacity();
|
||||
|
||||
ParserStatus parse_status = OK;
|
||||
|
||||
|
@ -490,18 +554,18 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
|
|||
VLOG(1) << "After dispatch_fb.join()";
|
||||
service_->OnClose(cc_.get());
|
||||
|
||||
stats->read_buf_capacity -= io_buf_.Capacity();
|
||||
stats_->read_buf_capacity -= io_buf_.Capacity();
|
||||
|
||||
// Update num_replicas if this was a replica connection.
|
||||
if (cc_->replica_conn) {
|
||||
--stats->num_replicas;
|
||||
--stats_->num_replicas;
|
||||
}
|
||||
|
||||
// We wait for dispatch_fb to finish writing the previous replies before replying to the last
|
||||
// offending request.
|
||||
if (parse_status == ERROR) {
|
||||
VLOG(1) << "Error parser status " << parser_error_;
|
||||
++stats->parser_err_cnt;
|
||||
++stats_->parser_err_cnt;
|
||||
|
||||
if (redis_parser_) {
|
||||
SendProtocolError(RedisParser::Result(parser_error_), peer);
|
||||
|
@ -521,7 +585,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
|
|||
LOG(WARNING) << "Socket error " << ec << " " << ec.message();
|
||||
}
|
||||
|
||||
--stats->num_conns;
|
||||
--stats_->num_conns;
|
||||
}
|
||||
|
||||
auto Connection::ParseRedis() -> ParserStatus {
|
||||
|
@ -649,12 +713,11 @@ void Connection::OnBreakCb(int32_t mask) {
|
|||
|
||||
auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, ParserStatus> {
|
||||
SinkReplyBuilder* builder = cc_->reply_builder();
|
||||
ConnectionStats* stats = service_->GetThreadLocalConnectionStats();
|
||||
error_code ec;
|
||||
ParserStatus parse_status = OK;
|
||||
|
||||
do {
|
||||
FetchBuilderStats(stats, builder);
|
||||
FetchBuilderStats(stats_, builder);
|
||||
|
||||
io::MutableBytes append_buf = io_buf_.AppendBuffer();
|
||||
SetPhase("readsock");
|
||||
|
@ -669,8 +732,8 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
|
|||
}
|
||||
|
||||
io_buf_.CommitWrite(*recv_sz);
|
||||
stats->io_read_bytes += *recv_sz;
|
||||
++stats->io_read_cnt;
|
||||
stats_->io_read_bytes += *recv_sz;
|
||||
++stats_->io_read_cnt;
|
||||
SetPhase("process");
|
||||
|
||||
if (redis_parser_) {
|
||||
|
@ -698,7 +761,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
|
|||
|
||||
if (capacity < io_buf_.Capacity()) {
|
||||
VLOG(1) << "Growing io_buf to " << io_buf_.Capacity();
|
||||
stats->read_buf_capacity += (io_buf_.Capacity() - capacity);
|
||||
stats_->read_buf_capacity += (io_buf_.Capacity() - capacity);
|
||||
}
|
||||
}
|
||||
} else if (parse_status != OK) {
|
||||
|
@ -707,7 +770,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
|
|||
ec = builder->GetError();
|
||||
} while (peer->IsOpen() && !ec);
|
||||
|
||||
FetchBuilderStats(stats, builder);
|
||||
FetchBuilderStats(stats_, builder);
|
||||
|
||||
if (ec)
|
||||
return ec;
|
||||
|
@ -715,54 +778,6 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
|
|||
return parse_status;
|
||||
}
|
||||
|
||||
struct Connection::DispatchOperations {
|
||||
DispatchOperations(SinkReplyBuilder* b, Connection* me)
|
||||
: stats{me->service_->GetThreadLocalConnectionStats()}, builder{b}, self(me) {
|
||||
}
|
||||
|
||||
void operator()(const PubMsgRecord& msg);
|
||||
void operator()(Request::PipelineMsg& msg);
|
||||
void operator()(const Request::MonitorMessage& msg);
|
||||
|
||||
ConnectionStats* stats = nullptr;
|
||||
SinkReplyBuilder* builder = nullptr;
|
||||
Connection* self = nullptr;
|
||||
};
|
||||
|
||||
void Connection::DispatchOperations::operator()(const Request::MonitorMessage& msg) {
|
||||
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
|
||||
rbuilder->SendSimpleString(msg);
|
||||
}
|
||||
|
||||
void Connection::DispatchOperations::operator()(const PubMsgRecord& msg) {
|
||||
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
|
||||
++stats->async_writes_cnt;
|
||||
const PubMessage& pub_msg = msg.pub_msg;
|
||||
string_view arr[4];
|
||||
if (pub_msg.pattern.empty()) {
|
||||
arr[0] = "message";
|
||||
arr[1] = pub_msg.channel;
|
||||
arr[2] = *pub_msg.message;
|
||||
rbuilder->SendStringArr(absl::Span<string_view>{arr, 3});
|
||||
} else {
|
||||
arr[0] = "pmessage";
|
||||
arr[1] = pub_msg.pattern;
|
||||
arr[2] = pub_msg.channel;
|
||||
arr[3] = *pub_msg.message;
|
||||
rbuilder->SendStringArr(absl::Span<string_view>{arr, 4});
|
||||
}
|
||||
}
|
||||
|
||||
void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) {
|
||||
++stats->pipelined_cmd_cnt;
|
||||
bool empty = self->dispatch_q_.empty();
|
||||
builder->SetBatchMode(!empty);
|
||||
self->cc_->async_dispatch = true;
|
||||
self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get());
|
||||
self->last_interaction_ = time(nullptr);
|
||||
self->cc_->async_dispatch = false;
|
||||
}
|
||||
|
||||
// DispatchFiber handles commands coming from the InputLoop.
|
||||
// Thus, InputLoop can quickly read data from the input buffer, parse it and push
|
||||
// into the dispatch queue and DispatchFiber will run those commands asynchronously with
|
||||
|
@ -784,8 +799,10 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
|
|||
std::visit(dispatch_op, req->payload);
|
||||
|
||||
// Do not cache more than K items.
|
||||
if (free_req_pool_.size() < 16)
|
||||
if (free_req_pool_.size() < 16) {
|
||||
stats_->pipeline_cache_capacity += req->StorageCapacity();
|
||||
free_req_pool_.push_back(std::move(req));
|
||||
}
|
||||
}
|
||||
|
||||
cc_->conn_closing = true;
|
||||
|
@ -813,6 +830,8 @@ auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> RequestPtr {
|
|||
req = Request::New(heap, args, backed_sz);
|
||||
} else {
|
||||
req = move(free_req_pool_.back());
|
||||
stats_->pipeline_cache_capacity -= req->StorageCapacity();
|
||||
|
||||
free_req_pool_.pop_back();
|
||||
req->Emplace(move(args), backed_sz);
|
||||
}
|
||||
|
|
|
@ -63,10 +63,7 @@ class Connection : public util::Connection {
|
|||
// this function is overriden at test_utils TestConnection
|
||||
virtual void SendMsgVecAsync(const PubMessage& pub_msg);
|
||||
|
||||
// Please note, this accept the message by value, since we really want to
|
||||
// create a new copy here, so that we would not need to "worry" about memory
|
||||
// management, we are assuming that we would not have many copy for this, and that
|
||||
// we would not need in this way to sync on the lifetime of the message
|
||||
// Note that this is accepted by value because the message is processed asynchronously.
|
||||
void SendMonitorMsg(std::string monitor_msg);
|
||||
|
||||
void SetName(std::string_view name) {
|
||||
|
@ -129,7 +126,7 @@ class Connection : public util::Connection {
|
|||
|
||||
std::unique_ptr<ConnectionContext> cc_;
|
||||
|
||||
struct Request;
|
||||
class Request;
|
||||
struct DispatchOperations;
|
||||
struct DispatchCleanup;
|
||||
struct RequestDeleter;
|
||||
|
@ -149,6 +146,7 @@ class Connection : public util::Connection {
|
|||
unsigned parser_error_ = 0;
|
||||
uint32_t id_;
|
||||
uint32_t break_poll_id_ = UINT32_MAX;
|
||||
ConnectionStats* stats_ = nullptr;
|
||||
|
||||
Protocol protocol_;
|
||||
|
||||
|
|
|
@ -21,9 +21,10 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats);
|
|||
|
||||
ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
|
||||
// To break this code deliberately if we add/remove a field to this struct.
|
||||
static_assert(kSizeConnStats == 176);
|
||||
static_assert(kSizeConnStats == 184);
|
||||
|
||||
ADD(read_buf_capacity);
|
||||
ADD(pipeline_cache_capacity);
|
||||
ADD(io_read_cnt);
|
||||
ADD(io_read_bytes);
|
||||
ADD(io_write_cnt);
|
||||
|
|
|
@ -32,13 +32,15 @@ struct ConnectionStats {
|
|||
absl::flat_hash_map<std::string, uint64_t> cmd_count_map;
|
||||
|
||||
size_t read_buf_capacity = 0;
|
||||
size_t pipeline_cache_capacity = 0;
|
||||
|
||||
size_t io_read_cnt = 0;
|
||||
size_t io_read_bytes = 0;
|
||||
size_t io_write_cnt = 0;
|
||||
size_t io_write_bytes = 0;
|
||||
size_t command_cnt = 0;
|
||||
size_t pipelined_cmd_cnt = 0;
|
||||
size_t parser_err_cnt = 0;
|
||||
uint64_t command_cnt = 0;
|
||||
uint64_t pipelined_cmd_cnt = 0;
|
||||
uint64_t parser_err_cnt = 0;
|
||||
|
||||
// Writes count that happened via DispatchOperations call.
|
||||
uint64_t async_writes_cnt = 0;
|
||||
|
|
|
@ -4,16 +4,18 @@ cxx_link(dragonfly base dragonfly_lib epoll_fiber_lib)
|
|||
if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND CMAKE_BUILD_TYPE STREQUAL "Release")
|
||||
# Add core2 only to this file, thus avoiding instructions in this object file that
|
||||
# can cause SIGILL.
|
||||
set_source_files_properties(dfly_main.cc PROPERTIES COMPILE_FLAGS -march=core2 COMPILE_DEFINITIONS SOURCE_PATH_FROM_BUILD_ENV=${CMAKE_SOURCE_DIR})
|
||||
set_source_files_properties(dfly_main.cc PROPERTIES COMPILE_FLAGS -march=core2 COMPILE_DEFINITIONS
|
||||
SOURCE_PATH_FROM_BUILD_ENV=${CMAKE_SOURCE_DIR})
|
||||
endif()
|
||||
|
||||
add_library(dfly_transaction db_slice.cc malloc_stats.cc engine_shard_set.cc blocking_controller.cc common.cc
|
||||
io_mgr.cc journal/journal.cc journal/journal_slice.cc table.cc
|
||||
add_library(dfly_transaction db_slice.cc malloc_stats.cc engine_shard_set.cc blocking_controller.cc
|
||||
common.cc
|
||||
io_mgr.cc journal/journal.cc journal/journal_slice.cc server_state.cc table.cc
|
||||
tiered_storage.cc transaction.cc)
|
||||
cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib)
|
||||
|
||||
add_library(dragonfly_lib channel_slice.cc command_registry.cc
|
||||
config_flags.cc conn_context.cc debugcmd.cc server_state.cc dflycmd.cc
|
||||
config_flags.cc conn_context.cc debugcmd.cc dflycmd.cc
|
||||
generic_family.cc hset_family.cc json_family.cc
|
||||
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
|
||||
snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc
|
||||
|
|
|
@ -7,7 +7,6 @@
|
|||
#include <absl/strings/charconv.h>
|
||||
#include <absl/strings/match.h>
|
||||
#include <absl/strings/str_cat.h>
|
||||
#include <mimalloc.h>
|
||||
|
||||
#include <system_error>
|
||||
|
||||
|
@ -15,10 +14,8 @@ extern "C" {
|
|||
#include "redis/object.h"
|
||||
#include "redis/rdb.h"
|
||||
#include "redis/util.h"
|
||||
#include "redis/zmalloc.h"
|
||||
}
|
||||
|
||||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "core/compact_object.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
|
@ -27,46 +24,15 @@ extern "C" {
|
|||
#include "server/server_state.h"
|
||||
#include "server/transaction.h"
|
||||
|
||||
ABSL_FLAG(uint32_t, interpreter_per_thread, 10, "Lua interpreters per thread");
|
||||
|
||||
namespace dfly {
|
||||
|
||||
using namespace std;
|
||||
|
||||
thread_local ServerState ServerState::state_;
|
||||
|
||||
atomic_uint64_t used_mem_peak(0);
|
||||
atomic_uint64_t used_mem_current(0);
|
||||
unsigned kernel_version = 0;
|
||||
size_t max_memory_limit = 0;
|
||||
|
||||
ServerState::ServerState() : interpreter_mgr_{absl::GetFlag(FLAGS_interpreter_per_thread)} {
|
||||
CHECK(mi_heap_get_backing() == mi_heap_get_default());
|
||||
|
||||
mi_heap_t* tlh = mi_heap_new();
|
||||
init_zmalloc_threadlocal(tlh);
|
||||
data_heap_ = tlh;
|
||||
}
|
||||
|
||||
ServerState::~ServerState() {
|
||||
}
|
||||
|
||||
void ServerState::Init() {
|
||||
gstate_ = GlobalState::ACTIVE;
|
||||
}
|
||||
|
||||
void ServerState::Shutdown() {
|
||||
gstate_ = GlobalState::SHUTTING_DOWN;
|
||||
}
|
||||
|
||||
Interpreter* ServerState::BorrowInterpreter() {
|
||||
return interpreter_mgr_.Get();
|
||||
}
|
||||
|
||||
void ServerState::ReturnInterpreter(Interpreter* ir) {
|
||||
interpreter_mgr_.Return(ir);
|
||||
}
|
||||
|
||||
const char* GlobalStateName(GlobalState s) {
|
||||
switch (s) {
|
||||
case GlobalState::ACTIVE:
|
||||
|
|
|
@ -30,12 +30,6 @@ void StoredCmd::Invoke(ConnectionContext* ctx) {
|
|||
descr->Invoke(arg_list_, ctx);
|
||||
}
|
||||
|
||||
void ConnectionContext::SendMonitorMsg(std::string msg) {
|
||||
CHECK(owner());
|
||||
|
||||
owner()->SendMonitorMsg(std::move(msg));
|
||||
}
|
||||
|
||||
void ConnectionContext::ChangeMonitor(bool start) {
|
||||
// This will either remove or register a new connection
|
||||
// at the "top level" thread --> ServerState context
|
||||
|
@ -43,11 +37,11 @@ void ConnectionContext::ChangeMonitor(bool start) {
|
|||
// then notify all other threads that there is a change in the number of monitors
|
||||
auto& my_monitors = ServerState::tlocal()->Monitors();
|
||||
if (start) {
|
||||
my_monitors.Add(this);
|
||||
my_monitors.Add(owner());
|
||||
} else {
|
||||
VLOG(1) << "connection " << owner()->GetClientInfo()
|
||||
<< " no longer needs to be monitored - removing 0x" << std::hex << (const void*)this;
|
||||
my_monitors.Remove(this);
|
||||
<< " no longer needs to be monitored - removing 0x" << std::hex << this;
|
||||
my_monitors.Remove(owner());
|
||||
}
|
||||
// Tell other threads that about the change in the number of connection that we monitor
|
||||
shard_set->pool()->Await(
|
||||
|
|
|
@ -135,12 +135,6 @@ class ConnectionContext : public facade::ConnectionContext {
|
|||
return conn_state.db_index;
|
||||
}
|
||||
|
||||
// Note that this is accepted by value for lifetime reasons
|
||||
// we want to have our own copy since we are assuming that
|
||||
// 1. there will be not to many connections that we in monitor state
|
||||
// 2. we need to have for each of them each own copy for thread safe reasons
|
||||
void SendMonitorMsg(std::string msg);
|
||||
|
||||
void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args);
|
||||
void ChangePSub(bool to_add, bool to_reply, CmdArgList args);
|
||||
void UnsubscribeAll(bool to_reply);
|
||||
|
|
|
@ -167,6 +167,20 @@ std::string MakeMonitorMessage(const ConnectionState& conn_state,
|
|||
return message;
|
||||
}
|
||||
|
||||
void SendMonitor(const std::string& msg) {
|
||||
const auto& monitor_repo = ServerState::tlocal()->Monitors();
|
||||
const auto& monitors = monitor_repo.monitors();
|
||||
if (!monitors.empty()) {
|
||||
VLOG(1) << "thread " << util::ProactorBase::GetIndex() << " sending monitor message '" << msg
|
||||
<< "' for " << monitors.size();
|
||||
|
||||
for (auto monitor_conn : monitors) {
|
||||
// never preempts, so we can iterate safely.
|
||||
monitor_conn->SendMonitorMsg(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void DispatchMonitorIfNeeded(bool admin_cmd, ConnectionContext* connection, CmdArgList args) {
|
||||
// We are not sending any admin command in the monitor, and we do not want to
|
||||
// do any processing if we don't have any waiting connections with monitor
|
||||
|
@ -175,15 +189,11 @@ void DispatchMonitorIfNeeded(bool admin_cmd, ConnectionContext* connection, CmdA
|
|||
if (!(my_monitors.Empty() || admin_cmd)) {
|
||||
// We have connections waiting to get the info on the last command, send it to them
|
||||
auto monitor_msg = MakeMonitorMessage(connection->conn_state, connection->owner(), args);
|
||||
// Note that this is accepted by value for lifetime reasons
|
||||
// we want to have our own copy since we are assuming that
|
||||
// 1. there will be not to many connections that we in monitor state
|
||||
// 2. we need to have for each of them each own copy for thread safe reasons
|
||||
|
||||
VLOG(1) << "sending command '" << monitor_msg << "' to the clients that registered on it";
|
||||
|
||||
shard_set->pool()->DispatchBrief(
|
||||
[msg = std::move(monitor_msg)](unsigned idx, util::ProactorBase*) {
|
||||
ServerState::tlocal()->Monitors().Send(msg);
|
||||
});
|
||||
[msg = std::move(monitor_msg)](unsigned idx, util::ProactorBase*) { SendMonitor(msg); });
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1319,6 +1319,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
|
|||
append("listpack_blobs", total.listpack_blob_cnt);
|
||||
append("listpack_bytes", total.listpack_bytes);
|
||||
append("small_string_bytes", m.small_string_bytes);
|
||||
append("pipeline_cache_bytes", m.conn_stats.pipeline_cache_capacity);
|
||||
append("maxmemory", max_memory_limit);
|
||||
append("maxmemory_human", HumanReadableNumBytes(max_memory_limit));
|
||||
append("cache_mode", GetFlag(FLAGS_cache_mode) ? "cache" : "store");
|
||||
|
|
|
@ -4,12 +4,23 @@
|
|||
|
||||
#include "server/server_state.h"
|
||||
|
||||
#include <mimalloc.h>
|
||||
|
||||
extern "C" {
|
||||
#include "redis/zmalloc.h"
|
||||
}
|
||||
|
||||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "server/conn_context.h"
|
||||
#include "facade/conn_context.h"
|
||||
|
||||
ABSL_FLAG(uint32_t, interpreter_per_thread, 10, "Lua interpreters per thread");
|
||||
|
||||
namespace dfly {
|
||||
|
||||
void MonitorsRepo::Add(ConnectionContext* connection) {
|
||||
thread_local ServerState ServerState::state_;
|
||||
|
||||
void MonitorsRepo::Add(facade::Connection* connection) {
|
||||
VLOG(1) << "register connection "
|
||||
<< " at address 0x" << std::hex << (const void*)connection << " for thread "
|
||||
<< util::ProactorBase::GetIndex();
|
||||
|
@ -17,25 +28,14 @@ void MonitorsRepo::Add(ConnectionContext* connection) {
|
|||
monitors_.push_back(connection);
|
||||
}
|
||||
|
||||
void MonitorsRepo::Send(const std::string& msg) {
|
||||
if (!monitors_.empty()) {
|
||||
VLOG(1) << "thread " << util::ProactorBase::GetIndex() << " sending monitor message '" << msg
|
||||
<< "' for " << monitors_.size();
|
||||
for (auto monitor_conn : monitors_) {
|
||||
monitor_conn->SendMonitorMsg(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void MonitorsRepo::Remove(const ConnectionContext* conn) {
|
||||
void MonitorsRepo::Remove(const facade::Connection* conn) {
|
||||
auto it = std::find_if(monitors_.begin(), monitors_.end(),
|
||||
[&conn](const auto& val) { return val == conn; });
|
||||
if (it != monitors_.end()) {
|
||||
VLOG(1) << "removing connection 0x" << std::hex << (const void*)conn << " releasing token";
|
||||
VLOG(1) << "removing connection 0x" << std::hex << conn << " releasing token";
|
||||
monitors_.erase(it);
|
||||
} else {
|
||||
VLOG(1) << "no connection 0x" << std::hex << (const void*)conn
|
||||
<< " found in the registered list here";
|
||||
VLOG(1) << "no connection 0x" << std::hex << conn << " found in the registered list here";
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -48,4 +48,31 @@ void MonitorsRepo::NotifyChangeCount(bool added) {
|
|||
}
|
||||
}
|
||||
|
||||
ServerState::ServerState() : interpreter_mgr_{absl::GetFlag(FLAGS_interpreter_per_thread)} {
|
||||
CHECK(mi_heap_get_backing() == mi_heap_get_default());
|
||||
|
||||
mi_heap_t* tlh = mi_heap_new();
|
||||
init_zmalloc_threadlocal(tlh);
|
||||
data_heap_ = tlh;
|
||||
}
|
||||
|
||||
ServerState::~ServerState() {
|
||||
}
|
||||
|
||||
void ServerState::Init() {
|
||||
gstate_ = GlobalState::ACTIVE;
|
||||
}
|
||||
|
||||
void ServerState::Shutdown() {
|
||||
gstate_ = GlobalState::SHUTTING_DOWN;
|
||||
}
|
||||
|
||||
Interpreter* ServerState::BorrowInterpreter() {
|
||||
return interpreter_mgr_.Get();
|
||||
}
|
||||
|
||||
void ServerState::ReturnInterpreter(Interpreter* ir) {
|
||||
interpreter_mgr_.Return(ir);
|
||||
}
|
||||
|
||||
} // end of namespace dfly
|
||||
|
|
|
@ -14,9 +14,12 @@
|
|||
|
||||
typedef struct mi_heap_s mi_heap_t;
|
||||
|
||||
namespace facade {
|
||||
class Connection;
|
||||
}
|
||||
|
||||
namespace dfly {
|
||||
|
||||
class ConnectionContext;
|
||||
namespace journal {
|
||||
class Journal;
|
||||
} // namespace journal
|
||||
|
@ -36,21 +39,17 @@ class Journal;
|
|||
// at which this would run. It also minimized the number of copied for this list.
|
||||
class MonitorsRepo {
|
||||
public:
|
||||
using MonitorVec = std::vector<facade::Connection*>;
|
||||
|
||||
// This function adds a new connection to be monitored. This function only add
|
||||
// new connection that belong to this thread! Must not be called outside of this
|
||||
// thread context
|
||||
void Add(ConnectionContext* info);
|
||||
|
||||
// Note that this is accepted by value for lifetime reasons
|
||||
// we want to have our own copy since we are assuming that
|
||||
// 1. there will be not to many connections that we in monitor state
|
||||
// 2. we need to have for each of them each own copy for thread safe reasons
|
||||
void Send(const std::string& msg);
|
||||
void Add(facade::Connection* conn);
|
||||
|
||||
// This function remove a connection what was monitored. This function only removes
|
||||
// a connection that belong to this thread! Must not be called outside of this
|
||||
// thread context
|
||||
void Remove(const ConnectionContext* conn);
|
||||
void Remove(const facade::Connection* conn);
|
||||
|
||||
// We have for each thread the total number of monitors in the application.
|
||||
// So this call is thread safe since we hold a copy of this for each thread.
|
||||
|
@ -68,8 +67,11 @@ class MonitorsRepo {
|
|||
return monitors_.size();
|
||||
}
|
||||
|
||||
const MonitorVec& monitors() const {
|
||||
return monitors_;
|
||||
}
|
||||
|
||||
private:
|
||||
using MonitorVec = std::vector<ConnectionContext*>;
|
||||
MonitorVec monitors_; // save connections belonging to this thread only!
|
||||
unsigned int global_count_ = 0; // by global its means that we count the monitor for all threads
|
||||
};
|
||||
|
|
Loading…
Reference in a new issue