From fc63eec1b60ed957a01b61a57caeb60e41d02dcd Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 12 Jan 2022 08:58:07 +0200 Subject: [PATCH] Add BLPOP support --- server/engine_shard_set.cc | 344 ++++++++++++++++++++++++++++----- server/engine_shard_set.h | 52 +++++ server/list_family.cc | 135 +++++++++++++ server/list_family.h | 1 + server/list_family_test.cc | 160 +++++++++++++++- server/transaction.cc | 377 +++++++++++++++++++++++++++++++++++-- server/transaction.h | 79 ++++++-- 7 files changed, 1070 insertions(+), 78 deletions(-) diff --git a/server/engine_shard_set.cc b/server/engine_shard_set.cc index 11e820575..59a217fe9 100644 --- a/server/engine_shard_set.cc +++ b/server/engine_shard_set.cc @@ -19,6 +19,26 @@ namespace fibers = ::boost::fibers; thread_local EngineShard* EngineShard::shard_ = nullptr; constexpr size_t kQueueLen = 64; +struct WatchItem { + ::boost::intrusive_ptr trans; + + WatchItem(Transaction* t) : trans(t) { + } +}; + +struct EngineShard::WatchQueue { + deque items; + TxId notify_txid = UINT64_MAX; + + // Updated by both coordinator and shard threads but at different times. + enum State { SUSPENDED, ACTIVE } state = SUSPENDED; + + void Suspend() { + state = SUSPENDED; + notify_txid = UINT64_MAX; + } +}; + EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time) : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), db_slice_(pb->GetIndex(), this) { @@ -70,6 +90,16 @@ void EngineShard::PollExecution(Transaction* trans) { DVLOG(1) << "PollExecution " << (trans ? trans->DebugId() : ""); ShardId sid = shard_id(); + uint16_t trans_mask = trans ? trans->GetLocalMask(sid) : 0; + if (trans_mask & Transaction::AWAKED_Q) { + DCHECK(continuation_trans_ == nullptr); + + CHECK_EQ(committed_txid_, trans->notify_txid()) << "TBD"; + bool keep = trans->RunInShard(this); + if (keep) + return; + } + if (continuation_trans_) { if (trans == continuation_trans_) trans = nullptr; @@ -79,77 +109,83 @@ void EngineShard::PollExecution(Transaction* trans) { DVLOG(1) << "RunContTrans: " << continuation_trans_->DebugId() << " keep: " << to_keep; if (!to_keep) { continuation_trans_ = nullptr; + OnTxFinish(); } } - - if (continuation_trans_) { - // Once we start executing transaction we do not continue until it's finished. - // This preserves atomicity property of multi-hop transactions. - return; - } } - DCHECK(!continuation_trans_); - + bool has_awaked_trans = HasAwakedTransaction(); Transaction* head = nullptr; string dbg_id; - while (!txq_.Empty()) { - auto val = txq_.Front(); - head = absl::get(val); + if (continuation_trans_ == nullptr && !has_awaked_trans) { + while (!txq_.Empty()) { + auto val = txq_.Front(); + head = absl::get(val); - // The fact that Tx is in the queue, already means that coordinator fiber will not progress, - // hence here it's enough to test for run_count and check local_mask. - bool is_armed = head->IsArmedInShard(sid); - if (!is_armed) - break; + // The fact that Tx is in the queue, already means that coordinator fiber will not progress, + // hence here it's enough to test for run_count and check local_mask. + bool is_armed = head->IsArmedInShard(sid); + if (!is_armed) + break; - // It could be that head is processed and unblocks multi-hop transaction . - // The transaction will schedule again and will arm another callback. - // Then we will reach invalid state by running trans after this loop, - // which is not what we want. - // This function should not process 2 different callbacks for the same transaction. - // Hence we make sure to reset trans if it has been processed via tx-queue. - if (head == trans) - trans = nullptr; - TxId txid = head->txid(); + // It could be that head is processed and unblocks multi-hop transaction . + // The transaction will schedule again and will arm another callback. + // Then we will reach invalid state by running trans after this loop, + // which is not what we want. + // This function should not process 2 different callbacks for the same transaction. + // Hence we make sure to reset trans if it has been processed via tx-queue. + if (head == trans) + trans = nullptr; + TxId txid = head->txid(); - DCHECK_LT(committed_txid_, txid); + // committed_txid_ is strictly increasing when processed via TxQueue. + DCHECK_LT(committed_txid_, txid); - // We update committed_txid_ before calling RunInShard() to avoid cases where - // a transaction stalls the execution with IO while another fiber queries this shard for - // committed_txid_ (for example during the scheduling). - committed_txid_ = txid; - if (VLOG_IS_ON(2)) { - dbg_id = head->DebugId(); - } + // We update committed_txid_ before calling RunInShard() to avoid cases where + // a transaction stalls the execution with IO while another fiber queries this shard for + // committed_txid_ (for example during the scheduling). + committed_txid_ = txid; + if (VLOG_IS_ON(2)) { + dbg_id = head->DebugId(); + } - bool keep = head->RunInShard(this); - // We should not access head from this point since RunInShard callback decrements refcount. - DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep; + bool keep = head->RunInShard(this); + // We should not access head from this point since RunInShard callback decrements refcount. + DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep; - if (keep) { - continuation_trans_ = head; - break; - } - } + if (keep) { + continuation_trans_ = head; + break; + } - if (!trans) + OnTxFinish(); + } // while(!txq_.Empty()) + } // if (!has_awaked_trans) + + // For SUSPENDED_Q - if transaction has not been notified, it will still be + // in the watch queue. We need to unlock an Execute by running a noop. + if (trans_mask & Transaction::SUSPENDED_Q) { + TxId notify_txid = trans->notify_txid(); + DCHECK(HasResultConverged(notify_txid)); + trans->RunNoop(this); return; - - uint16_t local_mask = trans->GetLocalMask(sid); + } // If trans is out of order, i.e. locks keys that previous transactions have not locked. // It may be that there are other transactions that touch those keys but they necessary ordered // after trans in the queue, hence it's safe to run trans out of order. - if (local_mask & Transaction::OUT_OF_ORDER) { + if (trans && trans_mask & Transaction::OUT_OF_ORDER) { DCHECK(trans != head); + DCHECK(!trans->IsMulti()); // multi, global transactions can not be OOO. + DCHECK(trans_mask & Transaction::ARMED); dbg_id.clear(); if (VLOG_IS_ON(1)) { dbg_id = trans->DebugId(); } + ++stats_.ooo_runs; bool keep = trans->RunInShard(this); DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep; @@ -159,12 +195,230 @@ void EngineShard::PollExecution(Transaction* trans) { } } +// Internal function called from ProcessAwakened(). +// Marks the queue as active and notifies the first transaction in the queue. +Transaction* EngineShard::NotifyWatchQueue(WatchQueue* wq) { + wq->state = WatchQueue::ACTIVE; + + auto& q = wq->items; + ShardId sid = shard_id(); + + do { + const WatchItem& wi = q.front(); + Transaction* head = wi.trans.get(); + + if (head->NotifySuspended(committed_txid_, sid)) { + wq->notify_txid = committed_txid_; + return head; + } + + q.pop_front(); + } while (!q.empty()); + + return nullptr; +} + +// Processes potentially awakened keys and verifies that these are indeed +// awakened to eliminate false positives. +void EngineShard::ProcessAwakened(Transaction* completed_t) { + for (DbIndex index : awakened_indices_) { + WatchTable& wt = watch_map_[index]; + for (auto key : wt.awakened_keys) { + string_view sv_key = static_cast(key); + auto [it, exp_it] = db_slice_.FindExt(index, sv_key); // Double verify we still got the item. + if (IsValid(it)) { + auto w_it = wt.queue_map.find(sv_key); + CHECK(w_it != wt.queue_map.end()); + DVLOG(1) << "NotifyWatchQueue " << key; + Transaction* t2 = NotifyWatchQueue(w_it->second.get()); + if (t2) { + awakened_transactions_.insert(t2); + } + } + } + wt.awakened_keys.clear(); + } + awakened_indices_.clear(); + + if (!completed_t) + return; + + auto& wt = watch_map_[completed_t->db_index()]; + KeyLockArgs lock_args = completed_t->GetLockArgs(shard_id()); + for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { + string_view key = lock_args.args[i]; + auto w_it = wt.queue_map.find(key); + + if (w_it == wt.queue_map.end() || w_it->second->state != WatchQueue::ACTIVE) + continue; + WatchQueue& wq = *w_it->second; + + DCHECK_LE(wq.notify_txid, committed_txid_); + auto& queue = wq.items; + DCHECK(!queue.empty()); // since it's active + if (queue.front().trans == completed_t) { + queue.pop_front(); + + while (!queue.empty()) { + const WatchItem& bi = queue.front(); + Transaction* head = bi.trans.get(); + + if (head->NotifySuspended(wq.notify_txid, shard_id())) + break; + queue.pop_front(); + } + + if (queue.empty()) { + wt.queue_map.erase(w_it); + } + } + } + awakened_transactions_.erase(completed_t); +} + +void EngineShard::AddWatched(string_view key, Transaction* me) { + WatchTable& wt = watch_map_[me->db_index()]; + auto [res, inserted] = wt.queue_map.emplace(key, nullptr); + if (inserted) { + res->second.reset(new WatchQueue); + } + + res->second->items.emplace_back(me); +} + +// Runs in O(N) complexity. +bool EngineShard::RemovedWatched(string_view key, Transaction* me) { + WatchTable& wt = watch_map_[me->db_index()]; + auto watch_it = wt.queue_map.find(key); + CHECK(watch_it != wt.queue_map.end()); + + WatchQueue& wq = *watch_it->second; + for (auto j = wq.items.begin(); j != wq.items.end(); ++j) { + if (j->trans == me) { + wq.items.erase(j); + if (wq.items.empty()) { + wt.queue_map.erase(watch_it); + } + return true; + } + } + + LOG(FATAL) << "should not happen"; + + return false; +} + +void EngineShard::GCWatched(const KeyLockArgs& largs) { + auto& queue_map = watch_map_[largs.db_index].queue_map; + + for (size_t i = 0; i < largs.args.size(); i += largs.key_step) { + auto key = largs.args[i]; + auto watch_it = queue_map.find(key); + CHECK(watch_it != queue_map.end()); + WatchQueue& wq = *watch_it->second; + DCHECK(!wq.items.empty()); + do { + auto local_mask = wq.items.front().trans->GetLocalMask(shard_id()); + if ((local_mask & Transaction::EXPIRED_Q) == 0) { + break; + } + wq.items.pop_front(); + } while (!wq.items.empty()); + + if (wq.items.empty()) { + queue_map.erase(watch_it); + } + } +} + +// Called from commands like lpush. +void EngineShard::AwakeWatched(DbIndex db_index, const MainIterator& main_it) { + auto it = watch_map_.find(db_index); + if (it == watch_map_.end()) + return; + + WatchTable& wt = it->second; + if (wt.queue_map.empty()) { /// No blocked transactions. + return; + } + + string tmp; + string_view db_key = main_it->first; + auto wit = wt.queue_map.find(db_key); + + if (wit == wt.queue_map.end()) + return; /// Similarly, nobody watches this key. + + string_view key = wit->first; + + // Already awakened this key. + if (wt.awakened_keys.find(key) != wt.awakened_keys.end()) + return; + + wt.awakened_keys.insert(wit->first); + awakened_indices_.insert(db_index); +} + void EngineShard::ShutdownMulti(Transaction* multi) { if (continuation_trans_ == multi) { continuation_trans_ = nullptr; } } +void EngineShard::WaitForConvergence(TxId notifyid, Transaction* t) { + DVLOG(1) << "ConvergeNotification " << shard_id(); + waiting_convergence_.emplace(notifyid, t); +} + +void EngineShard::OnTxFinish() { + DCHECK(continuation_trans_ == nullptr); // By definition of OnTxFinish. + + if (waiting_convergence_.empty()) + return; + + if (txq_.Empty()) { + for (const auto& k_v : waiting_convergence_) { + NotifyConvergence(k_v.second); + } + waiting_convergence_.clear(); + return; + } + + TxId txq_score = txq_.HeadScore(); + do { + auto tx_waiting = waiting_convergence_.begin(); + + // Instead of taking the map key, we use upto date notify_txid + // That could meanwhile improve. Not important though. + TxId notifyid = tx_waiting->second->notify_txid(); + if (notifyid > committed_txid_ && txq_score <= tx_waiting->first) + break; + auto nh = waiting_convergence_.extract(tx_waiting); + NotifyConvergence(nh.mapped()); + } while (!waiting_convergence_.empty()); +} + +void EngineShard::NotifyConvergence(Transaction* tx) { + LOG(FATAL) << "TBD"; +} + +// There are several cases that contain proof of convergence for this shard: +// 1. txq_ empty - it means that anything that is goonna be scheduled will already be scheduled +// with txid > notifyid. +// 2. committed_txid_ > notifyid - similarly, this shard can not affect the result with timestamp +// notifyid. +// 3. committed_txid_ == notifyid, then if a transaction in progress (continuation_trans_ != NULL) +// the this transaction can still affect the result, hence we require continuation_trans_ is null +// which will point to converged result @notifyid. +// 4. Finally with committed_txid_ < notifyid and continuation_trans_ == nullptr, +// we can check if the next in line (HeadScore) is after notifyid in that case we can also +// conclude regarding the result convergence for this shard. +bool EngineShard::HasResultConverged(TxId notifyid) const { + return txq_.Empty() || committed_txid_ > notifyid || + (continuation_trans_ == nullptr && + (committed_txid_ == notifyid || txq_.HeadScore() > notifyid)); +} + void EngineShardSet::Init(uint32_t sz) { CHECK_EQ(0u, size()); diff --git a/server/engine_shard_set.h b/server/engine_shard_set.h index 7a729e8d6..02603ced3 100644 --- a/server/engine_shard_set.h +++ b/server/engine_shard_set.h @@ -8,8 +8,11 @@ extern "C" { #include "redis/sds.h" } +#include +#include #include +#include "base/string_view_sso.h" #include "core/tx_queue.h" #include "server/db_slice.h" #include "util/fibers/fiberqueue_threadpool.h" @@ -73,8 +76,34 @@ class EngineShard { return &shard_lock_; } + // Iterates over awakened key candidates in each db and moves verified ones into + // global verified_awakened_ array. + // Returns true if there are active awakened keys, false otherwise. + // It has 2 responsibilities. + // 1: to go over potential wakened keys, verify them and activate watch queues. + // 2: if t is awaked and finished running - to remove it from the head + // of the queue and notify the next one. + // If t is null then second part is omitted. + void ProcessAwakened(Transaction* t); + + // Blocking API + // TODO: consider moving all watched functions to + // EngineShard with separate per db map. + //! AddWatched adds a transaction to the blocking queue. + void AddWatched(std::string_view key, Transaction* me); + bool RemovedWatched(std::string_view key, Transaction* me); + void GCWatched(const KeyLockArgs& lock_args); + + void AwakeWatched(DbIndex db_index, const MainIterator& it); + + bool HasAwakedTransaction() const { + return !awakened_transactions_.empty(); + } + // TODO: Awkward interface. I should solve it somehow. void ShutdownMulti(Transaction* multi); + void WaitForConvergence(TxId notifyid, Transaction* t); + bool HasResultConverged(TxId notifyid) const; void IncQuickRun() { stats_.quick_runs++; @@ -90,6 +119,29 @@ class EngineShard { private: EngineShard(util::ProactorBase* pb, bool update_db_time); + struct WatchQueue; + + void OnTxFinish(); + void NotifyConvergence(Transaction* tx); + + /// Returns the notified transaction, + /// or null if all transactions in the queue have expired.. + Transaction* NotifyWatchQueue(WatchQueue* wq); + + struct WatchTable { + absl::flat_hash_map> queue_map; + + // awakened keys that point to blocked entries that can potentially be unblocked. + // reference watched keys. + absl::flat_hash_set awakened_keys; + }; + + absl::flat_hash_map watch_map_; + absl::flat_hash_set awakened_indices_; + absl::flat_hash_set awakened_transactions_; + + absl::btree_multimap waiting_convergence_; + ::util::fibers_ext::FiberQueue queue_; ::boost::fibers::fiber fiber_q_; diff --git a/server/list_family.cc b/server/list_family.cc index e0b8bcc54..e925967a9 100644 --- a/server/list_family.cc +++ b/server/list_family.cc @@ -97,6 +97,101 @@ OpResult ListPop(DbIndex db_ind, const MainValue& mv, ListDir dir) { return res; } +class BPopper { + public: + explicit BPopper(); + + // Returns WRONG_TYPE, OK. + // If OK is returned then use result() to fetch the value. + OpStatus Run(Transaction* t, unsigned msec); + + const std::pair result() const { + return std::pair(key_, value_); + } + + bool found() const { + return found_; + } + + private: + OpStatus Pop(Transaction* t, EngineShard* shard); + + bool found_ = false; + MainIterator find_it_; + ShardId find_sid_ = std::numeric_limits::max(); + + std::string key_, value_; +}; + +BPopper::BPopper() { +} + +OpStatus BPopper::Run(Transaction* t, unsigned msec) { + OpResult result; + using time_point = Transaction::time_point; + + time_point tp = + msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max(); + bool is_multi = t->IsMulti(); + if (!is_multi) { + t->Schedule(); + } + + while (true) { + result = t->FindFirst(); + + if (result) + break; + + if (result.status() != OpStatus::KEY_NOTFOUND) { // Some error occurred. + // We could be registered in the queue due to previous iterations. + t->UnregisterWatch(); + + return result.status(); + } + + if (is_multi) { + auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + t->Execute(std::move(cb), true); + return OpStatus::TIMED_OUT; + } + + if (!t->WaitOnWatch(tp)) { + return OpStatus::TIMED_OUT; + } + } + + DCHECK_EQ(OpStatus::OK, result.status()); + + VLOG(1) << "Popping an element"; + find_sid_ = result->sid; + find_it_ = result->find_res; + found_ = true; + + auto cb = [this](Transaction* t, EngineShard* shard) { return Pop(t, shard); }; + t->Execute(std::move(cb), true); + + return OpStatus::OK; +} + +OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { + DCHECK(found()); + + if (shard->shard_id() == find_sid_) { + key_ = find_it_->first; + + OpResult res = ListPop(t->db_index(), find_it_->second, ListDir::LEFT); + CHECK(res.ok()); + value_ = std::move(res.value()); + + quicklist* ql = GetQL(find_it_->second); + if (quicklistCount(ql) == 0) { + CHECK(shard->db_slice().Del(t->db_index(), find_it_)); + } + } + return OpStatus::OK; +} + } // namespace void ListFamily::LPush(CmdArgList args, ConnectionContext* cntx) { @@ -150,6 +245,42 @@ void ListFamily::LIndex(CmdArgList args, ConnectionContext* cntx) { } } +void ListFamily::BLPop(CmdArgList args, ConnectionContext* cntx) { + DCHECK_GE(args.size(), 3u); + + float timeout; + auto timeout_str = ArgS(args, args.size() - 1); + if (!absl::SimpleAtof(timeout_str, &timeout)) { + return cntx->SendError("timeout is not a float or out of range"); + } + if (timeout < 0) { + return cntx->SendError("timeout is negative"); + } + VLOG(1) << "BLPop start " << timeout; + + Transaction* transaction = cntx->transaction; + BPopper popper; + OpStatus result = popper.Run(transaction, unsigned(timeout * 1000)); + + switch (result) { + case OpStatus::WRONG_TYPE: + return cntx->SendError(kWrongTypeErr); + case OpStatus::OK: + break; + case OpStatus::TIMED_OUT: + return cntx->SendNullArray(); + default: + LOG(FATAL) << "Unexpected error " << result; + } + + CHECK(popper.found()); + VLOG(1) << "BLPop returned "; + + auto res = popper.result(); + std::string_view str_arr[2] = {res.first, res.second}; + return cntx->SendStringArr(str_arr); +} + void ListFamily::PushGeneric(ListDir dir, const CmdArgList& args, ConnectionContext* cntx) { std::string_view key = ArgS(args, 1); vector vals(args.size() - 2); @@ -219,6 +350,9 @@ OpResult ListFamily::OpPush(const OpArgs& op_args, std::string_view ke quicklistPush(ql, es->tmp_str, sdslen(es->tmp_str), pos); } + if (new_key) { + es->AwakeWatched(op_args.db_ind, it); + } return quicklistCount(ql); } @@ -275,6 +409,7 @@ void ListFamily::Register(CommandRegistry* registry) { << CI{"LPOP", CO::WRITE | CO::FAST | CO::DENYOOM, 2, 1, 1, 1}.HFUNC(LPop) << CI{"RPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPush) << CI{"RPOP", CO::WRITE | CO::FAST | CO::DENYOOM, 2, 1, 1, 1}.HFUNC(RPop) + << CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop) << CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen) << CI{"LINDEX", CO::READONLY, 3, 1, 1, 1}.HFUNC(LIndex); } diff --git a/server/list_family.h b/server/list_family.h index 151c66e06..54c713e92 100644 --- a/server/list_family.h +++ b/server/list_family.h @@ -22,6 +22,7 @@ class ListFamily { static void RPush(CmdArgList args, ConnectionContext* cntx); static void LPop(CmdArgList args, ConnectionContext* cntx); static void RPop(CmdArgList args, ConnectionContext* cntx); + static void BLPop(CmdArgList args, ConnectionContext* cntx); static void LLen(CmdArgList args, ConnectionContext* cntx); static void LIndex(CmdArgList args, ConnectionContext* cntx); diff --git a/server/list_family_test.cc b/server/list_family_test.cc index e422a361c..826d73d69 100644 --- a/server/list_family_test.cc +++ b/server/list_family_test.cc @@ -19,16 +19,21 @@ using namespace testing; using namespace std; using namespace util; -using namespace boost; +namespace this_fiber = ::boost::this_fiber; +namespace fibers = ::boost::fibers; namespace dfly { class ListFamilyTest : public BaseFamilyTest { protected: + ListFamilyTest() { + num_threads_ = 4; + } }; const char* kKey1 = "x"; const char* kKey2 = "b"; +const char* kKey3 = "c"; TEST_F(ListFamilyTest, Basic) { auto resp = Run({"lpush", kKey1, "1"}); @@ -53,4 +58,157 @@ TEST_F(ListFamilyTest, Expire) { EXPECT_THAT(resp[0], IntArg(1)); } + +TEST_F(ListFamilyTest, BLPopUnblocking) { + auto resp = Run({"lpush", kKey1, "1"}); + EXPECT_THAT(resp[0], IntArg(1)); + resp = Run({"lpush", kKey2, "2"}); + ASSERT_THAT(resp, ElementsAre(IntArg(1))); + + resp = Run({"blpop", kKey1, kKey2}); // missing "0" delimiter. + ASSERT_THAT(resp[0], ErrArg("timeout is not a float")); + + resp = Run({"blpop", kKey1, kKey2, "0"}); + ASSERT_EQ(2, GetDebugInfo().shards_count); + EXPECT_THAT(resp, ElementsAre(kKey1, "1")); + + resp = Run({"blpop", kKey1, kKey2, "0"}); + EXPECT_THAT(resp, ElementsAre(kKey2, "2")); + + Run({"set", "z", "1"}); + + resp = Run({"blpop", "z", "0"}); + ASSERT_THAT(resp[0], ErrArg("WRONGTYPE ")); + + ASSERT_FALSE(IsLocked(0, "x")); + ASSERT_FALSE(IsLocked(0, "y")); + ASSERT_FALSE(IsLocked(0, "z")); +} + +TEST_F(ListFamilyTest, BLPopBlocking) { + RespVec resp0, resp1; + + // Run the fiber at creation. + auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + resp0 = Run({"blpop", "x", "0"}); + LOG(INFO) << "pop0"; + }); + + this_fiber::sleep_for(50us); + /*auto fb1 = pp_->at(1)->LaunchFiber([&] { + resp1 = Run({"blpop", "x", "0"}); + LOG(INFO) << "pop1"; + });*/ + this_fiber::sleep_for(30us); + pp_->at(1)->AwaitBlocking([&] { Run({"lpush", "x", "2", "1"}); }); + + fb0.join(); + // fb1.join(); + + // fb0 should start first and be the first transaction blocked. Therefore, it should pop '1'. + // sometimes order is switched, need to think how to fix it. + int64_t epoch0 = GetDebugInfo("IO0").clock; + int64_t epoch1 = GetDebugInfo("IO1").clock; + ASSERT_LT(epoch0, epoch1); + + EXPECT_THAT(resp0, ElementsAre("x", "1")); + ASSERT_FALSE(IsLocked(0, "x")); +} + +TEST_F(ListFamilyTest, BLPopMultiple) { + RespVec resp0, resp1; + + resp0 = Run({"blpop", kKey1, kKey2, "0.01"}); // timeout + EXPECT_THAT(resp0, ElementsAre(ArgType(RespExpr::NIL_ARRAY))); + ASSERT_EQ(2, GetDebugInfo().shards_count); + + ASSERT_FALSE(IsLocked(0, kKey1)); + ASSERT_FALSE(IsLocked(0, kKey2)); + + auto fb1 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + resp0 = Run({"blpop", kKey1, kKey2, "0"}); + }); + + pp_->at(1)->AwaitBlocking([&] { Run({"lpush", kKey1, "1", "2", "3"}); }); + fb1.join(); + EXPECT_THAT(resp0, ElementsAre(StrArg(kKey1), StrArg("3"))); + ASSERT_FALSE(IsLocked(0, kKey1)); + ASSERT_FALSE(IsLocked(0, kKey2)); + ess_->RunBriefInParallel([](EngineShard* es) { ASSERT_FALSE(es->HasAwakedTransaction()); }); +} + +TEST_F(ListFamilyTest, BLPopTimeout) { + RespVec resp = Run({"blpop", kKey1, kKey2, kKey3, "0.01"}); + EXPECT_THAT(resp[0], ArgType(RespExpr::NIL_ARRAY)); + EXPECT_EQ(3, GetDebugInfo().shards_count); + ASSERT_FALSE(service_->IsLocked(0, kKey1)); + + // Under Multi + resp = Run({"multi"}); + ASSERT_THAT(resp, RespEq("OK")); + + Run({"blpop", kKey1, "0"}); + resp = Run({"exec"}); + + EXPECT_THAT(resp, ElementsAre(ArgType(RespExpr::NIL_ARRAY))); + ASSERT_FALSE(service_->IsLocked(0, kKey1)); +} + +TEST_F(ListFamilyTest, BLPopSerialize) { + RespVec blpop_resp; + + auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"}); + }); + + do { + this_fiber::sleep_for(30us); + } while (!IsLocked(0, kKey1)); + + LOG(INFO) << "Starting multi"; + + TxClock cl1, cl2; + unsigned key1_len1 = 0, key1_len2 = 0; + + auto p1_fb = pp_->at(1)->LaunchFiber([&] { + auto resp = Run({"multi"}); // We use multi to assign ts to lpush. + ASSERT_THAT(resp, RespEq("OK")); + Run({"lpush", kKey1, "A"}); + resp = Run({"exec"}); + + // Either this lpush has run first or the one below. + // In any case it must be that between 2 invocations of lpush (wrapped in multi) + // blpop will be triggerred and it will empty the list again. Hence, in any case + // lpush kKey1 here and below should return 1. + EXPECT_THAT(resp, ElementsAre(IntArg(1))); + key1_len1 = get(resp[0].u); + cl1 = GetDebugInfo("IO1").clock; + LOG(INFO) << "push1 ts: " << cl1; + }); + + auto p2_fb = pp_->at(2)->LaunchFiber([&] { + auto resp = Run({"multi"}); // We use multi to assign ts to lpush. + ASSERT_THAT(resp, RespEq("OK")); + Run({"lpush", kKey1, "B"}); + Run({"lpush", kKey2, "C"}); + resp = Run({"exec"}); + EXPECT_THAT(resp, ElementsAre(IntArg(1), IntArg(1))); + key1_len2 = get(resp[0].u); + cl2 = GetDebugInfo("IO2").clock; + LOG(INFO) << "push2 ts: " << cl2; + }); + + p1_fb.join(); + p2_fb.join(); + + pop_fb.join(); + EXPECT_THAT(blpop_resp, ElementsAre(StrArg(kKey1), ArgType(RespExpr::STRING))); + + if (cl2 < cl1) { + EXPECT_EQ(blpop_resp[1], "B"); + } else { + EXPECT_EQ(blpop_resp[1], "A"); + } +} + } // namespace dfly diff --git a/server/transaction.cc b/server/transaction.cc index c6a455d62..751ae1b2c 100644 --- a/server/transaction.cc +++ b/server/transaction.cc @@ -24,6 +24,72 @@ std::atomic_uint64_t op_seq{1}; } // namespace +struct Transaction::FindFirstProcessor { + public: + FindFirstProcessor(TxId notify, unsigned size) + : find_res_(size, OpStatus::KEY_NOTFOUND), notify_txid_(notify) { + } + + void Find(Transaction* t); + + OpResult Process(Transaction* t); + + private: + OpStatus RunInShard(Transaction* t, EngineShard* shard); + + // Holds Find results: (iterator to a found key, and its index in the passed arguments). + // See DbSlice::FindFirst for more details. + // spans all the shards for now. + std::vector>> find_res_; + TxId notify_txid_; +}; + +void Transaction::FindFirstProcessor::Find(Transaction* t) { + VLOG(2) << "FindFirst::Find " << t->DebugId(); + + t->Execute([this](auto* t, auto* s) { return RunInShard(t, s); }, false); +} + +OpStatus Transaction::FindFirstProcessor::RunInShard(Transaction* t, EngineShard* shard) { + if (notify_txid_ == kuint64max || shard->committed_txid() == notify_txid_) { + // TODO: to add timestamp logic that provides consistency guarantees for blocking transactions. + auto args = t->ShardArgsInShard(shard->shard_id()); + find_res_[shard->shard_id()] = shard->db_slice().FindFirst(t->db_index(), args); + } + return OpStatus::OK; +} + +OpResult Transaction::FindFirstProcessor::Process(Transaction* t) { + uint32_t min_arg_indx = UINT32_MAX; + + FindFirstResult result; + for (size_t sid = 0; sid < find_res_.size(); ++sid) { + const auto& fr = find_res_[sid]; + auto status = fr.status(); + if (status == OpStatus::KEY_NOTFOUND) + continue; + + if (status == OpStatus::WRONG_TYPE) { + return status; + } + + DCHECK(fr && IsValid(fr->first)); + const auto& it_pos = fr.value(); + + size_t arg_indx = t->ReverseArgIndex(sid, it_pos.second); + if (arg_indx < min_arg_indx) { + min_arg_indx = arg_indx; + result.sid = sid; + result.find_res = it_pos.first; + } + } + + if (result.sid == kInvalidSid) { + return OpStatus::KEY_NOTFOUND; + } + return result; +} + IntentLock::Mode Transaction::Mode() const { return (trans_options_ & CO::READONLY) ? IntentLock::SHARED : IntentLock::EXCLUSIVE; } @@ -164,7 +230,7 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) { } CHECK(next_arg == args_.end()); - DVLOG(1) << "InitByArgs " << DebugId(); + DVLOG(1) << "InitByArgs " << DebugId() << " " << args_.front(); if (unique_shard_cnt_ == 1) { PerShardData* sd; @@ -224,18 +290,25 @@ bool Transaction::RunInShard(EngineShard* shard) { DCHECK(sd.local_mask & ARMED); sd.local_mask &= ~ARMED; + DCHECK_EQ(sd.local_mask & (SUSPENDED_Q | EXPIRED_Q), 0); + + bool awaked_prerun = (sd.local_mask & AWAKED_Q) != 0; + // For multi we unlock transaction (i.e. its keys) in UnlockMulti() call. // Therefore we differentiate between concluding, which says that this specific // runnable concludes current operation, and should_release which tells // whether we should unlock the keys. should_release is false for multi and // equal to concluding otherwise. - bool should_release = is_concluding_cb_ && !multi_; + bool should_release = (coordinator_state_ & COORD_EXEC_CONCLUDING) && !multi_; + IntentLock::Mode mode = Mode(); // We make sure that we lock exactly once for each (multi-hop) transaction inside // multi-transactions. if (multi_ && ((sd.local_mask & KEYLOCK_ACQUIRED) == 0)) { + DCHECK(!awaked_prerun); // we should not have blocking transaction inside multi block. + sd.local_mask |= KEYLOCK_ACQUIRED; - shard->db_slice().Acquire(Mode(), GetLockArgs(idx)); + shard->db_slice().Acquire(mode, GetLockArgs(idx)); } DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED)); @@ -264,7 +337,10 @@ bool Transaction::RunInShard(EngineShard* shard) { // If it's a final hop we should release the locks. if (should_release) { + bool is_suspended = sd.local_mask & SUSPENDED_Q; + if (IsGlobal()) { + DCHECK(!awaked_prerun && !is_suspended); // Global transactions can not be blocking. shard->shard_lock()->Release(Mode()); } else { // not global. KeyLockArgs largs = GetLockArgs(idx); @@ -272,9 +348,16 @@ bool Transaction::RunInShard(EngineShard* shard) { // If a transaction has been suspended, we keep the lock so that future transaction // touching those keys will be ordered via TxQueue. It's necessary because we preserve // the atomicity of awaked transactions by halting the TxQueue. - shard->db_slice().Release(Mode(), largs); - sd.local_mask &= ~KEYLOCK_ACQUIRED; + if (!is_suspended) { + shard->db_slice().Release(mode, largs); + sd.local_mask &= ~KEYLOCK_ACQUIRED; + } sd.local_mask &= ~OUT_OF_ORDER; + // It has 2 responsibilities. + // 1: to go over potential wakened keys, verify them and activate watch queues. + // 2: if this transaction was notified and finished running - to remove it from the head + // of the queue and notify the next one. + shard->ProcessAwakened(awaked_prerun ? this : nullptr); } } @@ -284,11 +367,43 @@ bool Transaction::RunInShard(EngineShard* shard) { return !should_release; // keep } -void Transaction::ScheduleInternal(bool single_hop) { +void Transaction::RunNoop(EngineShard* shard) { + DVLOG(1) << "RunNoop " << DebugId(); + + unsigned idx = SidToId(shard->shard_id()); + auto& sd = shard_data_[idx]; + DCHECK(sd.local_mask & ARMED); + DCHECK(sd.local_mask & KEYLOCK_ACQUIRED); + DCHECK(!multi_); + DCHECK(!IsGlobal()); + + sd.local_mask &= ~ARMED; + + if (unique_shard_cnt_ == 1) { + cb_ = nullptr; + local_result_ = OpStatus::OK; + } + + if (coordinator_state_ & COORD_EXEC_CONCLUDING) { + KeyLockArgs largs = GetLockArgs(idx); + shard->db_slice().Release(Mode(), largs); + sd.local_mask &= ~KEYLOCK_ACQUIRED; + + if (sd.local_mask & SUSPENDED_Q) { + sd.local_mask |= EXPIRED_Q; + shard->GCWatched(largs); + } + } + // Decrease run count after we update all the data in the transaction object. + CHECK_GE(DecreaseRunCnt(), 1u); +} + +void Transaction::ScheduleInternal() { DCHECK_EQ(0u, txid_); + DCHECK_EQ(0, coordinator_state_ & (COORD_SCHED | COORD_OOO)); bool span_all = IsGlobal(); - bool out_of_order = false; + bool single_hop = (coordinator_state_ & COORD_EXEC_CONCLUDING); uint32_t num_shards; std::function is_active; @@ -297,6 +412,7 @@ void Transaction::ScheduleInternal(bool single_hop) { if (span_all) { is_active = [](uint32_t) { return true; }; num_shards = ess_->size(); + // Lock shards auto cb = [mode](EngineShard* shard) { shard->shard_lock()->Acquire(mode); }; ess_->RunBriefInParallel(std::move(cb)); @@ -331,10 +447,11 @@ void Transaction::ScheduleInternal(bool single_hop) { // OOO can not happen with span-all transactions. We ensure it in ScheduleInShard when we // refuse to acquire locks for these transactions.. DCHECK(!span_all); - out_of_order = true; + coordinator_state_ |= COORD_OOO; } - DVLOG(1) << "Scheduled " << DebugId() << " OutOfOrder: " << out_of_order; - + DVLOG(1) << "Scheduled " << DebugId() + << " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO); + coordinator_state_ |= COORD_SCHED; break; } @@ -348,7 +465,7 @@ void Transaction::ScheduleInternal(bool single_hop) { CHECK_EQ(0u, success.load(memory_order_relaxed)); } - if (out_of_order) { + if (IsOOO()) { for (auto& sd : shard_data_) { sd.local_mask |= OUT_OF_ORDER; } @@ -363,6 +480,9 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { cb_ = std::move(cb); + // single hop -> concluding. + coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING); + bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !multi_; if (schedule_fast) { // Single shard (local) optimization. // We never resize shard_data because that would affect MULTI transaction correctness. @@ -395,8 +515,8 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { } else { // Transaction spans multiple shards or it's global (like flushdb) or multi. if (!multi_) - ScheduleInternal(true); - ExecuteAsync(true); + ScheduleInternal(); + ExecuteAsync(); } DVLOG(1) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load(); @@ -454,6 +574,9 @@ void Transaction::UnlockMulti() { } shard->ShutdownMulti(this); + + // notify awakened transactions. + shard->ProcessAwakened(nullptr); shard->PollExecution(nullptr); this->DecreaseRunCnt(); @@ -474,8 +597,15 @@ void Transaction::UnlockMulti() { // Runs in coordinator thread. void Transaction::Execute(RunnableType cb, bool conclude) { cb_ = std::move(cb); + coordinator_state_ |= COORD_EXEC; - ExecuteAsync(conclude); + if (conclude) { + coordinator_state_ |= COORD_EXEC_CONCLUDING; + } else { + coordinator_state_ &= ~COORD_EXEC_CONCLUDING; + } + + ExecuteAsync(); DVLOG(1) << "Wait on Exec " << DebugId(); WaitForShardCallbacks(); @@ -485,10 +615,8 @@ void Transaction::Execute(RunnableType cb, bool conclude) { } // Runs in coordinator thread. -void Transaction::ExecuteAsync(bool concluding_cb) { - DVLOG(1) << "ExecuteAsync " << DebugId() << " concluding " << concluding_cb; - - is_concluding_cb_ = concluding_cb; +void Transaction::ExecuteAsync() { + DVLOG(1) << "ExecuteAsync " << DebugId(); DCHECK_GT(unique_shard_cnt_, 0u); DCHECK_GT(use_count_.load(memory_order_relaxed), 0u); @@ -584,6 +712,52 @@ void Transaction::RunQuickie(EngineShard* shard) { cb_ = nullptr; // We can do it because only a single shard runs the callback. } +// runs in coordinator thread. +// Marks the transaction as expired but does not remove it from the waiting queue. +void Transaction::ExpireBlocking() { + DVLOG(1) << "ExpireBlocking " << DebugId(); + DCHECK(!IsGlobal()); + + run_count_.store(unique_shard_cnt_, memory_order_release); + + auto expire_cb = [this] { + EngineShard* shard = EngineShard::tlocal(); + + auto lock_args = GetLockArgs(shard->shard_id()); + shard->db_slice().Release(Mode(), lock_args); + + unsigned sd_idx = SidToId(shard->shard_id()); + auto& sd = shard_data_[sd_idx]; + sd.local_mask |= EXPIRED_Q; + sd.local_mask &= ~KEYLOCK_ACQUIRED; + + // Need to see why I decided to call this. + // My guess - probably to trigger the run of stalled transactions in case + // this shard concurrently awoke this transaction and stalled the processing + // of TxQueue. + shard->PollExecution(nullptr); + + CHECK_GE(DecreaseRunCnt(), 1u); + }; + + if (unique_shard_cnt_ == 1) { + DCHECK_LT(unique_shard_id_, ess_->size()); + ess_->Add(unique_shard_id_, std::move(expire_cb)); + } else { + for (ShardId i = 0; i < shard_data_.size(); ++i) { + auto& sd = shard_data_[i]; + DCHECK_EQ(0, sd.local_mask & ARMED); + if (sd.arg_count == 0) + continue; + ess_->Add(i, expire_cb); + } + } + + // Wait for all callbacks to conclude. + WaitForShardCallbacks(); + DVLOG(1) << "ExpireBlocking finished " << DebugId(); +} + const char* Transaction::Name() const { return cid_->name(); } @@ -744,6 +918,123 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { return reverse_index_[shard_data_[shard_id].arg_start + arg_index]; } +bool Transaction::WaitOnWatch(const time_point& tp) { + // Assumes that transaction is pending and scheduled. TODO: To verify it with state machine. + VLOG(2) << "WaitOnWatch Start use_count(" << use_count() << ")"; + using namespace chrono; + + // wake_txid_.store(kuint64max, std::memory_order_relaxed); + Execute([](auto* t, auto* shard) { return t->AddToWatchedShardCb(shard); }, true); + coordinator_state_ |= COORD_BLOCKED; + bool res = true; // returns false if timeout occurs. + + auto wake_cb = [this] { + return (coordinator_state_ & COORD_CANCELLED) || + notify_txid_.load(memory_order_relaxed) != kuint64max; + }; + cv_status status = cv_status::no_timeout; + + if (tp == time_point::max()) { + DVLOG(1) << "WaitOnWatch foreva " << DebugId(); + blocking_ec_.await(move(wake_cb)); + DVLOG(1) << "WaitOnWatch AfterWait"; + } else { + DVLOG(1) << "WaitOnWatch TimeWait for " + << duration_cast(tp - time_point::clock::now()).count() << " ms"; + + status = blocking_ec_.await_until(move(wake_cb), tp); + + DVLOG(1) << "WaitOnWatch await_until " << int(status); + } + + if ((coordinator_state_ & COORD_CANCELLED) || status == cv_status::timeout) { + ExpireBlocking(); + coordinator_state_ &= ~COORD_BLOCKED; + return false; + } + + // We were notified by a shard, so lets make sure that our notifications converged to a stable + // form. + if (unique_shard_cnt_ > 1) { + run_count_.store(unique_shard_cnt_, memory_order_release); + auto converge_cb = [this] { + EngineShard* shard = EngineShard::tlocal(); + auto& sd = shard_data_[shard->shard_id()]; + + TxId notify = notify_txid(); + if ((sd.local_mask & AWAKED_Q) || shard->HasResultConverged(notify)) { + CHECK_GE(DecreaseRunCnt(), 1u); + return; + } + shard->WaitForConvergence(notify, this); + }; + + for (ShardId i = 0; i < shard_data_.size(); ++i) { + auto& sd = shard_data_[i]; + DCHECK_EQ(0, sd.local_mask & ARMED); + if (sd.arg_count == 0) + continue; + ess_->Add(i, converge_cb); + } + + // Wait for all callbacks to conclude. + WaitForShardCallbacks(); + DVLOG(1) << "Convergence finished " << DebugId(); + } + + // Lift blocking mask. + coordinator_state_ &= ~COORD_BLOCKED; + + return res; +} + +void Transaction::UnregisterWatch() { + auto cb = [](Transaction* t, EngineShard* shard) { + t->RemoveFromWatchedShardCb(shard); + return OpStatus::OK; + }; + Execute(std::move(cb), true); +} + +// Runs only in the shard thread. +OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) { + ShardId sid = SidToId(shard->shard_id()); + + auto& sd = shard_data_[sid]; + CHECK_EQ(0, sd.local_mask & SUSPENDED_Q); + DCHECK_EQ(0, sd.local_mask & ARMED); + + auto args = ShardArgsInShard(shard->shard_id()); + for (auto s : args) { + shard->AddWatched(s, this); + } + sd.local_mask |= SUSPENDED_Q; + + return OpStatus::OK; +} + +// Runs only in the shard thread. +// Quadratic complexity in number of arguments and queue length. +bool Transaction::RemoveFromWatchedShardCb(EngineShard* shard) { + ShardId sid = SidToId(shard->shard_id()); + auto& sd = shard_data_[sid]; + + constexpr uint16_t kQueueMask = + -Transaction::SUSPENDED_Q | Transaction::AWAKED_Q | Transaction::EXPIRED_Q; + + if ((sd.local_mask & kQueueMask) == 0) + return false; + + sd.local_mask &= kQueueMask; + + // TODO: what if args have keys and values? + auto args = ShardArgsInShard(shard->shard_id()); + for (auto s : args) { + shard->RemovedWatched(s, this); + } + return true; +} + inline uint32_t Transaction::DecreaseRunCnt() { // to protect against cases where Transaction is destroyed before run_ec_.notify // finishes running. We can not put it inside the (res == 1) block because then it's too late. @@ -751,7 +1042,6 @@ inline uint32_t Transaction::DecreaseRunCnt() { // We use release so that no stores will be reordered after. uint32_t res = run_count_.fetch_sub(1, std::memory_order_release); - if (res == 1) { run_ec_.notify(); } @@ -762,4 +1052,53 @@ bool Transaction::IsGlobal() const { return (trans_options_ & CO::GLOBAL_TRANS) != 0; } +// Runs only in the shard thread. +bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { + unsigned sd_id = SidToId(sid); + auto& sd = shard_data_[sd_id]; + unsigned local_mask = sd.local_mask; + CHECK_NE(0u, local_mask & SUSPENDED_Q); + DVLOG(1) << "NotifyBlocked " << DebugId() << ", local_mask: " << local_mask; + + if (local_mask & Transaction::EXPIRED_Q) { + return false; + } + + if (local_mask & SUSPENDED_Q) { + DCHECK_EQ(0u, local_mask & AWAKED_Q); + + sd.local_mask &= ~SUSPENDED_Q; + sd.local_mask |= AWAKED_Q; + + TxId notify_id = notify_txid_.load(memory_order_relaxed); + + while (committed_txid < notify_id) { + if (notify_txid_.compare_exchange_weak(notify_id, committed_txid, memory_order_relaxed)) { + // if we improved notify_txid_ - break. + blocking_ec_.notify(); // release barrier. + break; + } + } + return true; + } + + CHECK(sd.local_mask & AWAKED_Q); + return true; +} + +void Transaction::BreakOnClose() { + if (coordinator_state_ & COORD_BLOCKED) { + coordinator_state_ |= COORD_CANCELLED; + blocking_ec_.notify(); + } +} + +auto Transaction::FindFirst() -> OpResult { + FindFirstProcessor processor(notify_txid_.load(memory_order_relaxed), ess_->size()); + + processor.Find(this); + + return processor.Process(this); +} + } // namespace dfly diff --git a/server/transaction.h b/server/transaction.h index 5a4c00a08..8a587e261 100644 --- a/server/transaction.h +++ b/server/transaction.h @@ -51,6 +51,9 @@ class Transaction { ARMED = 1, // Transaction was armed with the callback OUT_OF_ORDER = 2, KEYLOCK_ACQUIRED = 4, + SUSPENDED_Q = 0x10, // added by the coordination flow (via WaitBlocked()). + AWAKED_Q = 0x20, // awaked by condition (lpush etc) + EXPIRED_Q = 0x40, // timed-out and should be garbage collected from the blocking queue. }; Transaction(const CommandId* cid, EngineShardSet* ess); @@ -96,7 +99,7 @@ class Transaction { // Schedules a transaction. Usually used for multi-hop transactions like Rename or BLPOP. // For single hop, use ScheduleSingleHop instead. void Schedule() { - ScheduleInternal(false); + ScheduleInternal(); } // if conclude is true, removes the transaction from the pending queue. @@ -138,6 +141,10 @@ class Transaction { return unique_shard_cnt_; } + TxId notify_txid() const { + return notify_txid_.load(std::memory_order_relaxed); + } + bool IsMulti() const { return bool(multi_); } @@ -145,25 +152,58 @@ class Transaction { bool IsGlobal() const; bool IsOOO() const { - return false; + return coordinator_state_ & COORD_OOO; } EngineShardSet* shard_set() { return ess_; } + // Registers transaction into watched queue and blocks until a) either notification is received. + // or b) tp is reached. If tp is time_point::max() then waits indefinitely. + // Expects that the transaction had been scheduled before, and uses Execute(.., true) to register. + // Returns false if timeout ocurred, true if was notified by one of the keys. + bool WaitOnWatch(const time_point& tp); + void UnregisterWatch(); + + // Returns true if transaction is awaked, false if it's timed-out and can be removed from the + // blocking queue. NotifySuspended may be called from (multiple) shard threads and + // with each call potentially improving the minimal wake_txid at which + // this transaction has been awaked. + bool NotifySuspended(TxId committed_ts, ShardId sid); + + void BreakOnClose(); + // Called by EngineShard when performing Execute over the tx queue. // Returns true if transaction should be kept in the queue. bool RunInShard(EngineShard* shard); + void RunNoop(EngineShard* shard); + + //! Returns locking arguments needed for DbSlice to Acquire/Release transactional locks. + //! Runs in the shard thread. + KeyLockArgs GetLockArgs(ShardId sid) const; + + // TODO: iterators do not survive between hops. + // It could happen that FindFirst returns a result but then a different transaction + // grows the table and invalidates find_res. We should return a key, unfortunately, + // and not the iterator. + struct FindFirstResult { + MainIterator find_res; + ShardId sid = kInvalidSid; + }; + + OpResult FindFirst(); + private: unsigned SidToId(ShardId sid) const { return sid < shard_data_.size() ? sid : 0; } - void ScheduleInternal(bool single_hop); + void ScheduleInternal(); - void ExecuteAsync(bool concluding_cb); + void ExpireBlocking(); + void ExecuteAsync(); // Optimized version of RunInShard for single shard uncontended cases. void RunQuickie(EngineShard* shard); @@ -180,9 +220,9 @@ class Transaction { // Returns true if operation was cancelled for this shard. Runs in the shard thread. bool CancelInShard(EngineShard* shard); - //! Returns locking arguments needed for DbSlice to Acquire/Release transactional locks. - //! Runs in the shard thread. - KeyLockArgs GetLockArgs(ShardId sid) const; + // Shard callbacks used within Execute calls + OpStatus AddToWatchedShardCb(EngineShard* shard); + bool RemoveFromWatchedShardCb(EngineShard* shard); void WaitForShardCallbacks() { run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); }); @@ -199,6 +239,8 @@ class Transaction { return use_count_.load(std::memory_order_relaxed); } + struct FindFirstProcessor; + struct PerShardData { uint32_t arg_start = 0; // Indices into args_ array. uint16_t arg_count = 0; @@ -249,6 +291,7 @@ class Transaction { const CommandId* cid_; EngineShardSet* ess_; TxId txid_{0}; + std::atomic notify_txid_{kuint64max}; std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0}; @@ -258,16 +301,26 @@ class Transaction { uint32_t trans_options_ = 0; ShardId unique_shard_id_{kInvalidSid}; - DbIndex db_index_ = 0; - // For single-hop transactions with unique_shards_ == 1, hence no data-race. + // Used for single-hop transactions with unique_shards_ == 1, hence no data-race. OpStatus local_result_ = OpStatus::OK; - // NOTE: to move to bitmask if it grows. - // Written by coordinator thread, read by shard threads but not concurrently. - // Says whether the current callback function is concluding for this operation. - bool is_concluding_cb_{true}; + enum CoordinatorState : uint8_t { + COORD_SCHED = 1, + COORD_EXEC = 2, + // We are running the last execution step in multi-hop operation. + COORD_EXEC_CONCLUDING = 4, + COORD_BLOCKED = 8, + COORD_CANCELLED = 0x10, + COORD_OOO = 0x20, + }; + + // Transaction coordinator state, written and read by coordinator thread. + // Can be read by shard threads as long as we respect ordering rules, i.e. when + // they read this variable the coordinator thread is stalled and can not cause data races. + // If COORDINATOR_XXX has been set, it means we passed or crossed stage XXX. + uint8_t coordinator_state_ = 0; struct PerShardCache { std::vector args;