mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-15 17:51:06 +00:00
chore(server): Introduce StringSetWrapper (#3347)
chore: use StringSetWrapper Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
4b1574b5c8
commit
3ffd30f193
2 changed files with 86 additions and 133 deletions
|
@ -48,6 +48,7 @@ class StringSet : public DenseSet {
|
|||
class iterator : private IteratorBase {
|
||||
public:
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using value_type = sds;
|
||||
using pointer = sds*;
|
||||
using reference = sds&;
|
||||
|
|
|
@ -74,58 +74,74 @@ intset* IntsetAddSafe(string_view val, intset* is, bool* success, bool* added) {
|
|||
return is;
|
||||
}
|
||||
|
||||
pair<unsigned, bool> RemoveStrSet(uint32_t now_sec, facade::ArgRange vals, CompactObj* set) {
|
||||
unsigned removed = 0;
|
||||
bool isempty = false;
|
||||
DCHECK(IsDenseEncoding(*set));
|
||||
|
||||
if (true) {
|
||||
StringSet* ss = ((StringSet*)set->RObjPtr());
|
||||
ss->set_time(now_sec);
|
||||
|
||||
for (string_view member : vals) {
|
||||
removed += ss->Erase(member);
|
||||
}
|
||||
|
||||
isempty = ss->Empty();
|
||||
struct StringSetWrapper {
|
||||
StringSetWrapper(const CompactObj& obj, const DbContext& db_cntx)
|
||||
: StringSetWrapper(obj.RObjPtr(), db_cntx.time_now_ms) {
|
||||
DCHECK(IsDenseEncoding(obj));
|
||||
}
|
||||
|
||||
return make_pair(removed, isempty);
|
||||
}
|
||||
StringSetWrapper(const SetType& st, const DbContext& db_cntx)
|
||||
: StringSetWrapper(st.first, db_cntx.time_now_ms) {
|
||||
DCHECK_EQ(st.second, kEncodingStrMap2);
|
||||
}
|
||||
|
||||
unsigned AddStrSet(const DbContext& db_context, const NewEntries& vals, uint32_t ttl_sec,
|
||||
CompactObj* dest) {
|
||||
unsigned res = 0;
|
||||
DCHECK(IsDenseEncoding(*dest));
|
||||
static void Init(CompactObj* obj) {
|
||||
obj->InitRobj(OBJ_SET, kEncodingStrMap2, CompactObj::AllocateMR<StringSet>());
|
||||
}
|
||||
|
||||
if (true) {
|
||||
StringSet* ss = (StringSet*)dest->RObjPtr();
|
||||
uint32_t time_now = MemberTimeSeconds(db_context.time_now_ms);
|
||||
|
||||
ss->set_time(time_now);
|
||||
|
||||
for (auto member : EntriesRange(vals)) {
|
||||
unsigned Add(const NewEntries& entries, uint32_t ttl_sec) const {
|
||||
unsigned res = 0;
|
||||
for (string_view member : EntriesRange(entries))
|
||||
res += ss->Add(member, ttl_sec);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
pair<unsigned, bool> Remove(const facade::ArgRange& entries) const {
|
||||
unsigned removed = 0;
|
||||
for (string_view member : entries)
|
||||
removed += ss->Erase(member);
|
||||
return {removed, ss->Empty()};
|
||||
}
|
||||
|
||||
void InitStrSet(CompactObj* set) {
|
||||
set->InitRobj(OBJ_SET, kEncodingStrMap2, CompactObj::AllocateMR<StringSet>());
|
||||
}
|
||||
uint64_t Scan(uint64_t curs, const ScanOpts& scan_op, StringVec* res) const {
|
||||
uint32_t count = scan_op.limit;
|
||||
long maxiterations = count * 10;
|
||||
|
||||
do {
|
||||
auto scan_callback = [&](const sds ptr) {
|
||||
if (string_view str{ptr, sdslen(ptr)}; scan_op.Matches(str))
|
||||
res->emplace_back(str);
|
||||
};
|
||||
curs = ss->Scan(curs, scan_callback);
|
||||
} while (curs && maxiterations-- && res->size() < count);
|
||||
return curs;
|
||||
}
|
||||
|
||||
StringSet* operator->() const {
|
||||
return ss;
|
||||
}
|
||||
|
||||
auto Range() const {
|
||||
auto transform = [](const sds ptr) { return string_view{ptr, sdslen(ptr)}; };
|
||||
return base::it::Transform(transform, base::it::Range(ss->begin(), ss->end()));
|
||||
}
|
||||
|
||||
private:
|
||||
StringSetWrapper(void* robj_ptr, uint64_t now_ms) : ss(static_cast<StringSet*>(robj_ptr)) {
|
||||
ss->set_time(MemberTimeSeconds(now_ms));
|
||||
}
|
||||
|
||||
StringSet* const ss;
|
||||
};
|
||||
|
||||
// returns (removed, isempty)
|
||||
pair<unsigned, bool> RemoveSet(const DbContext& db_context, facade::ArgRange vals,
|
||||
CompactObj* set) {
|
||||
bool isempty = false;
|
||||
unsigned removed = 0;
|
||||
|
||||
if (set->Encoding() == kEncodingIntSet) {
|
||||
intset* is = (intset*)set->RObjPtr();
|
||||
long long llval;
|
||||
|
||||
unsigned removed = 0;
|
||||
for (string_view val : vals) {
|
||||
if (!string2ll(val.data(), val.size(), &llval)) {
|
||||
continue;
|
||||
|
@ -135,12 +151,12 @@ pair<unsigned, bool> RemoveSet(const DbContext& db_context, facade::ArgRange val
|
|||
is = intsetRemove(is, llval, &is_removed);
|
||||
removed += is_removed;
|
||||
}
|
||||
isempty = (intsetLen(is) == 0);
|
||||
set->SetRObjPtr(is);
|
||||
|
||||
return {removed, intsetLen(is) == 0};
|
||||
} else {
|
||||
return RemoveStrSet(MemberTimeSeconds(db_context.time_now_ms), vals, set);
|
||||
return StringSetWrapper{*set, db_context}.Remove(vals);
|
||||
}
|
||||
return make_pair(removed, isempty);
|
||||
}
|
||||
|
||||
void InitSet(const NewEntries& vals, CompactObj* set) {
|
||||
|
@ -158,44 +174,15 @@ void InitSet(const NewEntries& vals, CompactObj* set) {
|
|||
intset* is = intsetNew();
|
||||
set->InitRobj(OBJ_SET, kEncodingIntSet, is);
|
||||
} else {
|
||||
InitStrSet(set);
|
||||
StringSetWrapper::Init(set);
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t ScanStrSet(const DbContext& db_context, const CompactObj& co, uint64_t curs,
|
||||
const ScanOpts& scan_op, StringVec* res) {
|
||||
uint32_t count = scan_op.limit;
|
||||
long maxiterations = count * 10;
|
||||
DCHECK(IsDenseEncoding(co));
|
||||
|
||||
if (true) {
|
||||
StringSet* set = (StringSet*)co.RObjPtr();
|
||||
set->set_time(MemberTimeSeconds(db_context.time_now_ms));
|
||||
|
||||
do {
|
||||
auto scan_callback = [&](const sds ptr) {
|
||||
string_view str{ptr, sdslen(ptr)};
|
||||
if (scan_op.Matches(str)) {
|
||||
res->push_back(std::string(str));
|
||||
}
|
||||
};
|
||||
|
||||
curs = set->Scan(curs, scan_callback);
|
||||
|
||||
} while (curs && maxiterations-- && res->size() < count);
|
||||
}
|
||||
return curs;
|
||||
}
|
||||
|
||||
uint32_t SetTypeLen(const DbContext& db_context, const SetType& set) {
|
||||
if (set.second == kEncodingIntSet) {
|
||||
return intsetLen((const intset*)set.first);
|
||||
}
|
||||
|
||||
if (true) {
|
||||
StringSet* ss = (StringSet*)set.first;
|
||||
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
|
||||
return ss->UpperBoundSize();
|
||||
} else {
|
||||
return StringSetWrapper(set, db_context)->UpperBoundSize();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -207,11 +194,7 @@ bool IsInSet(const DbContext& db_context, const SetType& st, int64_t val) {
|
|||
char* next = absl::numbers_internal::FastIntToBuffer(val, buf);
|
||||
string_view str{buf, size_t(next - buf)};
|
||||
|
||||
if (true) {
|
||||
StringSet* ss = (StringSet*)st.first;
|
||||
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
|
||||
return ss->Contains(str);
|
||||
}
|
||||
return StringSetWrapper(st, db_context)->Contains(str);
|
||||
}
|
||||
|
||||
bool IsInSet(const DbContext& db_context, const SetType& st, string_view member) {
|
||||
|
@ -221,13 +204,8 @@ bool IsInSet(const DbContext& db_context, const SetType& st, string_view member)
|
|||
return false;
|
||||
|
||||
return intsetFind((intset*)st.first, llval);
|
||||
}
|
||||
|
||||
if (true) {
|
||||
StringSet* ss = (StringSet*)st.first;
|
||||
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
|
||||
|
||||
return ss->Contains(member);
|
||||
} else {
|
||||
return StringSetWrapper(st, db_context)->Contains(member);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -239,12 +217,8 @@ int32_t GetExpiry(const DbContext& db_context, const SetType& st, string_view me
|
|||
return -3;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (true) {
|
||||
StringSet* ss = (StringSet*)st.first;
|
||||
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
|
||||
|
||||
} else {
|
||||
StringSetWrapper ss{st, db_context};
|
||||
auto it = ss->Find(member);
|
||||
if (it == ss->end())
|
||||
return -3;
|
||||
|
@ -264,31 +238,21 @@ void FindInSet(StringVec& memberships, const DbContext& db_context, const SetTyp
|
|||
// Removes arg from result.
|
||||
void DiffStrSet(const DbContext& db_context, const SetType& st,
|
||||
absl::flat_hash_set<string>* result) {
|
||||
if (true) {
|
||||
StringSet* ss = (StringSet*)st.first;
|
||||
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
|
||||
for (sds ptr : *ss) {
|
||||
result->erase(string_view{ptr, sdslen(ptr)});
|
||||
}
|
||||
}
|
||||
for (string_view entry : StringSetWrapper{st, db_context}.Range())
|
||||
result->erase(entry);
|
||||
}
|
||||
|
||||
void InterStrSet(const DbContext& db_context, const vector<SetType>& vec, StringVec* result) {
|
||||
if (true) {
|
||||
StringSet* ss = (StringSet*)vec.front().first;
|
||||
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
|
||||
for (const sds ptr : *ss) {
|
||||
std::string_view str{ptr, sdslen(ptr)};
|
||||
size_t j = 1;
|
||||
for (j = 1; j < vec.size(); ++j) {
|
||||
if (vec[j].first != ss && !IsInSet(db_context, vec[j], str)) {
|
||||
break;
|
||||
}
|
||||
for (string_view str : StringSetWrapper{vec.front(), db_context}.Range()) {
|
||||
size_t j = 1;
|
||||
for (j = 1; j < vec.size(); ++j) {
|
||||
if (vec[j].first != vec.front().first && !IsInSet(db_context, vec[j], str)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (j == vec.size()) {
|
||||
result->push_back(std::string(str));
|
||||
}
|
||||
if (j == vec.size()) {
|
||||
result->emplace_back(str);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -305,22 +269,14 @@ StringVec RandMemberStrSet(const DbContext& db_context, const CompactObj& co,
|
|||
StringVec result;
|
||||
result.reserve(picks_count);
|
||||
|
||||
StringSet* ss = static_cast<StringSet*>(co.RObjPtr());
|
||||
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
|
||||
|
||||
std::uint32_t ss_entry_index = 0;
|
||||
container_utils::IterateSet(
|
||||
co, [&result, ×_index_is_picked, &ss_entry_index](container_utils::ContainerEntry ce) {
|
||||
auto it = times_index_is_picked.find(ss_entry_index++);
|
||||
if (it != times_index_is_picked.end()) {
|
||||
std::uint32_t t = it->second;
|
||||
while (t--) {
|
||||
result.emplace_back(ce.ToString());
|
||||
}
|
||||
}
|
||||
return true;
|
||||
});
|
||||
|
||||
for (string_view str : StringSetWrapper{co, db_context}.Range()) {
|
||||
auto it = times_index_is_picked.find(ss_entry_index++);
|
||||
if (it != times_index_is_picked.end()) {
|
||||
while (it->second--)
|
||||
result.emplace_back(str);
|
||||
}
|
||||
}
|
||||
/* Equal elements in the result are always successive. So, it is necessary to shuffle them */
|
||||
absl::BitGen gen;
|
||||
std::shuffle(result.begin(), result.end(), gen);
|
||||
|
@ -504,11 +460,10 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE
|
|||
InitSet(vals, &co);
|
||||
}
|
||||
|
||||
void* inner_obj = co.RObjPtr();
|
||||
uint32_t res = 0;
|
||||
|
||||
if (co.Encoding() == kEncodingIntSet) {
|
||||
intset* is = (intset*)inner_obj;
|
||||
intset* is = (intset*)co.RObjPtr();
|
||||
bool success = true;
|
||||
|
||||
for (auto val : vals_it) {
|
||||
|
@ -526,7 +481,6 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE
|
|||
|
||||
// frees 'is' on a way.
|
||||
co.InitRobj(OBJ_SET, kEncodingStrMap2, ss);
|
||||
inner_obj = co.RObjPtr();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -536,7 +490,7 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE
|
|||
}
|
||||
|
||||
if (co.Encoding() != kEncodingIntSet) {
|
||||
res = AddStrSet(op_args.db_cntx, vals, UINT32_MAX, &co);
|
||||
res = StringSetWrapper{co, op_args.db_cntx}.Add(vals, UINT32_MAX);
|
||||
}
|
||||
|
||||
if (journal_update && op_args.shard->journal()) {
|
||||
|
@ -563,7 +517,7 @@ OpResult<uint32_t> OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_
|
|||
CompactObj& co = add_res.it->second;
|
||||
|
||||
if (add_res.is_new) {
|
||||
InitStrSet(&co);
|
||||
StringSetWrapper::Init(&co);
|
||||
} else {
|
||||
// for non-overwrite case it must be set.
|
||||
if (co.ObjType() != OBJ_SET)
|
||||
|
@ -582,9 +536,7 @@ OpResult<uint32_t> OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_
|
|||
CHECK(IsDenseEncoding(co));
|
||||
}
|
||||
|
||||
uint32_t res = AddStrSet(op_args.db_cntx, vals, ttl_sec, &co);
|
||||
|
||||
return res;
|
||||
return StringSetWrapper{co, op_args.db_cntx}.Add(vals, ttl_sec);
|
||||
}
|
||||
|
||||
OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, facade::ArgRange vals,
|
||||
|
@ -982,7 +934,7 @@ OpResult<StringVec> OpScan(const OpArgs& op_args, string_view key, uint64_t* cur
|
|||
}
|
||||
*cursor = 0;
|
||||
} else {
|
||||
*cursor = ScanStrSet(op_args.db_cntx, it->second, *cursor, scan_op, &res);
|
||||
*cursor = StringSetWrapper{it->second, op_args.db_cntx}.Scan(*cursor, scan_op, &res);
|
||||
}
|
||||
|
||||
return res;
|
||||
|
|
Loading…
Reference in a new issue