1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(server): Support brpoplpush for two shard case.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-01-10 08:47:49 +02:00 committed by Roman Gershman
parent 9ca636e49d
commit 16a3e557ae
4 changed files with 186 additions and 118 deletions

View file

@ -280,8 +280,8 @@ void EngineShard::DestroyThreadLocal() {
// Is called by Transaction::ExecuteAsync in order to run transaction tasks.
// Only runs in its own thread.
void EngineShard::PollExecution(const char* context, Transaction* trans) {
VLOG(2) << "PollExecution " << context << " " << (trans ? trans->DebugId() : "") << " "
<< txq_.size() << " " << continuation_trans_;
DVLOG(2) << "PollExecution " << context << " " << (trans ? trans->DebugId() : "") << " "
<< txq_.size() << " " << continuation_trans_;
ShardId sid = shard_id();

View file

@ -381,66 +381,10 @@ OpResult<string> Peek(const OpArgs& op_args, string_view key, ListDir dir, bool
return absl::StrCat(entry.longval);
}
BPopPusher::BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir)
: pop_key_(pop_key), push_key_(push_key), popdir_(popdir), pushdir_(pushdir) {
}
OpResult<string> BPopPusher::Run(Transaction* t, unsigned msec) {
time_point tp =
msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max();
t->Schedule();
if (t->unique_shard_cnt() == 1) {
return RunSingle(t, tp);
}
return RunPair(t, tp);
}
OpResult<string> BPopPusher::RunSingle(Transaction* t, time_point tp) {
OpResult<string> op_res;
bool is_multi = t->IsMulti();
auto cb_move = [&](Transaction* t, EngineShard* shard) {
op_res = OpMoveSingleShard(t->GetOpArgs(shard), pop_key_, push_key_, popdir_, pushdir_);
return OpStatus::OK;
};
t->Execute(cb_move, false);
if (is_multi || op_res.status() != OpStatus::KEY_NOTFOUND) {
if (op_res.status() == OpStatus::KEY_NOTFOUND) {
op_res = OpStatus::TIMED_OUT;
}
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
t->Execute(std::move(cb), true);
return op_res;
}
auto* stats = ServerState::tl_connection_stats();
auto wcb = [&](Transaction* t, EngineShard* shard) {
ArgSlice keys{&this->pop_key_, 1};
return t->WatchInShard(keys, shard);
};
// Block
++stats->num_blocked_clients;
bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb));
--stats->num_blocked_clients;
if (!wait_succeeded)
return OpStatus::TIMED_OUT;
t->Execute(cb_move, true);
return op_res;
}
OpResult<string> BPopPusher::RunPair(Transaction* t, time_point tp) {
return OpStatus::TIMED_OUT;
}
OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
bool skip_notexist, absl::Span<std::string_view> vals) {
DVLOG(1) << "OpPush " << key;
EngineShard* es = op_args.shard;
PrimeIterator it;
bool new_key = false;
@ -530,6 +474,58 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, u
return res;
}
OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view dest,
ListDir src_dir, ListDir dest_dir, bool conclude_on_error) {
DCHECK_EQ(2u, trans->unique_shard_cnt());
OpResult<string> find_res[2];
OpResult<string> result;
// Transaction is comprised of 2 hops:
// 1 - check for entries existence, their types and if possible -
// read the value we may move from the source list.
// 2. If everything is ok, pop from source and push the peeked value into
// the destination.
//
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
DCHECK_EQ(1u, args.size());
bool is_dest = args.front() == dest;
find_res[is_dest] = Peek(t->GetOpArgs(shard), args.front(), src_dir, !is_dest);
return OpStatus::OK;
};
trans->Execute(move(cb), false);
if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) {
result = find_res[0] ? find_res[1] : find_res[0];
if (conclude_on_error) {
auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
trans->Execute(move(cb), true);
}
} else {
// Everything is ok, lets proceed with the mutations.
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
bool is_dest = args.front() == dest;
OpArgs op_args = t->GetOpArgs(shard);
if (is_dest) {
string_view val{find_res[0].value()};
absl::Span<string_view> span{&val, 1};
OpPush(op_args, args.front(), dest_dir, false, span);
} else {
OpPop(op_args, args.front(), src_dir, 1, false);
}
return OpStatus::OK;
};
trans->Execute(move(cb), true);
result = std::move(find_res[0].value());
}
return result;
}
OpResult<uint32_t> OpLen(const OpArgs& op_args, std::string_view key) {
auto res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST);
if (!res)
@ -783,56 +779,6 @@ OpResult<StringVec> OpRange(const OpArgs& op_args, std::string_view key, long st
return str_vec;
}
OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view dest,
ListDir src_dir, ListDir dest_dir) {
DCHECK_EQ(2u, trans->unique_shard_cnt());
OpResult<string> find_res[2];
OpResult<string> result;
// Transaction is comprised of 2 hops:
// 1 - check for entries existence, their types and if possible -
// read the value we may move from the source list.
// 2. If everything is ok, pop from source and push the peeked value into
// the destination.
//
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
DCHECK_EQ(1u, args.size());
bool is_dest = args.front() == dest;
find_res[is_dest] = Peek(t->GetOpArgs(shard), args.front(), src_dir, !is_dest);
return OpStatus::OK;
};
trans->Execute(move(cb), false);
if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) {
auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
trans->Execute(move(cb), true);
result = find_res[0] ? find_res[1] : find_res[0];
} else {
// Everything is ok, lets proceed with the mutations.
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
bool is_dest = args.front() == dest;
OpArgs op_args = t->GetOpArgs(shard);
if (is_dest) {
string_view val{find_res[0].value()};
absl::Span<string_view> span{&val, 1};
OpPush(op_args, args.front(), dest_dir, false, span);
} else {
OpPop(op_args, args.front(), src_dir, 1, false);
}
return OpStatus::OK;
};
trans->Execute(move(cb), true);
result = std::move(find_res[0].value());
}
return result;
}
void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, ListDir src_dir,
ListDir dest_dir) {
OpResult<string> result;
@ -845,7 +791,7 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis
result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
} else {
cntx->transaction->Schedule();
result = MoveTwoShards(cntx->transaction, src, dest, src_dir, dest_dir);
result = MoveTwoShards(cntx->transaction, src, dest, src_dir, dest_dir, true);
}
if (result) {
@ -902,6 +848,93 @@ void BRPopLPush(CmdArgList args, ConnectionContext* cntx) {
}
}
BPopPusher::BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir)
: pop_key_(pop_key), push_key_(push_key), popdir_(popdir), pushdir_(pushdir) {
}
OpResult<string> BPopPusher::Run(Transaction* t, unsigned msec) {
time_point tp =
msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max();
t->Schedule();
if (t->unique_shard_cnt() == 1) {
return RunSingle(t, tp);
}
return RunPair(t, tp);
}
OpResult<string> BPopPusher::RunSingle(Transaction* t, time_point tp) {
OpResult<string> op_res;
bool is_multi = t->IsMulti();
auto cb_move = [&](Transaction* t, EngineShard* shard) {
op_res = OpMoveSingleShard(t->GetOpArgs(shard), pop_key_, push_key_, popdir_, pushdir_);
return OpStatus::OK;
};
t->Execute(cb_move, false);
if (is_multi || op_res.status() != OpStatus::KEY_NOTFOUND) {
if (op_res.status() == OpStatus::KEY_NOTFOUND) {
op_res = OpStatus::TIMED_OUT;
}
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
t->Execute(std::move(cb), true);
return op_res;
}
auto* stats = ServerState::tl_connection_stats();
auto wcb = [&](Transaction* t, EngineShard* shard) {
ArgSlice keys{&this->pop_key_, 1};
return t->WatchInShard(keys, shard);
};
// Block
++stats->num_blocked_clients;
bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb));
--stats->num_blocked_clients;
if (!wait_succeeded)
return OpStatus::TIMED_OUT;
t->Execute(cb_move, true);
return op_res;
}
OpResult<string> BPopPusher::RunPair(Transaction* t, time_point tp) {
bool is_multi = t->IsMulti();
OpResult<string> op_res = MoveTwoShards(t, pop_key_, push_key_, popdir_, pushdir_, false);
if (is_multi || op_res.status() != OpStatus::KEY_NOTFOUND) {
if (op_res.status() == OpStatus::KEY_NOTFOUND) {
op_res = OpStatus::TIMED_OUT;
}
return op_res;
}
auto* stats = ServerState::tl_connection_stats();
// a hack: we watch in both shards for pop_key but only in the source shard it's relevant.
// Therefore we follow the regular flow of watching the key but for the destination shard it
// will never be triggerred.
// This allows us to run Transaction::Execute on watched transactions in both shards.
auto wcb = [&](Transaction* t, EngineShard* shard) {
ArgSlice keys{&this->pop_key_, 1};
return t->WatchInShard(keys, shard);
};
++stats->num_blocked_clients;
bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb));
--stats->num_blocked_clients;
if (!wait_succeeded)
return OpStatus::TIMED_OUT;
return MoveTwoShards(t, pop_key_, push_key_, popdir_, pushdir_, true);
}
} // namespace
void ListFamily::LPush(CmdArgList args, ConnectionContext* cntx) {

View file

@ -100,7 +100,8 @@ TEST_F(ListFamilyTest, BLPopBlocking) {
});
fibers_ext::SleepFor(30us);
pp_->at(1)->Await([&] { Run("B1", {"lpush", "x", "2", "1"}); });
RespExpr resp = pp_->at(1)->Await([&] { return Run("B1", {"lpush", "x", "2", "1"}); });
ASSERT_THAT(resp, IntArg(2));
fb0.Join();
fb1.Join();
@ -679,25 +680,61 @@ TEST_F(ListFamilyTest, BRPopLPushSingleShard) {
Run({"brpoplpush", "y", "x", "0"});
RespExpr resp = Run({"exec"});
EXPECT_THAT(resp, ArgType(RespExpr::NIL));
ASSERT_FALSE(IsLocked(0, "x"));
ASSERT_FALSE(IsLocked(0, "y"));
}
TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) {
RespExpr resp0, resp1;
RespExpr resp;
// Run the fiber at creation.
auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
resp0 = Run({"brpoplpush", "x", "y", "0"});
resp = Run({"brpoplpush", "x", "y", "0"});
});
fibers_ext::SleepFor(30us);
pp_->at(1)->Await([&] { Run("B1", {"lpush", "y", "2"}); });
pp_->at(1)->Await([&] { Run("B1", {"lpush", "x", "1"}); });
fb0.Join();
ASSERT_EQ(resp0, "1");
ASSERT_EQ(resp, "1");
ASSERT_FALSE(IsLocked(0, "x"));
ASSERT_FALSE(IsLocked(0, "y"));
}
TEST_F(ListFamilyTest, BRPopLPushTwoShards) {
RespExpr resp;
EXPECT_THAT(Run({"brpoplpush", "x", "z", "0.05"}), ArgType(RespExpr::NIL));
Run({"lpush", "x", "val"});
EXPECT_EQ(Run({"brpoplpush", "x", "z", "0"}), "val");
resp = Run({"lrange", "z", "0", "-1"});
ASSERT_EQ(resp, "val");
Run({"del", "z"});
// Run the fiber at creation.
auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
resp = Run({"brpoplpush", "x", "z", "0"});
});
fibers_ext::SleepFor(30us);
RespExpr resp_push = pp_->at(1)->Await([&] { return Run("B1", {"lpush", "z", "val2"}); });
ASSERT_THAT(resp_push, IntArg(1));
resp_push = pp_->at(1)->Await([&] { return Run("B1", {"lpush", "x", "val1"}); });
ASSERT_THAT(resp_push, IntArg(1));
fb0.Join();
// Result of brpoplpush above.
ASSERT_EQ(resp, "val1");
resp = Run({"lrange", "z", "0", "-1"});
ASSERT_THAT(resp, ArrLen(2));
ASSERT_THAT(resp.GetVec(), ElementsAre("val1", "val2"));
ASSERT_FALSE(IsLocked(0, "x"));
ASSERT_FALSE(IsLocked(0, "z"));
// TODO: there is a bug here.
// we do not wake the dest shard, when source is awaked which prevents
// the atomicity and causes the first bug as well.
}
} // namespace dfly

View file

@ -350,9 +350,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
// if transaction is suspended (blocked in watched queue), then it's a noop.
OpStatus status = OpStatus::OK;
if (!was_suspended) {
status = cb_(this, shard);
}
status = cb_(this, shard);
if (unique_shard_cnt_ == 1) {
cb_ = nullptr; // We can do it because only a single thread runs the callback.