1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix(transaction): Don't set continuation for blocking (#3540)

This commit is contained in:
Vladislav 2024-08-21 20:48:23 +03:00 committed by GitHub
parent 8266c8d026
commit 1bf6d73aa3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 53 additions and 2 deletions

View file

@ -486,14 +486,18 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
return;
if (trans_mask & Transaction::AWAKED_Q) {
CHECK(continuation_trans_ == nullptr || continuation_trans_ == trans)
CHECK(trans->GetNamespace().GetBlockingController(shard_id_)->HasAwakedTransaction());
CHECK(continuation_trans_ == nullptr)
<< continuation_trans_->DebugId() << " when polling " << trans->DebugId()
<< "cont_mask: " << continuation_trans_->DEBUG_GetLocalMask(sid) << " vs "
<< trans->DEBUG_GetLocalMask(sid);
// Commands like BRPOPLPUSH don't conclude immediately
if (trans->RunInShard(this, false)) {
continuation_trans_ = trans;
// execution is blocked while HasAwakedTransaction() returns true, so no need to set
// continuation_trans_. Moreover, setting it for wakened multi-hop transactions may lead to
// inconcistency, see BLMoveSimultaneously test.
// continuation_trans_ = trans;
return;
}

View file

@ -765,6 +765,53 @@ TEST_F(ListFamilyTest, BLMove) {
ASSERT_THAT(resp.GetVec(), ElementsAre("val1", "val2"));
}
TEST_F(ListFamilyTest, BLMoveSimultaneously) {
EXPECT_EQ(Shard("src1", shard_set->size()),
Shard("src10", shard_set->size())); // wake on same shard
EXPECT_NE(Shard("dest110", shard_set->size()),
Shard("src1", shard_set->size())); // Trigger MoveTwoShards
auto f1 = pp_->at(1)->LaunchFiber([this]() {
Run("c1", {"blmove", "src1", "dest110", "LEFT", "RIGHT", "0"});
});
auto f2 = pp_->at(1)->LaunchFiber([this]() {
Run("c2", {"blmove", "src10", "dest110", "LEFT", "RIGHT", "0"});
});
ThisFiber::SleepFor(5ms);
Run({"multi"});
Run({"rpush", "src1", "v1"});
Run({"rpush", "src10", "v2"});
Run({"exec"});
f1.Join();
f2.Join();
auto res = Run({"lrange", "dest110", "0", "-1"});
EXPECT_THAT(res.GetVec(), UnorderedElementsAre("v1", "v2"));
}
TEST_F(ListFamilyTest, BLMoveRings) {
vector<fb2::Fiber> fibers;
for (int i = 0; i < 10; i++) {
auto key1 = to_string(i);
auto key2 = to_string(i + 1);
fibers.emplace_back(pp_->at(i % 3)->LaunchFiber([=]() {
Run(key1, {"blmove", key1, key2, "LEFT", "RIGHT", "0"});
}));
}
ThisFiber::SleepFor(5ms);
Run({"lpush", "0", "v1"});
for (auto& fiber : fibers)
fiber.Join();
for (int i = 0; i < 10; i++)
EXPECT_THAT(Run({"llen", to_string(i)}), IntArg(0));
EXPECT_EQ(Run({"lrange", "10", "0", "-1"}), "v1");
}
TEST_F(ListFamilyTest, LPushX) {
// No push for 'lpushx' on nonexisting key.
EXPECT_THAT(Run({"lpushx", kKey1, "val1"}), IntArg(0));