diff --git a/src/core/tx_queue.h b/src/core/tx_queue.h index 73fc138a8..0df6d00f5 100644 --- a/src/core/tx_queue.h +++ b/src/core/tx_queue.h @@ -76,6 +76,10 @@ class TxQueue { return head_; } + Iterator Next(Iterator it) const { + return vec_[it].next; + } + private: enum { TRANS_TAG = 0, UINT_TAG = 11, FREE_TAG = 12 }; diff --git a/src/facade/conn_context.h b/src/facade/conn_context.h index 7bd63020d..542c3492b 100644 --- a/src/facade/conn_context.h +++ b/src/facade/conn_context.h @@ -53,7 +53,7 @@ class ConnectionContext { virtual void OnClose() {} - std::string GetContextInfo() const { return std::string{}; } + virtual std::string GetContextInfo() const { return std::string{}; } private: Connection* owner_; diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index d426ef35e..77ac104ed 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -349,7 +349,9 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { // After the client disconnected. cc_->conn_closing = true; // Signal dispatch to close. evc_.notify(); + VLOG(1) << "Before dispatch_fb.join()"; dispatch_fb.join(); + VLOG(1) << "After dispatch_fb.join()"; cc_->OnClose(); stats->read_buf_capacity -= io_buf_.Capacity(); diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 5af007820..4ac3a88e6 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -10,7 +10,7 @@ add_library(dragonfly_lib blocking_controller.cc channel_slice.cc command_regist set_family.cc stream_family.cc string_family.cc table.cc tiered_storage.cc transaction.cc zset_family.cc version.cc) -cxx_link(dragonfly_lib dfly_core dfly_facade redis_lib strings_lib) +cxx_link(dragonfly_lib dfly_core dfly_facade redis_lib strings_lib html_lib) add_library(dfly_test_lib test_utils.cc) cxx_link(dfly_test_lib dragonfly_lib facade_test gtest_main_ext) diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index fa05d2232..def9ae96c 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -232,4 +232,16 @@ void ConnectionContext::OnClose() { } } +string ConnectionContext::GetContextInfo() const { + char buf[16] = {0}; + unsigned index = 0; + if (async_dispatch) + buf[index++] = 'a'; + + if (conn_closing) + buf[index++] = 't'; + + return index ? absl::StrCat("flags:", buf) : string(); +} + } // namespace dfly diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 57536b364..187ccbbcc 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -101,6 +101,8 @@ class ConnectionContext : public facade::ConnectionContext { bool is_replicating = false; + std::string GetContextInfo() const override; + private: void SendSubscriptionChangedResponse(std::string_view action, std::optional topic, diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index fe9141b9b..c8bd6770c 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -132,7 +132,9 @@ void EngineShard::DestroyThreadLocal() { // Is called by Transaction::ExecuteAsync in order to run transaction tasks. // Only runs in its own thread. void EngineShard::PollExecution(const char* context, Transaction* trans) { - DVLOG(1) << "PollExecution " << context << " " << (trans ? trans->DebugId() : ""); + VLOG(2) << "PollExecution " << context << " " << (trans ? trans->DebugId() : "") + << " " << txq_.size() << " " << continuation_trans_; + ShardId sid = shard_id(); uint16_t trans_mask = trans ? trans->GetLocalMask(sid) : 0; @@ -170,7 +172,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { // The fact that Tx is in the queue, already means that coordinator fiber will not progress, // hence here it's enough to test for run_count and check local_mask. bool is_armed = head->IsArmedInShard(sid); - DVLOG(2) << "Considering head " << head->DebugId() << " isarmed: " << is_armed; + VLOG(2) << "Considering head " << head->DebugId() << " isarmed: " << is_armed; if (!is_armed) break; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 11002fe76..07fc8e8ee 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -33,7 +33,7 @@ extern "C" { #include "server/transaction.h" #include "server/version.h" #include "server/zset_family.h" -#include "util/metrics/metrics.h" +#include "util/html/sorted_table.h" #include "util/uring/uring_fiber_algo.h" #include "util/varz.h" @@ -63,13 +63,13 @@ namespace this_fiber = ::boost::this_fiber; using absl::GetFlag; using absl::StrCat; using namespace facade; +namespace h2 = boost::beast::http; namespace { DEFINE_VARZ(VarzMapAverage, request_latency_usec); std::optional engine_varz; -metrics::CounterFamily cmd_req("requests_total", "Number of served redis requests"); constexpr size_t kMaxThreadSize = 1024; @@ -300,6 +300,48 @@ bool EvalValidator(CmdArgList args, ConnectionContext* cntx) { return true; } +void TxTable(const http::QueryArgs& args, HttpContext* send) { + using html::SortedTable; + + http::StringResponse resp = http::MakeStringResponse(h2::status::ok); + resp.body() = SortedTable::HtmlStart(); + SortedTable::StartTable({"ShardId", "TID", "TxId", "Armed"}, &resp.body()); + + if (shard_set) { + vector rows(shard_set->size()); + + shard_set->RunBriefInParallel([&](EngineShard* shard) { + ShardId sid = shard->shard_id(); + + absl::AlphaNum tid(gettid()); + absl::AlphaNum sid_an(sid); + + string& mine = rows[sid]; + TxQueue* queue = shard->txq(); + + if (!queue->Empty()) { + auto cur = queue->Head(); + do { + auto value = queue->At(cur); + Transaction* trx = std::get(value); + + absl::AlphaNum an2(trx->txid()); + absl::AlphaNum an3(trx->IsArmedInShard(sid)); + SortedTable::Row({sid_an.Piece(), tid.Piece(), an2.Piece(), an3.Piece()}, &mine); + cur = queue->Next(cur); + } while (cur != queue->Head()); + } + }); + + for (const auto& s : rows) { + resp.body().append(s); + } + } + + SortedTable::EndTable(&resp.body()); + send->Invoke(std::move(resp)); +} + } // namespace Service::Service(ProactorPool* pp) : pp_(*pp), server_family_(this) { @@ -334,7 +376,6 @@ void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_i StringFamily::Init(&pp_); GenericFamily::Init(&pp_); server_family_.Init(acceptor, main_interface); - cmd_req.Init(&pp_, {"type"}); } void Service::Shutdown() { @@ -352,7 +393,6 @@ void Service::Shutdown() { StringFamily::Shutdown(); GenericFamily::Shutdown(); - cmd_req.Shutdown(); shard_set->Shutdown(); // wait for all the pending callbacks to stop. @@ -503,7 +543,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) } dfly_cntx->cid = cid; - cmd_req.Inc({cmd_name}); + cid->Invoke(args, dfly_cntx); end_usec = ProactorBase::GetMonotonicTimeNs(); @@ -856,6 +896,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { return rb->SendError("-EXECABORT Transaction discarded because of previous errors"); } + VLOG(1) << "StartExec " << cntx->conn_state.exec_body.size(); rb->StartArray(cntx->conn_state.exec_body.size()); if (!cntx->conn_state.exec_body.empty()) { CmdArgVec str_list; @@ -961,9 +1002,9 @@ void Service::Subscribe(CmdArgList args, ConnectionContext* cntx) { void Service::Unsubscribe(CmdArgList args, ConnectionContext* cntx) { args.remove_prefix(1); - if (args.size() == 0){ + if (args.size() == 0) { cntx->UnsubscribeAll(true); - }else{ + } else { cntx->ChangeSubscription(false, true, std::move(args)); } } @@ -1026,6 +1067,7 @@ GlobalState Service::SwitchState(GlobalState from, GlobalState to) { void Service::ConfigureHttpHandlers(util::HttpListenerBase* base) { server_family_.ConfigureMetrics(base); + base->RegisterCb("/txz", TxTable); } using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx); diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 9d041b0ad..4cd2804b4 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -629,7 +629,8 @@ void StreamFamily::XDel(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); args.remove_prefix(2); - absl::InlinedVector ids(ids.size()); + absl::InlinedVector ids(args.size()); + for (size_t i = 0; i < args.size(); ++i) { ParsedStreamId parsed_id; string_view str_id = ArgS(args, i); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 81a3c50b3..786302a21 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -299,7 +299,7 @@ bool Transaction::RunInShard(EngineShard* shard) { // because Scheduling is done before multi-exec batch is executed. Therefore we // lock keys right before the execution of each statement. - DVLOG(1) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id(); + VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id(); unsigned idx = SidToId(shard->shard_id()); auto& sd = shard_data_[idx]; @@ -462,20 +462,45 @@ void Transaction::ScheduleInternal() { DCHECK(!span_all); coordinator_state_ |= COORD_OOO; } - DVLOG(1) << "Scheduled " << DebugId() - << " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO); + VLOG(2) << "Scheduled " << DebugId() + << " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO) + << " num_shards: " << num_shards; coordinator_state_ |= COORD_SCHED; break; } - DVLOG(1) << "Cancelling " << DebugId(); + VLOG(2) << "Cancelling " << DebugId(); + + atomic_bool should_poll_execution{false}; auto cancel = [&](EngineShard* shard) { - success.fetch_sub(CancelShardCb(shard), memory_order_relaxed); + bool res = CancelShardCb(shard); + if (res) { + should_poll_execution.store(true, memory_order_relaxed); + } }; shard_set->RunBriefInParallel(std::move(cancel), is_active); - CHECK_EQ(0u, success.load(memory_order_relaxed)); + + // We must follow up with PollExecution because in rare cases with multi-trans + // that follows this one, we may find the next transaction in the queue that is never + // trigerred. Which leads to deadlock. I could solve this by adding PollExecution to + // CancelShardCb above but then we would need to use the shard_set queue since PollExecution + // is blocking. I wanted to avoid the additional latency for the general case of running + // CancelShardCb because of the very rate case below. Therefore, I decided to just fetch the + // indication that we need to follow up with PollExecution and then send it to shard_set queue. + // We do not need to wait for this callback to finish - just make sure it will eventually run. + // See https://github.com/dragonflydb/dragonfly/issues/150 for more info. + if (should_poll_execution.load(memory_order_relaxed)) { + for (uint32_t i = 0; i < shard_set->size(); ++i) { + if (!is_active(i)) + continue; + + shard_set->Add(i, [] { + EngineShard::tlocal()->PollExecution("cancel_cleanup", nullptr); + }); + } + } } if (IsOOO()) { @@ -855,11 +880,12 @@ bool Transaction::CancelShardCb(EngineShard* shard) { sd.pq_pos = TxQueue::kEnd; - TxQueue* pq = shard->txq(); - auto val = pq->At(pos); + TxQueue* txq = shard->txq(); + TxQueue::Iterator head = txq->Head(); + auto val = txq->At(pos); Transaction* trans = absl::get(val); - DCHECK(trans == this) << "Pos " << pos << ", pq size " << pq->size() << ", trans " << trans; - pq->Remove(pos); + DCHECK(trans == this) << "Pos " << pos << ", txq size " << txq->size() << ", trans " << trans; + txq->Remove(pos); if (sd.local_mask & KEYLOCK_ACQUIRED) { auto mode = Mode(); @@ -867,7 +893,12 @@ bool Transaction::CancelShardCb(EngineShard* shard) { shard->db_slice().Release(mode, lock_args); sd.local_mask &= ~KEYLOCK_ACQUIRED; } - return true; + + if (pos == head && !txq->Empty()) { + return true; + } + + return false; } // runs in engine-shard thread. diff --git a/src/server/transaction.h b/src/server/transaction.h index b7c49eaaa..8983586f5 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -208,7 +208,7 @@ class Transaction { /// Runs in the shard thread. std::pair ScheduleInShard(EngineShard* shard); - // Returns true if operation was cancelled for this shard. Runs in the shard thread. + // Returns true if we need to follow up with PollExecution on this shard. bool CancelShardCb(EngineShard* shard); // Shard callbacks used within Execute calls @@ -332,7 +332,7 @@ class Transaction { }; inline uint16_t trans_id(const Transaction* ptr) { - return intptr_t(ptr) & 0xFFFF; + return (intptr_t(ptr) >> 8) & 0xFFFF; } OpResult DetermineKeys(const CommandId* cid, CmdArgList args); diff --git a/tests/async.py b/tests/async.py new file mode 100755 index 000000000..f4a31c893 --- /dev/null +++ b/tests/async.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 + +""" +This is the script that helped to reproduce https://github.com/dragonflydb/dragonfly/issues/150 +The outcome - stalled code with all its connections deadlocked. +Reproduced only with dragonfly in release mode on multi-core machine. +""" + +import asyncio +import aioredis + +from loguru import logger as log +import sys +import random + +connection_pool = aioredis.ConnectionPool(host="localhost", port=6379, + db=1, decode_responses=True, max_connections=16) + + +key_index = 1 + +async def post_to_redis(sem, db_name, index): + global key_index + async with sem: + results = None + try: + redis_client = aioredis.Redis(connection_pool=connection_pool) + async with redis_client.pipeline(transaction=True) as pipe: + for i in range(1, 15): + pipe.hsetnx(name=f"key_{key_index}", key="name", value="bla") + key_index += 1 + #log.info(f"after first half {key_index}") + for i in range(1, 15): + pipe.hsetnx(name=f"bla_{key_index}", key="name2", value="bla") + key_index += 1 + assert len(pipe.command_stack) > 0 + log.info(f"before pipe.execute {key_index}") + results = await pipe.execute() + log.info(f"after pipe.execute {key_index}") + finally: + # log.info(f"before close {index}") + await redis_client.close() + #log.info(f"after close {index} {len(results)}") + + +async def do_concurrent(db_name): + tasks = [] + sem = asyncio.Semaphore(10) + for i in range(1, 3000): + tasks.append(post_to_redis(sem, db_name, i)) + res = await asyncio.gather(*tasks) + + +if __name__ == '__main__': + log.remove() + log.add(sys.stdout, enqueue=True, level='INFO') + loop = asyncio.get_event_loop() + loop.run_until_complete(do_concurrent("my_db"))