mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
feat(server): Support limiting the number of open connections. (#1670)
* feat(server): Support limiting the number of open connections. * * Update helio after the small fix was merged to master * Don't limit admin connections (and add a test case) * Resolve CR comments
This commit is contained in:
parent
6dd51de9fe
commit
ed845fe526
9 changed files with 83 additions and 5 deletions
2
helio
2
helio
|
@ -1 +1 @@
|
|||
Subproject commit c8ccbbdf9113e5d3f1dc16c6cb96396ac7e3694d
|
||||
Subproject commit 96330440550013c69da14ae173049bf80e1e9257
|
|
@ -11,6 +11,7 @@
|
|||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "facade/conn_context.h"
|
||||
#include "facade/dragonfly_listener.h"
|
||||
#include "facade/memcache_parser.h"
|
||||
#include "facade/redis_parser.h"
|
||||
#include "facade/service_interface.h"
|
||||
|
@ -403,8 +404,7 @@ uint32_t Connection::GetClientId() const {
|
|||
}
|
||||
|
||||
bool Connection::IsAdmin() const {
|
||||
uint16_t admin_port = absl::GetFlag(FLAGS_admin_port);
|
||||
return socket_->LocalEndpoint().port() == admin_port;
|
||||
return static_cast<Listener*>(owner())->IsAdminInterface();
|
||||
}
|
||||
|
||||
io::Result<bool> Connection::CheckForHttpProto(FiberSocketBase* peer) {
|
||||
|
|
|
@ -201,6 +201,14 @@ bool Listener::AwaitDispatches(absl::Duration timeout,
|
|||
return false;
|
||||
}
|
||||
|
||||
bool Listener::IsAdminInterface() const {
|
||||
return is_admin_;
|
||||
}
|
||||
|
||||
void Listener::SetAdminInterface(bool is_admin) {
|
||||
is_admin_ = is_admin;
|
||||
}
|
||||
|
||||
void Listener::PreShutdown() {
|
||||
// Iterate on all connections and allow them to finish their commands for
|
||||
// a short period.
|
||||
|
@ -266,6 +274,10 @@ void Listener::OnConnectionClose(util::Connection* conn) {
|
|||
}
|
||||
}
|
||||
|
||||
void Listener::OnMaxConnectionsReached(util::FiberSocketBase* sock) {
|
||||
sock->Write(io::Buffer("-ERR max number of clients reached\r\n"));
|
||||
}
|
||||
|
||||
// We can limit number of threads handling dragonfly connections.
|
||||
ProactorBase* Listener::PickConnectionProactor(util::FiberSocketBase* sock) {
|
||||
util::ProactorPool* pp = pool();
|
||||
|
|
|
@ -35,12 +35,16 @@ class Listener : public util::ListenerInterface {
|
|||
bool AwaitDispatches(absl::Duration timeout,
|
||||
const std::function<bool(util::Connection*)>& filter);
|
||||
|
||||
bool IsAdminInterface() const;
|
||||
void SetAdminInterface(bool is_admin = true);
|
||||
|
||||
private:
|
||||
util::Connection* NewConnection(ProactorBase* proactor) final;
|
||||
ProactorBase* PickConnectionProactor(util::FiberSocketBase* sock) final;
|
||||
|
||||
void OnConnectionStart(util::Connection* conn) final;
|
||||
void OnConnectionClose(util::Connection* conn) final;
|
||||
void OnMaxConnectionsReached(util::FiberSocketBase* sock) final;
|
||||
void PreAcceptLoop(ProactorBase* pb) final;
|
||||
|
||||
void PreShutdown() final;
|
||||
|
@ -58,6 +62,8 @@ class Listener : public util::ListenerInterface {
|
|||
|
||||
std::atomic_uint32_t next_id_{0};
|
||||
|
||||
bool is_admin_ = false;
|
||||
|
||||
uint32_t conn_cnt_{0};
|
||||
uint32_t min_cnt_thread_id_{0};
|
||||
int32_t min_cnt_{0};
|
||||
|
|
|
@ -43,6 +43,17 @@ bool ConfigRegistry::Set(std::string_view config_name, std::string_view value) {
|
|||
return cb(*flag);
|
||||
}
|
||||
|
||||
std::optional<std::string> ConfigRegistry::Get(std::string_view config_name) {
|
||||
unique_lock lk(mu_);
|
||||
if (!registry_.contains(config_name))
|
||||
return std::nullopt;
|
||||
lk.unlock();
|
||||
|
||||
absl::CommandLineFlag* flag = absl::FindCommandLineFlag(config_name);
|
||||
CHECK(flag);
|
||||
return flag->CurrentValue();
|
||||
}
|
||||
|
||||
void ConfigRegistry::Reset() {
|
||||
unique_lock lk(mu_);
|
||||
registry_.clear();
|
||||
|
|
|
@ -20,6 +20,8 @@ class ConfigRegistry {
|
|||
// Returns true if the value was updated.
|
||||
bool Set(std::string_view config_name, std::string_view value);
|
||||
|
||||
std::optional<std::string> Get(std::string_view config_name);
|
||||
|
||||
void Reset();
|
||||
|
||||
private:
|
||||
|
|
|
@ -392,6 +392,7 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
|
|||
const std::string printable_addr =
|
||||
absl::StrCat("admin socket ", interface_addr ? interface_addr : "any", ":", admin_port);
|
||||
Listener* admin_listener = new Listener{Protocol::REDIS, &service};
|
||||
admin_listener->SetAdminInterface();
|
||||
error_code ec = acceptor->AddListener(interface_addr, admin_port, admin_listener);
|
||||
|
||||
if (ec) {
|
||||
|
|
|
@ -74,6 +74,8 @@ ABSL_FLAG(string, dbfilename, "dump-{timestamp}", "the filename to save/load the
|
|||
ABSL_FLAG(string, requirepass, "",
|
||||
"password for AUTH authentication. "
|
||||
"If empty can also be set with DFLY_PASSWORD environment variable.");
|
||||
ABSL_FLAG(uint32_t, maxclients, 64000, "Maximum number of concurrent clients allowed.");
|
||||
|
||||
ABSL_FLAG(string, save_schedule, "",
|
||||
"glob spec for the UTC time to save a snapshot which matches HH:MM 24h time");
|
||||
ABSL_FLAG(string, snapshot_cron, "",
|
||||
|
@ -898,12 +900,28 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) {
|
|||
ServerFamily::~ServerFamily() {
|
||||
}
|
||||
|
||||
void SetMaxClients(std::vector<facade::Listener*>& listeners, uint32_t maxclients) {
|
||||
for (auto* listener : listeners) {
|
||||
if (!listener->IsAdminInterface()) {
|
||||
listener->SetMaxClients(maxclients);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners) {
|
||||
CHECK(acceptor_ == nullptr);
|
||||
acceptor_ = acceptor;
|
||||
listeners_ = std::move(listeners);
|
||||
dfly_cmd_ = make_unique<DflyCmd>(this);
|
||||
|
||||
SetMaxClients(listeners_, absl::GetFlag(FLAGS_maxclients));
|
||||
config_registry.Register("maxclients", [this](const absl::CommandLineFlag& flag) {
|
||||
auto res = flag.TryGet<uint32_t>();
|
||||
if (res.has_value())
|
||||
SetMaxClients(listeners_, res.value());
|
||||
return res.has_value();
|
||||
});
|
||||
|
||||
pb_task_ = shard_set->pool()->GetNextProactor();
|
||||
if (pb_task_->GetKind() == ProactorBase::EPOLL) {
|
||||
fq_threadpool_.reset(new FiberQueueThreadPool(absl::GetFlag(FLAGS_epoll_file_threads)));
|
||||
|
@ -1621,9 +1639,10 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (param == "databases") {
|
||||
res.emplace_back(param);
|
||||
res.push_back(absl::StrCat(absl::GetFlag(FLAGS_dbnum)));
|
||||
} else if (param == "maxmemory") {
|
||||
} else if (auto value_from_registry = config_registry.Get(param);
|
||||
value_from_registry.has_value()) {
|
||||
res.emplace_back(param);
|
||||
res.push_back(absl::StrCat(max_memory_limit));
|
||||
res.push_back(*value_from_registry);
|
||||
}
|
||||
|
||||
return (*cntx)->SendStringArr(res, RedisReplyBuilder::MAP);
|
||||
|
|
27
tests/dragonfly/config_test.py
Normal file
27
tests/dragonfly/config_test.py
Normal file
|
@ -0,0 +1,27 @@
|
|||
import pytest
|
||||
import redis
|
||||
from redis.asyncio import Redis as RedisClient
|
||||
from .utility import *
|
||||
from . import DflyStartException
|
||||
|
||||
|
||||
async def test_maxclients(df_factory):
|
||||
# Needs some authentication
|
||||
server = df_factory.create(port=1111, maxclients=1, admin_port=1112)
|
||||
server.start()
|
||||
|
||||
async with server.client() as client1:
|
||||
assert [b"maxclients", b"1"] == await client1.execute_command("CONFIG GET maxclients")
|
||||
|
||||
with pytest.raises(redis.exceptions.ConnectionError):
|
||||
async with server.client() as client2:
|
||||
await client2.get("test")
|
||||
|
||||
# Check that admin connections are not limited.
|
||||
async with RedisClient(port=server.admin_port) as admin_client:
|
||||
await admin_client.get("test")
|
||||
|
||||
await client1.execute_command("CONFIG SET maxclients 3")
|
||||
assert [b"maxclients", b"3"] == await client1.execute_command("CONFIG GET maxclients")
|
||||
async with server.client() as client2:
|
||||
await client2.get("test")
|
Loading…
Reference in a new issue