diff --git a/server/dragonfly_test.cc b/server/dragonfly_test.cc index e011a4b68..c849ddd32 100644 --- a/server/dragonfly_test.cc +++ b/server/dragonfly_test.cc @@ -28,8 +28,12 @@ namespace { constexpr unsigned kPoolThreadCount = 4; -} // namespace +const char kKey1[] = "x"; +const char kKey2[] = "b"; +const char kKey3[] = "c"; +const char kKey4[] = "y"; +} // namespace // This test is responsible for server and main service // (connection, transaction etc) families. @@ -44,10 +48,10 @@ TEST_F(DflyEngineTest, Multi) { RespVec resp = Run({"multi"}); ASSERT_THAT(resp, RespEq("OK")); - resp = Run({"get", "x"}); + resp = Run({"get", kKey1}); ASSERT_THAT(resp, RespEq("QUEUED")); - resp = Run({"get", "y"}); + resp = Run({"get", kKey4}); ASSERT_THAT(resp, RespEq("QUEUED")); resp = Run({"exec"}); @@ -61,11 +65,94 @@ TEST_F(DflyEngineTest, Multi) { }); EXPECT_TRUE(tx_empty); - resp = Run({"get", "y"}); + resp = Run({"get", kKey4}); ASSERT_THAT(resp, ElementsAre(ArgType(RespExpr::NIL))); - ASSERT_FALSE(service_->IsLocked(0, "x")); - ASSERT_FALSE(service_->IsLocked(0, "y")); + ASSERT_FALSE(service_->IsLocked(0, kKey1)); + ASSERT_FALSE(service_->IsLocked(0, kKey4)); +} + + +TEST_F(DflyEngineTest, MultiEmpty) { + RespVec resp = Run({"multi"}); + ASSERT_THAT(resp, RespEq("OK")); + resp = Run({"exec"}); + + ASSERT_THAT(resp[0], ArrLen(0)); +} + +TEST_F(DflyEngineTest, MultiSeq) { + RespVec resp = Run({"multi"}); + ASSERT_THAT(resp, RespEq("OK")); + + resp = Run({"set", kKey1, absl::StrCat(1)}); + ASSERT_THAT(resp, RespEq("QUEUED")); + resp = Run({"get", kKey1}); + ASSERT_THAT(resp, RespEq("QUEUED")); + resp = Run({"mget", kKey1, kKey4}); + ASSERT_THAT(resp, RespEq("QUEUED")); + resp = Run({"exec"}); + + ASSERT_FALSE(service_->IsLocked(0, kKey1)); + ASSERT_FALSE(service_->IsLocked(0, kKey4)); + + EXPECT_THAT(resp, ElementsAre(StrArg("OK"), StrArg("1"), ArrLen(2))); + const RespExpr::Vec& arr = *get(resp[2].u); + ASSERT_THAT(arr, ElementsAre("1", ArgType(RespExpr::NIL))); +} + +TEST_F(DflyEngineTest, MultiConsistent) { + auto mset_fb = pp_->at(0)->LaunchFiber([&] { + for (size_t i = 1; i < 10; ++i) { + string base = StrCat(i * 900); + RespVec resp = Run({"mset", kKey1, base, kKey4, base}); + ASSERT_THAT(resp, RespEq("OK")); + } + }); + + auto fb = pp_->at(1)->LaunchFiber([&] { + RespVec resp = Run({"multi"}); + ASSERT_THAT(resp, RespEq("OK")); + this_fiber::sleep_for(1ms); + + resp = Run({"get", kKey1}); + ASSERT_THAT(resp, RespEq("QUEUED")); + + resp = Run({"get", kKey4}); + ASSERT_THAT(resp, RespEq("QUEUED")); + + resp = Run({"mget", kKey4, kKey1}); + ASSERT_THAT(resp, RespEq("QUEUED")); + + resp = Run({"exec"}); + + EXPECT_THAT(resp, ElementsAre(ArgType(RespExpr::STRING), ArgType(RespExpr::STRING), + ArgType(RespExpr::ARRAY))); + ASSERT_EQ(resp[0].GetBuf(), resp[1].GetBuf()); + const RespVec& arr = *get(resp[2].u); + EXPECT_THAT(arr, ElementsAre(ArgType(RespExpr::STRING), ArgType(RespExpr::STRING))); + EXPECT_EQ(arr[0].GetBuf(), arr[1].GetBuf()); + EXPECT_EQ(arr[0].GetBuf(), resp[0].GetBuf()); + }); + + mset_fb.join(); + fb.join(); + ASSERT_FALSE(service_->IsLocked(0, kKey1)); + ASSERT_FALSE(service_->IsLocked(0, kKey4)); +} + +TEST_F(DflyEngineTest, MultiRename) { + RespVec resp = Run({"multi"}); + ASSERT_THAT(resp, RespEq("OK")); + Run({"set", kKey1, "1"}); + + resp = Run({"rename", kKey1, kKey4}); + ASSERT_THAT(resp, RespEq("QUEUED")); + resp = Run({"exec"}); + + EXPECT_THAT(resp, ElementsAre(StrArg("OK"), StrArg("OK"))); + ASSERT_FALSE(service_->IsLocked(0, kKey1)); + ASSERT_FALSE(service_->IsLocked(0, kKey4)); } } // namespace dfly diff --git a/server/generic_family.cc b/server/generic_family.cc index 38c3d676b..737ba1769 100644 --- a/server/generic_family.cc +++ b/server/generic_family.cc @@ -40,7 +40,7 @@ class Renamer { DbIndex db_indx_; ShardId src_sid_; - std::pair find_res_[2]; + pair find_res_[2]; uint64_t expire_; MainValue src_val_; @@ -76,7 +76,7 @@ void Renamer::SwapValues(EngineShard* shard, const ArgSlice& args) { shard->db_slice().Expire(db_indx_, dest_it, expire_); } else { // we just add the key to destination with the source object. - std::string_view key = args.front(); // from key + string_view key = args.front(); // from key shard->db_slice().AddNew(db_indx_, key, std::move(src_val_), expire_); } } @@ -152,7 +152,7 @@ void GenericFamily::Ping(CmdArgList args, ConnectionContext* cntx) { if (args.size() == 1) { return cntx->SendSimpleRespString("PONG"); } else { - std::string_view arg = ArgS(args, 1); + string_view arg = ArgS(args, 1); DVLOG(2) << "Ping " << arg; return cntx->SendBulkString(arg); @@ -180,8 +180,8 @@ void GenericFamily::Exists(CmdArgList args, ConnectionContext* cntx) { } void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) { - std::string_view key = ArgS(args, 1); - std::string_view sec = ArgS(args, 2); + string_view key = ArgS(args, 1); + string_view sec = ArgS(args, 2); int64_t int_arg; if (!absl::SimpleAtoi(sec, &int_arg)) { @@ -194,14 +194,14 @@ void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { return OpExpire(OpArgs{shard, t->db_index()}, key, params); }; - OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); + OpStatus status = cntx->transaction->ScheduleSingleHop(move(cb)); cntx->SendLong(status == OpStatus::OK); } void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) { - std::string_view key = ArgS(args, 1); - std::string_view sec = ArgS(args, 2); + string_view key = ArgS(args, 1); + string_view sec = ArgS(args, 2); int64_t int_arg; if (!absl::SimpleAtoi(sec, &int_arg)) { @@ -231,7 +231,7 @@ void GenericFamily::Pttl(CmdArgList args, ConnectionContext* cntx) { } void GenericFamily::TtlGeneric(CmdArgList args, ConnectionContext* cntx, TimeUnit unit) { - std::string_view key = ArgS(args, 1); + string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { return OpTtl(t, shard, key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -251,7 +251,7 @@ void GenericFamily::TtlGeneric(CmdArgList args, ConnectionContext* cntx, TimeUni } void GenericFamily::Select(CmdArgList args, ConnectionContext* cntx) { - std::string_view key = ArgS(args, 1); + string_view key = ArgS(args, 1); int64_t index; if (!absl::SimpleAtoi(key, &index)) { return cntx->SendError(kInvalidDbIndErr); @@ -272,7 +272,7 @@ void GenericFamily::Select(CmdArgList args, ConnectionContext* cntx) { OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_dest, ConnectionContext* cntx) { - std::string_view key[2] = {ArgS(args, 1), ArgS(args, 2)}; + string_view key[2] = {ArgS(args, 1), ArgS(args, 2)}; Transaction* transaction = cntx->transaction; @@ -305,11 +305,11 @@ OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des } void GenericFamily::Echo(CmdArgList args, ConnectionContext* cntx) { - std::string_view key = ArgS(args, 1); + string_view key = ArgS(args, 1); return cntx->SendBulkString(key); } -OpStatus GenericFamily::OpExpire(const OpArgs& op_args, std::string_view key, +OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key, const ExpireParams& params) { auto& db_slice = op_args.shard->db_slice(); auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key); @@ -333,7 +333,7 @@ OpStatus GenericFamily::OpExpire(const OpArgs& op_args, std::string_view key, return OpStatus::OK; } -OpResult GenericFamily::OpTtl(Transaction* t, EngineShard* shard, std::string_view key) { +OpResult GenericFamily::OpTtl(Transaction* t, EngineShard* shard, string_view key) { auto& db_slice = shard->db_slice(); auto [it, expire] = db_slice.FindExt(t->db_index(), key); if (!IsValid(it)) @@ -375,25 +375,37 @@ OpResult GenericFamily::OpExists(const OpArgs& op_args, ArgSlice keys) return res; } -OpResult GenericFamily::OpRen(const OpArgs& op_args, std::string_view from, - std::string_view to, bool skip_exists) { +OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from, + string_view to, bool skip_exists) { auto& db_slice = op_args.shard->db_slice(); auto [from_it, expire_it] = db_slice.FindExt(op_args.db_ind, from); if (!IsValid(from_it)) return OpStatus::KEY_NOTFOUND; - auto to_de = db_slice.FindExt(op_args.db_ind, to); - if (IsValid(to_de.first)) { + auto [to_it, to_expire] = db_slice.FindExt(op_args.db_ind, to); + if (IsValid(to_it)) { if (skip_exists) return OpStatus::KEY_EXISTS; - - CHECK(db_slice.Del(op_args.db_ind, to_de.first)); } uint64_t exp_ts = IsValid(expire_it) ? expire_it->second : 0; - db_slice.AddNew(op_args.db_ind, to, std::move(from_it->second), exp_ts); - CHECK(db_slice.Del(op_args.db_ind, from_it)); + if (IsValid(to_it)) { + to_it->second = std::move(from_it->second); + from_it->second.SetExpire(IsValid(expire_it)); + if (IsValid(to_expire)) { + to_it->second.SetExpire(true); + to_expire->second = exp_ts; + } else { + to_it->second.SetExpire(false); + db_slice.Expire(op_args.db_ind, to_it, exp_ts); + } + } else { + db_slice.AddNew(op_args.db_ind, to, std::move(from_it->second), exp_ts); + // Need search again since the container might invalidate the iterators. + from_it = db_slice.FindExt(op_args.db_ind, from).first; + } + CHECK(db_slice.Del(op_args.db_ind, from_it)); return OpStatus::OK; }