diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 000000000..b2e4a7160 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,31 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Dragonfly GDB", + "type": "cppdbg", + "request": "launch", + "program": "${workspaceFolder}/build-dbg/dragonfly", + "args": [ + "--alsologtostderr" + ], + "stopAtEntry": false, + "cwd": "${workspaceFolder}/build-dbg", + "environment": [], + "externalConsole": false, + "MIMode": "gdb", + "setupCommands": [ + { + "description": "Enable pretty-printing for gdb", + "text": "-enable-pretty-printing", + "ignoreFailures": true + }, + { + "description": "Set Disassembly Flavor to Intel", + "text": "-gdb-set disassembly-flavor intel", + "ignoreFailures": true + } + ] + } + ] +} diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 85fe7fa1d..2e3b48c9b 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -620,7 +620,7 @@ void SendAtLeastOneKeyError(ConnectionContext* cntx) { (*cntx)->SendError(absl::StrCat("at least 1 input key is needed for ", name)); } -enum class AggType : uint8_t { SUM, MIN, MAX }; +enum class AggType : uint8_t { SUM, MIN, MAX, NOOP }; using ScoredMap = absl::flat_hash_map; ScoredMap FromObject(const CompactObj& co, double weight) { @@ -650,6 +650,8 @@ double Aggregate(double v1, double v2, AggType atype) { return max(v1, v2); case AggType::MIN: return min(v1, v2); + case AggType::NOOP: + return 0; } return 0; } @@ -766,6 +768,15 @@ OpResult OpUnion(EngineShard* shard, Transaction* t, string_view dest return UnionShardKeysWithScore(key_weight_vec, agg_type); } +ScoredMap ZSetFromSet(const PrimeValue& pv, double weight) { + ScoredMap result; + container_utils::IterateSet(pv, [&result, weight](container_utils::ContainerEntry ce) { + result.emplace(ce.ToString(), weight); + return true; + }); + return result; +} + OpResult OpInter(EngineShard* shard, Transaction* t, string_view dest, AggType agg_type, const vector& weights, bool store) { ArgSlice keys = t->GetShardArgs(shard->shard_id()); @@ -787,7 +798,7 @@ OpResult OpInter(EngineShard* shard, Transaction* t, string_view dest // In case ONLY the destination key is hosted in this shard no work on this shard should be // done in this step if (keys.empty()) { - return OpStatus::OK; + return OpStatus::SKIPPED; } } @@ -797,15 +808,17 @@ OpResult OpInter(EngineShard* shard, Transaction* t, string_view dest return OpStatus::SKIPPED; // return noop for (unsigned j = 0; j < keys.size(); ++j) { - auto it_res = db_slice.Find(t->GetDbContext(), keys[j], OBJ_ZSET); - if (it_res == OpStatus::WRONG_TYPE) // TODO: support sets with default score 1. - return it_res.status(); - - if (!it_res) + auto it_res = db_slice.FindExt(t->GetDbContext(), keys[j]).first; + if (!IsValid(it_res)) continue; // we exit in the next loop - it_arr[j] = {*it_res, GetKeyWeight(t, shard->shard_id(), weights, j + removed_keys, - cmdargs_keys_offset)}; + // sets are supported for ZINTER* commands: + auto obj_type = it_res->second.ObjType(); + if (obj_type != OBJ_ZSET && obj_type != OBJ_SET) + return OpStatus::WRONG_TYPE; + + it_arr[j] = { + it_res, GetKeyWeight(t, shard->shard_id(), weights, j + removed_keys, cmdargs_keys_offset)}; } ScoredMap result; @@ -814,7 +827,12 @@ OpResult OpInter(EngineShard* shard, Transaction* t, string_view dest return ScoredMap{}; } - ScoredMap sm = FromObject(it->first->second, it->second); + ScoredMap sm; + if (it->first->second.ObjType() == OBJ_ZSET) + sm = FromObject(it->first->second, it->second); + else + sm = ZSetFromSet(it->first->second, it->second); + if (result.empty()) result.swap(sm); else @@ -1340,6 +1358,54 @@ void ZSetFamily::ZInterStore(CmdArgList args, ConnectionContext* cntx) { (*cntx)->SendLong(smvec.size()); } +void ZSetFamily::ZInterCard(CmdArgList args, ConnectionContext* cntx) { + unsigned num_keys; + if (!absl::SimpleAtoi(ArgS(args, 0), &num_keys)) { + return (*cntx)->SendError(OpStatus::SYNTAX_ERR); + } + + uint64_t limit = 0; + if (args.size() == (1 + num_keys + 2) && ArgS(args, 1 + num_keys) == "LIMIT") { + if (!absl::SimpleAtoi(ArgS(args, 1 + num_keys + 1), &limit)) { + return (*cntx)->SendError("limit value is not a positive integer", kSyntaxErrType); + } + } else if (args.size() != 1 + num_keys) { + return (*cntx)->SendError(kSyntaxErr); + } + + vector> maps(shard_set->size(), OpStatus::SKIPPED); + + auto cb = [&](Transaction* t, EngineShard* shard) { + maps[shard->shard_id()] = OpInter(shard, t, "", AggType::NOOP, {}, false); + return OpStatus::OK; + }; + + cntx->transaction->ScheduleSingleHop(std::move(cb)); + + ScoredMap result; + for (auto& op_res : maps) { + if (op_res.status() == OpStatus::SKIPPED) + continue; + + if (!op_res) + return (*cntx)->SendError(op_res.status()); + + if (result.empty()) { + result.swap(op_res.value()); + } else { + InterScoredMap(&result, &op_res.value(), AggType::NOOP); + } + + if (result.empty()) + break; + } + + if (0 < limit && limit < result.size()) { + return (*cntx)->SendLong(limit); + } + (*cntx)->SendLong(result.size()); +} + void ZSetFamily::ZPopMax(CmdArgList args, ConnectionContext* cntx) { ZPopMinMax(std::move(args), true, cntx); } @@ -2134,6 +2200,8 @@ void ZSetFamily::Register(CommandRegistry* registry) { << CI{"ZCOUNT", CO::FAST | CO::READONLY, 4, 1, 1, 1}.HFUNC(ZCount) << CI{"ZINCRBY", CO::FAST | CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(ZIncrBy) << CI{"ZINTERSTORE", kStoreMask, -4, 3, 3, 1}.HFUNC(ZInterStore) + << CI{"ZINTERCARD", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -3, 2, 2, 1} + .HFUNC(ZInterCard) << CI{"ZLEXCOUNT", CO::READONLY, 4, 1, 1, 1}.HFUNC(ZLexCount) << CI{"ZPOPMAX", CO::FAST | CO::WRITE, -2, 1, 1, 1}.HFUNC(ZPopMax) << CI{"ZPOPMIN", CO::FAST | CO::WRITE, -2, 1, 1, 1}.HFUNC(ZPopMin) diff --git a/src/server/zset_family.h b/src/server/zset_family.h index 765262498..48e497957 100644 --- a/src/server/zset_family.h +++ b/src/server/zset_family.h @@ -60,6 +60,7 @@ class ZSetFamily { static void ZCount(CmdArgList args, ConnectionContext* cntx); static void ZIncrBy(CmdArgList args, ConnectionContext* cntx); static void ZInterStore(CmdArgList args, ConnectionContext* cntx); + static void ZInterCard(CmdArgList args, ConnectionContext* cntx); static void ZLexCount(CmdArgList args, ConnectionContext* cntx); static void ZPopMax(CmdArgList args, ConnectionContext* cntx); static void ZPopMin(CmdArgList args, ConnectionContext* cntx); diff --git a/src/server/zset_family_test.cc b/src/server/zset_family_test.cc index a17ca5be9..e205bb0a5 100644 --- a/src/server/zset_family_test.cc +++ b/src/server/zset_family_test.cc @@ -421,6 +421,32 @@ TEST_F(ZSetFamilyTest, ZInterStore) { EXPECT_EQ(1, CheckedInt({"zinterstore", "a", "2", "z1", "z2"})); resp = Run({"zrange", "a", "0", "-1", "withscores"}); EXPECT_THAT(resp.GetVec(), ElementsAre("b", "4")); + + // support for sets + EXPECT_EQ(2, CheckedInt({"sadd", "s2", "b", "c"})); + EXPECT_EQ(1, CheckedInt({"zinterstore", "b", "2", "z1", "s2"})); + resp = Run({"zrange", "b", "0", "-1", "withscores"}); + EXPECT_THAT(resp.GetVec(), ElementsAre("b", "3")); +} + +TEST_F(ZSetFamilyTest, ZInterCard) { + EXPECT_EQ(3, CheckedInt({"zadd", "z1", "1", "a", "2", "b", "3", "c"})); + EXPECT_EQ(3, CheckedInt({"zadd", "z2", "2", "b", "3", "c", "4", "d"})); + RespExpr resp; + + EXPECT_EQ(2, CheckedInt({"zintercard", "2", "z1", "z2"})); + EXPECT_EQ(1, CheckedInt({"zintercard", "2", "z1", "z2", "LIMIT", "1"})); + + resp = Run({"zintercard", "2", "z1", "z2", "LIM"}); + EXPECT_THAT(resp, ErrArg("syntax error")); + resp = Run({"zintercard", "2", "z1", "z2", "LIMIT"}); + EXPECT_THAT(resp, ErrArg("syntax error")); + resp = Run({"zintercard", "2", "z1", "z2", "LIMIT", "a"}); + EXPECT_THAT(resp, ErrArg("limit value is not a positive integer")); + + // support for sets + EXPECT_EQ(3, CheckedInt({"sadd", "s2", "b", "c", "d"})); + EXPECT_EQ(2, CheckedInt({"zintercard", "2", "z1", "s2"})); } TEST_F(ZSetFamilyTest, ZAddBug148) {