1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix: call NotifyPending only from tx queue invocations (#1439)

* fix: call NotifyPending only from tx queue invocations

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-06-21 10:26:22 +03:00 committed by GitHub
parent f25098bb98
commit 6f78ae5073
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 61 additions and 11 deletions

View file

@ -196,6 +196,9 @@ void BlockingController::FinalizeWatched(ArgSlice args, Transaction* tx) {
}
void BlockingController::NotifyPending() {
const Transaction* tx = owner_->GetContTx();
CHECK(tx == nullptr) << tx->DebugId();
DbContext context;
context.time_now_ms = GetCurrentTimeMs();

View file

@ -402,8 +402,8 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
}
}
void EngineShard::ShutdownMulti(Transaction* multi) {
if (continuation_trans_ == multi) {
void EngineShard::RemoveContTx(Transaction* tx) {
if (continuation_trans_ == tx) {
continuation_trans_ = nullptr;
}
}

View file

@ -98,8 +98,8 @@ class EngineShard {
return &shard_lock_;
}
// TODO: Awkward interface. I should solve it somehow.
void ShutdownMulti(Transaction* multi);
// Remove current continuation trans if its equal to tx.
void RemoveContTx(Transaction* tx);
void IncQuickRun() {
stats_.quick_runs++;
@ -153,6 +153,10 @@ class EngineShard {
return is_replica_;
}
const Transaction* GetContTx() const {
return continuation_trans_;
}
void TEST_EnableHeartbeat();
private:

View file

@ -827,4 +827,39 @@ TEST_F(ListFamilyTest, BLPopUnwakesInScript) {
f2.Join();
}
TEST_F(ListFamilyTest, OtherMultiWakesBLpop) {
const string_view SCRIPT = R"(
redis.call('LPUSH', 'l', 'bad')
for i = 1, 1000 do
redis.call('MGET', 'a', 'b', 'c', 'd')
end
redis.call('LPUSH', 'l', 'good')
)";
const string_view SCRIPT_SHORT = R"(
redis.call('GET', KEYS[1])
)";
// Start BLPOP with infinite timeout
auto f1 = pp_->at(1)->LaunchFiber(Launch::dispatch, [&]() {
auto resp = Run("blpop", {"BLPOP", "l", "0"});
// blpop should only be awakened after the script has completed, so the
// last element added in the script should be returned.
EXPECT_THAT(resp, ArgType(RespExpr::ARRAY));
EXPECT_THAT(resp.GetVec(), ElementsAre("l", "good"));
});
// Start long running script that accesses the list, but should wake up blpop only after it
// finished
auto f2 = pp_->at(2)->LaunchFiber(Launch::dispatch, [&]() {
Run("script", {"EVAL", SCRIPT, "5", "a", "b", "c", "d", "l"});
});
// Run quick multi transaction that concludes after one hop
Run({"EVAL", SCRIPT_SHORT, "1", "y"});
f1.Join();
f2.Join();
}
} // namespace dfly

View file

@ -522,16 +522,24 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
sd.local_mask &= ~OUT_OF_ORDER;
}
// This is the last hop, so clear cont_trans if its held by the current tx
shard->RemoveContTx(this);
// It has 2 responsibilities.
// 1: to go over potential wakened keys, verify them and activate watch queues.
// 2: if this transaction was notified and finished running - to remove it from the head
// of the queue and notify the next one.
// RunStep is also called for global transactions because of commands like MOVE.
if (shard->blocking_controller()) {
if (auto* bcontroller = shard->blocking_controller(); bcontroller) {
if (awaked_prerun || was_suspended) {
shard->blocking_controller()->FinalizeWatched(largs, this);
bcontroller->FinalizeWatched(largs, this);
}
// Wake only if no tx queue head is currently running
// Note: RemoveContTx might have no effect above if this tx had no continuations
if (shard->GetContTx() == nullptr) {
bcontroller->NotifyPending();
}
shard->blocking_controller()->NotifyPending();
}
}
@ -1232,12 +1240,12 @@ void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, E
sd.pq_pos = TxQueue::kEnd;
}
shard->ShutdownMulti(this);
shard->RemoveContTx(this);
// notify awakened transactions, not sure we need it here because it's done after
// each operation
if (shard->blocking_controller())
// Wake only if no tx queue head is currently running
if (shard->blocking_controller() && shard->GetContTx() == nullptr)
shard->blocking_controller()->NotifyPending();
shard->PollExecution("unlockmulti", nullptr);
this->DecreaseRunCnt();