mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
feat(server): Implement CLIENT KILL
(#2404)
* feat(server): Implement `CLIENT KILL` Currently, it supports the following syntax: * `CLIENT KILL <addr>:<port>` * `CLIENT KILL ID <id>` * `CLIENT KILL ADDR <addr>:<port>` * `CLIENT KILL LADDR <addr>:<port>` It will not allow killing an admin-connection from a non-admin port. There are a few parameters of `CLIENT KILL` that Redis supports but this PR does not yet add. Let's add them as needed. Fixes #1614 * Add tests * fixes
This commit is contained in:
parent
01a9e6d1f0
commit
13718699d8
2 changed files with 82 additions and 5 deletions
|
@ -506,6 +506,70 @@ void ClientTracking(CmdArgList args, ConnectionContext* cntx) {
|
|||
return cntx->SendOk();
|
||||
}
|
||||
|
||||
void ClientKill(CmdArgList args, absl::Span<facade::Listener*> listeners, ConnectionContext* cntx) {
|
||||
std::function<bool(facade::Connection * conn)> evaluator;
|
||||
|
||||
if (args.size() == 1) {
|
||||
string_view ip_port = ArgS(args, 0);
|
||||
if (ip_port.find(':') != ip_port.npos) {
|
||||
evaluator = [ip_port](facade::Connection* conn) {
|
||||
return conn->RemoteEndpointStr() == ip_port;
|
||||
};
|
||||
}
|
||||
} else if (args.size() == 2) {
|
||||
ToUpper(&args[0]);
|
||||
string_view filter_type = ArgS(args, 0);
|
||||
string_view filter_value = ArgS(args, 1);
|
||||
if (filter_type == "ADDR") {
|
||||
evaluator = [filter_value](facade::Connection* conn) {
|
||||
return conn->RemoteEndpointStr() == filter_value;
|
||||
};
|
||||
} else if (filter_type == "LADDR") {
|
||||
evaluator = [filter_value](facade::Connection* conn) {
|
||||
return conn->LocalBindStr() == filter_value;
|
||||
};
|
||||
} else if (filter_type == "ID") {
|
||||
uint32_t id;
|
||||
if (absl::SimpleAtoi(filter_value, &id)) {
|
||||
evaluator = [id](facade::Connection* conn) { return conn->GetClientId() == id; };
|
||||
}
|
||||
}
|
||||
// TODO: Add support for KILL USER/TYPE/SKIPME
|
||||
}
|
||||
|
||||
if (!evaluator) {
|
||||
return cntx->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
const bool is_admin_request = cntx->conn()->IsPrivileged();
|
||||
|
||||
atomic<uint32_t> killed_connections = 0;
|
||||
atomic<uint32_t> kill_errors = 0;
|
||||
auto cb = [&](unsigned thread_index, util::Connection* conn) {
|
||||
facade::Connection* dconn = static_cast<facade::Connection*>(conn);
|
||||
if (evaluator(dconn)) {
|
||||
if (is_admin_request || !dconn->IsPrivileged()) {
|
||||
dconn->ShutdownSelf();
|
||||
killed_connections.fetch_add(1);
|
||||
} else {
|
||||
kill_errors.fetch_add(1);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for (auto* listener : listeners) {
|
||||
listener->TraverseConnections(cb);
|
||||
}
|
||||
|
||||
if (kill_errors.load() == 0) {
|
||||
return cntx->SendLong(killed_connections.load());
|
||||
} else {
|
||||
return cntx->SendError(absl::StrCat("Killed ", killed_connections.load(),
|
||||
" client(s), but unable to kill ", kill_errors.load(),
|
||||
" admin client(s)."));
|
||||
}
|
||||
}
|
||||
|
||||
std::string_view GetOSString() {
|
||||
// Call uname() only once since it can be expensive. Cache the final result in a static string.
|
||||
static string os_string = []() {
|
||||
|
@ -1404,6 +1468,8 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
|
|||
return ClientPauseCmd(sub_args, absl::MakeSpan(listeners_), cntx);
|
||||
} else if (sub_cmd == "TRACKING") {
|
||||
return ClientTracking(sub_args, cntx);
|
||||
} else if (sub_cmd == "KILL") {
|
||||
return ClientKill(sub_args, absl::MakeSpan(listeners_), cntx);
|
||||
}
|
||||
|
||||
if (sub_cmd == "SETINFO") {
|
||||
|
|
|
@ -74,16 +74,27 @@ async def test_get_databases(async_client: aioredis.Redis):
|
|||
assert dbnum == {"databases": "16"}
|
||||
|
||||
|
||||
async def test_client_list(df_factory):
|
||||
async def test_client_kill(df_factory):
|
||||
with df_factory.create(port=1111, admin_port=1112) as instance:
|
||||
client = aioredis.Redis(port=instance.port)
|
||||
admin_client = aioredis.Redis(port=instance.admin_port)
|
||||
|
||||
await client.ping()
|
||||
await admin_client.ping()
|
||||
assert len(await client.execute_command("CLIENT LIST")) == 2
|
||||
|
||||
# This creates `client_conn` as a non-auto-reconnect client
|
||||
async with client.client() as client_conn:
|
||||
assert len(await client_conn.execute_command("CLIENT LIST")) == 2
|
||||
assert len(await admin_client.execute_command("CLIENT LIST")) == 2
|
||||
|
||||
# Can't kill admin from regular connection
|
||||
with pytest.raises(Exception) as e_info:
|
||||
await client_conn.execute_command("CLIENT KILL LADDR 127.0.0.1:1112")
|
||||
|
||||
assert len(await admin_client.execute_command("CLIENT LIST")) == 2
|
||||
await admin_client.execute_command("CLIENT KILL LADDR 127.0.0.1:1111")
|
||||
assert len(await admin_client.execute_command("CLIENT LIST")) == 1
|
||||
with pytest.raises(Exception) as e_info:
|
||||
await client_conn.ping()
|
||||
|
||||
await disconnect_clients(client, admin_client)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue