1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(zset_family): support zscan match and count optional params issue (#891)

* feat(zset_family): support zscan match and count optional params

---------

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-03-01 16:59:49 +02:00 committed by GitHub
parent edd93a086c
commit eb5fd2867f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 11 deletions

View file

@ -1599,12 +1599,15 @@ void ZSetFamily::ZScan(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError("invalid cursor");
}
if (args.size() > 3) {
return (*cntx)->SendError("scan options are not supported yet");
OpResult<ScanOpts> ops = ScanOpts::TryFrom(args.subspan(3));
if (!ops) {
DVLOG(1) << "Scan invalid args - return " << ops << " to the user";
return (*cntx)->SendError(ops.status());
}
ScanOpts scan_op = ops.value();
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpScan(t->GetOpArgs(shard), key, &cursor);
return OpScan(t->GetOpArgs(shard), key, &cursor, scan_op);
};
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
@ -1783,7 +1786,7 @@ void ZSetFamily::ZPopMinMax(CmdArgList args, bool reverse, ConnectionContext* cn
}
OpResult<StringVec> ZSetFamily::OpScan(const OpArgs& op_args, std::string_view key,
uint64_t* cursor) {
uint64_t* cursor, const ScanOpts& scan_op) {
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
if (!find_res)
@ -1796,22 +1799,24 @@ OpResult<StringVec> ZSetFamily::OpScan(const OpArgs& op_args, std::string_view k
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
RangeParams params;
params.with_scores = true;
IntervalVisitor iv{Action::RANGE, params, zobj};
iv(IndexInterval{0, kuint32max});
ScoredArray arr = iv.PopResult();
res.resize(arr.size() * 2);
for (size_t i = 0; i < arr.size(); ++i) {
if (!scan_op.Matches(arr[i].first)) {
continue;
}
res.emplace_back(std::move(arr[i].first));
char* str = RedisReplyBuilder::FormatDouble(arr[i].second, buf, sizeof(buf));
res[2 * i] = std::move(arr[i].first);
res[2 * i + 1].assign(str);
res.emplace_back(str);
}
*cursor = 0;
} else {
CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), zobj->encoding);
uint32_t count = 20;
uint32_t count = scan_op.limit;
zset* zs = (zset*)zobj->ptr;
dict* ht = zs->dict;
@ -1820,12 +1825,17 @@ OpResult<StringVec> ZSetFamily::OpScan(const OpArgs& op_args, std::string_view k
struct ScanArgs {
char* sbuf;
StringVec* res;
} sargs = {buf, &res};
const ScanOpts* scan_op;
} sargs = {buf, &res, &scan_op};
auto scanCb = [](void* privdata, const dictEntry* de) {
ScanArgs* sargs = (ScanArgs*)privdata;
sds key = (sds)de->key;
if (!sargs->scan_op->Matches(key)) {
return;
}
double score = *(double*)dictGetVal(de);
sargs->res->emplace_back(key, sdslen(key));

View file

@ -91,7 +91,8 @@ class ZSetFamily {
static void ZRankGeneric(CmdArgList args, bool reverse, ConnectionContext* cntx);
static bool ParseRangeByScoreParams(CmdArgList args, RangeParams* params);
static void ZPopMinMax(CmdArgList args, bool reverse, ConnectionContext* cntx);
static OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor);
static OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor,
const ScanOpts& scan_op);
static OpResult<unsigned> OpRem(const OpArgs& op_args, std::string_view key, ArgSlice members);
static OpResult<double> OpScore(const OpArgs& op_args, std::string_view key,

View file

@ -270,6 +270,19 @@ TEST_F(ZSetFamilyTest, ZScan) {
} while (cursor != 0);
EXPECT_EQ(100 * 2, scan_len);
// Check scan with count and match params
scan_len = 0;
do {
auto resp = Run({"zscan", "key", absl::StrCat(cursor), "count", "5", "match", "*0"});
ASSERT_THAT(resp, ArgType(RespExpr::ARRAY));
ASSERT_THAT(resp.GetVec(), ElementsAre(ArgType(RespExpr::STRING), ArgType(RespExpr::ARRAY)));
string_view token = ToSV(resp.GetVec()[0].GetBuf());
ASSERT_TRUE(absl::SimpleAtoi(token, &cursor));
auto sub_arr = resp.GetVec()[1].GetVec();
scan_len += sub_arr.size();
} while (cursor != 0);
EXPECT_EQ(10 * 2, scan_len); // expected members a0,a10,a20..,a90
}
TEST_F(ZSetFamilyTest, ZUnionError) {