From 105c2bd761c96fb426830aeef850613d02af1b0a Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Wed, 25 Sep 2024 09:45:20 +0300 Subject: [PATCH] fix: bitop do not add dst key if result is empty (#3751) * fix bitiop creating the dst key if result is empty * fix replicating dst with the wrong type * make bitop a blind update (similar to set command) --------- Signed-off-by: kostas --- src/server/bitops_family.cc | 63 +++++++++++++++++++++++------ src/server/bitops_family_test.cc | 11 ++++- tests/dragonfly/replication_test.py | 9 +++++ 3 files changed, 70 insertions(+), 13 deletions(-) diff --git a/src/server/bitops_family.cc b/src/server/bitops_family.cc index 9b5414155..3ee455177 100644 --- a/src/server/bitops_family.cc +++ b/src/server/bitops_family.cc @@ -285,11 +285,16 @@ class ElementAccess { EngineShard* shard_ = nullptr; mutable DbSlice::AutoUpdater post_updater_; + void SetFields(EngineShard* shard, DbSlice::AddOrFindResult res); + public: ElementAccess(std::string_view key, const OpArgs& args) : key_{key}, context_{args.db_cntx} { } OpStatus Find(EngineShard* shard); + // Still finds the element even if it's WRONG_TYPE. This is used for blind updates. + // See BITOP operation. + OpStatus FindAllowWrongType(EngineShard* shard); bool IsNewEntry() const { CHECK_NOTNULL(shard_); @@ -317,6 +322,13 @@ std::optional ElementAccess::Exists(EngineShard* shard) { return res.status() != OpStatus::KEY_NOTFOUND; } +void ElementAccess::SetFields(EngineShard* shard, DbSlice::AddOrFindResult res) { + element_iter_ = res.it; + added_ = res.is_new; + shard_ = shard; + post_updater_ = std::move(res.post_updater); +} + OpStatus ElementAccess::Find(EngineShard* shard) { auto op_res = context_.GetDbSlice(shard->shard_id()).AddOrFind(context_, key_); RETURN_ON_BAD_STATUS(op_res); @@ -325,10 +337,17 @@ OpStatus ElementAccess::Find(EngineShard* shard) { if (!add_res.is_new && add_res.it->second.ObjType() != OBJ_STRING) { return OpStatus::WRONG_TYPE; } - element_iter_ = add_res.it; - added_ = add_res.is_new; - shard_ = shard; - post_updater_ = std::move(add_res.post_updater); + + SetFields(shard, std::move(add_res)); + return OpStatus::OK; +} + +OpStatus ElementAccess::FindAllowWrongType(EngineShard* shard) { + auto op_res = context_.GetDbSlice(shard->shard_id()).AddOrFind(context_, key_); + RETURN_ON_BAD_STATUS(op_res); + auto& add_res = *op_res; + + SetFields(shard, std::move(add_res)); return OpStatus::OK; } @@ -343,8 +362,18 @@ std::string ElementAccess::Value() const { void ElementAccess::Commit(std::string_view new_value) const { if (shard_) { - element_iter_->second.SetString(new_value); - post_updater_.Run(); + if (new_value.empty()) { + if (!IsNewEntry()) { + post_updater_.Run(); + } else { + // No need to run, it was a new entry and it got removed + post_updater_.Cancel(); + } + context_.GetDbSlice(shard_->shard_id()).Del(context_, element_iter_); + } else { + element_iter_->second.SetString(new_value); + post_updater_.Run(); + } } } @@ -1159,7 +1188,7 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) { cntx->transaction->Execute(std::move(shard_bitop), false); // we still have more work to do // All result from each shard const auto joined_results = CombineResultOp(result_set, op); - // Second phase - save to targe key if successful + // Second phase - save to target key if successful if (!joined_results) { cntx->transaction->Conclude(); cntx->SendError(joined_results.status()); @@ -1169,14 +1198,24 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) { auto store_cb = [&](Transaction* t, EngineShard* shard) { if (shard->shard_id() == dest_shard) { ElementAccess operation{dest_key, t->GetOpArgs(shard)}; - auto find_res = operation.Find(shard); + auto find_res = operation.FindAllowWrongType(shard); - if (find_res == OpStatus::OK) { + // BITOP command acts as a blind update. If the key existed and its type + // was not a string we still want to Commit with the new value. + if (find_res == OpStatus::OK || find_res == OpStatus::WRONG_TYPE) { operation.Commit(op_result); - } - if (shard->journal()) { - RecordJournal(t->GetOpArgs(shard), "SET", {dest_key, op_result}); + if (shard->journal()) { + if (op_result.empty()) { + // We need to delete it if the key exists. If it doesn't, we just + // skip it and do not send it to the replica at all. + if (!operation.IsNewEntry()) { + RecordJournal(t->GetOpArgs(shard), "DEL", {dest_key}); + } + } else { + RecordJournal(t->GetOpArgs(shard), "SET", {dest_key, op_result}); + } + } } } return OpStatus::OK; diff --git a/src/server/bitops_family_test.cc b/src/server/bitops_family_test.cc index c67e5b277..c654ff57b 100644 --- a/src/server/bitops_family_test.cc +++ b/src/server/bitops_family_test.cc @@ -427,7 +427,16 @@ TEST_F(BitOpsFamilyTest, BitOpsNot) { // Make sure that this works with none existing key as well EXPECT_EQ(0, CheckedInt({"bitop", "NOT", "bit-op-not-none-existing-key-results", "this-key-do-not-exists"})); - EXPECT_EQ(Run({"get", "bit-op-not-none-existing-key-results"}), ""); + ASSERT_THAT(Run({"get", "bit-op-not-none-existing-key-results"}), ArgType(RespExpr::Type::NIL)); + + EXPECT_EQ(Run({"set", "foo", "bar"}), "OK"); + EXPECT_EQ(0, CheckedInt({"bitop", "NOT", "foo", "this-key-do-not-exists"})); + ASSERT_THAT(Run({"get", "foo"}), ArgType(RespExpr::Type::NIL)); + + // Change the type of foo. Bitops is similar to set command. It's a blind update. + ASSERT_THAT(Run({"hset", "foo", "bar", "val"}), IntArg(1)); + EXPECT_EQ(0, CheckedInt({"bitop", "NOT", "foo", "this-key-do-not-exists"})); + ASSERT_THAT(Run({"get", "foo"}), ArgType(RespExpr::Type::NIL)); // test bitop not resp = Run({"set", KEY_VALUES_BIT_OP[0].first, KEY_VALUES_BIT_OP[0].second}); diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index dfc63bcad..c3c6f2c89 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -673,6 +673,15 @@ async def test_rewrites(df_factory): await skip_cmd() # Check BITOP turns into SET await check("BITOP OR kdest k1 k2", r"SET kdest 1100") + # See gh issue #3528 + await c_master.execute_command(f"HSET foo bar val") + await skip_cmd() + await check("BITOP NOT foo tmp", r"DEL foo") + await c_master.execute_command(f"HSET foo bar val") + await skip_cmd() + await c_master.set("k3", "-") + await skip_cmd() + await check("BITOP NOT foo k3", r"SET foo \\xd2") # Check there is no rewrite for LMOVE on single shard await c_master.lpush("list", "v1", "v2", "v3", "v4")