diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc index b468fb2d4..60d3fe77e 100644 --- a/src/server/blocking_controller.cc +++ b/src/server/blocking_controller.cc @@ -114,20 +114,14 @@ void BlockingController::RunStep(Transaction* completed_t) { DbWatchTable& wt = *dbit->second; for (auto key : wt.awakened_keys) { string_view sv_key = static_cast(key); + DVLOG(1) << "Processing awakened key " << sv_key; // Double verify we still got the item. auto [it, exp_it] = owner_->db_slice().FindExt(index, sv_key); if (!IsValid(it) || it->second.ObjType() != OBJ_LIST) // Only LIST is allowed to block. continue; - auto w_it = wt.queue_map.find(sv_key); - CHECK(w_it != wt.queue_map.end()); - DVLOG(1) << "NotifyWatchQueue " << key; - WatchQueue* wq = w_it->second.get(); - NotifyWatchQueue(wq); - if (wq->items.empty()) { - wt.queue_map.erase(w_it); - } + NotifyWatchQueue(sv_key, &wt.queue_map); } wt.awakened_keys.clear(); @@ -155,33 +149,42 @@ void BlockingController::AddWatched(Transaction* trans) { res->second.reset(new WatchQueue); } + if (!res->second->items.empty()) { + Transaction* last = res->second->items.back().get(); + DCHECK_GT(last->use_count(), 0u); + + // Duplicate keys case. We push only once per key. + if (last == trans) + continue; + } + DVLOG(2) << "Emplace " << trans << " " << trans->DebugId() << " to watch " << key; res->second->items.emplace_back(trans); } } -// Runs in O(N) complexity. +// Runs in O(N) complexity in the worst case. void BlockingController::RemoveWatched(Transaction* trans) { VLOG(1) << "RemoveWatched [" << owner_->shard_id() << "] " << trans->DebugId(); auto dbit = watched_dbs_.find(trans->db_index()); - CHECK(dbit != watched_dbs_.end()); + if (dbit == watched_dbs_.end()) + return; DbWatchTable& wt = *dbit->second; auto args = trans->ShardArgsInShard(owner_->shard_id()); for (auto key : args) { auto watch_it = wt.queue_map.find(key); - CHECK(watch_it != wt.queue_map.end()); + if (watch_it == wt.queue_map.end()) + continue; // that can happen in case of duplicate keys WatchQueue& wq = *watch_it->second; - bool erased = false; for (auto items_it = wq.items.begin(); items_it != wq.items.end(); ++items_it) { if (items_it->trans == trans) { wq.items.erase(items_it); - erased = true; break; } } - CHECK(erased); + // again, we may not find trans if we searched for the same key several times. if (wq.items.empty()) { wt.RemoveEntry(watch_it); @@ -208,13 +211,18 @@ void BlockingController::AwakeWatched(DbIndex db_index, string_view db_key) { if (wt.AddAwakeEvent(WatchQueue::SUSPENDED, db_key)) { awakened_indices_.insert(db_index); + } else { + DVLOG(1) << "Skipped awakening " << db_index; } } -// Internal function called from ProcessAwakened(). +// Internal function called from RunStep(). // Marks the queue as active and notifies the first transaction in the queue. -void BlockingController::NotifyWatchQueue(WatchQueue* wq) { - VLOG(1) << "Notify WQ: [" << owner_->shard_id() << "]"; +void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* wqm) { + auto w_it = wqm->find(key); + CHECK(w_it != wqm->end()); + DVLOG(1) << "Notify WQ: [" << owner_->shard_id() << "] " << key; + WatchQueue* wq = w_it->second.get(); wq->state = WatchQueue::ACTIVE; @@ -224,6 +232,7 @@ void BlockingController::NotifyWatchQueue(WatchQueue* wq) { do { WatchItem& wi = queue.front(); Transaction* head = wi.get(); + DVLOG(2) << "Pop " << head << " from key " << key; queue.pop_front(); @@ -233,6 +242,10 @@ void BlockingController::NotifyWatchQueue(WatchQueue* wq) { break; } } while (!queue.empty()); + + if (wq->items.empty()) { + wqm->erase(w_it); + } } #if 0 @@ -288,4 +301,17 @@ size_t BlockingController::NumWatched(DbIndex db_indx) const { return it->second->queue_map.size(); } +vector BlockingController::GetWatchedKeys(DbIndex db_indx) const { + vector res; + auto it = watched_dbs_.find(db_indx); + + if (it != watched_dbs_.end()) { + for (const auto& k_v : it->second->queue_map) { + res.push_back(k_v.first); + } + } + + return res; +} + } // namespace dfly diff --git a/src/server/blocking_controller.h b/src/server/blocking_controller.h index 329ecaa52..f856321b6 100644 --- a/src/server/blocking_controller.h +++ b/src/server/blocking_controller.h @@ -44,11 +44,9 @@ class BlockingController { // Called from operations that create keys like lpush, rename etc. void AwakeWatched(DbIndex db_index, std::string_view db_key); - // void OnTxFinish(); - - // void RegisterAwaitForConverge(Transaction* t); - + // Used in tests and debugging functions. size_t NumWatched(DbIndex db_indx) const; + std::vector GetWatchedKeys(DbIndex db_indx) const; private: struct WatchQueue; @@ -56,9 +54,7 @@ class BlockingController { using WatchQueueMap = absl::flat_hash_map>; - /// Returns the notified transaction, - /// or null if all transactions in the queue have expired.. - void NotifyWatchQueue(WatchQueue* wq); + void NotifyWatchQueue(std::string_view key, WatchQueueMap* wqm); // void NotifyConvergence(Transaction* tx); diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 9a4efa922..c741348fd 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -10,6 +10,7 @@ #include #include "base/logging.h" +#include "server/blocking_controller.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/main_service.h" @@ -47,13 +48,11 @@ struct ObjInfo { unsigned bucket_id = 0; unsigned slot_id = 0; + enum LockStatus { NONE, S, X } lock_status = NONE; + int64_t ttl = INT64_MAX; bool has_sec_precision = false; - - ObjInfo(unsigned e, unsigned bid) : encoding(e), bucket_id(bid) { - } - - ObjInfo() = default; + bool found = false; }; void DoPopulateBatch(std::string_view prefix, size_t val_size, const SetCmd::SetParams& params, @@ -88,6 +87,7 @@ void DebugCmd::Run(CmdArgList args) { " Examples:", " * DEBUG RELOAD NOSAVE: replace the current database with the contents of an", " existing RDB file.", + "WATCHED", "POPULATE [] []", " Create string keys named key:. If is specified then", " it is used instead of the 'key' prefix.", @@ -106,6 +106,9 @@ void DebugCmd::Run(CmdArgList args) { if (subcmd == "RELOAD") { return Reload(args); } + if (subcmd == "WATCHED") { + return Watched(); + } if (subcmd == "LOAD" && args.size() == 3) { return Load(ArgS(args, 2)); @@ -283,43 +286,72 @@ void DebugCmd::Inspect(string_view key) { EngineShardSet& ess = *shard_set; ShardId sid = Shard(key, ess.size()); - auto cb = [&]() -> facade::OpResult { + auto cb = [&]() -> ObjInfo { auto& db_slice = EngineShard::tlocal()->db_slice(); auto [pt, exp_t] = db_slice.GetTables(cntx_->db_index()); PrimeIterator it = pt->Find(key); - if (!IsValid(it)) { - return OpStatus::KEY_NOTFOUND; + ObjInfo oinfo; + if (IsValid(it)) { + oinfo.found = true; + oinfo.encoding = it->second.Encoding(); + oinfo.bucket_id = it.bucket_id(); + oinfo.slot_id = it.slot_id(); + if (it->second.HasExpire()) { + ExpireIterator exp_it = exp_t->Find(it->first); + CHECK(!exp_it.is_done()); + + time_t exp_time = db_slice.ExpireTime(exp_it); + oinfo.ttl = exp_time - db_slice.Now(); + oinfo.has_sec_precision = exp_it->second.is_second_precision(); + } } - ObjInfo oinfo(it->second.Encoding(), it.bucket_id()); - oinfo.slot_id = it.slot_id(); - - if (it->second.HasExpire()) { - ExpireIterator exp_it = exp_t->Find(it->first); - CHECK(!exp_it.is_done()); - - time_t exp_time = db_slice.ExpireTime(exp_it); - oinfo.ttl = exp_time - db_slice.Now(); - oinfo.has_sec_precision = exp_it->second.is_second_precision(); + KeyLockArgs lock_args; + lock_args.args = ArgSlice{&key, 1}; + lock_args.db_index = cntx_->db_index(); + if (!db_slice.CheckLock(IntentLock::EXCLUSIVE, lock_args)) { + oinfo.lock_status = + db_slice.CheckLock(IntentLock::SHARED, lock_args) ? ObjInfo::S : ObjInfo::X; } return oinfo; }; - OpResult res = ess.Await(sid, cb); - if (res) { - string resp; - StrAppend(&resp, "encoding:", strEncoding(res->encoding), " bucket_id:", res->bucket_id); - StrAppend(&resp, " slot:", res->slot_id); + ObjInfo res = ess.Await(sid, cb); + string resp; - if (res->ttl != INT64_MAX) { - StrAppend(&resp, " ttl:", res->ttl, res->has_sec_precision ? "s" : "ms"); + if (res.found) { + StrAppend(&resp, "encoding:", strEncoding(res.encoding), " bucket_id:", res.bucket_id); + StrAppend(&resp, " slot:", res.slot_id, " shard:", sid); + + if (res.ttl != INT64_MAX) { + StrAppend(&resp, " ttl:", res.ttl, res.has_sec_precision ? "s" : "ms"); } - (*cntx_)->SendSimpleString(resp); - } else { - (*cntx_)->SendError(res.status()); } + + if (res.lock_status != ObjInfo::NONE) { + StrAppend(&resp, " lock:", res.lock_status == ObjInfo::X ? "x" : "s"); + } + (*cntx_)->SendSimpleString(resp); +} + +void DebugCmd::Watched() { + vector watched_keys; + boost::fibers::mutex mu; + + auto cb = [&](EngineShard* shard) { + auto* bc = shard->blocking_controller(); + if (bc) { + auto keys = bc->GetWatchedKeys(cntx_->db_index()); + + lock_guard lk(mu); + watched_keys.insert(watched_keys.end(), keys.begin(), keys.end()); + } + }; + + shard_set->RunBlockingInParallel(cb); + (*cntx_)->SendStringArr(watched_keys); } } // namespace dfly diff --git a/src/server/debugcmd.h b/src/server/debugcmd.h index 1eb577a79..7cbc9e17a 100644 --- a/src/server/debugcmd.h +++ b/src/server/debugcmd.h @@ -23,6 +23,7 @@ class DebugCmd { void Reload(CmdArgList args); void Load(std::string_view filename); void Inspect(std::string_view key); + void Watched(); ServerFamily& sf_; ConnectionContext* cntx_; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 46f16d6e3..2a2614e22 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -127,7 +127,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { if (trans_mask & Transaction::AWAKED_Q) { DCHECK(continuation_trans_ == nullptr); - CHECK_EQ(committed_txid_, trans->notify_txid()) << "TBD"; + CHECK_EQ(committed_txid_, trans->notify_txid()); bool keep = trans->RunInShard(this); if (keep) return; @@ -198,21 +198,14 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { DVLOG(1) << "Skipped TxQueue " << continuation_trans_ << " " << has_awaked_trans; } - // For SUSPENDED_Q - if transaction has not been notified, it will still be - // in the watch queue. We need to unlock an Execute by running a noop. - if (trans_mask & Transaction::SUSPENDED_Q) { - // This case happens when some other shard notified the transaction and now it - // runs FindFirst on all shards. - // TxId notify_txid = trans->notify_txid(); - // DCHECK(HasResultConverged(notify_txid)); - trans->RunNoop(this); - return; - } + // we need to run trans if it's OOO or when trans is blocked in this shard and should + // be treated here as noop. + // trans is OOO, it it locked keys that previous transactions have not locked yet. + bool should_run = trans_mask & (Transaction::OUT_OF_ORDER | Transaction::SUSPENDED_Q); - // If trans is out of order, i.e. locks keys that previous transactions have not locked. // It may be that there are other transactions that touch those keys but they necessary ordered // after trans in the queue, hence it's safe to run trans out of order. - if (trans && (trans_mask & Transaction::OUT_OF_ORDER)) { + if (trans && should_run) { DCHECK(trans != head); DCHECK(!trans->IsMulti()); // multi, global transactions can not be OOO. DCHECK(trans_mask & Transaction::ARMED); @@ -226,9 +219,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { bool keep = trans->RunInShard(this); DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep; - - // Should be enforced via Schedule(). TODO: to remove the check once the code is mature. - CHECK(!keep) << "multi-hop transactions can not be OOO."; } } diff --git a/src/server/list_family.cc b/src/server/list_family.cc index a93147a0b..1a07a973e 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -132,6 +132,7 @@ OpResult FindFirst(Transaction* trans) { auto cb = [&find_res](auto* t, EngineShard* shard) { auto args = t->ShardArgsInShard(shard->shard_id()); + OpResult> ff_res = shard->db_slice().FindFirst(t->db_index(), args); @@ -245,12 +246,16 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { result = FindFirst(t); // retry - must find something. } + // We got here if (!result) { - t->UnregisterWatch(); + // cleanups, locks removal etc. + auto cb = [this](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + t->Execute(std::move(cb), true); + return result.status(); } - VLOG(1) << "Popping an element"; + VLOG(1) << "Popping an element " << t->DebugId(); ff_result_ = move(result.value()); auto cb = [this](Transaction* t, EngineShard* shard) { return Pop(t, shard); }; diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index dc2b179f5..77f2394dd 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -30,6 +30,12 @@ class ListFamilyTest : public BaseFamilyTest { ListFamilyTest() { num_threads_ = 4; } + + void WaitForLocked(string_view key) { + do { + this_fiber::sleep_for(30us); + } while (!IsLocked(0, key)); + } }; const char kKey1[] = "x"; @@ -181,9 +187,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) { blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"}); }); - do { - this_fiber::sleep_for(30us); - } while (!IsLocked(0, kKey1)); + WaitForLocked(kKey1); auto p1_fb = pp_->at(1)->LaunchFiber([&] { for (unsigned i = 0; i < 100; ++i) { @@ -221,9 +225,7 @@ TEST_F(ListFamilyTest, BLPopSerialize) { blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"}); }); - do { - this_fiber::sleep_for(30us); - } while (!IsLocked(0, kKey1)); + WaitForLocked(kKey1); LOG(INFO) << "Starting multi"; @@ -293,9 +295,7 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) { blpop_resp = Run({"blpop", kKey1, "0"}); }); - do { - this_fiber::sleep_for(30us); - } while (!IsLocked(0, kKey1)); + WaitForLocked(kKey1); auto p1_fb = pp_->at(1)->LaunchFiber([&] { Run({"multi"}); @@ -320,11 +320,11 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) { auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"}); + auto watched = Run({"debug", "watched"}); + ASSERT_THAT(watched, ArrLen(0)); }); - do { - this_fiber::sleep_for(30us); - } while (!IsLocked(0, kKey1)); + WaitForLocked(kKey1); pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"})); }); pop_fb.join(); @@ -336,9 +336,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) { blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"}); }); - do { - this_fiber::sleep_for(30us); - } while (!IsLocked(0, kKey1)); + WaitForLocked(kKey1); pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey2, "bar"})); }); pop_fb.join(); @@ -347,6 +345,28 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) { EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey2, "bar")); } +TEST_F(ListFamilyTest, BPopTwoKeysSameShard) { + Run({"exists", "x", "y"}); + ASSERT_EQ(1, GetDebugInfo().shards_count); + RespExpr blpop_resp; + + auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + blpop_resp = Run({"blpop", "x", "y", "0"}); + auto watched = Run({"debug", "watched"}); + + EXPECT_FALSE(IsLocked(0, "y")); + ASSERT_THAT(watched, ArrLen(0)); + }); + + WaitForLocked("x"); + + pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", "x", "bar"})); }); + pop_fb.join(); + + ASSERT_THAT(blpop_resp, ArrLen(2)); + EXPECT_THAT(blpop_resp.GetVec(), ElementsAre("x", "bar")); +} + TEST_F(ListFamilyTest, BPopRename) { RespExpr blpop_resp; @@ -357,9 +377,7 @@ TEST_F(ListFamilyTest, BPopRename) { blpop_resp = Run({"blpop", kKey1, "0"}); }); - do { - this_fiber::sleep_for(30us); - } while (!IsLocked(0, kKey1)); + WaitForLocked(kKey1); pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", "a", "bar"})); @@ -377,9 +395,7 @@ TEST_F(ListFamilyTest, BPopFlush) { blpop_resp = Run({"blpop", kKey1, "0"}); }); - do { - this_fiber::sleep_for(30us); - } while (!IsLocked(0, kKey1)); + WaitForLocked(kKey1); pp_->at(1)->Await([&] { Run({"flushdb"}); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 13bb2663b..1fd567c38 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -284,6 +284,8 @@ void Transaction::SetExecCmd(const CommandId* cid) { } string Transaction::DebugId() const { + DCHECK_GT(use_count_.load(memory_order_relaxed), 0u); + return absl::StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")"); } @@ -305,8 +307,7 @@ bool Transaction::RunInShard(EngineShard* shard) { DCHECK(sd.local_mask & ARMED); sd.local_mask &= ~ARMED; - DCHECK_EQ(sd.local_mask & (SUSPENDED_Q | EXPIRED_Q), 0); - + bool was_suspended = sd.local_mask & SUSPENDED_Q; bool awaked_prerun = (sd.local_mask & AWAKED_Q) != 0; bool incremental_lock = multi_ && multi_->incremental; @@ -332,7 +333,8 @@ bool Transaction::RunInShard(EngineShard* shard) { /*************************************************************************/ // Actually running the callback. try { - OpStatus status = cb_(this, shard); + // if transaction is suspended (blocked in watched queue), then it's a noop. + OpStatus status = was_suspended ? OpStatus::OK : cb_(this, shard); if (unique_shard_cnt_ == 1) { cb_ = nullptr; // We can do it because only a single thread runs the callback. @@ -366,20 +368,25 @@ bool Transaction::RunInShard(EngineShard* shard) { // If it's a final hop we should release the locks. if (should_release) { - bool is_suspended = sd.local_mask & SUSPENDED_Q; + bool become_suspended = sd.local_mask & SUSPENDED_Q; if (IsGlobal()) { - DCHECK(!awaked_prerun && !is_suspended); // Global transactions can not be blocking. + DCHECK(!awaked_prerun && !become_suspended); // Global transactions can not be blocking. shard->shard_lock()->Release(Mode()); } else { // not global. KeyLockArgs largs = GetLockArgs(idx); + DCHECK(sd.local_mask & KEYLOCK_ACQUIRED); // If a transaction has been suspended, we keep the lock so that future transaction // touching those keys will be ordered via TxQueue. It's necessary because we preserve // the atomicity of awaked transactions by halting the TxQueue. - if (!is_suspended) { + if (was_suspended || !become_suspended) { shard->db_slice().Release(mode, largs); sd.local_mask &= ~KEYLOCK_ACQUIRED; + + if (was_suspended || (sd.local_mask & AWAKED_Q)) { + shard->blocking_controller()->RemoveWatched(this); + } } sd.local_mask &= ~OUT_OF_ORDER; @@ -398,37 +405,6 @@ bool Transaction::RunInShard(EngineShard* shard) { return !should_release; // keep } -void Transaction::RunNoop(EngineShard* shard) { - DVLOG(1) << "RunNoop " << DebugId(); - - unsigned idx = SidToId(shard->shard_id()); - auto& sd = shard_data_[idx]; - DCHECK(sd.local_mask & ARMED); - DCHECK(sd.local_mask & KEYLOCK_ACQUIRED); - DCHECK(!multi_); - DCHECK(!IsGlobal()); - - sd.local_mask &= ~ARMED; - - if (unique_shard_cnt_ == 1) { - cb_ = nullptr; - local_result_ = OpStatus::OK; - } - - if (coordinator_state_ & COORD_EXEC_CONCLUDING) { - KeyLockArgs largs = GetLockArgs(idx); - shard->db_slice().Release(Mode(), largs); - sd.local_mask &= ~KEYLOCK_ACQUIRED; - - if (sd.local_mask & SUSPENDED_Q) { - sd.local_mask |= EXPIRED_Q; - shard->blocking_controller()->RemoveWatched(this); - } - } - // Decrease run count after we update all the data in the transaction object. - CHECK_GE(DecreaseRunCnt(), 1u); -} - void Transaction::ScheduleInternal() { DCHECK(!shard_data_.empty()); DCHECK_EQ(0u, txid_); @@ -939,7 +915,8 @@ bool Transaction::WaitOnWatch(const time_point& tp) { DVLOG(1) << "WaitOnWatch AfterWait"; } else { DVLOG(1) << "WaitOnWatch TimeWait for " - << duration_cast(tp - time_point::clock::now()).count() << " ms"; + << duration_cast(tp - time_point::clock::now()).count() << " ms " + << DebugId(); status = blocking_ec_.await_until(move(wake_cb), tp); @@ -982,14 +959,6 @@ bool Transaction::WaitOnWatch(const time_point& tp) { return true; } -void Transaction::UnregisterWatch() { - auto cb = [](Transaction* t, EngineShard* shard) { - t->RemoveFromWatchedShardCb(shard); - return OpStatus::OK; - }; - Execute(std::move(cb), true); -} - // Runs only in the shard thread. OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) { ShardId idx = SidToId(shard->shard_id()); @@ -1005,25 +974,6 @@ OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) { return OpStatus::OK; } -// Runs only in the shard thread. -// Quadratic complexity in number of arguments and queue length. -bool Transaction::RemoveFromWatchedShardCb(EngineShard* shard) { - ShardId idx = SidToId(shard->shard_id()); - auto& sd = shard_data_[idx]; - - constexpr uint16_t kQueueMask = - Transaction::SUSPENDED_Q | Transaction::AWAKED_Q | Transaction::EXPIRED_Q; - - if ((sd.local_mask & kQueueMask) == 0) - return false; - - sd.local_mask &= ~kQueueMask; - - shard->blocking_controller()->RemoveWatched(this); - - return true; -} - void Transaction::ExpireShardCb(EngineShard* shard) { auto lock_args = GetLockArgs(shard->shard_id()); shard->db_slice().Release(Mode(), lock_args); @@ -1171,7 +1121,6 @@ void Transaction::BreakOnClose() { } } - OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { DCHECK_EQ(0u, cid->opt_mask() & CO::GLOBAL_TRANS); diff --git a/src/server/transaction.h b/src/server/transaction.h index 8f5b98aad..b7c49eaaa 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -21,14 +21,15 @@ namespace dfly { -class DbSlice; -class EngineShardSet; class EngineShard; +class BlockingController; using facade::OpStatus; using facade::OpResult; class Transaction { + friend class BlockingController; + Transaction(const Transaction&); void operator=(const Transaction&) = delete; @@ -161,7 +162,6 @@ class Transaction { // Expects that the transaction had been scheduled before, and uses Execute(.., true) to register. // Returns false if timeout ocurred, true if was notified by one of the keys. bool WaitOnWatch(const time_point& tp); - void UnregisterWatch(); // Returns true if transaction is awaked, false if it's timed-out and can be removed from the // blocking queue. NotifySuspended may be called from (multiple) shard threads and @@ -175,8 +175,6 @@ class Transaction { // Returns true if transaction should be kept in the queue. bool RunInShard(EngineShard* shard); - void RunNoop(EngineShard* shard); - //! Returns locking arguments needed for DbSlice to Acquire/Release transactional locks. //! Runs in the shard thread. KeyLockArgs GetLockArgs(ShardId sid) const; @@ -215,7 +213,7 @@ class Transaction { // Shard callbacks used within Execute calls OpStatus AddToWatchedShardCb(EngineShard* shard); - bool RemoveFromWatchedShardCb(EngineShard* shard); + void ExpireShardCb(EngineShard* shard); void UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard); diff --git a/tests/pytest/test_dragonfly.py b/tests/pytest/test_dragonfly.py index 50c298d21..fb0ec393c 100644 --- a/tests/pytest/test_dragonfly.py +++ b/tests/pytest/test_dragonfly.py @@ -1,19 +1,18 @@ - import pytest import redis from threading import Thread - - +from redis.client import NEVER_DECODE @pytest.fixture def client(): pool = redis.ConnectionPool(decode_responses=True) - return redis.Redis(connection_pool=pool) - + client = redis.Redis(connection_pool=pool) + return client class BLPopWorkerThread: def __init__(self): self.result = None + self.thread = None def async_blpop(self, client: redis.Redis): self.result = None @@ -22,22 +21,26 @@ class BLPopWorkerThread: self.result = client.blpop( ['list1{t}', 'list2{t}', 'list2{t}', 'list1{t}'], 0.5) - result = Thread(target=blpop_task, args=(self, client)) - result.start() - return result + self.thread = Thread(target=blpop_task, args=(self, client)) + self.thread.start() + + def wait(self, timeout): + self.thread.join(timeout) + return not self.thread.is_alive() -@pytest.mark.parametrize('execution_number', range(5)) -def test_blpop_multiple_keys(client, execution_number): +@pytest.mark.parametrize('index', range(50)) +def test_blpop_multiple_keys(client : redis.Redis, index): wt_blpop = BLPopWorkerThread() - thread = wt_blpop.async_blpop(client) - client.lpush('list1{t}', 'a') - thread.join(timeout=2) - assert not thread.is_alive() - assert wt_blpop.result[1] == 'a' + wt_blpop.async_blpop(client) - thread = wt_blpop.async_blpop(client) + client.lpush('list1{t}', 'a') + assert wt_blpop.wait(2) + assert wt_blpop.result[1] == 'a' + watched = client.execute_command('DEBUG WATCHED') + assert watched == [] + + wt_blpop.async_blpop(client) client.lpush('list2{t}', 'b') - thread.join(timeout=2) - assert not thread.is_alive() + assert wt_blpop.wait(2) assert wt_blpop.result[1] == 'b'