1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix(server): Implement SCRIPT GC command (#3431)

* fix(server): Implement SCRIPT GC command
This commit is contained in:
Vladislav 2024-08-02 23:49:51 +03:00 committed by GitHub
parent f652f10743
commit 82298b8122
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 68 additions and 3 deletions

View file

@ -884,6 +884,10 @@ void Interpreter::ResetStack() {
lua_settop(lua_, 0);
}
void Interpreter::RunGC() {
lua_gc(lua_, LUA_GCCOLLECT);
}
// Returns number of results, which is always 1 in this case.
// Please note that lua resets the stack once the function returns so no need
// to unwind the stack manually in the function (though lua allows doing this).
@ -1104,4 +1108,14 @@ void InterpreterManager::Reset() {
VLOG(1) << "InterpreterManager::Reset ended";
}
void InterpreterManager::Alter(std::function<void(Interpreter*)> modf) {
vector<Interpreter*> taken;
swap(taken, available_); // swap data because modf can preempt
for (Interpreter* ir : taken) {
modf(ir);
Return(ir);
}
}
} // namespace dfly

View file

@ -110,6 +110,8 @@ class Interpreter {
void ResetStack();
void RunGC();
// fp must point to buffer with at least 41 chars.
// fp[40] will be set to '\0'.
static void FuncSha1(std::string_view body, char* fp);
@ -166,6 +168,9 @@ class InterpreterManager {
// Clear all interpreters, keeps capacity. Waits until all are returned.
void Reset();
// Run on all unused interpreters. Those are marked as used at once, so the callback can preempt
void Alter(std::function<void(Interpreter*)> modf);
static Stats& tl_stats();
private:

View file

@ -72,7 +72,7 @@ void ScriptMgr::Run(CmdArgList args, ConnectionContext* cntx) {
"LOAD <script>",
" Load a script into the scripts cache without executing it.",
"FLAGS <sha> [flags ...]",
" Set specific flags for script. Can be called before the sript is loaded."
" Set specific flags for script. Can be called before the sript is loaded.",
" The following flags are possible: ",
" - Use 'allow-undeclared-keys' to allow accessing undeclared keys",
" - Use 'disable-atomicity' to allow running scripts non-atomically",
@ -80,7 +80,9 @@ void ScriptMgr::Run(CmdArgList args, ConnectionContext* cntx) {
" Lists loaded scripts.",
"LATENCY",
" Prints latency histograms in usec for every called function.",
"HELP"
"GC",
" Invokes garbage collection on all unused interpreter instances.",
"HELP",
" Prints this help."};
auto rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
return rb->SendSimpleStrArr(kHelp);
@ -104,6 +106,9 @@ void ScriptMgr::Run(CmdArgList args, ConnectionContext* cntx) {
if (subcmd == "FLAGS" && args.size() > 2)
return ConfigCmd(args, cntx);
if (subcmd == "GC")
return GCCmd(cntx);
string err = absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd,
"'. Try SCRIPT HELP.");
cntx->SendError(err, kSyntaxErrType);
@ -122,7 +127,6 @@ void ScriptMgr::ExistsCmd(CmdArgList args, ConnectionContext* cntx) const {
for (uint8_t v : res) {
rb->SendLong(v);
}
return;
}
void ScriptMgr::FlushCmd(CmdArgList args, ConnectionContext* cntx) {
@ -207,6 +211,16 @@ void ScriptMgr::LatencyCmd(ConnectionContext* cntx) const {
}
}
void ScriptMgr::GCCmd(ConnectionContext* cntx) const {
auto cb = [](Interpreter* ir) {
ir->RunGC();
ThisFiber::Yield();
};
shard_set->pool()->AwaitFiberOnAll(
[cb](auto* pb) { ServerState::tlocal()->AlterInterpreters(cb); });
return cntx->SendOk();
}
// Check if script starts with shebang (#!lua). If present, look for flags parameter and truncate
// it.
io::Result<optional<ScriptMgr::ScriptParams>, GenericError> DeduceParams(string_view* body) {

View file

@ -68,6 +68,7 @@ class ScriptMgr {
void ConfigCmd(CmdArgList args, ConnectionContext* cntx);
void ListCmd(ConnectionContext* cntx) const;
void LatencyCmd(ConnectionContext* cntx) const;
void GCCmd(ConnectionContext* cntx) const;
void UpdateScriptCaches(ScriptKey sha, ScriptParams params) const;

View file

@ -189,6 +189,10 @@ void ServerState::ResetInterpreter() {
interpreter_mgr_.Reset();
}
void ServerState::AlterInterpreters(std::function<void(Interpreter*)> modf) {
interpreter_mgr_.Alter(std::move(modf));
}
ServerState* ServerState::SafeTLocal() {
// https://stackoverflow.com/a/75622732
asm volatile("");

View file

@ -187,6 +187,10 @@ class ServerState { // public struct - to allow initialization.
void ResetInterpreter();
// Invoke function on all free interpreters. They are marked atomically as
// used and the function is allowed to suspend.
void AlterInterpreters(std::function<void(Interpreter*)> modf);
// Returns sum of all requests in the last 6 seconds
// (not including the current one).
uint32_t MovingSum6() const {

View file

@ -339,3 +339,26 @@ async def test_migrate_close_connection(async_client: aioredis.Redis, df_server:
tasks = [asyncio.create_task(run()) for _ in range(50)]
await asyncio.gather(*tasks)
@pytest.mark.opt_only
@dfly_args({"proactor_threads": 4, "interpreter_per_thread": 4})
async def test_fill_memory_gc(async_client: aioredis.Redis):
SCRIPT = """
local res = {{}}
for j = 1, 100 do
for i = 1, 10000 do
table.insert(res, tostring(i) .. 'data')
end
end
"""
await asyncio.gather(*(async_client.eval(SCRIPT, 0) for _ in range(5)))
info = await async_client.info("memory")
# if this assert fails, we likely run gc after script invocations, remove this test
assert info["used_memory_lua"] > 50 * 1e6
await async_client.execute_command("SCRIPT GC")
info = await async_client.info("memory")
assert info["used_memory_lua"] < 10 * 1e6