mirror of
synced 2024-12-14 11:58:02 +00:00
fix: fix interpreter acquisition with MULTI (#2549)
* fix: fix interpreter acquisition with MULTI --------- Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
8 changed files with 114 additions and 34 deletions
@ -885,12 +885,15 @@ Interpreter* InterpreterManager::Get() {
waker_.await([this]() { return available_.size() > 0; });
Interpreter* ir = available_.back();
return ir;
void InterpreterManager::Return(Interpreter* ir) {
DCHECK_LE(storage_.data(), ir); // ensure the pointer
DCHECK_GE(storage_.data() + storage_.size(), ir); // belongs to storage_
@ -146,9 +146,9 @@ class InterpreterManager {
// Borrow interpreter. Always return it after usage.
Interpreter* Get();
void Return(Interpreter*);
// Clear all interpreters, keeps capacity. Waits until all are returned.
void Reset();
@ -252,6 +252,7 @@ size_t ConnectionContext::UsedMemory() const {
void ConnectionState::ExecInfo::Clear() {
DCHECK(!preborrowed_interpreter); // Must have been released properly
is_write = false;
@ -128,20 +128,22 @@ constexpr size_t kMaxThreadSize = 1024;
// Unwatch all keys for a connection and unregister from DbSlices.
// Used by UNWATCH, DICARD and EXEC.
void UnwatchAllKeys(ConnectionContext* cntx) {
auto& exec_info = cntx->conn_state.exec_info;
if (!exec_info.watched_keys.empty()) {
auto cb = [&](EngineShard* shard) {
void UnwatchAllKeys(ConnectionState::ExecInfo* exec_info) {
if (!exec_info->watched_keys.empty()) {
auto cb = [&](EngineShard* shard) { shard->db_slice().UnregisterConnectionWatches(exec_info); };
void MultiCleanup(ConnectionContext* cntx) {
auto& exec_info = cntx->conn_state.exec_info;
if (auto* borrowed = exec_info.preborrowed_interpreter; borrowed) {
exec_info.preborrowed_interpreter = nullptr;
void DeactivateMonitoring(ConnectionContext* server_ctx) {
@ -691,13 +693,24 @@ string CreateExecDescriptor(const std::vector<StoredCmd>& stored_cmds, unsigned
return result;
// Either take the interpreter from the preborrowed multi exec transaction or borrow one.
// Ensures availability of an interpreter for EVAL-like commands and it's automatic release.
// If it's part of MULTI, the preborrowed interpreter is returned, otherwise a new is acquired.
struct BorrowedInterpreter {
explicit BorrowedInterpreter(ConnectionContext* cntx) {
// Ensure squashing ignores EVAL. We can't run on a stub context, because it doesn't have our
// preborrowed interpreter (which can't be shared on multiple threads).
if (auto borrowed = cntx->conn_state.exec_info.preborrowed_interpreter; borrowed) {
DCHECK_EQ(cntx->conn_state.exec_info.state, ConnectionState::ExecInfo::EXEC_RUNNING);
// Ensure a preborrowed interpreter is only set for an already running MULTI transaction.
CHECK_EQ(cntx->conn_state.exec_info.state, ConnectionState::ExecInfo::EXEC_RUNNING);
interpreter_ = borrowed;
} else {
// A scheduled transaction occupies a place in the transaction queue and holds locks,
// preventing other transactions from progressing. Blocking below can deadlock!
interpreter_ = ServerState::tlocal()->BorrowInterpreter();
owned_ = true;
@ -708,6 +721,13 @@ struct BorrowedInterpreter {
// Give up ownership of the interpreter, it must be returned manually.
Interpreter* Release() && {
owned_ = false;
return interpreter_;
operator Interpreter*() {
return interpreter_;
@ -1538,7 +1558,7 @@ void Service::Watch(CmdArgList args, ConnectionContext* cntx) {
void Service::Unwatch(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendOk();
@ -2007,14 +2027,16 @@ void StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo*
void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto& exec_info = cntx->conn_state.exec_info;
// Clean the context no matter the outcome
absl::Cleanup exec_clear = [&cntx] { MultiCleanup(cntx); };
if (!cntx->conn_state.exec_info.IsCollecting()) {
// Check basic invariants
if (!exec_info.IsCollecting()) {
return rb->SendError("EXEC without MULTI");
auto& exec_info = cntx->conn_state.exec_info;
if (IsWatchingOtherDbs(cntx->db_index(), exec_info)) {
return rb->SendError("Dragonfly does not allow WATCH and EXEC on different databases");
@ -2028,12 +2050,17 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
cntx->last_command_debug.exec_body_len = exec_info.body.size();
const CommandId* const exec_cid = cntx->cid;
CmdArgVec arg_vec;
// The transaction can contain scripts, determine their presence ahead to customize logic below.
ExecEvalState state = DetermineEvalPresense(exec_info.body);
// We adjust the atomicity level of multi transaction inside StartMultiExec. i.e if multi mode is
// lock ahead and we run global script in the transaction then multi mode will be global.
// We borrow a single interpreter for all the EVALs inside. Returned by MultiCleanup
if (state != ExecEvalState::NONE) {
exec_info.preborrowed_interpreter = BorrowedInterpreter(cntx).Release();
// Determine according multi mode, not only only flag, but based on presence of global commands
// and scripts
optional<Transaction::MultiMode> multi_mode = DeduceExecMode(state, exec_info, *script_mgr());
if (!multi_mode)
return rb->SendError(
@ -2059,9 +2086,6 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
SinkReplyBuilder::ReplyAggregator agg(rb);
if (state != ExecEvalState::NONE)
exec_info.preborrowed_interpreter = ServerState::tlocal()->BorrowInterpreter();
if (!exec_info.body.empty()) {
if (GetFlag(FLAGS_track_exec_frequencies)) {
string descr = CreateExecDescriptor(exec_info.body, cntx->transaction->GetUniqueShardCnt());
@ -2071,6 +2095,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
if (absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE) {
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), cntx, this);
} else {
CmdArgVec arg_vec;
for (auto& scmd : exec_info.body) {
VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs();
@ -2097,19 +2122,12 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
if (exec_info.preborrowed_interpreter) {
// Use SafeTLocal() to avoid accessing the wrong thread local instance
exec_info.preborrowed_interpreter = nullptr;
if (scheduled) {
VLOG(1) << "Exec unlocking " << exec_info.body.size() << " commands";
cntx->cid = exec_cid;
cntx->cid = exec_cid_;
VLOG(1) << "Exec completed";
@ -2413,7 +2431,7 @@ void Service::OnClose(facade::ConnectionContext* cntx) {
@ -1868,6 +1868,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total);
append("reply_count", reply_stats.send_stats.count);
append("reply_latency_usec", reply_stats.send_stats.total_duration);
append("blocked_on_interpreter", m.coordinator_stats.blocked_on_interpreter);
if (should_enter("TIERED", true)) {
@ -48,7 +48,7 @@ auto ServerState::Stats::operator=(Stats&& other) -> Stats& {
ServerState::Stats& ServerState::Stats::Add(unsigned num_shards, const ServerState::Stats& other) {
static_assert(sizeof(Stats) == 12 * 8, "Stats size mismatch");
static_assert(sizeof(Stats) == 13 * 8, "Stats size mismatch");
for (int i = 0; i < NUM_TX_TYPES; ++i) {
this->tx_type_cnt[i] += other.tx_type_cnt[i];
@ -63,6 +63,8 @@ ServerState::Stats& ServerState::Stats::Add(unsigned num_shards, const ServerSta
this->multi_squash_exec_hop_usec += other.multi_squash_exec_hop_usec;
this->multi_squash_exec_reply_usec += other.multi_squash_exec_reply_usec;
this->blocked_on_interpreter += other.blocked_on_interpreter;
if (this->tx_width_freq_arr == nullptr) {
this->tx_width_freq_arr = new uint64_t[num_shards];
std::copy_n(other.tx_width_freq_arr, num_shards, this->tx_width_freq_arr);
@ -174,7 +176,10 @@ bool ServerState::IsPaused() const {
Interpreter* ServerState::BorrowInterpreter() {
return interpreter_mgr_.Get();
auto* ptr = interpreter_mgr_.Get();
return ptr;
void ServerState::ReturnInterpreter(Interpreter* ir) {
@ -106,6 +106,8 @@ class ServerState { // public struct - to allow initialization.
uint64_t multi_squash_exec_hop_usec = 0;
uint64_t multi_squash_exec_reply_usec = 0;
uint64_t blocked_on_interpreter = 0;
// Array of size of number of shards.
// Each entry is how many transactions we had with this width (unique_shard_cnt).
uint64_t* tx_width_freq_arr = nullptr;
@ -175,7 +177,8 @@ class ServerState { // public struct - to allow initialization.
bool AllowInlineScheduling() const;
// Borrow interpreter from internal manager. Return int with ReturnInterpreter.
// Borrow interpreter from interpreter pool, return it with ReturnInterpreter.
// Will block if no interpreters are aviable. Use with caution!
Interpreter* BorrowInterpreter();
// Return interpreter to internal manager to be re-used.
@ -1,10 +1,13 @@
import asyncio
import async_timeout
from redis import asyncio as aioredis
import time
import json
import pytest
import random
import itertools
import random
import string
from . import dfly_args, dfly_multi_test_args
@ -264,3 +267,49 @@ async def test_lua_auto_async(async_client: aioredis.Redis):
flushes = (await async_client.info("transaction"))["eval_squashed_flushes"]
assert 1 <= flushes <= 3 # all 100 commands are executed in at most 3 batches
Ensure liveness even with only a single interpreter in scenarios where EVAL and EVAL inside multi run concurrently while also contending for keys
@dfly_args({"proactor_threads": 3, "interpreter_per_thread": 1})
async def test_one_interpreter(async_client: aioredis.Redis):
sha = await async_client.script_load("redis.call('GET', KEYS[1])")
all_keys = [string.ascii_lowercase[i] for i in range(5)]
total_commands = 100
async def run_multi():
for _ in range(total_commands):
p = async_client.pipeline(transaction=True)
pkeys = random.choices(all_keys, k=3)
for key in pkeys:
p.evalsha(sha, 1, key)
await p.execute()
async def run_single():
for _ in range(total_commands):
await async_client.evalsha(sha, 1, random.choice(all_keys))
max_blocked = 0
async def measure_blocked():
nonlocal max_blocked
while True:
max_blocked = max(
max_blocked, (await async_client.info("STATS"))["blocked_on_interpreter"]
await asyncio.sleep(0.01)
tm = [asyncio.create_task(run_multi()) for _ in range(5)]
ts = [asyncio.create_task(run_single()) for _ in range(5)]
block_measure = asyncio.create_task(measure_blocked())
async with async_timeout.timeout(5):
await asyncio.gather(*(tm + ts))
# At least some of the commands were seen blocking on the interpreter
assert max_blocked > 3
Reference in a new issue