1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix(server): client pause fix on pipeline squash (#2180)

* fix(server): client pause fix on pipeline squash

allow squashing commands on pause
move await on client pause inside InvokeCommand - this way all flows of command invoke will read pause state

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-11-16 13:30:02 +02:00 committed by GitHub
parent b61d07d2c1
commit c95f4961be
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 11 additions and 20 deletions

View file

@ -1067,14 +1067,6 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
<< " in dbid=" << dfly_cntx->conn_state.db_index;
}
string_view cmd_name(cid->name());
bool is_write = (cid->opt_mask() & CO::WRITE) || cmd_name == "PUBLISH" || cmd_name == "EVAL" ||
cmd_name == "EVALSHA";
if (cmd_name == "EXEC" && dfly_cntx->conn_state.exec_info.is_write) {
is_write = true;
}
etl.AwaitPauseState(is_write);
etl.RecordCmd();
if (auto err = VerifyCommandState(cid, args_no_cmd, *dfly_cntx); err) {
@ -1195,10 +1187,19 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
return true; // return false only for internal error aborts
}
ServerState& etl = *ServerState::tlocal();
string_view cmd_name(cid->name());
bool is_write = (cid->opt_mask() & CO::WRITE) || cmd_name == "PUBLISH" || cmd_name == "EVAL" ||
cmd_name == "EVALSHA";
if (cmd_name == "EXEC" && cntx->conn_state.exec_info.is_write) {
is_write = true;
}
etl.AwaitPauseState(is_write);
// We are not sending any admin command in the monitor, and we do not want to
// do any processing if we don't have any waiting connections with monitor
// enabled on them - see https://redis.io/commands/monitor/
ServerState& etl = *ServerState::tlocal();
const MonitorsRepo& monitors = etl.Monitors();
if (!monitors.Empty() && (cid->opt_mask() & CO::ADMIN) == 0) {
DispatchMonitor(cntx, cid, tail_args);
@ -1264,9 +1265,8 @@ void Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
// invocations, we can potentially execute multiple eval in parallel, which is very powerful
// paired with shardlocal eval
const bool is_eval = CO::IsEvalKind(ArgS(args, 0));
const bool is_pause = dfly::ServerState::tlocal()->IsPaused();
if (!is_multi && !is_eval && cid != nullptr && !is_pause) {
if (!is_multi && !is_eval && cid != nullptr) {
stored_cmds.reserve(args_list.size());
stored_cmds.emplace_back(cid, tail_args);
continue;

View file

@ -119,10 +119,6 @@ void ServerState::SetPauseState(ClientPause state, bool start) {
}
}
bool ServerState::IsPaused() const {
return client_pauses_[0] || client_pauses_[1];
}
void ServerState::AwaitPauseState(bool is_write) {
client_pause_ec_.await([is_write, this]() {
if (client_pauses_[int(ClientPause::ALL)]) {

View file

@ -227,9 +227,6 @@ class ServerState { // public struct - to allow initialization.
// whether this is starting or ending the pause.
void SetPauseState(ClientPause state, bool start);
// Returns whether any type of commands is paused.
bool IsPaused() const;
// Awaits until the pause is over and the command can execute.
// @is_write controls whether the command is a write command or not.
void AwaitPauseState(bool is_write);

View file

@ -1763,8 +1763,6 @@ async def test_search(df_local_factory):
].id == "k0"
# @pytest.mark.slow
@pytest.mark.skip(reason="Client pause command bug with pipeline squashing")
@pytest.mark.asyncio
async def test_client_pause_with_replica(df_local_factory, df_seeder_factory):
master = df_local_factory.create(proactor_threads=4)