1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix(transaction): Fix rare deadlock case - fixes #150. (#155)

In rare cases a scheduled transaction is not scheduled correctly and we need
to remove it from the tx-queue in order to re-schedule. When we pull it from tx-queue
and it has been located at the head, we must poll-execute the next txs in the queue.

1. Fix the bug.
2. Improve verbosity loggings to make it easier to follow up on tx flow in release mode.
3. Introduce /txz handler that shows currently pending transactions in the queue.
4. Fix a typo in xdel() function.
5. Add a py-script that reproduces the bug.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-06-15 16:53:27 +03:00 committed by GitHub
parent f739300415
commit 4ec2538204
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 179 additions and 25 deletions

View file

@ -76,6 +76,10 @@ class TxQueue {
return head_;
}
Iterator Next(Iterator it) const {
return vec_[it].next;
}
private:
enum { TRANS_TAG = 0, UINT_TAG = 11, FREE_TAG = 12 };

View file

@ -53,7 +53,7 @@ class ConnectionContext {
virtual void OnClose() {}
std::string GetContextInfo() const { return std::string{}; }
virtual std::string GetContextInfo() const { return std::string{}; }
private:
Connection* owner_;

View file

@ -349,7 +349,9 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
// After the client disconnected.
cc_->conn_closing = true; // Signal dispatch to close.
evc_.notify();
VLOG(1) << "Before dispatch_fb.join()";
dispatch_fb.join();
VLOG(1) << "After dispatch_fb.join()";
cc_->OnClose();
stats->read_buf_capacity -= io_buf_.Capacity();

View file

@ -10,7 +10,7 @@ add_library(dragonfly_lib blocking_controller.cc channel_slice.cc command_regist
set_family.cc stream_family.cc string_family.cc table.cc tiered_storage.cc
transaction.cc zset_family.cc version.cc)
cxx_link(dragonfly_lib dfly_core dfly_facade redis_lib strings_lib)
cxx_link(dragonfly_lib dfly_core dfly_facade redis_lib strings_lib html_lib)
add_library(dfly_test_lib test_utils.cc)
cxx_link(dfly_test_lib dragonfly_lib facade_test gtest_main_ext)

View file

@ -232,4 +232,16 @@ void ConnectionContext::OnClose() {
}
}
string ConnectionContext::GetContextInfo() const {
char buf[16] = {0};
unsigned index = 0;
if (async_dispatch)
buf[index++] = 'a';
if (conn_closing)
buf[index++] = 't';
return index ? absl::StrCat("flags:", buf) : string();
}
} // namespace dfly

View file

@ -101,6 +101,8 @@ class ConnectionContext : public facade::ConnectionContext {
bool is_replicating = false;
std::string GetContextInfo() const override;
private:
void SendSubscriptionChangedResponse(std::string_view action,
std::optional<std::string_view> topic,

View file

@ -132,7 +132,9 @@ void EngineShard::DestroyThreadLocal() {
// Is called by Transaction::ExecuteAsync in order to run transaction tasks.
// Only runs in its own thread.
void EngineShard::PollExecution(const char* context, Transaction* trans) {
DVLOG(1) << "PollExecution " << context << " " << (trans ? trans->DebugId() : "");
VLOG(2) << "PollExecution " << context << " " << (trans ? trans->DebugId() : "")
<< " " << txq_.size() << " " << continuation_trans_;
ShardId sid = shard_id();
uint16_t trans_mask = trans ? trans->GetLocalMask(sid) : 0;
@ -170,7 +172,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
// The fact that Tx is in the queue, already means that coordinator fiber will not progress,
// hence here it's enough to test for run_count and check local_mask.
bool is_armed = head->IsArmedInShard(sid);
DVLOG(2) << "Considering head " << head->DebugId() << " isarmed: " << is_armed;
VLOG(2) << "Considering head " << head->DebugId() << " isarmed: " << is_armed;
if (!is_armed)
break;

View file

@ -33,7 +33,7 @@ extern "C" {
#include "server/transaction.h"
#include "server/version.h"
#include "server/zset_family.h"
#include "util/metrics/metrics.h"
#include "util/html/sorted_table.h"
#include "util/uring/uring_fiber_algo.h"
#include "util/varz.h"
@ -63,13 +63,13 @@ namespace this_fiber = ::boost::this_fiber;
using absl::GetFlag;
using absl::StrCat;
using namespace facade;
namespace h2 = boost::beast::http;
namespace {
DEFINE_VARZ(VarzMapAverage, request_latency_usec);
std::optional<VarzFunction> engine_varz;
metrics::CounterFamily cmd_req("requests_total", "Number of served redis requests");
constexpr size_t kMaxThreadSize = 1024;
@ -300,6 +300,48 @@ bool EvalValidator(CmdArgList args, ConnectionContext* cntx) {
return true;
}
void TxTable(const http::QueryArgs& args, HttpContext* send) {
using html::SortedTable;
http::StringResponse resp = http::MakeStringResponse(h2::status::ok);
resp.body() = SortedTable::HtmlStart();
SortedTable::StartTable({"ShardId", "TID", "TxId", "Armed"}, &resp.body());
if (shard_set) {
vector<string> rows(shard_set->size());
shard_set->RunBriefInParallel([&](EngineShard* shard) {
ShardId sid = shard->shard_id();
absl::AlphaNum tid(gettid());
absl::AlphaNum sid_an(sid);
string& mine = rows[sid];
TxQueue* queue = shard->txq();
if (!queue->Empty()) {
auto cur = queue->Head();
do {
auto value = queue->At(cur);
Transaction* trx = std::get<Transaction*>(value);
absl::AlphaNum an2(trx->txid());
absl::AlphaNum an3(trx->IsArmedInShard(sid));
SortedTable::Row({sid_an.Piece(), tid.Piece(), an2.Piece(), an3.Piece()}, &mine);
cur = queue->Next(cur);
} while (cur != queue->Head());
}
});
for (const auto& s : rows) {
resp.body().append(s);
}
}
SortedTable::EndTable(&resp.body());
send->Invoke(std::move(resp));
}
} // namespace
Service::Service(ProactorPool* pp) : pp_(*pp), server_family_(this) {
@ -334,7 +376,6 @@ void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_i
StringFamily::Init(&pp_);
GenericFamily::Init(&pp_);
server_family_.Init(acceptor, main_interface);
cmd_req.Init(&pp_, {"type"});
}
void Service::Shutdown() {
@ -352,7 +393,6 @@ void Service::Shutdown() {
StringFamily::Shutdown();
GenericFamily::Shutdown();
cmd_req.Shutdown();
shard_set->Shutdown();
// wait for all the pending callbacks to stop.
@ -503,7 +543,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
}
dfly_cntx->cid = cid;
cmd_req.Inc({cmd_name});
cid->Invoke(args, dfly_cntx);
end_usec = ProactorBase::GetMonotonicTimeNs();
@ -856,6 +896,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
return rb->SendError("-EXECABORT Transaction discarded because of previous errors");
}
VLOG(1) << "StartExec " << cntx->conn_state.exec_body.size();
rb->StartArray(cntx->conn_state.exec_body.size());
if (!cntx->conn_state.exec_body.empty()) {
CmdArgVec str_list;
@ -961,9 +1002,9 @@ void Service::Subscribe(CmdArgList args, ConnectionContext* cntx) {
void Service::Unsubscribe(CmdArgList args, ConnectionContext* cntx) {
args.remove_prefix(1);
if (args.size() == 0){
if (args.size() == 0) {
cntx->UnsubscribeAll(true);
}else{
} else {
cntx->ChangeSubscription(false, true, std::move(args));
}
}
@ -1026,6 +1067,7 @@ GlobalState Service::SwitchState(GlobalState from, GlobalState to) {
void Service::ConfigureHttpHandlers(util::HttpListenerBase* base) {
server_family_.ConfigureMetrics(base);
base->RegisterCb("/txz", TxTable);
}
using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx);

View file

@ -629,7 +629,8 @@ void StreamFamily::XDel(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
args.remove_prefix(2);
absl::InlinedVector<streamID, 8> ids(ids.size());
absl::InlinedVector<streamID, 8> ids(args.size());
for (size_t i = 0; i < args.size(); ++i) {
ParsedStreamId parsed_id;
string_view str_id = ArgS(args, i);

View file

@ -299,7 +299,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
// because Scheduling is done before multi-exec batch is executed. Therefore we
// lock keys right before the execution of each statement.
DVLOG(1) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id();
VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id();
unsigned idx = SidToId(shard->shard_id());
auto& sd = shard_data_[idx];
@ -462,20 +462,45 @@ void Transaction::ScheduleInternal() {
DCHECK(!span_all);
coordinator_state_ |= COORD_OOO;
}
DVLOG(1) << "Scheduled " << DebugId()
<< " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO);
VLOG(2) << "Scheduled " << DebugId()
<< " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO)
<< " num_shards: " << num_shards;
coordinator_state_ |= COORD_SCHED;
break;
}
DVLOG(1) << "Cancelling " << DebugId();
VLOG(2) << "Cancelling " << DebugId();
atomic_bool should_poll_execution{false};
auto cancel = [&](EngineShard* shard) {
success.fetch_sub(CancelShardCb(shard), memory_order_relaxed);
bool res = CancelShardCb(shard);
if (res) {
should_poll_execution.store(true, memory_order_relaxed);
}
};
shard_set->RunBriefInParallel(std::move(cancel), is_active);
CHECK_EQ(0u, success.load(memory_order_relaxed));
// We must follow up with PollExecution because in rare cases with multi-trans
// that follows this one, we may find the next transaction in the queue that is never
// trigerred. Which leads to deadlock. I could solve this by adding PollExecution to
// CancelShardCb above but then we would need to use the shard_set queue since PollExecution
// is blocking. I wanted to avoid the additional latency for the general case of running
// CancelShardCb because of the very rate case below. Therefore, I decided to just fetch the
// indication that we need to follow up with PollExecution and then send it to shard_set queue.
// We do not need to wait for this callback to finish - just make sure it will eventually run.
// See https://github.com/dragonflydb/dragonfly/issues/150 for more info.
if (should_poll_execution.load(memory_order_relaxed)) {
for (uint32_t i = 0; i < shard_set->size(); ++i) {
if (!is_active(i))
continue;
shard_set->Add(i, [] {
EngineShard::tlocal()->PollExecution("cancel_cleanup", nullptr);
});
}
}
}
if (IsOOO()) {
@ -855,11 +880,12 @@ bool Transaction::CancelShardCb(EngineShard* shard) {
sd.pq_pos = TxQueue::kEnd;
TxQueue* pq = shard->txq();
auto val = pq->At(pos);
TxQueue* txq = shard->txq();
TxQueue::Iterator head = txq->Head();
auto val = txq->At(pos);
Transaction* trans = absl::get<Transaction*>(val);
DCHECK(trans == this) << "Pos " << pos << ", pq size " << pq->size() << ", trans " << trans;
pq->Remove(pos);
DCHECK(trans == this) << "Pos " << pos << ", txq size " << txq->size() << ", trans " << trans;
txq->Remove(pos);
if (sd.local_mask & KEYLOCK_ACQUIRED) {
auto mode = Mode();
@ -867,7 +893,12 @@ bool Transaction::CancelShardCb(EngineShard* shard) {
shard->db_slice().Release(mode, lock_args);
sd.local_mask &= ~KEYLOCK_ACQUIRED;
}
return true;
if (pos == head && !txq->Empty()) {
return true;
}
return false;
}
// runs in engine-shard thread.

View file

@ -208,7 +208,7 @@ class Transaction {
/// Runs in the shard thread.
std::pair<bool, bool> ScheduleInShard(EngineShard* shard);
// Returns true if operation was cancelled for this shard. Runs in the shard thread.
// Returns true if we need to follow up with PollExecution on this shard.
bool CancelShardCb(EngineShard* shard);
// Shard callbacks used within Execute calls
@ -332,7 +332,7 @@ class Transaction {
};
inline uint16_t trans_id(const Transaction* ptr) {
return intptr_t(ptr) & 0xFFFF;
return (intptr_t(ptr) >> 8) & 0xFFFF;
}
OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args);

58
tests/async.py Executable file
View file

@ -0,0 +1,58 @@
#!/usr/bin/env python3
"""
This is the script that helped to reproduce https://github.com/dragonflydb/dragonfly/issues/150
The outcome - stalled code with all its connections deadlocked.
Reproduced only with dragonfly in release mode on multi-core machine.
"""
import asyncio
import aioredis
from loguru import logger as log
import sys
import random
connection_pool = aioredis.ConnectionPool(host="localhost", port=6379,
db=1, decode_responses=True, max_connections=16)
key_index = 1
async def post_to_redis(sem, db_name, index):
global key_index
async with sem:
results = None
try:
redis_client = aioredis.Redis(connection_pool=connection_pool)
async with redis_client.pipeline(transaction=True) as pipe:
for i in range(1, 15):
pipe.hsetnx(name=f"key_{key_index}", key="name", value="bla")
key_index += 1
#log.info(f"after first half {key_index}")
for i in range(1, 15):
pipe.hsetnx(name=f"bla_{key_index}", key="name2", value="bla")
key_index += 1
assert len(pipe.command_stack) > 0
log.info(f"before pipe.execute {key_index}")
results = await pipe.execute()
log.info(f"after pipe.execute {key_index}")
finally:
# log.info(f"before close {index}")
await redis_client.close()
#log.info(f"after close {index} {len(results)}")
async def do_concurrent(db_name):
tasks = []
sem = asyncio.Semaphore(10)
for i in range(1, 3000):
tasks.append(post_to_redis(sem, db_name, i))
res = await asyncio.gather(*tasks)
if __name__ == '__main__':
log.remove()
log.add(sys.stdout, enqueue=True, level='INFO')
loop = asyncio.get_event_loop()
loop.run_until_complete(do_concurrent("my_db"))