mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
feat: implement ZINTERCARD (#1197)
* fix: ZINTERSTORE bug When a shard only contains the dest key, it returned an empty map which causes the resulting intersection to be empty * chore(vscode): Add gdb launch setting * feat: Implement ZINTERCARD Initial implementation without LIMIT * feat: Implement limit for ZINTERCARD * feat: Handle sets in ZINTER* commands
This commit is contained in:
parent
eec09a13c7
commit
cb80b3fd1f
4 changed files with 136 additions and 10 deletions
31
.vscode/launch.json
vendored
Normal file
31
.vscode/launch.json
vendored
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
{
|
||||||
|
"version": "0.2.0",
|
||||||
|
"configurations": [
|
||||||
|
{
|
||||||
|
"name": "Dragonfly GDB",
|
||||||
|
"type": "cppdbg",
|
||||||
|
"request": "launch",
|
||||||
|
"program": "${workspaceFolder}/build-dbg/dragonfly",
|
||||||
|
"args": [
|
||||||
|
"--alsologtostderr"
|
||||||
|
],
|
||||||
|
"stopAtEntry": false,
|
||||||
|
"cwd": "${workspaceFolder}/build-dbg",
|
||||||
|
"environment": [],
|
||||||
|
"externalConsole": false,
|
||||||
|
"MIMode": "gdb",
|
||||||
|
"setupCommands": [
|
||||||
|
{
|
||||||
|
"description": "Enable pretty-printing for gdb",
|
||||||
|
"text": "-enable-pretty-printing",
|
||||||
|
"ignoreFailures": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"description": "Set Disassembly Flavor to Intel",
|
||||||
|
"text": "-gdb-set disassembly-flavor intel",
|
||||||
|
"ignoreFailures": true
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
|
@ -620,7 +620,7 @@ void SendAtLeastOneKeyError(ConnectionContext* cntx) {
|
||||||
(*cntx)->SendError(absl::StrCat("at least 1 input key is needed for ", name));
|
(*cntx)->SendError(absl::StrCat("at least 1 input key is needed for ", name));
|
||||||
}
|
}
|
||||||
|
|
||||||
enum class AggType : uint8_t { SUM, MIN, MAX };
|
enum class AggType : uint8_t { SUM, MIN, MAX, NOOP };
|
||||||
using ScoredMap = absl::flat_hash_map<std::string, double>;
|
using ScoredMap = absl::flat_hash_map<std::string, double>;
|
||||||
|
|
||||||
ScoredMap FromObject(const CompactObj& co, double weight) {
|
ScoredMap FromObject(const CompactObj& co, double weight) {
|
||||||
|
@ -650,6 +650,8 @@ double Aggregate(double v1, double v2, AggType atype) {
|
||||||
return max(v1, v2);
|
return max(v1, v2);
|
||||||
case AggType::MIN:
|
case AggType::MIN:
|
||||||
return min(v1, v2);
|
return min(v1, v2);
|
||||||
|
case AggType::NOOP:
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -766,6 +768,15 @@ OpResult<ScoredMap> OpUnion(EngineShard* shard, Transaction* t, string_view dest
|
||||||
return UnionShardKeysWithScore(key_weight_vec, agg_type);
|
return UnionShardKeysWithScore(key_weight_vec, agg_type);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ScoredMap ZSetFromSet(const PrimeValue& pv, double weight) {
|
||||||
|
ScoredMap result;
|
||||||
|
container_utils::IterateSet(pv, [&result, weight](container_utils::ContainerEntry ce) {
|
||||||
|
result.emplace(ce.ToString(), weight);
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
OpResult<ScoredMap> OpInter(EngineShard* shard, Transaction* t, string_view dest, AggType agg_type,
|
OpResult<ScoredMap> OpInter(EngineShard* shard, Transaction* t, string_view dest, AggType agg_type,
|
||||||
const vector<double>& weights, bool store) {
|
const vector<double>& weights, bool store) {
|
||||||
ArgSlice keys = t->GetShardArgs(shard->shard_id());
|
ArgSlice keys = t->GetShardArgs(shard->shard_id());
|
||||||
|
@ -787,7 +798,7 @@ OpResult<ScoredMap> OpInter(EngineShard* shard, Transaction* t, string_view dest
|
||||||
// In case ONLY the destination key is hosted in this shard no work on this shard should be
|
// In case ONLY the destination key is hosted in this shard no work on this shard should be
|
||||||
// done in this step
|
// done in this step
|
||||||
if (keys.empty()) {
|
if (keys.empty()) {
|
||||||
return OpStatus::OK;
|
return OpStatus::SKIPPED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -797,15 +808,17 @@ OpResult<ScoredMap> OpInter(EngineShard* shard, Transaction* t, string_view dest
|
||||||
return OpStatus::SKIPPED; // return noop
|
return OpStatus::SKIPPED; // return noop
|
||||||
|
|
||||||
for (unsigned j = 0; j < keys.size(); ++j) {
|
for (unsigned j = 0; j < keys.size(); ++j) {
|
||||||
auto it_res = db_slice.Find(t->GetDbContext(), keys[j], OBJ_ZSET);
|
auto it_res = db_slice.FindExt(t->GetDbContext(), keys[j]).first;
|
||||||
if (it_res == OpStatus::WRONG_TYPE) // TODO: support sets with default score 1.
|
if (!IsValid(it_res))
|
||||||
return it_res.status();
|
|
||||||
|
|
||||||
if (!it_res)
|
|
||||||
continue; // we exit in the next loop
|
continue; // we exit in the next loop
|
||||||
|
|
||||||
it_arr[j] = {*it_res, GetKeyWeight(t, shard->shard_id(), weights, j + removed_keys,
|
// sets are supported for ZINTER* commands:
|
||||||
cmdargs_keys_offset)};
|
auto obj_type = it_res->second.ObjType();
|
||||||
|
if (obj_type != OBJ_ZSET && obj_type != OBJ_SET)
|
||||||
|
return OpStatus::WRONG_TYPE;
|
||||||
|
|
||||||
|
it_arr[j] = {
|
||||||
|
it_res, GetKeyWeight(t, shard->shard_id(), weights, j + removed_keys, cmdargs_keys_offset)};
|
||||||
}
|
}
|
||||||
|
|
||||||
ScoredMap result;
|
ScoredMap result;
|
||||||
|
@ -814,7 +827,12 @@ OpResult<ScoredMap> OpInter(EngineShard* shard, Transaction* t, string_view dest
|
||||||
return ScoredMap{};
|
return ScoredMap{};
|
||||||
}
|
}
|
||||||
|
|
||||||
ScoredMap sm = FromObject(it->first->second, it->second);
|
ScoredMap sm;
|
||||||
|
if (it->first->second.ObjType() == OBJ_ZSET)
|
||||||
|
sm = FromObject(it->first->second, it->second);
|
||||||
|
else
|
||||||
|
sm = ZSetFromSet(it->first->second, it->second);
|
||||||
|
|
||||||
if (result.empty())
|
if (result.empty())
|
||||||
result.swap(sm);
|
result.swap(sm);
|
||||||
else
|
else
|
||||||
|
@ -1340,6 +1358,54 @@ void ZSetFamily::ZInterStore(CmdArgList args, ConnectionContext* cntx) {
|
||||||
(*cntx)->SendLong(smvec.size());
|
(*cntx)->SendLong(smvec.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ZSetFamily::ZInterCard(CmdArgList args, ConnectionContext* cntx) {
|
||||||
|
unsigned num_keys;
|
||||||
|
if (!absl::SimpleAtoi(ArgS(args, 0), &num_keys)) {
|
||||||
|
return (*cntx)->SendError(OpStatus::SYNTAX_ERR);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t limit = 0;
|
||||||
|
if (args.size() == (1 + num_keys + 2) && ArgS(args, 1 + num_keys) == "LIMIT") {
|
||||||
|
if (!absl::SimpleAtoi(ArgS(args, 1 + num_keys + 1), &limit)) {
|
||||||
|
return (*cntx)->SendError("limit value is not a positive integer", kSyntaxErrType);
|
||||||
|
}
|
||||||
|
} else if (args.size() != 1 + num_keys) {
|
||||||
|
return (*cntx)->SendError(kSyntaxErr);
|
||||||
|
}
|
||||||
|
|
||||||
|
vector<OpResult<ScoredMap>> maps(shard_set->size(), OpStatus::SKIPPED);
|
||||||
|
|
||||||
|
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||||
|
maps[shard->shard_id()] = OpInter(shard, t, "", AggType::NOOP, {}, false);
|
||||||
|
return OpStatus::OK;
|
||||||
|
};
|
||||||
|
|
||||||
|
cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||||
|
|
||||||
|
ScoredMap result;
|
||||||
|
for (auto& op_res : maps) {
|
||||||
|
if (op_res.status() == OpStatus::SKIPPED)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (!op_res)
|
||||||
|
return (*cntx)->SendError(op_res.status());
|
||||||
|
|
||||||
|
if (result.empty()) {
|
||||||
|
result.swap(op_res.value());
|
||||||
|
} else {
|
||||||
|
InterScoredMap(&result, &op_res.value(), AggType::NOOP);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result.empty())
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (0 < limit && limit < result.size()) {
|
||||||
|
return (*cntx)->SendLong(limit);
|
||||||
|
}
|
||||||
|
(*cntx)->SendLong(result.size());
|
||||||
|
}
|
||||||
|
|
||||||
void ZSetFamily::ZPopMax(CmdArgList args, ConnectionContext* cntx) {
|
void ZSetFamily::ZPopMax(CmdArgList args, ConnectionContext* cntx) {
|
||||||
ZPopMinMax(std::move(args), true, cntx);
|
ZPopMinMax(std::move(args), true, cntx);
|
||||||
}
|
}
|
||||||
|
@ -2134,6 +2200,8 @@ void ZSetFamily::Register(CommandRegistry* registry) {
|
||||||
<< CI{"ZCOUNT", CO::FAST | CO::READONLY, 4, 1, 1, 1}.HFUNC(ZCount)
|
<< CI{"ZCOUNT", CO::FAST | CO::READONLY, 4, 1, 1, 1}.HFUNC(ZCount)
|
||||||
<< CI{"ZINCRBY", CO::FAST | CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(ZIncrBy)
|
<< CI{"ZINCRBY", CO::FAST | CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(ZIncrBy)
|
||||||
<< CI{"ZINTERSTORE", kStoreMask, -4, 3, 3, 1}.HFUNC(ZInterStore)
|
<< CI{"ZINTERSTORE", kStoreMask, -4, 3, 3, 1}.HFUNC(ZInterStore)
|
||||||
|
<< CI{"ZINTERCARD", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -3, 2, 2, 1}
|
||||||
|
.HFUNC(ZInterCard)
|
||||||
<< CI{"ZLEXCOUNT", CO::READONLY, 4, 1, 1, 1}.HFUNC(ZLexCount)
|
<< CI{"ZLEXCOUNT", CO::READONLY, 4, 1, 1, 1}.HFUNC(ZLexCount)
|
||||||
<< CI{"ZPOPMAX", CO::FAST | CO::WRITE, -2, 1, 1, 1}.HFUNC(ZPopMax)
|
<< CI{"ZPOPMAX", CO::FAST | CO::WRITE, -2, 1, 1, 1}.HFUNC(ZPopMax)
|
||||||
<< CI{"ZPOPMIN", CO::FAST | CO::WRITE, -2, 1, 1, 1}.HFUNC(ZPopMin)
|
<< CI{"ZPOPMIN", CO::FAST | CO::WRITE, -2, 1, 1, 1}.HFUNC(ZPopMin)
|
||||||
|
|
|
@ -60,6 +60,7 @@ class ZSetFamily {
|
||||||
static void ZCount(CmdArgList args, ConnectionContext* cntx);
|
static void ZCount(CmdArgList args, ConnectionContext* cntx);
|
||||||
static void ZIncrBy(CmdArgList args, ConnectionContext* cntx);
|
static void ZIncrBy(CmdArgList args, ConnectionContext* cntx);
|
||||||
static void ZInterStore(CmdArgList args, ConnectionContext* cntx);
|
static void ZInterStore(CmdArgList args, ConnectionContext* cntx);
|
||||||
|
static void ZInterCard(CmdArgList args, ConnectionContext* cntx);
|
||||||
static void ZLexCount(CmdArgList args, ConnectionContext* cntx);
|
static void ZLexCount(CmdArgList args, ConnectionContext* cntx);
|
||||||
static void ZPopMax(CmdArgList args, ConnectionContext* cntx);
|
static void ZPopMax(CmdArgList args, ConnectionContext* cntx);
|
||||||
static void ZPopMin(CmdArgList args, ConnectionContext* cntx);
|
static void ZPopMin(CmdArgList args, ConnectionContext* cntx);
|
||||||
|
|
|
@ -421,6 +421,32 @@ TEST_F(ZSetFamilyTest, ZInterStore) {
|
||||||
EXPECT_EQ(1, CheckedInt({"zinterstore", "a", "2", "z1", "z2"}));
|
EXPECT_EQ(1, CheckedInt({"zinterstore", "a", "2", "z1", "z2"}));
|
||||||
resp = Run({"zrange", "a", "0", "-1", "withscores"});
|
resp = Run({"zrange", "a", "0", "-1", "withscores"});
|
||||||
EXPECT_THAT(resp.GetVec(), ElementsAre("b", "4"));
|
EXPECT_THAT(resp.GetVec(), ElementsAre("b", "4"));
|
||||||
|
|
||||||
|
// support for sets
|
||||||
|
EXPECT_EQ(2, CheckedInt({"sadd", "s2", "b", "c"}));
|
||||||
|
EXPECT_EQ(1, CheckedInt({"zinterstore", "b", "2", "z1", "s2"}));
|
||||||
|
resp = Run({"zrange", "b", "0", "-1", "withscores"});
|
||||||
|
EXPECT_THAT(resp.GetVec(), ElementsAre("b", "3"));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ZSetFamilyTest, ZInterCard) {
|
||||||
|
EXPECT_EQ(3, CheckedInt({"zadd", "z1", "1", "a", "2", "b", "3", "c"}));
|
||||||
|
EXPECT_EQ(3, CheckedInt({"zadd", "z2", "2", "b", "3", "c", "4", "d"}));
|
||||||
|
RespExpr resp;
|
||||||
|
|
||||||
|
EXPECT_EQ(2, CheckedInt({"zintercard", "2", "z1", "z2"}));
|
||||||
|
EXPECT_EQ(1, CheckedInt({"zintercard", "2", "z1", "z2", "LIMIT", "1"}));
|
||||||
|
|
||||||
|
resp = Run({"zintercard", "2", "z1", "z2", "LIM"});
|
||||||
|
EXPECT_THAT(resp, ErrArg("syntax error"));
|
||||||
|
resp = Run({"zintercard", "2", "z1", "z2", "LIMIT"});
|
||||||
|
EXPECT_THAT(resp, ErrArg("syntax error"));
|
||||||
|
resp = Run({"zintercard", "2", "z1", "z2", "LIMIT", "a"});
|
||||||
|
EXPECT_THAT(resp, ErrArg("limit value is not a positive integer"));
|
||||||
|
|
||||||
|
// support for sets
|
||||||
|
EXPECT_EQ(3, CheckedInt({"sadd", "s2", "b", "c", "d"}));
|
||||||
|
EXPECT_EQ(2, CheckedInt({"zintercard", "2", "z1", "s2"}));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(ZSetFamilyTest, ZAddBug148) {
|
TEST_F(ZSetFamilyTest, ZAddBug148) {
|
||||||
|
|
Loading…
Reference in a new issue