diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 5914c4465..2bb42665d 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -280,8 +280,8 @@ void EngineShard::DestroyThreadLocal() { // Is called by Transaction::ExecuteAsync in order to run transaction tasks. // Only runs in its own thread. void EngineShard::PollExecution(const char* context, Transaction* trans) { - VLOG(2) << "PollExecution " << context << " " << (trans ? trans->DebugId() : "") << " " - << txq_.size() << " " << continuation_trans_; + DVLOG(2) << "PollExecution " << context << " " << (trans ? trans->DebugId() : "") << " " + << txq_.size() << " " << continuation_trans_; ShardId sid = shard_id(); diff --git a/src/server/list_family.cc b/src/server/list_family.cc index a4b28c3a9..3d023d841 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -381,66 +381,10 @@ OpResult Peek(const OpArgs& op_args, string_view key, ListDir dir, bool return absl::StrCat(entry.longval); } -BPopPusher::BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir) - : pop_key_(pop_key), push_key_(push_key), popdir_(popdir), pushdir_(pushdir) { -} - -OpResult BPopPusher::Run(Transaction* t, unsigned msec) { - time_point tp = - msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max(); - - t->Schedule(); - - if (t->unique_shard_cnt() == 1) { - return RunSingle(t, tp); - } - - return RunPair(t, tp); -} - -OpResult BPopPusher::RunSingle(Transaction* t, time_point tp) { - OpResult op_res; - bool is_multi = t->IsMulti(); - auto cb_move = [&](Transaction* t, EngineShard* shard) { - op_res = OpMoveSingleShard(t->GetOpArgs(shard), pop_key_, push_key_, popdir_, pushdir_); - return OpStatus::OK; - }; - t->Execute(cb_move, false); - - if (is_multi || op_res.status() != OpStatus::KEY_NOTFOUND) { - if (op_res.status() == OpStatus::KEY_NOTFOUND) { - op_res = OpStatus::TIMED_OUT; - } - auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - t->Execute(std::move(cb), true); - return op_res; - } - - auto* stats = ServerState::tl_connection_stats(); - auto wcb = [&](Transaction* t, EngineShard* shard) { - ArgSlice keys{&this->pop_key_, 1}; - return t->WatchInShard(keys, shard); - }; - - // Block - ++stats->num_blocked_clients; - - bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb)); - --stats->num_blocked_clients; - - if (!wait_succeeded) - return OpStatus::TIMED_OUT; - - t->Execute(cb_move, true); - return op_res; -} - -OpResult BPopPusher::RunPair(Transaction* t, time_point tp) { - return OpStatus::TIMED_OUT; -} - OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir dir, bool skip_notexist, absl::Span vals) { + DVLOG(1) << "OpPush " << key; + EngineShard* es = op_args.shard; PrimeIterator it; bool new_key = false; @@ -530,6 +474,58 @@ OpResult OpPop(const OpArgs& op_args, string_view key, ListDir dir, u return res; } +OpResult MoveTwoShards(Transaction* trans, string_view src, string_view dest, + ListDir src_dir, ListDir dest_dir, bool conclude_on_error) { + DCHECK_EQ(2u, trans->unique_shard_cnt()); + + OpResult find_res[2]; + OpResult result; + + // Transaction is comprised of 2 hops: + // 1 - check for entries existence, their types and if possible - + // read the value we may move from the source list. + // 2. If everything is ok, pop from source and push the peeked value into + // the destination. + // + auto cb = [&](Transaction* t, EngineShard* shard) { + auto args = t->ShardArgsInShard(shard->shard_id()); + DCHECK_EQ(1u, args.size()); + bool is_dest = args.front() == dest; + find_res[is_dest] = Peek(t->GetOpArgs(shard), args.front(), src_dir, !is_dest); + return OpStatus::OK; + }; + + trans->Execute(move(cb), false); + + if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) { + result = find_res[0] ? find_res[1] : find_res[0]; + if (conclude_on_error) { + auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + trans->Execute(move(cb), true); + } + } else { + // Everything is ok, lets proceed with the mutations. + auto cb = [&](Transaction* t, EngineShard* shard) { + auto args = t->ShardArgsInShard(shard->shard_id()); + bool is_dest = args.front() == dest; + OpArgs op_args = t->GetOpArgs(shard); + + if (is_dest) { + string_view val{find_res[0].value()}; + absl::Span span{&val, 1}; + OpPush(op_args, args.front(), dest_dir, false, span); + } else { + OpPop(op_args, args.front(), src_dir, 1, false); + } + return OpStatus::OK; + }; + trans->Execute(move(cb), true); + result = std::move(find_res[0].value()); + } + + return result; +} + OpResult OpLen(const OpArgs& op_args, std::string_view key) { auto res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST); if (!res) @@ -783,56 +779,6 @@ OpResult OpRange(const OpArgs& op_args, std::string_view key, long st return str_vec; } -OpResult MoveTwoShards(Transaction* trans, string_view src, string_view dest, - ListDir src_dir, ListDir dest_dir) { - DCHECK_EQ(2u, trans->unique_shard_cnt()); - - OpResult find_res[2]; - OpResult result; - - // Transaction is comprised of 2 hops: - // 1 - check for entries existence, their types and if possible - - // read the value we may move from the source list. - // 2. If everything is ok, pop from source and push the peeked value into - // the destination. - // - auto cb = [&](Transaction* t, EngineShard* shard) { - auto args = t->ShardArgsInShard(shard->shard_id()); - DCHECK_EQ(1u, args.size()); - bool is_dest = args.front() == dest; - find_res[is_dest] = Peek(t->GetOpArgs(shard), args.front(), src_dir, !is_dest); - return OpStatus::OK; - }; - - trans->Execute(move(cb), false); - - if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) { - auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - trans->Execute(move(cb), true); - result = find_res[0] ? find_res[1] : find_res[0]; - } else { - // Everything is ok, lets proceed with the mutations. - auto cb = [&](Transaction* t, EngineShard* shard) { - auto args = t->ShardArgsInShard(shard->shard_id()); - bool is_dest = args.front() == dest; - OpArgs op_args = t->GetOpArgs(shard); - - if (is_dest) { - string_view val{find_res[0].value()}; - absl::Span span{&val, 1}; - OpPush(op_args, args.front(), dest_dir, false, span); - } else { - OpPop(op_args, args.front(), src_dir, 1, false); - } - return OpStatus::OK; - }; - trans->Execute(move(cb), true); - result = std::move(find_res[0].value()); - } - - return result; -} - void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, ListDir src_dir, ListDir dest_dir) { OpResult result; @@ -845,7 +791,7 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); } else { cntx->transaction->Schedule(); - result = MoveTwoShards(cntx->transaction, src, dest, src_dir, dest_dir); + result = MoveTwoShards(cntx->transaction, src, dest, src_dir, dest_dir, true); } if (result) { @@ -902,6 +848,93 @@ void BRPopLPush(CmdArgList args, ConnectionContext* cntx) { } } +BPopPusher::BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir) + : pop_key_(pop_key), push_key_(push_key), popdir_(popdir), pushdir_(pushdir) { +} + +OpResult BPopPusher::Run(Transaction* t, unsigned msec) { + time_point tp = + msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max(); + + t->Schedule(); + + if (t->unique_shard_cnt() == 1) { + return RunSingle(t, tp); + } + + return RunPair(t, tp); +} + +OpResult BPopPusher::RunSingle(Transaction* t, time_point tp) { + OpResult op_res; + bool is_multi = t->IsMulti(); + auto cb_move = [&](Transaction* t, EngineShard* shard) { + op_res = OpMoveSingleShard(t->GetOpArgs(shard), pop_key_, push_key_, popdir_, pushdir_); + return OpStatus::OK; + }; + t->Execute(cb_move, false); + + if (is_multi || op_res.status() != OpStatus::KEY_NOTFOUND) { + if (op_res.status() == OpStatus::KEY_NOTFOUND) { + op_res = OpStatus::TIMED_OUT; + } + auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + t->Execute(std::move(cb), true); + return op_res; + } + + auto* stats = ServerState::tl_connection_stats(); + auto wcb = [&](Transaction* t, EngineShard* shard) { + ArgSlice keys{&this->pop_key_, 1}; + return t->WatchInShard(keys, shard); + }; + + // Block + ++stats->num_blocked_clients; + + bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb)); + --stats->num_blocked_clients; + + if (!wait_succeeded) + return OpStatus::TIMED_OUT; + + t->Execute(cb_move, true); + return op_res; +} + +OpResult BPopPusher::RunPair(Transaction* t, time_point tp) { + bool is_multi = t->IsMulti(); + OpResult op_res = MoveTwoShards(t, pop_key_, push_key_, popdir_, pushdir_, false); + + if (is_multi || op_res.status() != OpStatus::KEY_NOTFOUND) { + if (op_res.status() == OpStatus::KEY_NOTFOUND) { + op_res = OpStatus::TIMED_OUT; + } + return op_res; + } + + auto* stats = ServerState::tl_connection_stats(); + + // a hack: we watch in both shards for pop_key but only in the source shard it's relevant. + // Therefore we follow the regular flow of watching the key but for the destination shard it + // will never be triggerred. + // This allows us to run Transaction::Execute on watched transactions in both shards. + auto wcb = [&](Transaction* t, EngineShard* shard) { + ArgSlice keys{&this->pop_key_, 1}; + return t->WatchInShard(keys, shard); + }; + + ++stats->num_blocked_clients; + + bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb)); + --stats->num_blocked_clients; + + if (!wait_succeeded) + return OpStatus::TIMED_OUT; + + return MoveTwoShards(t, pop_key_, push_key_, popdir_, pushdir_, true); +} + } // namespace void ListFamily::LPush(CmdArgList args, ConnectionContext* cntx) { diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index ee5feddef..b396fbc31 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -100,7 +100,8 @@ TEST_F(ListFamilyTest, BLPopBlocking) { }); fibers_ext::SleepFor(30us); - pp_->at(1)->Await([&] { Run("B1", {"lpush", "x", "2", "1"}); }); + RespExpr resp = pp_->at(1)->Await([&] { return Run("B1", {"lpush", "x", "2", "1"}); }); + ASSERT_THAT(resp, IntArg(2)); fb0.Join(); fb1.Join(); @@ -679,25 +680,61 @@ TEST_F(ListFamilyTest, BRPopLPushSingleShard) { Run({"brpoplpush", "y", "x", "0"}); RespExpr resp = Run({"exec"}); EXPECT_THAT(resp, ArgType(RespExpr::NIL)); + ASSERT_FALSE(IsLocked(0, "x")); + ASSERT_FALSE(IsLocked(0, "y")); } TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) { - RespExpr resp0, resp1; + RespExpr resp; // Run the fiber at creation. auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { - resp0 = Run({"brpoplpush", "x", "y", "0"}); + resp = Run({"brpoplpush", "x", "y", "0"}); }); fibers_ext::SleepFor(30us); pp_->at(1)->Await([&] { Run("B1", {"lpush", "y", "2"}); }); pp_->at(1)->Await([&] { Run("B1", {"lpush", "x", "1"}); }); fb0.Join(); - ASSERT_EQ(resp0, "1"); + ASSERT_EQ(resp, "1"); + ASSERT_FALSE(IsLocked(0, "x")); + ASSERT_FALSE(IsLocked(0, "y")); } TEST_F(ListFamilyTest, BRPopLPushTwoShards) { + RespExpr resp; + EXPECT_THAT(Run({"brpoplpush", "x", "z", "0.05"}), ArgType(RespExpr::NIL)); + Run({"lpush", "x", "val"}); + EXPECT_EQ(Run({"brpoplpush", "x", "z", "0"}), "val"); + resp = Run({"lrange", "z", "0", "-1"}); + ASSERT_EQ(resp, "val"); + Run({"del", "z"}); + + // Run the fiber at creation. + auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + resp = Run({"brpoplpush", "x", "z", "0"}); + }); + + fibers_ext::SleepFor(30us); + RespExpr resp_push = pp_->at(1)->Await([&] { return Run("B1", {"lpush", "z", "val2"}); }); + ASSERT_THAT(resp_push, IntArg(1)); + + resp_push = pp_->at(1)->Await([&] { return Run("B1", {"lpush", "x", "val1"}); }); + ASSERT_THAT(resp_push, IntArg(1)); + fb0.Join(); + + // Result of brpoplpush above. + ASSERT_EQ(resp, "val1"); + + resp = Run({"lrange", "z", "0", "-1"}); + ASSERT_THAT(resp, ArrLen(2)); + ASSERT_THAT(resp.GetVec(), ElementsAre("val1", "val2")); + ASSERT_FALSE(IsLocked(0, "x")); + ASSERT_FALSE(IsLocked(0, "z")); + // TODO: there is a bug here. + // we do not wake the dest shard, when source is awaked which prevents + // the atomicity and causes the first bug as well. } } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 77c5b7466..719ff450d 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -350,9 +350,7 @@ bool Transaction::RunInShard(EngineShard* shard) { // if transaction is suspended (blocked in watched queue), then it's a noop. OpStatus status = OpStatus::OK; - if (!was_suspended) { - status = cb_(this, shard); - } + status = cb_(this, shard); if (unique_shard_cnt_ == 1) { cb_ = nullptr; // We can do it because only a single thread runs the callback.