diff --git a/docs/api_status.md b/docs/api_status.md index 6fd2af08a..01386a991 100644 --- a/docs/api_status.md +++ b/docs/api_status.md @@ -40,6 +40,7 @@ with respect to Memcached and Redis APIs. - [X] EXPIRE - [X] EXPIREAT - [X] KEYS + - [X] MOVE - [X] PING - [X] RENAME - [X] RENAMENX @@ -105,7 +106,6 @@ with respect to Memcached and Redis APIs. - [ ] BGREWRITEAOF - [ ] MONITOR - [ ] RANDOMKEY - - [ ] MOVE ### API 2 - [X] List Family diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index c77259894..60550bbeb 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -463,6 +463,38 @@ void GenericFamily::Stick(CmdArgList args, ConnectionContext* cntx) { (*cntx)->SendLong(match_cnt); } +void GenericFamily::Move(CmdArgList args, ConnectionContext* cntx) { + string_view key = ArgS(args, 1); + int64_t target_db; + + if (!absl::SimpleAtoi(ArgS(args, 2), &target_db)) { + return (*cntx)->SendError(kInvalidIntErr); + } + + if (target_db < 0 || target_db >= absl::GetFlag(FLAGS_dbnum)) { + return (*cntx)->SendError(kDbIndOutOfRangeErr); + } + + if (target_db == cntx->db_index()) { + return (*cntx)->SendError("source and destination objects are the same"); + } + + OpStatus res = OpStatus::SKIPPED; + ShardId target_shard = Shard(key, shard_set->size()); + auto cb = [&](Transaction* t, EngineShard* shard) { + // MOVE runs as a global transaction and is therefore scheduled on every shard. + if (target_shard == shard->shard_id()) { + res = OpMove(t->GetOpArgs(shard), key, target_db); + } + return OpStatus::OK; + }; + + cntx->transaction->ScheduleSingleHop(std::move(cb)); + // Exactly one shard will call OpMove. + DCHECK(res != OpStatus::SKIPPED); + (*cntx)->SendLong(res == OpStatus::OK); +} + void GenericFamily::Rename(CmdArgList args, ConnectionContext* cntx) { OpResult st = RenameGeneric(args, false, cntx); (*cntx)->SendError(st.status()); @@ -771,6 +803,43 @@ OpResult GenericFamily::OpStick(const OpArgs& op_args, ArgSlice keys) return res; } +// OpMove touches multiple databases (op_args.db_idx, target_db), so it assumes it runs +// as a global transaction. +// TODO: Allow running OpMove without a global transaction. +OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex target_db) { + auto& db_slice = op_args.shard->db_slice(); + + // Fetch value at key in current db. + auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, key); + if (!IsValid(from_it)) + return OpStatus::KEY_NOTFOUND; + + // Fetch value at key in target db. + auto [to_it, _] = db_slice.FindExt(target_db, key); + if (IsValid(to_it)) + return OpStatus::KEY_EXISTS; + + // Ensure target database exists. + db_slice.ActivateDb(target_db); + + bool sticky = from_it->first.IsSticky(); + uint64_t exp_ts = db_slice.ExpireTime(from_expire); + PrimeValue from_obj = std::move(from_it->second); + + // Restore expire flag after std::move. + from_it->second.SetExpire(IsValid(from_expire)); + + CHECK(db_slice.Del(op_args.db_ind, from_it)); + to_it = db_slice.AddNew(target_db, key, std::move(from_obj), exp_ts); + to_it->first.SetSticky(sticky); + + if (to_it->second.ObjType() == OBJ_LIST && op_args.shard->blocking_controller()) { + op_args.shard->blocking_controller()->AwakeWatched(target_db, key); + } + + return OpStatus::OK; +} + using CI = CommandId; #define HFUNC(x) SetHandler(&GenericFamily::x) @@ -798,7 +867,8 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"PTTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Pttl) << CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, 1}.HFUNC(Type) << CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del) - << CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick); + << CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick) + << CI{"MOVE", CO::WRITE | CO::GLOBAL_TRANS, 3, 1, 1, 1}.HFUNC(Move); } } // namespace dfly diff --git a/src/server/generic_family.h b/src/server/generic_family.h index 50c43a8bb..470e754fe 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -49,6 +49,7 @@ class GenericFamily { static void Keys(CmdArgList args, ConnectionContext* cntx); static void PexpireAt(CmdArgList args, ConnectionContext* cntx); static void Stick(CmdArgList args, ConnectionContext* cntx); + static void Move(CmdArgList args, ConnectionContext* cntx); static void Rename(CmdArgList args, ConnectionContext* cntx); static void RenameNx(CmdArgList args, ConnectionContext* cntx); @@ -71,6 +72,7 @@ class GenericFamily { static OpResult OpRen(const OpArgs& op_args, std::string_view from, std::string_view to, bool skip_exists); static OpResult OpStick(const OpArgs& op_args, ArgSlice keys); + static OpStatus OpMove(const OpArgs& op_args, std::string_view key, DbIndex target_db); }; } // namespace dfly diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 1fff6a855..e008d0893 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -201,6 +201,51 @@ TEST_F(GenericFamilyTest, Stick) { ASSERT_THAT(Run({"stick", "b"}), IntArg(0)); } +TEST_F(GenericFamilyTest, Move) { + // Check MOVE returns 0 on non-existent keys + ASSERT_THAT(Run({"move", "a", "1"}), IntArg(0)); + + // Check MOVE catches non-existent database indices + ASSERT_THAT(Run({"move", "a", "-1"}), ArgType(RespExpr::ERROR)); + ASSERT_THAT(Run({"move", "a", "100500"}), ArgType(RespExpr::ERROR)); + + // Check MOVE moves value & expiry & stickyness + Run({"set", "a", "test"}); + Run({"expire", "a", "1000"}); + Run({"stick", "a"}); + ASSERT_THAT(Run({"move", "a", "1"}), IntArg(1)); + Run({"select", "1"}); + ASSERT_THAT(Run({"get", "a"}), "test"); + ASSERT_THAT(Run({"ttl", "a"}), testing::Not(IntArg(-1))); + ASSERT_THAT(Run({"stick", "a"}), IntArg(0)); + + // Check MOVE doesn't move if key exists + Run({"select", "1"}); + Run({"set", "a", "test"}); + Run({"select", "0"}); + Run({"set", "a", "another test"}); + ASSERT_THAT(Run({"move", "a", "1"}), IntArg(0)); // exists from test case above + Run({"select", "1"}); + ASSERT_THAT(Run({"get", "a"}), "test"); + + // Check MOVE awakes blocking operations + auto fb_blpop = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + Run({"select", "1"}); + auto resp = Run({"blpop", "l", "0"}); + ASSERT_THAT(resp, ArgType(RespExpr::ARRAY)); + EXPECT_THAT(resp.GetVec(), ElementsAre("l", "TestItem")); + }); + + WaitUntilLocked(1, "l"); + + pp_->at(1)->Await([&] { + Run({"select", "0"}); + Run({"lpush", "l", "TestItem"}); + Run({"move", "l", "1"}); + }); + + fb_blpop.join(); +} using testing::AnyOf; using testing::Each; diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index ba09b888b..83206304a 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -30,12 +30,6 @@ class ListFamilyTest : public BaseFamilyTest { ListFamilyTest() { num_threads_ = 4; } - - void WaitForLocked(string_view key) { - do { - this_fiber::sleep_for(30us); - } while (!IsLocked(0, key)); - } }; const char kKey1[] = "x"; @@ -187,7 +181,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) { blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"}); }); - WaitForLocked(kKey1); + WaitUntilLocked(0, kKey1); auto p1_fb = pp_->at(1)->LaunchFiber([&] { for (unsigned i = 0; i < 100; ++i) { @@ -225,7 +219,7 @@ TEST_F(ListFamilyTest, BLPopSerialize) { blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"}); }); - WaitForLocked(kKey1); + WaitUntilLocked(0, kKey1); LOG(INFO) << "Starting multi"; @@ -295,7 +289,7 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) { blpop_resp = Run({"blpop", kKey1, "0"}); }); - WaitForLocked(kKey1); + WaitUntilLocked(0, kKey1); auto p1_fb = pp_->at(1)->LaunchFiber([&] { Run({"multi"}); @@ -324,7 +318,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) { ASSERT_THAT(watched, ArrLen(0)); }); - WaitForLocked(kKey1); + WaitUntilLocked(0, kKey1); pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"})); }); pop_fb.join(); @@ -336,7 +330,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) { blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"}); }); - WaitForLocked(kKey1); + WaitUntilLocked(0, kKey1); pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey2, "bar"})); }); pop_fb.join(); @@ -358,7 +352,7 @@ TEST_F(ListFamilyTest, BPopTwoKeysSameShard) { ASSERT_THAT(watched, ArrLen(0)); }); - WaitForLocked("x"); + WaitUntilLocked(0, "x"); pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", "x", "bar"})); }); pop_fb.join(); @@ -377,7 +371,7 @@ TEST_F(ListFamilyTest, BPopRename) { blpop_resp = Run({"blpop", kKey1, "0"}); }); - WaitForLocked(kKey1); + WaitUntilLocked(0, kKey1); pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", "a", "bar"})); @@ -395,7 +389,7 @@ TEST_F(ListFamilyTest, BPopFlush) { blpop_resp = Run({"blpop", kKey1, "0"}); }); - WaitForLocked(kKey1); + WaitUntilLocked(0, kKey1); pp_->at(1)->Await([&] { Run({"flushdb"}); diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index df234dafb..f88ce5e53 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -157,6 +157,16 @@ void BaseFamilyTest::UpdateTime(uint64_t ms) { shard_set->RunBriefInParallel(cb); } +void BaseFamilyTest::WaitUntilLocked(DbIndex db_index, string_view key, double timeout) { + auto step = 50us; + auto timeout_micro = chrono::duration_cast (1000ms * timeout); + int64_t steps = timeout_micro.count() / step.count(); + do { + ::boost::this_fiber::sleep_for(step); + } while (!IsLocked(db_index, key) && --steps > 0); + CHECK(IsLocked(db_index, key)); +} + RespExpr BaseFamilyTest::Run(ArgSlice list) { if (!ProactorBase::IsProactorThread()) { return pp_->at(0)->Await([&] { return this->Run(list); }); diff --git a/src/server/test_utils.h b/src/server/test_utils.h index d10ba9f71..bb963c42a 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -71,6 +71,9 @@ class BaseFamilyTest : public ::testing::Test { // ts is ms void UpdateTime(uint64_t ms); + // Wait for a locked key to unlock. Aborts after timeout seconds passed. + void WaitUntilLocked(DbIndex db_index, std::string_view key, double timeout = 3); + std::string GetId() const; size_t SubscriberMessagesLen(std::string_view conn_id) const; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 09881009a..c792d1e4a 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -398,13 +398,14 @@ bool Transaction::RunInShard(EngineShard* shard) { } } sd.local_mask &= ~OUT_OF_ORDER; - - // It has 2 responsibilities. - // 1: to go over potential wakened keys, verify them and activate watch queues. - // 2: if this transaction was notified and finished running - to remove it from the head - // of the queue and notify the next one. - if (shard->blocking_controller()) - shard->blocking_controller()->RunStep(awaked_prerun ? this : nullptr); + } + // It has 2 responsibilities. + // 1: to go over potential wakened keys, verify them and activate watch queues. + // 2: if this transaction was notified and finished running - to remove it from the head + // of the queue and notify the next one. + // RunStep is also called for global transactions because of commands like MOVE. + if (shard->blocking_controller()) { + shard->blocking_controller()->RunStep(awaked_prerun ? this : nullptr); } }