mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
fix: bitop do not add dst key if result is empty (#3751)
* fix bitiop creating the dst key if result is empty * fix replicating dst with the wrong type * make bitop a blind update (similar to set command) --------- Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
parent
987e6feaa5
commit
105c2bd761
3 changed files with 70 additions and 13 deletions
|
@ -285,11 +285,16 @@ class ElementAccess {
|
|||
EngineShard* shard_ = nullptr;
|
||||
mutable DbSlice::AutoUpdater post_updater_;
|
||||
|
||||
void SetFields(EngineShard* shard, DbSlice::AddOrFindResult res);
|
||||
|
||||
public:
|
||||
ElementAccess(std::string_view key, const OpArgs& args) : key_{key}, context_{args.db_cntx} {
|
||||
}
|
||||
|
||||
OpStatus Find(EngineShard* shard);
|
||||
// Still finds the element even if it's WRONG_TYPE. This is used for blind updates.
|
||||
// See BITOP operation.
|
||||
OpStatus FindAllowWrongType(EngineShard* shard);
|
||||
|
||||
bool IsNewEntry() const {
|
||||
CHECK_NOTNULL(shard_);
|
||||
|
@ -317,6 +322,13 @@ std::optional<bool> ElementAccess::Exists(EngineShard* shard) {
|
|||
return res.status() != OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
|
||||
void ElementAccess::SetFields(EngineShard* shard, DbSlice::AddOrFindResult res) {
|
||||
element_iter_ = res.it;
|
||||
added_ = res.is_new;
|
||||
shard_ = shard;
|
||||
post_updater_ = std::move(res.post_updater);
|
||||
}
|
||||
|
||||
OpStatus ElementAccess::Find(EngineShard* shard) {
|
||||
auto op_res = context_.GetDbSlice(shard->shard_id()).AddOrFind(context_, key_);
|
||||
RETURN_ON_BAD_STATUS(op_res);
|
||||
|
@ -325,10 +337,17 @@ OpStatus ElementAccess::Find(EngineShard* shard) {
|
|||
if (!add_res.is_new && add_res.it->second.ObjType() != OBJ_STRING) {
|
||||
return OpStatus::WRONG_TYPE;
|
||||
}
|
||||
element_iter_ = add_res.it;
|
||||
added_ = add_res.is_new;
|
||||
shard_ = shard;
|
||||
post_updater_ = std::move(add_res.post_updater);
|
||||
|
||||
SetFields(shard, std::move(add_res));
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
OpStatus ElementAccess::FindAllowWrongType(EngineShard* shard) {
|
||||
auto op_res = context_.GetDbSlice(shard->shard_id()).AddOrFind(context_, key_);
|
||||
RETURN_ON_BAD_STATUS(op_res);
|
||||
auto& add_res = *op_res;
|
||||
|
||||
SetFields(shard, std::move(add_res));
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
|
@ -343,8 +362,18 @@ std::string ElementAccess::Value() const {
|
|||
|
||||
void ElementAccess::Commit(std::string_view new_value) const {
|
||||
if (shard_) {
|
||||
element_iter_->second.SetString(new_value);
|
||||
post_updater_.Run();
|
||||
if (new_value.empty()) {
|
||||
if (!IsNewEntry()) {
|
||||
post_updater_.Run();
|
||||
} else {
|
||||
// No need to run, it was a new entry and it got removed
|
||||
post_updater_.Cancel();
|
||||
}
|
||||
context_.GetDbSlice(shard_->shard_id()).Del(context_, element_iter_);
|
||||
} else {
|
||||
element_iter_->second.SetString(new_value);
|
||||
post_updater_.Run();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1159,7 +1188,7 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
|
|||
cntx->transaction->Execute(std::move(shard_bitop), false); // we still have more work to do
|
||||
// All result from each shard
|
||||
const auto joined_results = CombineResultOp(result_set, op);
|
||||
// Second phase - save to targe key if successful
|
||||
// Second phase - save to target key if successful
|
||||
if (!joined_results) {
|
||||
cntx->transaction->Conclude();
|
||||
cntx->SendError(joined_results.status());
|
||||
|
@ -1169,14 +1198,24 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
|
|||
auto store_cb = [&](Transaction* t, EngineShard* shard) {
|
||||
if (shard->shard_id() == dest_shard) {
|
||||
ElementAccess operation{dest_key, t->GetOpArgs(shard)};
|
||||
auto find_res = operation.Find(shard);
|
||||
auto find_res = operation.FindAllowWrongType(shard);
|
||||
|
||||
if (find_res == OpStatus::OK) {
|
||||
// BITOP command acts as a blind update. If the key existed and its type
|
||||
// was not a string we still want to Commit with the new value.
|
||||
if (find_res == OpStatus::OK || find_res == OpStatus::WRONG_TYPE) {
|
||||
operation.Commit(op_result);
|
||||
}
|
||||
|
||||
if (shard->journal()) {
|
||||
RecordJournal(t->GetOpArgs(shard), "SET", {dest_key, op_result});
|
||||
if (shard->journal()) {
|
||||
if (op_result.empty()) {
|
||||
// We need to delete it if the key exists. If it doesn't, we just
|
||||
// skip it and do not send it to the replica at all.
|
||||
if (!operation.IsNewEntry()) {
|
||||
RecordJournal(t->GetOpArgs(shard), "DEL", {dest_key});
|
||||
}
|
||||
} else {
|
||||
RecordJournal(t->GetOpArgs(shard), "SET", {dest_key, op_result});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return OpStatus::OK;
|
||||
|
|
|
@ -427,7 +427,16 @@ TEST_F(BitOpsFamilyTest, BitOpsNot) {
|
|||
// Make sure that this works with none existing key as well
|
||||
EXPECT_EQ(0, CheckedInt({"bitop", "NOT", "bit-op-not-none-existing-key-results",
|
||||
"this-key-do-not-exists"}));
|
||||
EXPECT_EQ(Run({"get", "bit-op-not-none-existing-key-results"}), "");
|
||||
ASSERT_THAT(Run({"get", "bit-op-not-none-existing-key-results"}), ArgType(RespExpr::Type::NIL));
|
||||
|
||||
EXPECT_EQ(Run({"set", "foo", "bar"}), "OK");
|
||||
EXPECT_EQ(0, CheckedInt({"bitop", "NOT", "foo", "this-key-do-not-exists"}));
|
||||
ASSERT_THAT(Run({"get", "foo"}), ArgType(RespExpr::Type::NIL));
|
||||
|
||||
// Change the type of foo. Bitops is similar to set command. It's a blind update.
|
||||
ASSERT_THAT(Run({"hset", "foo", "bar", "val"}), IntArg(1));
|
||||
EXPECT_EQ(0, CheckedInt({"bitop", "NOT", "foo", "this-key-do-not-exists"}));
|
||||
ASSERT_THAT(Run({"get", "foo"}), ArgType(RespExpr::Type::NIL));
|
||||
|
||||
// test bitop not
|
||||
resp = Run({"set", KEY_VALUES_BIT_OP[0].first, KEY_VALUES_BIT_OP[0].second});
|
||||
|
|
|
@ -673,6 +673,15 @@ async def test_rewrites(df_factory):
|
|||
await skip_cmd()
|
||||
# Check BITOP turns into SET
|
||||
await check("BITOP OR kdest k1 k2", r"SET kdest 1100")
|
||||
# See gh issue #3528
|
||||
await c_master.execute_command(f"HSET foo bar val")
|
||||
await skip_cmd()
|
||||
await check("BITOP NOT foo tmp", r"DEL foo")
|
||||
await c_master.execute_command(f"HSET foo bar val")
|
||||
await skip_cmd()
|
||||
await c_master.set("k3", "-")
|
||||
await skip_cmd()
|
||||
await check("BITOP NOT foo k3", r"SET foo \\xd2")
|
||||
|
||||
# Check there is no rewrite for LMOVE on single shard
|
||||
await c_master.lpush("list", "v1", "v2", "v3", "v4")
|
||||
|
|
Loading…
Reference in a new issue