1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(server): SORT command prototype (#311)

feat(server): Implement SORT

Fixes #287

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Co-authored-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2022-09-23 07:17:49 +03:00 committed by GitHub
parent 082ac36ac1
commit 0377481e28
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 514 additions and 140 deletions

View file

@ -6,6 +6,10 @@ String sizes are limited to 256MB.
Indices (say in GETRANGE and SETRANGE commands) should be signed 32 bit integers in range
[-2147483647, 2147483648].
### String handling.
SORT does not take any locale into account.
## Expiry ranges.
Expirations are limited to 4 years. For commands with millisecond precision like PEXPIRE or PSETEX,
expirations greater than 2^27ms are quietly rounded to the nearest second loosing precision of less than 0.001%.

View file

@ -262,6 +262,16 @@ size_t RobjWrapper::Size() const {
case OBJ_STRING:
DCHECK_EQ(OBJ_ENCODING_RAW, encoding_);
return sz_;
case OBJ_LIST:
return quicklistCount((quicklist*)inner_obj_);
case OBJ_ZSET: {
robj self{.type = type_,
.encoding = encoding_,
.lru = 0,
.refcount = OBJ_STATIC_REFCOUNT,
.ptr = inner_obj_};
return zsetLength(&self);
}
case OBJ_SET:
switch (encoding_) {
case kEncodingIntSet: {
@ -278,7 +288,7 @@ size_t RobjWrapper::Size() const {
}
default:
LOG(FATAL) << "Unexpected encoding " << encoding_;
}
};
default:;
}
return 0;

View file

@ -299,6 +299,10 @@ void RedisReplyBuilder::SendNullArray() {
SendRaw("*-1\r\n");
}
void RedisReplyBuilder::SendEmptyArray() {
StartArray(0);
}
void RedisReplyBuilder::SendStringArr(absl::Span<const std::string_view> arr) {
if (arr.empty()) {
SendRaw("*0\r\n");

View file

@ -128,7 +128,10 @@ class RedisReplyBuilder : public SinkReplyBuilder {
void SendError(OpStatus status);
virtual void SendSimpleStrArr(const std::string_view* arr, uint32_t count);
// Send *-1
virtual void SendNullArray();
// Send *0
virtual void SendEmptyArray();
virtual void SendStringArr(absl::Span<const std::string_view> arr);
virtual void SendStringArr(absl::Span<const std::string> arr);

View file

@ -12,7 +12,7 @@ add_library(dragonfly_lib channel_slice.cc command_registry.cc
list_family.cc main_service.cc rdb_load.cc rdb_save.cc replica.cc
snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc
set_family.cc stream_family.cc string_family.cc
zset_family.cc version.cc bitops_family.cc)
zset_family.cc version.cc bitops_family.cc container_utils.cc)
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib
absl::random_random TRDP::jsoncons)

View file

@ -0,0 +1,161 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/container_utils.h"
#include "base/logging.h"
extern "C" {
#include "redis/intset.h"
#include "redis/listpack.h"
#include "redis/object.h"
#include "redis/redis_aux.h"
#include "redis/util.h"
#include "redis/zset.h"
}
namespace dfly::container_utils {
quicklistEntry QLEntry() {
quicklistEntry res{.quicklist = NULL,
.node = NULL,
.zi = NULL,
.value = NULL,
.longval = 0,
.sz = 0,
.offset = 0};
return res;
}
bool IterateList(const PrimeValue& pv, const IterateFunc& func, long start, long end) {
quicklist* ql = static_cast<quicklist*>(pv.RObjPtr());
long llen = quicklistCount(ql);
if (end < 0 || end >= llen)
end = llen - 1;
quicklistIter* qiter = quicklistGetIteratorAtIdx(ql, AL_START_HEAD, start);
quicklistEntry entry = QLEntry();
long lrange = end - start + 1;
bool success = true;
while (success && quicklistNext(qiter, &entry) && lrange-- > 0) {
if (entry.value) {
success = func(ContainerEntry{reinterpret_cast<char*>(entry.value), entry.sz});
} else {
success = func(ContainerEntry{nullptr, .longval=entry.longval});
}
}
quicklistReleaseIterator(qiter);
return success;
}
bool IterateSet(const PrimeValue& pv, const IterateFunc& func) {
bool success = true;
if (pv.Encoding() == kEncodingIntSet) {
intset* is = static_cast<intset*>(pv.RObjPtr());
int64_t ival;
int ii = 0;
while (success && intsetGet(is, ii++, &ival)) {
success = func(ContainerEntry{nullptr, .longval=ival});
}
} else {
if (pv.Encoding() == kEncodingStrMap2) {
for (sds ptr : *static_cast<StringSet*>(pv.RObjPtr())) {
if (!func(ContainerEntry{ptr, sdslen(ptr)})) {
success = false;
break;
}
}
} else {
dict* ds = static_cast<dict*>(pv.RObjPtr());
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while (success && (de = dictNext(di))) {
sds ptr = static_cast<sds>(de->key);
success = func(ContainerEntry{ptr, sdslen(ptr)});
}
dictReleaseIterator(di);
}
}
return success;
}
bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start, int32_t end,
bool reverse, bool use_score) {
unsigned long llen = zsetLength(zobj);
if (end < 0 || unsigned(end) >= llen)
end = llen - 1;
unsigned rangelen = unsigned(end - start) + 1;
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = static_cast<uint8_t*>(zobj->ptr);
uint8_t *eptr, *sptr;
uint8_t* vstr;
unsigned int vlen;
long long vlong;
double score = 0.0;
if (reverse) {
eptr = lpSeek(zl, -2 - long(2 * start));
} else {
eptr = lpSeek(zl, 2 * start);
}
DCHECK(eptr);
sptr = lpNext(zl, eptr);
bool success = true;
while (success && rangelen--) {
DCHECK(eptr != NULL && sptr != NULL);
vstr = lpGetValue(eptr, &vlen, &vlong);
// don't bother to extract the score if it's gonna be ignored.
if (use_score)
score = zzlGetScore(sptr);
if (vstr == NULL) {
success = func(ContainerEntry{nullptr, .longval=vlong}, score);
} else {
success = func(ContainerEntry{reinterpret_cast<const char*>(vstr), vlen}, score);
}
if (reverse) {
zzlPrev(zl, &eptr, &sptr);
} else {
zzlNext(zl, &eptr, &sptr);
};
}
return success;
} else {
CHECK_EQ(zobj->encoding, OBJ_ENCODING_SKIPLIST);
zset* zs = static_cast<zset*>(zobj->ptr);
zskiplist* zsl = zs->zsl;
zskiplistNode* ln;
/* Check if starting point is trivial, before doing log(N) lookup. */
if (reverse) {
ln = zsl->tail;
unsigned long llen = zsetLength(zobj);
if (start > 0)
ln = zslGetElementByRank(zsl, llen - start);
} else {
ln = zsl->header->level[0].forward;
if (start > 0)
ln = zslGetElementByRank(zsl, start + 1);
}
bool success = true;
while (success && rangelen--) {
DCHECK(ln != NULL);
success = func(ContainerEntry{ln->ele, sdslen(ln->ele)}, ln->score);
ln = reverse ? ln->backward : ln->level[0].forward;
}
return success;
}
return false;
}
} // namespace dfly::container_utils

View file

@ -0,0 +1,69 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include "core/compact_object.h"
#include "core/string_set.h"
#include "server/common.h"
#include "server/table.h"
extern "C" {
#include "redis/object.h"
#include "redis/quicklist.h"
}
#include <functional>
namespace dfly {
namespace container_utils {
// IsContainer returns true if the iterator points to a container type.
inline bool IsContainer(const PrimeValue& pv) {
unsigned type = pv.ObjType();
return (type == OBJ_LIST || type == OBJ_SET || type == OBJ_ZSET);
}
// Create empty quicklistEntry
quicklistEntry QLEntry();
// Stores either:
// - A single long long value (longval) when value = nullptr
// - A single char* (value) when value != nullptr
struct ContainerEntry {
const char* value;
union {
size_t length;
long long longval;
};
std::string ToString() {
if (value)
return {value, length};
else
return absl::StrCat(longval);
}
};
using IterateFunc = std::function<bool(ContainerEntry)>;
using IterateSortedFunc = std::function<bool(ContainerEntry, double)>;
// Iterate over all values and call func(val). Iteration stops as soon
// as func return false. Returns true if it successfully processed all elements
// without stopping.
bool IterateList(const PrimeValue& pv, const IterateFunc& func, long start = 0, long end = -1);
// Iterate over all values and call func(val). Iteration stops as soon
// as func return false. Returns true if it successfully processed all elements
// without stopping.
bool IterateSet(const PrimeValue& pv, const IterateFunc& func);
// Iterate over all values and call func(val). Iteration stops as soon
// as func return false. Returns true if it successfully processed all elements
// without stopping.
bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start = 0, int32_t end = -1,
bool reverse = false, bool use_score = false);
}; // namespace container_utils
} // namespace dfly

View file

@ -14,6 +14,7 @@ extern "C" {
#include "server/blocking_controller.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/transaction.h"
@ -208,8 +209,7 @@ bool ScanCb(const OpArgs& op_args, PrimeIterator it, const ScanOpts& opts, Strin
return true;
}
void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor,
StringVec* vec) {
void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, StringVec* vec) {
auto& db_slice = op_args.shard->db_slice();
DCHECK(db_slice.IsDbValid(op_args.db_ind));
@ -463,6 +463,170 @@ void GenericFamily::Stick(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendLong(match_cnt);
}
// Used to conditionally store double score
struct SortEntryScore {
double score;
};
// SortEntry stores all data required for sorting
template <bool ALPHA>
struct SortEntry
// Store score only if we need it
: public std::conditional_t<ALPHA, std::tuple<>, SortEntryScore> {
std::string key;
bool Parse(std::string&& item) {
if constexpr (!ALPHA) {
if (!absl::SimpleAtod(item, &this->score))
return false;
}
key = std::move(item);
return true;
}
bool Parse(int64_t item) {
if constexpr (!ALPHA) {
this->score = item;
}
key = absl::StrCat(item);
return true;
}
std::conditional_t<ALPHA, const std::string&, double> Cmp() const {
if constexpr (ALPHA) {
return key;
} else {
return this->score;
}
}
};
// std::variant of all possible vectors of SortEntries
using SortEntryList = std::variant<
// Used when sorting by double values
std::vector<SortEntry<false>>,
// Used when sorting by string values
std::vector<SortEntry<true>>>;
// Create SortEntryList based on runtime arguments
SortEntryList MakeSortEntryList(bool alpha) {
if (alpha)
return SortEntryList{std::vector<SortEntry<true>>{}};
else
return SortEntryList{std::vector<SortEntry<false>>{}};
}
// Iterate over container with generic function that accepts strings and ints
template <typename F> bool Iterate(const PrimeValue& pv, F&& func) {
auto cb = [&func](container_utils::ContainerEntry ce) {
if (ce.value)
return func(ce.ToString());
else
return func(ce.longval);
};
switch (pv.ObjType()) {
case OBJ_LIST:
return container_utils::IterateList(pv, cb);
case OBJ_SET:
return container_utils::IterateSet(pv, cb);
case OBJ_ZSET:
return container_utils::IterateSortedSet(
pv.AsRObj(), [&cb](container_utils::ContainerEntry ce, double) { return cb(ce); });
default:
return false;
}
}
// Create a SortEntryList from given key
OpResult<SortEntryList> OpFetchSortEntries(const OpArgs& op_args, std::string_view key,
bool alpha) {
using namespace container_utils;
auto [it, _] = op_args.shard->db_slice().FindExt(op_args.db_ind, key);
if (!IsValid(it) || !IsContainer(it->second)) {
return OpStatus::KEY_NOTFOUND;
}
auto result = MakeSortEntryList(alpha);
bool success = std::visit(
[&pv = it->second](auto& entries) {
entries.reserve(pv.Size());
return Iterate(pv, [&entries](auto&& val) {
return entries.emplace_back().Parse(std::forward<decltype(val)>(val));
});
},
result);
return success ? OpResult{std::move(result)} : OpStatus::WRONG_TYPE;
}
void GenericFamily::Sort(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1);
bool alpha = false;
bool reversed = false;
std::optional<std::pair<size_t, size_t>> bounds;
for (size_t i = 2; i < args.size(); i++) {
ToUpper(&args[i]);
std::string_view arg = ArgS(args, i);
if (arg == "ALPHA") {
alpha = true;
} else if (arg == "DESC") {
reversed = true;
} else if (arg == "LIMIT") {
int offset, limit;
if (i + 2 >= args.size()) {
return (*cntx)->SendError(kSyntaxErr);
}
if (!absl::SimpleAtoi(ArgS(args, i + 1), &offset) ||
!absl::SimpleAtoi(ArgS(args, i + 2), &limit)) {
return (*cntx)->SendError(kInvalidIntErr);
}
bounds = {offset, limit};
i += 2;
}
}
OpResult<SortEntryList> entries =
cntx->transaction->ScheduleSingleHopT([&](Transaction* t, EngineShard* shard) {
return OpFetchSortEntries(t->GetOpArgs(shard), key, alpha);
});
if (entries.status() == OpStatus::WRONG_TYPE)
return (*cntx)->SendError("One or more scores can't be converted into double");
if (!entries.ok())
return (*cntx)->SendEmptyArray();
auto sort_call = [cntx, bounds, reversed, key](auto& entries) {
if (bounds) {
auto sort_it = entries.begin() + std::min(bounds->first + bounds->second, entries.size());
std::partial_sort(entries.begin(), sort_it, entries.end(),
[reversed](const auto& lhs, const auto& rhs) {
return bool(lhs.Cmp() < rhs.Cmp()) ^ reversed;
});
} else {
std::sort(entries.begin(), entries.end(), [reversed](const auto& lhs, const auto& rhs) {
return bool(lhs.Cmp() < rhs.Cmp()) ^ reversed;
});
}
auto start_it = entries.begin();
auto end_it = entries.end();
if (bounds) {
start_it += std::min(bounds->first, entries.size());
end_it = entries.begin() + std::min(bounds->first + bounds->second, entries.size());
}
(*cntx)->StartArray(std::distance(start_it, end_it));
for (auto it = start_it; it != end_it; ++it) {
(*cntx)->SendBulkString(it->key);
}
};
std::visit(std::move(sort_call), entries.value());
}
void GenericFamily::Move(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
int64_t target_db;
@ -868,6 +1032,7 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, 1}.HFUNC(Type)
<< CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del)
<< CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick)
<< CI{"SORT", CO::READONLY, -2, 1, 1, 1}.HFUNC(Sort)
<< CI{"MOVE", CO::WRITE | CO::GLOBAL_TRANS, 3, 1, 1, 1}.HFUNC(Move);
}

View file

@ -49,6 +49,7 @@ class GenericFamily {
static void Keys(CmdArgList args, ConnectionContext* cntx);
static void PexpireAt(CmdArgList args, ConnectionContext* cntx);
static void Stick(CmdArgList args, ConnectionContext* cntx);
static void Sort(CmdArgList args, ConnectionContext* cntx);
static void Move(CmdArgList args, ConnectionContext* cntx);
static void Rename(CmdArgList args, ConnectionContext* cntx);

View file

@ -160,7 +160,7 @@ TEST_F(GenericFamilyTest, RenameNx) {
Run({"mset", "x", x_val, "b", b_val});
ASSERT_THAT(Run({"renamenx", "z", "b"}), ErrArg("no such key"));
ASSERT_THAT(Run({"renamenx", "x", "b"}), IntArg(0)); // b already exists
ASSERT_THAT(Run({"renamenx", "x", "b"}), IntArg(0)); // b already exists
ASSERT_THAT(Run({"renamenx", "x", "y"}), IntArg(1));
ASSERT_EQ(Run({"get", "y"}), x_val);
}
@ -169,7 +169,7 @@ TEST_F(GenericFamilyTest, Stick) {
// check stick returns zero on non-existent keys
ASSERT_THAT(Run({"stick", "a", "b"}), IntArg(0));
for (auto key: {"a", "b", "c", "d"}) {
for (auto key : {"a", "b", "c", "d"}) {
Run({"set", key, "."});
}
@ -276,4 +276,59 @@ TEST_F(GenericFamilyTest, Scan) {
EXPECT_THAT(vec, Each(StartsWith("zset")));
}
TEST_F(GenericFamilyTest, Sort) {
// Test list sort with params
Run({"del", "list-1"});
Run({"lpush", "list-1", "3.5", "1.2", "10.1", "2.20", "200"});
// numeric
ASSERT_THAT(Run({"sort", "list-1"}).GetVec(), ElementsAre("1.2", "2.20", "3.5", "10.1", "200"));
// string
ASSERT_THAT(Run({"sort", "list-1", "ALPHA"}).GetVec(), ElementsAre("1.2", "10.1", "2.20", "200", "3.5"));
// desc numeric
ASSERT_THAT(Run({"sort", "list-1", "DESC"}).GetVec(), ElementsAre("200", "10.1", "3.5", "2.20", "1.2"));
// desc strig
ASSERT_THAT(Run({"sort", "list-1", "DESC", "ALPHA"}).GetVec(), ElementsAre("3.5", "200", "2.20", "10.1", "1.2"));
// limits
ASSERT_THAT(Run({"sort", "list-1", "LIMIT", "0", "5"}).GetVec(), ElementsAre("1.2", "2.20", "3.5", "10.1", "200"));
ASSERT_THAT(Run({"sort", "list-1", "LIMIT", "0", "10"}).GetVec(), ElementsAre("1.2", "2.20", "3.5", "10.1", "200"));
ASSERT_THAT(Run({"sort", "list-1", "LIMIT", "2", "2"}).GetVec(), ElementsAre("3.5", "10.1"));
ASSERT_THAT(Run({"sort", "list-1", "LIMIT", "1", "1"}), "2.20");
ASSERT_THAT(Run({"sort", "list-1", "LIMIT", "4", "2"}), "200");
ASSERT_THAT(Run({"sort", "list-1", "LIMIT", "5", "2"}), ArrLen(0));
// limits desc
ASSERT_THAT(Run({"sort", "list-1", "DESC", "LIMIT", "0", "5"}).GetVec(), ElementsAre("200", "10.1", "3.5", "2.20", "1.2"));
ASSERT_THAT(Run({"sort", "list-1", "DESC", "LIMIT", "2", "2"}).GetVec(), ElementsAre("3.5", "2.20"));
ASSERT_THAT(Run({"sort", "list-1", "DESC", "LIMIT", "1", "1"}), "10.1");
ASSERT_THAT(Run({"sort", "list-1", "DESC", "LIMIT", "5", "2"}), ArrLen(0));
// Test set sort
Run({"del", "set-1"});
Run({"sadd", "set-1", "5.3", "4.4", "60", "99.9", "100", "9"});
ASSERT_THAT(Run({"sort", "set-1"}).GetVec(), ElementsAre("4.4", "5.3", "9", "60", "99.9", "100"));
ASSERT_THAT(Run({"sort", "set-1", "ALPHA"}).GetVec(), ElementsAre("100", "4.4", "5.3", "60", "9", "99.9"));
ASSERT_THAT(Run({"sort", "set-1", "DESC"}).GetVec(), ElementsAre("100", "99.9", "60", "9", "5.3", "4.4"));
ASSERT_THAT(Run({"sort", "set-1", "DESC", "ALPHA"}).GetVec(), ElementsAre("99.9", "9", "60", "5.3", "4.4", "100"));
// Test intset sort
Run({"del", "intset-1"});
Run({"sadd", "intset-1", "5", "4", "3", "2", "1"});
ASSERT_THAT(Run({"sort", "intset-1"}).GetVec(), ElementsAre("1", "2", "3", "4", "5"));
// Test sorted set sort
Run({"del", "zset-1"});
Run({"zadd", "zset-1", "0", "3.3", "0", "30.1", "0", "8.2"});
ASSERT_THAT(Run({"sort", "zset-1"}).GetVec(), ElementsAre("3.3", "8.2", "30.1"));
ASSERT_THAT(Run({"sort", "zset-1", "ALPHA"}).GetVec(), ElementsAre("3.3", "30.1", "8.2"));
ASSERT_THAT(Run({"sort", "zset-1", "DESC"}).GetVec(), ElementsAre("30.1", "8.2", "3.3"));
ASSERT_THAT(Run({"sort", "zset-1", "DESC", "ALPHA"}).GetVec(), ElementsAre("8.2", "30.1", "3.3"));
// Test sort with non existent key
Run({"del", "list-2"});
ASSERT_THAT(Run({"sort", "list-2"}), ArrLen(0));
// Test not convertible to double
Run({"lpush", "list-2", "NOTADOUBLE"});
ASSERT_THAT(Run({"sort", "list-2"}), ErrArg("One or more scores can't be converted into double"));
}
} // namespace dfly

View file

@ -19,6 +19,7 @@ extern "C" {
#include "server/error.h"
#include "server/server_state.h"
#include "server/transaction.h"
#include "server/container_utils.h"
/**
* The number of entries allowed per internal list node can be specified
@ -65,17 +66,6 @@ using absl::GetFlag;
namespace {
quicklistEntry QLEntry() {
quicklistEntry res{.quicklist = NULL,
.node = NULL,
.zi = NULL,
.value = NULL,
.longval = 0,
.sz = 0,
.offset = 0};
return res;
}
quicklist* GetQL(const PrimeValue& mv) {
return (quicklist*)mv.RObjPtr();
}
@ -358,7 +348,7 @@ OpResult<string> RPeek(const OpArgs& op_args, string_view key, bool fetch) {
return OpStatus::OK;
quicklist* ql = GetQL(it_res.value()->second);
quicklistEntry entry = QLEntry();
quicklistEntry entry = container_utils::QLEntry();
quicklistIter* iter = quicklistGetIterator(ql, AL_START_TAIL);
CHECK(quicklistNext(iter, &entry));
quicklistReleaseIterator(iter);
@ -845,7 +835,7 @@ OpResult<string> ListFamily::OpIndex(const OpArgs& op_args, std::string_view key
if (!res)
return res.status();
quicklist* ql = GetQL(res.value()->second);
quicklistEntry entry = QLEntry();
quicklistEntry entry = container_utils::QLEntry();
quicklistIter* iter = quicklistGetIteratorAtIdx(ql, AL_START_TAIL, index);
if (!iter)
return OpStatus::KEY_NOTFOUND;
@ -871,7 +861,7 @@ OpResult<int> ListFamily::OpInsert(const OpArgs& op_args, string_view key, strin
return it_res.status();
quicklist* ql = GetQL(it_res.value()->second);
quicklistEntry entry = QLEntry();
quicklistEntry entry = container_utils::QLEntry();
quicklistIter* qiter = quicklistGetIterator(ql, AL_START_HEAD);
bool found = false;
@ -1030,24 +1020,12 @@ OpResult<StringVec> ListFamily::OpRange(const OpArgs& op_args, std::string_view
return StringVec{};
}
if (end >= llen)
end = llen - 1;
unsigned lrange = end - start + 1;
quicklistIter* qiter = quicklistGetIteratorAtIdx(ql, AL_START_HEAD, start);
quicklistEntry entry = QLEntry();
StringVec str_vec;
unsigned cnt = 0;
while (cnt < lrange && quicklistNext(qiter, &entry)) {
if (entry.value)
str_vec.emplace_back(reinterpret_cast<char*>(entry.value), entry.sz);
else
str_vec.push_back(absl::StrCat(entry.longval));
++cnt;
}
quicklistReleaseIterator(qiter);
container_utils::IterateList(res.value()->second, [&str_vec](container_utils::ContainerEntry ce) {
str_vec.emplace_back(ce.ToString());
return true;
}, start, end);
return str_vec;
}

View file

@ -1439,7 +1439,7 @@ void ServerFamily::Latency(CmdArgList args, ConnectionContext* cntx) {
string_view sub_cmd = ArgS(args, 1);
if (sub_cmd == "LATEST") {
return (*cntx)->StartArray(0);
return (*cntx)->SendEmptyArray();
}
LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported";

View file

@ -21,6 +21,8 @@ extern "C" {
#include "server/error.h"
#include "server/transaction.h"
#include "server/container_utils.h"
ABSL_DECLARE_FLAG(bool, use_set2);
namespace dfly {
@ -146,27 +148,6 @@ void InitStrSet(CompactObj* set) {
}
}
// f receives a str object.
template <typename F> void FillFromStrSet(F&& f, void* ptr) {
string str;
if (GetFlag(FLAGS_use_set2)) {
for (sds ptr : *(StringSet*)ptr) {
str.assign(ptr, sdslen(ptr));
f(move(str));
}
} else {
dict* ds = (dict*)ptr;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
str.assign((sds)de->key, sdslen((sds)de->key));
f(move(str));
}
dictReleaseIterator(di);
}
}
// returns (removed, isempty)
pair<unsigned, bool> RemoveSet(ArgSlice vals, CompactObj* set) {
bool isempty = false;
@ -493,22 +474,6 @@ OpStatus NoOpCb(Transaction* t, EngineShard* shard) {
return OpStatus::OK;
};
template <typename F> void FillSet(const SetType& set, F&& f) {
if (set.second == kEncodingIntSet) {
intset* is = (intset*)set.first;
int64_t ival;
int ii = 0;
char buf[32];
while (intsetGet(is, ii++, &ival)) {
char* next = absl::numbers_internal::FastIntToBuffer(ival, buf);
f(string{buf, size_t(next - buf)});
}
} else {
FillFromStrSet(move(f), set.first);
}
}
// if overwrite is true then OpAdd writes vals into the key and discards its previous value.
OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice vals,
bool overwrite) {
@ -713,8 +678,10 @@ OpResult<StringVec> OpUnion(const OpArgs& op_args, ArgSlice keys) {
for (std::string_view key : keys) {
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_SET);
if (find_res) {
SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()};
FillSet(st, [&uniques](string s) { uniques.emplace(move(s)); });
container_utils::IterateSet(find_res.value()->second, [&uniques](container_utils::ContainerEntry ce){
uniques.emplace(ce.ToString());
return true;
});
continue;
}
@ -738,9 +705,10 @@ OpResult<StringVec> OpDiff(const OpArgs& op_args, ArgSlice keys) {
}
absl::flat_hash_set<string> uniques;
SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()};
FillSet(st, [&uniques](string s) { uniques.insert(move(s)); });
container_utils::IterateSet(find_res.value()->second, [&uniques](container_utils::ContainerEntry ce) {
uniques.emplace(ce.ToString());
return true;
});
DCHECK(!uniques.empty()); // otherwise the key would not exist.
@ -786,9 +754,10 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
if (!find_res)
return find_res.status();
SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()};
FillSet(st, [&result](string s) { result.push_back(move(s)); });
container_utils::IterateSet(find_res.value()->second, [&result](container_utils::ContainerEntry ce) {
result.push_back(ce.ToString());
return true;
});
return result;
}
@ -1261,16 +1230,20 @@ OpResult<StringVec> SetFamily::OpPop(const OpArgs& op_args, std::string_view key
PrimeIterator it = find_res.value();
size_t slen = it->second.Size();
SetType st{it->second.RObjPtr(), it->second.Encoding()};
/* CASE 1:
* The number of requested elements is greater than or equal to
* the number of elements inside the set: simply return the whole set. */
if (count >= slen) {
FillSet(st, [&result](string s) { result.push_back(move(s)); });
container_utils::IterateSet(it->second, [&result](container_utils::ContainerEntry ce) {
result.push_back(ce.ToString());
return true;
});
/* Delete the set as it is now empty */
CHECK(db_slice.Del(op_args.db_ind, it));
} else {
SetType st{it->second.RObjPtr(), it->second.Encoding()};
db_slice.PreUpdate(op_args.db_ind, it);
if (st.second == kEncodingIntSet) {
intset* is = (intset*)st.first;

View file

@ -847,7 +847,7 @@ void StreamFamily::XRangeGeneric(CmdArgList args, bool is_rev, ConnectionContext
}
if (result.status() == OpStatus::KEY_NOTFOUND) {
return (*cntx)->StartArray(0);
return (*cntx)->SendEmptyArray();
}
return (*cntx)->SendError(result.status());
}

View file

@ -18,6 +18,7 @@ extern "C" {
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
#include "server/transaction.h"
#include "server/container_utils.h"
namespace dfly {
@ -244,60 +245,10 @@ void IntervalVisitor::operator()(const ZSetFamily::LexInterval& li) {
}
void IntervalVisitor::ActionRange(unsigned start, unsigned end) {
unsigned rangelen = (end - start) + 1;
if (zobj_->encoding == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = (uint8_t*)zobj_->ptr;
uint8_t *eptr, *sptr;
uint8_t* vstr;
unsigned int vlen;
long long vlong;
double score = 0.0;
if (params_.reverse)
eptr = lpSeek(zl, -2 - long(2 * start));
else
eptr = lpSeek(zl, 2 * start);
DCHECK(eptr);
sptr = lpNext(zl, eptr);
while (rangelen--) {
DCHECK(eptr != NULL && sptr != NULL);
vstr = lpGetValue(eptr, &vlen, &vlong);
if (params_.with_scores) /* don't bother to extract the score if it's gonna be ignored. */
score = zzlGetScore(sptr);
AddResult(vstr, vlen, vlong, score);
Next(zl, &eptr, &sptr);
}
} else {
CHECK_EQ(zobj_->encoding, OBJ_ENCODING_SKIPLIST);
zset* zs = (zset*)zobj_->ptr;
zskiplist* zsl = zs->zsl;
zskiplistNode* ln;
/* Check if starting point is trivial, before doing log(N) lookup. */
if (params_.reverse) {
ln = zsl->tail;
unsigned long llen = zsetLength(zobj_);
if (start > 0)
ln = zslGetElementByRank(zsl, llen - start);
} else {
ln = zsl->header->level[0].forward;
if (start > 0)
ln = zslGetElementByRank(zsl, start + 1);
}
while (rangelen--) {
DCHECK(ln != NULL);
sds ele = ln->ele;
result_.emplace_back(string(ele, sdslen(ele)), ln->score);
ln = Next(ln);
}
}
container_utils::IterateSortedSet(zobj_, [this](container_utils::ContainerEntry ce, double score){
result_.emplace_back(ce.ToString(), score);
return true;
}, start, end, params_.reverse, params_.with_scores);
}
void IntervalVisitor::ActionRange(const zrangespec& range) {