1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(core): Added DenseSet and StringSet types (#268)

* feat(core): Added DenseSet & StringSet types with docs

- Improved documentation by adding labels to chain types & pointer tagging table
- Added potential improvements to the DenseSet types in the docs
- Added excalidraw save file for future editing
- Removed ambiguous overloading types
- Renamed iterators to be more clear


* feat(core): Cleaned up DenseSet and Docs
* feat(core): Made DenseSet more ergonomic
* feat(server): Integration of DenseSet into Server

- Integrated DenseSet with CompactObj and the Set Family commands

Signed-off-by: Braydn <braydn.moore@uwaterloo.ca>
This commit is contained in:
Braydn 2022-09-14 01:41:54 -04:00 committed by GitHub
parent ed83b07fad
commit b8d791961e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 3749 additions and 149 deletions

1985
docs/dense_set.excalidraw Normal file

File diff suppressed because it is too large Load diff

57
docs/dense_set.md Normal file
View file

@ -0,0 +1,57 @@
# DenseSet in Dragonfly
`DenseSet` uses [classic hashtable with separate chaining](https://en.wikipedia.org/wiki/Hash_table#Separate_chaining) similar to the Redis dictionary for lookup of items within the set.
The main optimization present in `DenseSet` is the ability for a pointer to **point to either an object or a link key**, removing the need to allocate a set entry for every entry. This is accomplished by using [pointer tagging](https://en.wikipedia.org/wiki/Tagged_pointer) exploiting the fact that the top 12 bits of any userspace address are not used and can be set to indicate if the current pointer points to nothing, a link key, or an object.
The following is what each bit in a pointer is used for
| Bit Index (from LSB) | Meaning |
| -------------------- |-------- |
| 0 - 52 | Memory address of data in the userspace |
| 53 | Indicates if this `DensePtr` points to data stored in the `DenseSet` or the next link in a chain |
| 54 | Displacement bit. Indicates if the current entry is in the correct list defined by the data's hash |
| 55 | Direction displaced, this only has meaning if the Displacement bit is set. 0 indicates the entry is to the left of its correct list, 1 indicates it is to the right of the correct list. |
| 56 - 63 | Unused |
Further, to reduce collisions items may be inserted into neighbors of the home chain (the chain determined by the hash) that are empty to reduce the number of unused spaces. These entries are then marked as displaced using pointer tagging.
An example of possible bucket configurations can be seen below.
![Dense Set Visualization](./dense_set.svg) *Created using [excalidraw](https://excalidraw.com)*
### Insertion
To insert an entry a `DenseSet` will take the following steps:
1. Check if the entry already exists in the set, if so return false
2. If the entry does not exist look for an empty chain at the hash index ± 1, prioritizing the home chain. If an empty entry is found the item will be inserted and return true
3. If step 2 fails and the growth prerequisites are met, increase the number of buckets in the table and repeat step 2
4. If step 3 fails, attempt to insert the entry in the home chain.
- If the home chain is not occupied by a displaced entry insert the new entry in the front of the list
- If the home chain is occupied by a displaced entry move the displaced entry to its home chain. This may cause a domino effect if the home chain of the displaced entry is occupied by a second displaced entry, resulting in up to `O(N)` "fixes"
### Searching
To find an entry in a `DenseSet`:
1. Check the first entry in the home and neighbour cells for matching entries
2. If step 1 fails iterate the home chain of the searched entry and check for equality
### Pending Improvements
Some further improvements to `DenseSet` include allowing entries to be inserted in their home chain without having to perform the current `O(N)` steps to fix displaced entries. By inserting an entry in their home chain after the displaced entry instead of fixing up displaced entries, searching incurs minimal added overhead and there is no domino effect in inserting a new entry. To move a displaced entry to its home chain eventually multiple heuristics may be implemented including:
- When an entry is erased if the chain becomes empty and there is a displaced entry in the neighbor chains move it to the now empty home chain
- If a displaced entry is found as a result of a search and is the root of a chain with multiple entries, the displaced node should be moved to its home bucket
## Benchmarks
At 100% utilization the Redis dictionary implementation uses approximately 32 bytes per record ([read the breakdown for more information](./dashtable.md#redis-dictionary))
In comparison using the neighbour cell optimization, `DenseSet` has ~21% of spaces unused at full utilization resulting in $N\*8 + 0.2\*16N \approx 11.2N$ or ~12 bytes per record, yielding ~20 byte savings. The number of bytes per record saved grows as utilization decreases.
Inserting 20M 10 byte strings into a set in chunks of 500 on an i5-8250U give the following results
| | Dragonfly (DenseSet) | Dragonfly (Redis Dictionary) | Redis 7 |
|-------------|----------------------|------------------------------|---------|
| Time | 44.1s | 46.9s | 50.3s |
| Memory used | 626.44MiB | 1.27G | 1.27G |

16
docs/dense_set.svg Normal file

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 42 KiB

View file

@ -1,9 +1,8 @@
add_library(dfly_core compact_object.cc dragonfly_core.cc extent_tree.cc
external_alloc.cc interpreter.cc mi_memory_resource.cc
segment_allocator.cc small_string.cc tx_queue.cc)
add_library(dfly_core compact_object.cc dragonfly_core.cc extent_tree.cc
external_alloc.cc interpreter.cc mi_memory_resource.cc
segment_allocator.cc small_string.cc tx_queue.cc dense_set.cc string_set.cc)
cxx_link(dfly_core base absl::flat_hash_map absl::str_format redis_lib TRDP::lua lua_modules
Boost::fiber crypto)
Boost::fiber crypto)
add_executable(dash_bench dash_bench.cc)
cxx_link(dash_bench dfly_core)
@ -15,3 +14,4 @@ cxx_test(external_alloc_test dfly_core LABELS DFLY)
cxx_test(dash_test dfly_core LABELS DFLY)
cxx_test(interpreter_test dfly_core LABELS DFLY)
cxx_test(json_test dfly_core TRDP::jsoncons LABELS DFLY)
cxx_test(string_set_test dfly_core LABELS DFLY)

View file

@ -19,8 +19,10 @@ extern "C" {
#include <absl/strings/str_cat.h>
#include "base/flags.h"
#include "base/logging.h"
#include "base/pod_array.h"
#include "core/string_set.h"
#if defined(__aarch64__)
#include "base/sse2neon.h"
@ -28,8 +30,11 @@ extern "C" {
#include <emmintrin.h>
#endif
ABSL_FLAG(bool, use_set2, true, "If true use DenseSet for an optimized set data structure");
namespace dfly {
using namespace std;
using absl::GetFlag;
namespace {
@ -55,6 +60,11 @@ inline void FreeObjSet(unsigned encoding, void* ptr, pmr::memory_resource* mr) {
dictRelease((dict*)ptr);
break;
}
case kEncodingStrMap2: {
delete (StringSet*)ptr;
break;
}
case kEncodingIntSet:
zfree((void*)ptr);
break;
@ -67,6 +77,10 @@ size_t MallocUsedSet(unsigned encoding, void* ptr) {
switch (encoding) {
case kEncodingStrMap /*OBJ_ENCODING_HT*/:
return 0; // TODO
case kEncodingStrMap2: {
StringSet* ss = (StringSet*)ptr;
return ss->ObjMallocUsed() + ss->SetMallocUsed();
}
case kEncodingIntSet:
return intsetBlobLen((intset*)ptr);
}
@ -101,7 +115,7 @@ size_t MallocUsedZSet(unsigned encoding, void* ptr) {
size_t MallocUsedStream(unsigned encoding, void* streamv) {
// stream* str_obj = (stream*)streamv;
return 0; // TODO
return 0; // TODO
}
inline void FreeObjHash(unsigned encoding, void* ptr) {
@ -258,6 +272,10 @@ size_t RobjWrapper::Size() const {
dict* d = (dict*)inner_obj_;
return dictSize(d);
}
case kEncodingStrMap2: {
StringSet* ss = (StringSet*)inner_obj_;
return ss->Size();
}
default:
LOG(FATAL) << "Unexpected encoding " << encoding_;
}
@ -382,8 +400,8 @@ void RobjWrapper::MakeInnerRoom(size_t current_cap, size_t desired, pmr::memory_
}
#if defined(__GNUC__) && !defined(__clang__)
#pragma GCC push_options
#pragma GCC optimize("Ofast")
#pragma GCC push_options
#pragma GCC optimize("Ofast")
#endif
// len must be at least 16
@ -463,7 +481,7 @@ bool compare_packed(const uint8_t* packed, const char* ascii, size_t ascii_len)
}
#if defined(__GNUC__) && !defined(__clang__)
#pragma GCC pop_options
#pragma GCC pop_options
#endif
} // namespace detail
@ -608,7 +626,7 @@ void CompactObj::ImportRObj(robj* o) {
if (o->encoding == OBJ_ENCODING_INTSET) {
enc = kEncodingIntSet;
} else {
enc = kEncodingStrMap;
enc = GetFlag(FLAGS_use_set2) ? kEncodingStrMap2 : kEncodingStrMap;
}
}
u_.r_obj.Init(type, enc, o->ptr);

View file

@ -17,6 +17,7 @@ namespace dfly {
constexpr unsigned kEncodingIntSet = 0;
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap2 = 2;
namespace detail {

405
src/core/dense_set.cc Normal file
View file

@ -0,0 +1,405 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "core/dense_set.h"
#include <absl/numeric/bits.h>
#include <cstddef>
#include <cstdint>
#include <stack>
#include <type_traits>
#include <vector>
#include "glog/logging.h"
extern "C" {
#include "redis/zmalloc.h"
}
namespace dfly {
using namespace std;
constexpr size_t kMinSizeShift = 2;
constexpr size_t kMinSize = 1 << kMinSizeShift;
constexpr bool kAllowDisplacements = true;
DenseSet::DenseSet(pmr::memory_resource* mr) : entries_(mr) {
}
DenseSet::~DenseSet() {
ClearInternal();
}
size_t DenseSet::PushFront(DenseSet::ChainVectorIterator it, void* data) {
// if this is an empty list assign the value to the empty placeholder pointer
if (it->IsEmpty()) {
it->SetObject(data);
return ObjectAllocSize(data);
}
// otherwise make a new link and connect it to the front of the list
it->SetLink(NewLink(data, *it));
return ObjectAllocSize(data);
}
void DenseSet::PushFront(DenseSet::ChainVectorIterator it, DenseSet::DensePtr ptr) {
if (it->IsEmpty()) {
it->SetObject(ptr.GetObject());
if (ptr.IsLink()) {
FreeLink(ptr);
}
} else if (ptr.IsLink()) {
// if the pointer is already a link then no allocation needed
*ptr.Next() = *it;
*it = ptr;
} else {
// allocate a new link if needed and copy the pointer to the new link
it->SetLink(NewLink(ptr.GetObject(), *it));
}
}
auto DenseSet::PopPtrFront(DenseSet::ChainVectorIterator it) -> DensePtr {
if (it->IsEmpty()) {
return DensePtr{};
}
DensePtr front = *it;
// if this is an object, then it's also the only record in this chain.
// therefore, we should just reset DensePtr.
if (it->IsObject()) {
it->Reset();
} else {
DCHECK(it->IsLink());
// since a DenseLinkKey could be at the end of a chain and have a nullptr for next
// avoid dereferencing a nullptr and just reset the pointer to this DenseLinkKey
if (it->Next() == nullptr) {
it->Reset();
} else {
*it = *it->Next();
}
}
return front;
}
void* DenseSet::PopDataFront(DenseSet::ChainVectorIterator it) {
DensePtr front = PopPtrFront(it);
void* ret = front.GetObject();
if (front.IsLink()) {
FreeLink(front);
}
return ret;
}
// updates *node with the next item
auto DenseSet::Unlink(DenseSet::DensePtr* node) -> DensePtr {
DensePtr ret = *node;
if (node->IsObject()) {
node->Reset();
} else {
*node = *node->Next();
}
return ret;
}
// updates *node with the next item
void* DenseSet::UnlinkAndFree(DenseSet::DensePtr* node) {
DensePtr unlinked = Unlink(node);
void* ret = unlinked.GetObject();
if (unlinked.IsLink()) {
FreeLink(unlinked);
}
return ret;
}
void DenseSet::ClearInternal() {
for (auto it = entries_.begin(); it != entries_.end(); ++it) {
while (!it->IsEmpty()) {
PopDataFront(it);
}
}
entries_.clear();
}
auto DenseSet::FindEmptyAround(uint32_t bid) -> ChainVectorIterator {
if (entries_[bid].IsEmpty()) {
return entries_.begin() + bid;
}
if (!kAllowDisplacements) {
return entries_.end();
}
if (bid + 1 < entries_.size() && entries_[bid + 1].IsEmpty()) {
return entries_.begin() + bid + 1;
}
if (bid && entries_[bid - 1].IsEmpty()) {
return entries_.begin() + bid - 1;
}
return entries_.end();
}
void DenseSet::Reserve(size_t sz) {
sz = std::min<size_t>(sz, kMinSize);
sz = absl::bit_ceil(sz);
capacity_log_ = absl::bit_width(sz);
entries_.reserve(sz);
}
void DenseSet::Grow() {
size_t prev_size = entries_.size();
entries_.resize(prev_size * 2);
++capacity_log_;
// perform rehashing of items in the set
for (long i = prev_size - 1; i >= 0; --i) {
DensePtr* curr = &entries_[i];
// curr != nullptr checks if we have reached the end of a chain
// while !curr->IsEmpty() checks that the current chain is not empty
while (curr != nullptr && !curr->IsEmpty()) {
void* ptr = curr->GetObject();
uint32_t bid = BucketId(ptr);
// if the item does not move from the current chain, ensure
// it is not marked as displaced and move to the next item in the chain
if (bid == i) {
curr->ClearDisplaced();
curr = curr->Next();
} else {
// if the entry is in the wrong chain remove it and
// add it to the correct chain. This will also correct
// displaced entries
DensePtr node = Unlink(curr);
PushFront(entries_.begin() + bid, node);
entries_[bid].ClearDisplaced();
}
}
}
}
bool DenseSet::AddInternal(void* ptr) {
uint64_t hc = Hash(ptr);
if (entries_.empty()) {
capacity_log_ = kMinSizeShift;
entries_.resize(kMinSize);
uint32_t bucket_id = BucketId(hc);
auto e = entries_.begin() + bucket_id;
obj_malloc_used_ += PushFront(e, ptr);
++size_;
++num_used_buckets_;
return true;
}
// if the value is already in the set exit early
uint32_t bucket_id = BucketId(hc);
if (Find(ptr, bucket_id) != nullptr) {
return false;
}
DCHECK_LT(bucket_id, entries_.size());
// Try insert into flat surface first. Also handle the grow case
// if utilization is too high.
for (unsigned j = 0; j < 2; ++j) {
ChainVectorIterator list = FindEmptyAround(bucket_id);
if (list != entries_.end()) {
obj_malloc_used_ += PushFront(list, ptr);
if (std::distance(entries_.begin(), list) != bucket_id) {
list->SetDisplaced(std::distance(entries_.begin() + bucket_id, list));
}
++num_used_buckets_;
++size_;
return true;
}
if (size_ < entries_.size()) {
break;
}
Grow();
bucket_id = BucketId(hc);
}
DCHECK(!entries_[bucket_id].IsEmpty());
/**
* Since the current entry is not empty, it is either a valid chain
* or there is a displaced node here. In the latter case it is best to
* move the displaced node to its correct bucket. However there could be
* a displaced node there and so forth. Keep to avoid having to keep a stack
* of displacements we can keep track of the current displaced node, add it
* to the correct chain, and if the correct chain contains a displaced node
* unlink it and repeat the steps
*/
DensePtr to_insert(ptr);
while (!entries_[bucket_id].IsEmpty() && entries_[bucket_id].IsDisplaced()) {
DensePtr unlinked = PopPtrFront(entries_.begin() + bucket_id);
PushFront(entries_.begin() + bucket_id, to_insert);
to_insert = unlinked;
bucket_id -= unlinked.GetDisplacedDirection();
}
if (!entries_[bucket_id].IsEmpty()) {
++num_chain_entries_;
}
ChainVectorIterator list = entries_.begin() + bucket_id;
PushFront(list, to_insert);
obj_malloc_used_ += ObjectAllocSize(ptr);
DCHECK(!entries_[bucket_id].IsDisplaced());
++size_;
return true;
}
auto DenseSet::Find(const void* ptr, uint32_t bid) const -> const DensePtr* {
const DensePtr* curr = &entries_[bid];
if (!curr->IsEmpty() && Equal(*curr, ptr)) {
return curr;
}
// first look for displaced nodes since this is quicker than iterating a potential long chain
if (bid && Equal(entries_[bid - 1], ptr)) {
return &entries_[bid - 1];
}
if (bid + 1 < entries_.size() && Equal(entries_[bid + 1], ptr)) {
return &entries_[bid + 1];
}
// if the node is not displaced, search the correct chain
curr = curr->Next();
while (curr != nullptr) {
if (Equal(*curr, ptr)) {
return curr;
}
curr = curr->Next();
}
// not in the Set
return nullptr;
}
// Same idea as FindAround but provide the const guarantee
bool DenseSet::ContainsInternal(const void* ptr) const {
uint32_t bid = BucketId(ptr);
return Find(ptr, bid) != nullptr;
}
void* DenseSet::EraseInternal(void* ptr) {
uint32_t bid = BucketId(ptr);
auto found = Find(ptr, bid);
if (found == nullptr) {
return nullptr;
}
if (found->IsLink()) {
--num_chain_entries_;
} else {
DCHECK(found->IsObject());
--num_used_buckets_;
}
obj_malloc_used_ -= ObjectAllocSize(ptr);
void* ret = UnlinkAndFree(found);
--size_;
return ret;
}
void* DenseSet::PopInternal() {
std::pmr::vector<DenseSet::DensePtr>::iterator bucket_iter = entries_.begin();
// find the first non-empty chain
while (bucket_iter != entries_.end() && bucket_iter->IsEmpty()) {
++bucket_iter;
}
// empty set
if (bucket_iter == entries_.end()) {
return nullptr;
}
if (bucket_iter->IsLink()) {
--num_chain_entries_;
} else {
DCHECK(bucket_iter->IsObject());
--num_used_buckets_;
}
// unlink the first node in the first non-empty chain
obj_malloc_used_ -= ObjectAllocSize(bucket_iter->GetObject());
void* ret = PopDataFront(bucket_iter);
--size_;
return ret;
}
/**
* stable scanning api. has the same guarantees as redis scan command.
* we avoid doing bit-reverse by using a different function to derive a bucket id
* from hash values. By using msb part of hash we make it "stable" with respect to
* rehashes. For example, with table log size 4 (size 16), entries in bucket id
* 1110 come from hashes 1110XXXXX.... When a table grows to log size 5,
* these entries can move either to 11100 or 11101. So if we traversed with our cursor
* range [0000-1110], it's guaranteed that in grown table we do not need to cover again
* [00000-11100]. Similarly with shrinkage, if a table is shrinked to log size 3,
* keys from 1110 and 1111 will move to bucket 111. Again, it's guaranteed that we
* covered the range [000-111] (all keys in that case).
* Returns: next cursor or 0 if reached the end of scan.
* cursor = 0 - initiates a new scan.
*/
uint32_t DenseSet::Scan(uint32_t cursor, const ItemCb& cb) const {
// empty set
if (capacity_log_ == 0) {
return 0;
}
uint32_t entries_idx = cursor >> (32 - capacity_log_);
// skip empty entries
while (entries_idx < entries_.size() && entries_[entries_idx].IsEmpty()) {
++entries_idx;
}
if (entries_idx >= entries_.size()) {
return 0;
}
const DensePtr* curr = &entries_[entries_idx];
// when scanning add all entries in a given chain
while (curr != nullptr && !curr->IsEmpty()) {
cb(curr->GetObject());
curr = curr->Next();
}
// move to the next index for the next scan and check if we are done
++entries_idx;
if (entries_idx >= entries_.size()) {
return 0;
}
return entries_idx << (32 - capacity_log_);
}
} // namespace dfly

417
src/core/dense_set.h Normal file
View file

@ -0,0 +1,417 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <cstddef>
#include <cstdint>
#include <functional>
#include <iterator>
#include <memory_resource>
#include <type_traits>
namespace dfly {
// DenseSet is a nice but over-optimized data-structure. Probably is not worth it in the first
// place but sometimes the OCD kicks in and one can not resist.
// The advantage of it over redis-dict is smaller meta-data waste.
// dictEntry is 24 bytes, i.e it uses at least 32N bytes where N is the expected length.
// dict requires to allocate dictEntry per each addition in addition to the supplied key.
// It also wastes space in case of a set because it stores a value pointer inside dictEntry.
// To summarize:
// 100% utilized dict uses N*24 + N*8 = 32N bytes not including the key space.
// for 75% utilization (1/0.75 buckets): N*1.33*8 + N*24 = 35N
//
// This class uses 8 bytes per bucket (similarly to dictEntry*) but it used it for both
// links and keys. For most cases, we remove the need for another redirection layer
// and just store the key, so no "dictEntry" allocations occur.
// For those cells that require chaining, the bucket is
// changed in run-time to represent a linked chain.
// Additional feature - in order to to reduce collisions, we insert items into
// neighbour cells but only if they are empty (not chains). This way we reduce the number of
// empty (unused) spaces at full utilization from 36% to ~21%.
// 100% utilized table requires: N*8 + 0.2N*16 = 11.2N bytes or ~20 bytes savings.
// 75% utilization: N*1.33*8 + 0.12N*16 = 13N or ~22 bytes savings per record.
// with potential replacements of hset/zset data structures.
// static_assert(sizeof(dictEntry) == 24);
class DenseSet {
public:
explicit DenseSet(std::pmr::memory_resource* mr = std::pmr::get_default_resource());
virtual ~DenseSet();
protected:
// Virtual functions to be implemented for generic data
virtual uint64_t Hash(const void*) const = 0;
virtual bool Equal(const void*, const void*) const = 0;
virtual size_t ObjectAllocSize(const void*) const = 0;
bool AddInternal(void*);
bool ContainsInternal(const void*) const;
void* EraseInternal(void*);
void* PopInternal();
// Note this does not free any dynamic allocations done by derived classes, that a DensePtr
// in the set may point to. This function only frees the allocated DenseLinkKeys created by
// DenseSet. All data allocated by a derived class should be freed before calling this
void ClearInternal();
private:
DenseSet(const DenseSet&) = delete;
DenseSet& operator=(DenseSet&) = delete;
struct DenseLinkKey;
// we can assume that high 12 bits of user address space
// can be used for tagging. At most 52 bits of address are reserved for
// some configurations, and usually it's 48 bits.
// https://www.kernel.org/doc/html/latest/arm64/memory.html
static constexpr size_t kLinkBit = 1ULL << 52;
static constexpr size_t kDisplaceBit = 1ULL << 53;
static constexpr size_t kDisplaceDirectionBit = 1ULL << 54;
static constexpr size_t kTagMask = 4095ULL << 51; // we reserve 12 high bits.
struct DensePtr {
explicit DensePtr(void* p = nullptr) : ptr_(p) {
}
uint64_t uptr() const {
return uint64_t(ptr_);
}
bool IsObject() const {
return (uptr() & kLinkBit) == 0;
}
bool IsLink() const {
return (uptr() & kLinkBit) == kLinkBit;
}
bool IsEmpty() const {
return ptr_ == nullptr;
}
void* Raw() const {
return (void*)(uptr() & ~kTagMask);
}
bool IsDisplaced() const {
return (uptr() & kDisplaceBit) == kDisplaceBit;
}
void SetLink(DenseLinkKey* lk) {
ptr_ = (void*)(uintptr_t(lk) | kLinkBit);
}
void SetDisplaced(int direction) {
ptr_ = (void*)(uptr() | kDisplaceBit);
if (direction == 1) {
ptr_ = (void*)(uptr() | kDisplaceDirectionBit);
}
}
void ClearDisplaced() {
ptr_ = (void*)(uptr() & ~(kDisplaceBit | kDisplaceDirectionBit));
}
// returns 1 if the displaced node is right of the correct bucket and -1 if it is left
int GetDisplacedDirection() const {
return (uptr() & kDisplaceDirectionBit) == kDisplaceDirectionBit ? 1 : -1;
}
void Reset() {
ptr_ = nullptr;
}
void* GetObject() const {
if (IsObject()) {
return Raw();
}
return AsLink()->Raw();
}
// Sets pointer but preserves tagging info
void SetObject(void* obj) {
ptr_ = (void*)((uptr() & kTagMask) | (uintptr_t(obj) & ~kTagMask));
}
DenseLinkKey* AsLink() {
return (DenseLinkKey*)Raw();
}
const DenseLinkKey* AsLink() const {
return (const DenseLinkKey*)Raw();
}
DensePtr* Next() {
if (!IsLink()) {
return nullptr;
}
return &AsLink()->next;
}
const DensePtr* Next() const {
if (!IsLink()) {
return nullptr;
}
return &AsLink()->next;
}
private:
void* ptr_ = nullptr; //
};
struct DenseLinkKey : public DensePtr {
DensePtr next; // could be LinkKey* or Object *.
};
static_assert(sizeof(DensePtr) == sizeof(uintptr_t));
static_assert(sizeof(DenseLinkKey) == 2 * sizeof(uintptr_t));
using LinkAllocator = std::pmr::polymorphic_allocator<DenseLinkKey>;
using ChainVectorIterator = std::pmr::vector<DensePtr>::iterator;
using ChainVectorConstIterator = std::pmr::vector<DensePtr>::const_iterator;
bool Equal(const DensePtr dptr, const void* ptr) const {
if (dptr.IsEmpty()) {
return false;
}
return Equal(dptr.GetObject(), ptr);
}
std::pmr::memory_resource* mr() {
return entries_.get_allocator().resource();
}
uint32_t BucketId(uint64_t hash) const {
return hash >> (64 - capacity_log_);
}
uint32_t BucketId(const void* ptr) const {
return BucketId(Hash(ptr));
}
// return a ChainVectorIterator (a.k.a iterator) or end if there is an empty chain found
ChainVectorIterator FindEmptyAround(uint32_t bid);
void Grow();
// ============ Pseudo Linked List Functions for interacting with Chains ==================
size_t PushFront(ChainVectorIterator, void*);
void PushFront(ChainVectorIterator, DensePtr);
void* PopDataFront(ChainVectorIterator);
DensePtr PopPtrFront(ChainVectorIterator);
// Note this function will modify the iterator passed to it
// to point to the next node in the chain
DensePtr Unlink(DensePtr* node);
// Note this will only free the encapsulaing DenseLinkKey and not
// the data it points to, this will be returned.
// This function will modify the iterator passed to it
// to point to the next node in the chain
void* UnlinkAndFree(DensePtr* node);
// ============ Pseudo Linked List in DenseSet end ==================
const DensePtr* Find(const void* ptr, uint32_t bid) const;
const DensePtr* Find(const void* ptr) const {
return Find(ptr, BucketId(ptr));
}
DensePtr* Find(const void* ptr, uint32_t bid) {
const DensePtr* ret = const_cast<const DenseSet*>(this)->Find(ptr, bid);
return const_cast<DensePtr*>(ret);
}
DensePtr* Find(const void* ptr) {
const DensePtr* ret = const_cast<const DenseSet*>(this)->Find(ptr);
return const_cast<DensePtr*>(ret);
}
inline DenseLinkKey* NewLink(void* data, DensePtr next) {
LinkAllocator la(mr());
DenseLinkKey* lk = la.allocate(1);
la.construct(lk);
lk->next = next;
lk->SetObject(data);
return lk;
}
inline void FreeLink(DensePtr link) {
// deallocate the link if it is no longer a link as it is now in an empty list
mr()->deallocate(link.AsLink(), sizeof(DenseLinkKey), alignof(DenseLinkKey));
}
std::pmr::vector<DensePtr> entries_;
size_t obj_malloc_used_ = 0;
uint32_t size_ = 0;
uint32_t num_chain_entries_ = 0;
uint32_t num_used_buckets_ = 0;
unsigned capacity_log_ = 0;
public:
size_t Size() const {
return size_;
}
bool Empty() const {
return size_ == 0;
}
size_t BucketCount() const {
return entries_.size();
}
// those that are chained to the entries stored inline in the bucket array.
size_t NumChainEntries() const {
return num_chain_entries_;
}
size_t NumUsedBuckets() const {
return num_used_buckets_;
}
size_t ObjMallocUsed() const {
return obj_malloc_used_;
}
size_t SetMallocUsed() const {
return (num_chain_entries_ + entries_.capacity()) * sizeof(DensePtr);
}
template <typename T> class iterator {
static_assert(std::is_pointer_v<T>, "Iterators can only return pointers");
public:
using iterator_category = std::forward_iterator_tag;
using value_type = T;
using pointer = value_type*;
using reference = value_type&;
iterator(DenseSet* set, ChainVectorIterator begin_list) : set_(set), curr_list_(begin_list) {
if (begin_list == set->entries_.end()) {
curr_entry_ = nullptr;
} else {
curr_entry_ = &*begin_list;
// find the first non null entry
if (curr_entry_ == nullptr || curr_entry_->IsEmpty()) {
++(*this);
}
}
}
iterator& operator++() {
curr_entry_ = curr_entry_->Next();
while (curr_list_ != set_->entries_.end() &&
(curr_entry_ == nullptr || curr_entry_->IsEmpty())) {
++curr_list_;
curr_entry_ = &*curr_list_;
}
return *this;
}
friend bool operator==(const iterator& a, const iterator& b) {
return a.curr_list_ == b.curr_list_;
}
friend bool operator!=(const iterator& a, const iterator& b) {
return !(a == b);
}
value_type operator*() {
return (value_type)curr_entry_->GetObject();
}
value_type operator->() {
return (value_type)curr_entry_->GetObject();
}
private:
DenseSet* set_;
ChainVectorIterator curr_list_;
DensePtr* curr_entry_;
};
template <typename T> class const_iterator {
static_assert(std::is_pointer_v<T>, "Iterators can only return pointer types");
public:
using iterator_category = std::input_iterator_tag;
using value_type = const T;
using pointer = value_type*;
using reference = value_type&;
const_iterator(const DenseSet* set, ChainVectorConstIterator begin_list)
: set_(set), curr_list_(begin_list) {
if (begin_list == set->entries_.end()) {
curr_entry_ = nullptr;
} else {
curr_entry_ = &*begin_list;
// find the first non null entry
if (curr_entry_ == nullptr || curr_entry_->IsEmpty()) {
++(*this);
}
}
}
const_iterator& operator++() {
curr_entry_ = curr_entry_->Next();
curr_entry_ = curr_entry_->Next();
while (curr_list_ != set_->entries_.end() &&
(curr_entry_ == nullptr || curr_entry_->IsEmpty())) {
++curr_list_;
curr_entry_ = &*curr_list_;
}
return *this;
}
friend bool operator==(const const_iterator& a, const const_iterator& b) {
return a.curr_list_ == b.curr_list_;
}
friend bool operator!=(const const_iterator& a, const const_iterator& b) {
return !(a == b);
}
value_type operator*() const {
return (value_type)curr_entry_->GetObject();
}
value_type operator->() const {
return (value_type)curr_entry_->GetObject();
}
private:
const DenseSet* set_;
ChainVectorConstIterator curr_list_;
const DensePtr* curr_entry_;
};
template <typename T> iterator<T> begin() {
return iterator<T>(this, entries_.begin());
}
template <typename T> iterator<T> end() {
return iterator<T>(this, entries_.end());
}
template <typename T> const_iterator<T> cbegin() const {
return const_iterator<T>(this, entries_.cbegin());
}
template <typename T> const_iterator<T> cend() const {
return const_iterator<T>(this, entries_.cend());
}
using ItemCb = std::function<void(const void*)>;
uint32_t Scan(uint32_t cursor, const ItemCb& cb) const;
void Reserve(size_t sz);
};
} // namespace dfly

102
src/core/string_set.cc Normal file
View file

@ -0,0 +1,102 @@
#include "core/string_set.h"
#include "core/compact_object.h"
#include "redis/sds.h"
extern "C" {
#include "redis/zmalloc.h"
}
namespace dfly {
uint64_t StringSet::Hash(const void* ptr) const {
sds s = (sds)ptr;
return CompactObj::HashCode(std::string_view{s, sdslen(s)});
}
bool StringSet::Equal(const void* ptr1, const void* ptr2) const {
sds s1 = (sds)ptr1;
sds s2 = (sds)ptr2;
if (sdslen(s1) != sdslen(s2)) {
return false;
}
return sdslen(s1) == 0 || memcmp(s1, s2, sdslen(s1)) == 0;
}
size_t StringSet::ObjectAllocSize(const void* s1) const {
return zmalloc_usable_size(sdsAllocPtr((sds)s1));
}
bool StringSet::AddSds(sds s1) {
return AddInternal(s1);
}
bool StringSet::Add(std::string_view s1) {
sds newsds = sdsnewlen(s1.data(), s1.size());
if (!AddInternal(newsds)) {
sdsfree(newsds);
return false;
}
return true;
}
bool StringSet::EraseSds(sds s1) {
void* ret = EraseInternal(s1);
if (ret == nullptr) {
return false;
} else {
sdsfree((sds)ret);
return true;
}
}
bool StringSet::Erase(std::string_view s1) {
sds to_erase = sdsnewlen(s1.data(), s1.size());
bool ret = EraseSds(to_erase);
sdsfree(to_erase);
return ret;
}
bool StringSet::ContainsSds(sds s1) const {
return ContainsInternal(s1);
}
bool StringSet::Contains(std::string_view s1) const {
sds to_search = sdsnewlen(s1.data(), s1.size());
bool ret = ContainsInternal(to_search);
sdsfree(to_search);
return ret;
}
void StringSet::Clear() {
for (auto it = begin(); it != end(); ++it) {
sdsfree((sds)*it);
}
ClearInternal();
}
std::optional<std::string> StringSet::Pop() {
sds str = (sds)PopInternal();
if (str == nullptr) {
return std::nullopt;
}
std::string ret{str, sdslen(str)};
sdsfree(str);
return ret;
}
sds StringSet::PopRaw() {
return (sds)PopInternal();
}
uint32_t StringSet::Scan(uint32_t cursor, const std::function<void(const sds)>& func) const {
return DenseSet::Scan(cursor, [func](const void* ptr) { func((sds)ptr); });
}
}; // namespace dfly

66
src/core/string_set.h Normal file
View file

@ -0,0 +1,66 @@
#pragma once
#include <cstdint>
#include <functional>
#include <optional>
#include "core/dense_set.h"
extern "C" {
#include "redis/sds.h"
}
namespace dfly {
class StringSet : public DenseSet {
public:
uint64_t Hash(const void* ptr) const override;
bool Equal(const void* ptr1, const void* ptr2) const override;
size_t ObjectAllocSize(const void* s1) const override;
bool Add(std::string_view s1);
bool AddSds(sds s1);
bool Erase(std::string_view s1);
bool EraseSds(sds s1);
bool Contains(std::string_view s1) const;
bool ContainsSds(sds s1) const;
void Clear();
std::optional<std::string> Pop();
sds PopRaw();
~StringSet() {
Clear();
}
StringSet(std::pmr::memory_resource* res = std::pmr::get_default_resource()) : DenseSet(res) {
}
iterator<sds> begin() {
return DenseSet::begin<sds>();
}
iterator<sds> end() {
return DenseSet::end<sds>();
}
const_iterator<sds> cbegin() const {
return DenseSet::cbegin<sds>();
}
const_iterator<sds> cend() const {
return DenseSet::cend<sds>();
}
uint32_t Scan(uint32_t, const std::function<void(sds)>&) const;
};
} // end namespace dfly

382
src/core/string_set_test.cc Normal file
View file

@ -0,0 +1,382 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "core/string_set.h"
#include <gtest/gtest.h>
#include <mimalloc.h>
#include <algorithm>
#include <cstddef>
#include <memory_resource>
#include <random>
#include <string>
#include <string_view>
#include <unordered_set>
#include <vector>
#include "core/compact_object.h"
#include "core/mi_memory_resource.h"
#include "glog/logging.h"
#include "redis/sds.h"
extern "C" {
#include "redis/zmalloc.h"
}
namespace dfly {
using namespace std;
class DenseSetAllocator : public std::pmr::memory_resource {
public:
bool all_freed() const {
return alloced_ == 0;
}
void* do_allocate(std::size_t bytes, std::size_t alignment) override {
alloced_ += bytes;
void* p = std::pmr::new_delete_resource()->allocate(bytes, alignment);
return p;
}
void do_deallocate(void* p, std::size_t bytes, std::size_t alignment) override {
alloced_ -= bytes;
return std::pmr::new_delete_resource()->deallocate(p, bytes, alignment);
}
bool do_is_equal(const std::pmr::memory_resource& other) const noexcept override {
return std::pmr::new_delete_resource()->is_equal(other);
}
private:
size_t alloced_ = 0;
};
class StringSetTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
auto* tlh = mi_heap_get_backing();
init_zmalloc_threadlocal(tlh);
}
static void TearDownTestSuite() {
}
void SetUp() override {
orig_resource_ = std::pmr::get_default_resource();
std::pmr::set_default_resource(&alloc_);
ss_ = new StringSet();
}
void TearDown() override {
delete ss_;
// ensure there are no memory leaks after every test
EXPECT_TRUE(alloc_.all_freed());
EXPECT_EQ(zmalloc_used_memory_tl, 0);
std::pmr::set_default_resource(orig_resource_);
}
StringSet* ss_;
std::pmr::memory_resource* orig_resource_;
DenseSetAllocator alloc_;
};
TEST_F(StringSetTest, Basic) {
EXPECT_TRUE(ss_->Add(std::string_view{"foo"}));
EXPECT_TRUE(ss_->Add(std::string_view{"bar"}));
EXPECT_FALSE(ss_->Add(std::string_view{"foo"}));
EXPECT_FALSE(ss_->Add(std::string_view{"bar"}));
EXPECT_TRUE(ss_->Contains(std::string_view{"foo"}));
EXPECT_TRUE(ss_->Contains(std::string_view{"bar"}));
EXPECT_EQ(2, ss_->Size());
}
TEST_F(StringSetTest, StandardAddErase) {
EXPECT_TRUE(ss_->Add("@@@@@@@@@@@@@@@@"));
EXPECT_TRUE(ss_->Add("A@@@@@@@@@@@@@@@"));
EXPECT_TRUE(ss_->Add("AA@@@@@@@@@@@@@@"));
EXPECT_TRUE(ss_->Add("AAA@@@@@@@@@@@@@"));
EXPECT_TRUE(ss_->Add("AAAAAAAAA@@@@@@@"));
EXPECT_TRUE(ss_->Add("AAAAAAAAAA@@@@@@"));
EXPECT_TRUE(ss_->Add("AAAAAAAAAAAAAAA@"));
EXPECT_TRUE(ss_->Add("AAAAAAAAAAAAAAAA"));
EXPECT_TRUE(ss_->Add("AAAAAAAAAAAAAAAD"));
EXPECT_TRUE(ss_->Add("BBBBBAAAAAAAAAAA"));
EXPECT_TRUE(ss_->Add("BBBBBBBBAAAAAAAA"));
EXPECT_TRUE(ss_->Add("CCCCCBBBBBBBBBBB"));
// Remove link in the middle of chain
EXPECT_TRUE(ss_->Erase("BBBBBBBBAAAAAAAA"));
// Remove start of a chain
EXPECT_TRUE(ss_->Erase("CCCCCBBBBBBBBBBB"));
// Remove end of link
EXPECT_TRUE(ss_->Erase("AAA@@@@@@@@@@@@@"));
// Remove only item in chain
EXPECT_TRUE(ss_->Erase("AA@@@@@@@@@@@@@@"));
EXPECT_TRUE(ss_->Erase("AAAAAAAAA@@@@@@@"));
EXPECT_TRUE(ss_->Erase("AAAAAAAAAA@@@@@@"));
EXPECT_TRUE(ss_->Erase("AAAAAAAAAAAAAAA@"));
}
static std::string random_string(std::mt19937& rand, unsigned len) {
const std::string_view alpanum = "1234567890abcdefghijklmnopqrstuvwxyz";
std::string ret;
ret.reserve(len);
for (size_t i = 0; i < len; ++i) {
ret += alpanum[rand() % alpanum.size()];
}
return ret;
}
TEST_F(StringSetTest, Resizing) {
constexpr size_t num_strs = 4096;
// pseudo random deterministic sequence with known seed should produce
// the same sequence on all systems
std::mt19937 rand(0);
std::vector<std::string> strs;
while (strs.size() != num_strs) {
auto str = random_string(rand, 10);
if (std::find(strs.begin(), strs.end(), str) != strs.end()) {
continue;
}
strs.push_back(random_string(rand, 10));
}
for (size_t i = 0; i < num_strs; ++i) {
EXPECT_TRUE(ss_->Add(strs[i]));
EXPECT_EQ(ss_->Size(), i + 1);
// make sure we haven't lost any items after a grow
// which happens every power of 2
if (i != 0 && (ss_->Size() & (ss_->Size() - 1)) == 0) {
for (size_t j = 0; j < i; ++j) {
EXPECT_TRUE(ss_->Contains(strs[j]));
}
}
}
}
TEST_F(StringSetTest, SimpleScan) {
std::unordered_set<std::string_view> info = {"foo", "bar"};
std::unordered_set<std::string_view> seen;
for (auto str : info) {
EXPECT_TRUE(ss_->Add(str));
}
uint32_t cursor = 0;
do {
cursor = ss_->Scan(cursor, [&](const sds ptr) {
sds s = (sds)ptr;
std::string_view str{s, sdslen(s)};
EXPECT_TRUE(info.count(str));
seen.insert(str);
});
} while (cursor != 0);
EXPECT_TRUE(seen.size() == info.size() && std::equal(seen.begin(), seen.end(), info.begin()));
}
// Ensure REDIS scan guarantees are met
TEST_F(StringSetTest, ScanGuarantees) {
std::unordered_set<std::string_view> to_be_seen = {"foo", "bar"};
std::unordered_set<std::string_view> not_be_seen = {"AAA", "BBB"};
std::unordered_set<std::string_view> maybe_seen = {"AA@@@@@@@@@@@@@@", "AAA@@@@@@@@@@@@@",
"AAAAAAAAA@@@@@@@", "AAAAAAAAAA@@@@@@"};
std::unordered_set<std::string_view> seen;
auto scan_callback = [&](const sds ptr) {
sds s = (sds)ptr;
std::string_view str{s, sdslen(s)};
EXPECT_TRUE(to_be_seen.count(str) || maybe_seen.count(str));
EXPECT_FALSE(not_be_seen.count(str));
if (to_be_seen.count(str)) {
seen.insert(str);
}
};
EXPECT_EQ(ss_->Scan(0, scan_callback), 0);
for (auto str : not_be_seen) {
EXPECT_TRUE(ss_->Add(str));
}
for (auto str : not_be_seen) {
EXPECT_TRUE(ss_->Erase(str));
}
for (auto str : to_be_seen) {
EXPECT_TRUE(ss_->Add(str));
}
// should reach at least the first item in the set
uint32_t cursor = ss_->Scan(0, scan_callback);
for (auto str : maybe_seen) {
EXPECT_TRUE(ss_->Add(str));
}
while (cursor != 0) {
cursor = ss_->Scan(cursor, scan_callback);
}
EXPECT_TRUE(seen.size() == to_be_seen.size());
}
TEST_F(StringSetTest, IntOnly) {
constexpr size_t num_ints = 8192;
std::unordered_set<unsigned int> numbers;
for (size_t i = 0; i < num_ints; ++i) {
numbers.insert(i);
EXPECT_TRUE(ss_->Add(std::to_string(i)));
}
for (size_t i = 0; i < num_ints; ++i) {
EXPECT_FALSE(ss_->Add(std::to_string(i)));
}
std::mt19937 generator(0);
size_t num_remove = generator() % 4096;
std::unordered_set<std::string> removed;
for (size_t i = 0; i < num_remove; ++i) {
auto remove_int = generator() % num_ints;
auto remove = std::to_string(remove_int);
if (numbers.count(remove_int)) {
EXPECT_TRUE(ss_->Contains(remove));
EXPECT_TRUE(ss_->Erase(remove));
numbers.erase(remove_int);
} else {
EXPECT_FALSE(ss_->Erase(remove));
}
EXPECT_FALSE(ss_->Contains(remove));
removed.insert(remove);
}
size_t expected_seen = 0;
auto scan_callback = [&](const sds ptr) {
sds s = (sds)ptr;
std::string_view str{s, sdslen(s)};
EXPECT_FALSE(removed.count(std::string(str)));
if (numbers.count(std::atoi(str.data()))) {
++expected_seen;
}
};
uint32_t cursor = 0;
do {
cursor = ss_->Scan(cursor, scan_callback);
// randomly throw in some new numbers
ss_->Add(std::to_string(generator()));
} while (cursor != 0);
EXPECT_TRUE(expected_seen + removed.size() == num_ints);
}
TEST_F(StringSetTest, XtremeScanGrow) {
std::unordered_set<std::string> to_see, force_grow, seen;
std::mt19937 generator(0);
while (to_see.size() != 8) {
to_see.insert(random_string(generator, 10));
}
while (force_grow.size() != 8192) {
std::string str = random_string(generator, 10);
if (to_see.count(str)) {
continue;
}
force_grow.insert(random_string(generator, 10));
}
for (auto& str : to_see) {
EXPECT_TRUE(ss_->Add(str));
}
auto scan_callback = [&](const sds ptr) {
sds s = (sds)ptr;
std::string_view str{s, sdslen(s)};
if (to_see.count(std::string(str))) {
seen.insert(std::string(str));
}
};
uint32_t cursor = ss_->Scan(0, scan_callback);
// force approx 10 grows
for (auto& s : force_grow) {
EXPECT_TRUE(ss_->Add(s));
}
while (cursor != 0) {
cursor = ss_->Scan(cursor, scan_callback);
}
EXPECT_EQ(seen.size(), to_see.size());
}
TEST_F(StringSetTest, Pop) {
constexpr size_t num_items = 8;
std::unordered_set<std::string> to_insert;
std::mt19937 generator(0);
while (to_insert.size() != num_items) {
auto str = random_string(generator, 10);
if (to_insert.count(str)) {
continue;
}
to_insert.insert(str);
EXPECT_TRUE(ss_->Add(str));
}
while (!ss_->Empty()) {
size_t size = ss_->Size();
auto str = ss_->Pop();
DCHECK(ss_->Size() == to_insert.size() - 1);
DCHECK(str.has_value());
DCHECK(to_insert.count(str.value()));
DCHECK_EQ(ss_->Size(), size - 1);
to_insert.erase(str.value());
}
DCHECK(ss_->Empty());
DCHECK(to_insert.empty());
}
TEST_F(StringSetTest, Iteration) {
constexpr size_t num_items = 8192;
std::unordered_set<std::string> to_insert;
std::mt19937 generator(0);
while (to_insert.size() != num_items) {
auto str = random_string(generator, 10);
if (to_insert.count(str)) {
continue;
}
to_insert.insert(str);
EXPECT_TRUE(ss_->Add(str));
}
for (const sds ptr : *ss_) {
std::string str{ptr, sdslen(ptr)};
EXPECT_TRUE(to_insert.count(str));
to_insert.erase(str);
}
EXPECT_EQ(to_insert.size(), 0);
}
} // namespace dfly

View file

@ -30,6 +30,7 @@ using absl::StrCat;
ABSL_DECLARE_FLAG(int32, list_compress_depth);
ABSL_DECLARE_FLAG(int32, list_max_listpack_size);
ABSL_DECLARE_FLAG(bool, use_set2);
namespace dfly {
@ -37,6 +38,11 @@ class RdbTest : public BaseFamilyTest {
protected:
protected:
io::FileSource GetSource(string name);
// disable usage of DenseSet until the RDB patches are added
RdbTest() : BaseFamilyTest() {
SetFlag(&FLAGS_use_set2, false);
}
};
inline const uint8_t* to_byte(const void* s) {

View file

@ -11,17 +11,22 @@ extern "C" {
#include "redis/util.h"
}
#include "base/flags.h"
#include "base/logging.h"
#include "base/stl_util.h"
#include "core/string_set.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/transaction.h"
ABSL_DECLARE_FLAG(bool, use_set2);
namespace dfly {
using namespace std;
using absl::GetFlag;
using ResultStringVec = vector<OpResult<vector<string>>>;
using ResultSetView = OpResult<absl::flat_hash_set<std::string_view>>;
@ -52,24 +57,116 @@ intset* IntsetAddSafe(string_view val, intset* is, bool* success, bool* added) {
return is;
}
bool dictContains(const dict* d, string_view key) {
uint64_t h = dictGenHashFunction(key.data(), key.size());
for (unsigned table = 0; table <= 1; table++) {
uint64_t idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
dictEntry* he = d->ht_table[table][idx];
while (he) {
sds dkey = (sds)he->key;
if (sdslen(dkey) == key.size() &&
(key.empty() || memcmp(dkey, key.data(), key.size()) == 0)) {
return true;
}
he = he->next;
}
if (!dictIsRehashing(d)) {
break;
}
}
return false;
}
pair<unsigned, bool> RemoveStrSet(ArgSlice vals, CompactObj* set) {
unsigned removed = 0;
bool isempty = false;
auto* shard = EngineShard::tlocal();
if (GetFlag(FLAGS_use_set2)) {
DCHECK_EQ(set->Encoding(), kEncodingStrMap2);
StringSet* ss = ((StringSet*)set->RObjPtr());
for (auto member : vals) {
shard->tmp_str1 = sdscpylen(shard->tmp_str1, member.data(), member.size());
removed += ss->EraseSds(shard->tmp_str1);
}
isempty = ss->Empty();
} else {
DCHECK_EQ(set->Encoding(), kEncodingStrMap);
dict* d = (dict*)set->RObjPtr();
for (auto member : vals) {
shard->tmp_str1 = sdscpylen(shard->tmp_str1, member.data(), member.size());
int result = dictDelete(d, shard->tmp_str1);
removed += (result == DICT_OK);
}
isempty = dictSize(d) == 0;
}
return make_pair(removed, isempty);
}
unsigned AddStrSet(ArgSlice vals, CompactObj* dest) {
unsigned res = 0;
dict* ds = (dict*)dest->RObjPtr();
auto* es = EngineShard::tlocal();
if (GetFlag(FLAGS_use_set2)) {
DCHECK_EQ(dest->Encoding(), kEncodingStrMap2);
StringSet* ss = (StringSet*)dest->RObjPtr();
for (auto member : vals) {
// the temporary variable in the engine shard is not used here since the
// SDS pointer allocated is stored in the set and will outlive this function
// call
res += ss->Add(member);
}
} else {
DCHECK_EQ(dest->Encoding(), kEncodingStrMap);
dict* ds = (dict*)dest->RObjPtr();
auto* es = EngineShard::tlocal();
for (auto member : vals) {
es->tmp_str1 = sdscpylen(es->tmp_str1, member.data(), member.size());
dictEntry* de = dictAddRaw(ds, es->tmp_str1, NULL);
if (de) {
de->key = sdsdup(es->tmp_str1);
++res;
for (auto member : vals) {
es->tmp_str1 = sdscpylen(es->tmp_str1, member.data(), member.size());
dictEntry* de = dictAddRaw(ds, es->tmp_str1, NULL);
if (de) {
de->key = sdsdup(es->tmp_str1);
++res;
}
}
}
return res;
}
void InitStrSet(CompactObj* set) {
if (GetFlag(FLAGS_use_set2)) {
StringSet* ss = new StringSet{CompactObj::memory_resource()};
set->InitRobj(OBJ_SET, kEncodingStrMap2, ss);
} else {
dict* ds = dictCreate(&setDictType);
set->InitRobj(OBJ_SET, kEncodingStrMap, ds);
}
}
// f receives a str object.
template <typename F> void FillFromStrSet(F&& f, void* ptr) {
string str;
if (GetFlag(FLAGS_use_set2)) {
for (sds ptr : *(StringSet*)ptr) {
str.assign(ptr, sdslen(ptr));
f(move(str));
}
} else {
dict* ds = (dict*)ptr;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
str.assign((sds)de->key, sdslen((sds)de->key));
f(move(str));
}
dictReleaseIterator(di);
}
}
// returns (removed, isempty)
pair<unsigned, bool> RemoveSet(ArgSlice vals, CompactObj* set) {
bool isempty = false;
@ -91,14 +188,7 @@ pair<unsigned, bool> RemoveSet(ArgSlice vals, CompactObj* set) {
isempty = (intsetLen(is) == 0);
set->SetRObjPtr(is);
} else {
dict* d = (dict*)set->RObjPtr();
auto* shard = EngineShard::tlocal();
for (auto member : vals) {
shard->tmp_str1 = sdscpylen(shard->tmp_str1, member.data(), member.size());
int result = dictDelete(d, shard->tmp_str1);
removed += (result == DICT_OK);
}
isempty = (dictSize(d) == 0);
return RemoveStrSet(vals, set);
}
return make_pair(removed, isempty);
}
@ -118,8 +208,7 @@ void InitSet(ArgSlice vals, CompactObj* set) {
intset* is = intsetNew();
set->InitRobj(OBJ_SET, kEncodingIntSet, is);
} else {
dict* ds = dictCreate(&setDictType);
set->InitRobj(OBJ_SET, kEncodingStrMap, ds);
InitStrSet(set);
}
}
@ -132,42 +221,156 @@ void ScanCallback(void* privdata, const dictEntry* de) {
uint64_t ScanStrSet(const CompactObj& co, uint64_t curs, unsigned count, StringVec* res) {
long maxiterations = count * 10;
DCHECK_EQ(kEncodingStrMap, co.Encoding());
dict* ds = (dict*)co.RObjPtr();
do {
curs = dictScan(ds, curs, ScanCallback, NULL, res);
} while (curs && maxiterations-- && res->size() < count);
if (GetFlag(FLAGS_use_set2)) {
DCHECK_EQ(co.Encoding(), kEncodingStrMap2);
StringSet* set = (StringSet*)co.RObjPtr();
do {
curs = set->Scan(curs, [&](const sds ptr) { res->push_back(std::string(ptr, sdslen(ptr))); });
} while (curs && maxiterations-- && res->size() < count);
} else {
DCHECK_EQ(co.Encoding(), kEncodingStrMap);
dict* ds = (dict*)co.RObjPtr();
do {
curs = dictScan(ds, curs, ScanCallback, NULL, res);
} while (curs && maxiterations-- && res->size() < count);
}
return curs;
}
using SetType = pair<void*, unsigned>;
uint32_t SetTypeLen(const SetType& set) {
if (set.second == kEncodingIntSet) {
return intsetLen((const intset*)set.first);
}
if (GetFlag(FLAGS_use_set2)) {
DCHECK_EQ(set.second, kEncodingStrMap2);
return ((StringSet*)set.first)->Size();
} else {
DCHECK_EQ(set.second, kEncodingStrMap);
return dictSize((const dict*)set.first);
}
}
bool IsInSet(const SetType& st, int64_t val) {
if (st.second == kEncodingIntSet)
return intsetFind((intset*)st.first, val);
char buf[32];
char* next = absl::numbers_internal::FastIntToBuffer(val, buf);
string_view str{buf, size_t(next - buf)};
if (GetFlag(FLAGS_use_set2)) {
DCHECK_EQ(st.second, kEncodingStrMap2);
return ((StringSet*)st.first)->Contains(str);
} else {
DCHECK_EQ(st.second, kEncodingStrMap);
return dictContains((dict*)st.first, str);
}
}
bool IsInSet(const SetType& st, string_view member) {
if (st.second == kEncodingIntSet) {
long long llval;
if (!string2ll(member.data(), member.size(), &llval))
return false;
return intsetFind((intset*)st.first, llval);
}
if (GetFlag(FLAGS_use_set2)) {
DCHECK_EQ(st.second, kEncodingStrMap2);
return ((StringSet*)st.first)->Contains(member);
} else {
DCHECK_EQ(st.second, kEncodingStrMap);
return dictContains((dict*)st.first, member);
}
}
// Removes arg from result.
void DiffStrSet(const SetType& st, absl::flat_hash_set<string>* result) {
DCHECK_EQ(kEncodingStrMap, st.second);
dict* ds = (dict*)st.first;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
sds key = (sds)de->key;
result->erase(string_view{key, sdslen(key)});
if (GetFlag(FLAGS_use_set2)) {
DCHECK_EQ(st.second, kEncodingStrMap2);
for (const sds ptr : *(StringSet*)st.first) {
result->erase(string_view{ptr, sdslen(ptr)});
}
} else {
DCHECK_EQ(st.second, kEncodingStrMap);
dict* ds = (dict*)st.first;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
sds key = (sds)de->key;
result->erase(string_view{key, sdslen(key)});
}
dictReleaseIterator(di);
}
}
void InterStrSet(const vector<SetType>& vec, StringVec* result) {
if (GetFlag(FLAGS_use_set2)) {
DCHECK_EQ(vec.front().second, kEncodingStrMap2);
StringSet* set = (StringSet*)vec.front().first;
for (const sds ptr : *set) {
std::string_view str{ptr, sdslen(ptr)};
size_t j = 1;
for (j = 1; j < vec.size(); ++j) {
if (vec[j].first != set && !IsInSet(vec[j], str)) {
break;
}
}
if (j == vec.size()) {
result->push_back(std::string(str));
}
}
} else {
DCHECK_EQ(vec.front().second, kEncodingStrMap);
dict* ds = (dict*)vec.front().first;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
size_t j = 1;
sds key = (sds)de->key;
string_view member{key, sdslen(key)};
for (j = 1; j < vec.size(); j++) {
if (vec[j].first != ds && !IsInSet(vec[j], member))
break;
}
/* Only take action when all vec contain the member */
if (j == vec.size()) {
result->push_back(string(member));
}
}
dictReleaseIterator(di);
}
dictReleaseIterator(di);
}
StringVec PopStrSet(unsigned count, const SetType& st) {
StringVec result;
dict* ds = (dict*)st.first;
string str;
dictIterator* di = dictGetSafeIterator(ds);
for (uint32_t i = 0; i < count; ++i) {
dictEntry* de = dictNext(di);
DCHECK(de);
result.emplace_back((sds)de->key, sdslen((sds)de->key));
dictDelete(ds, de->key);
if (GetFlag(FLAGS_use_set2)) {
DCHECK_EQ(st.second, kEncodingStrMap2);
StringSet* set = (StringSet*)st.first;
for (unsigned i = 0; i < count && !set->Empty(); ++i) {
result.push_back(set->Pop().value());
}
} else {
DCHECK_EQ(st.second, kEncodingStrMap);
dict* ds = (dict*)st.first;
string str;
dictIterator* di = dictGetSafeIterator(ds);
for (uint32_t i = 0; i < count; ++i) {
dictEntry* de = dictNext(di);
DCHECK(de);
result.emplace_back((sds)de->key, sdslen((sds)de->key));
dictDelete(ds, de->key);
}
dictReleaseIterator(di);
}
dictReleaseIterator(di);
return result;
}
@ -290,58 +493,6 @@ OpStatus NoOpCb(Transaction* t, EngineShard* shard) {
return OpStatus::OK;
};
using SetType = pair<void*, unsigned>;
uint32_t SetTypeLen(const SetType& set) {
if (set.second == kEncodingStrMap) {
return dictSize((const dict*)set.first);
}
DCHECK_EQ(set.second, kEncodingIntSet);
return intsetLen((const intset*)set.first);
};
bool dictContains(const dict* d, string_view key) {
uint64_t h = dictGenHashFunction(key.data(), key.size());
for (unsigned table = 0; table <= 1; table++) {
uint64_t idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
dictEntry* he = d->ht_table[table][idx];
while (he) {
sds dkey = (sds)he->key;
if (sdslen(dkey) == key.size() && (key.empty() || memcmp(dkey, key.data(), key.size()) == 0))
return true;
he = he->next;
}
if (!dictIsRehashing(d))
break;
}
return false;
}
bool IsInSet(const SetType& st, int64_t val) {
if (st.second == kEncodingIntSet)
return intsetFind((intset*)st.first, val);
DCHECK_EQ(st.second, kEncodingStrMap);
char buf[32];
char* next = absl::numbers_internal::FastIntToBuffer(val, buf);
return dictContains((dict*)st.first, string_view{buf, size_t(next - buf)});
}
bool IsInSet(const SetType& st, string_view member) {
if (st.second == kEncodingIntSet) {
long long llval;
if (!string2ll(member.data(), member.size(), &llval))
return false;
return intsetFind((intset*)st.first, llval);
}
DCHECK_EQ(st.second, kEncodingStrMap);
return dictContains((dict*)st.first, member);
}
template <typename F> void FillSet(const SetType& set, F&& f) {
if (set.second == kEncodingIntSet) {
intset* is = (intset*)set.first;
@ -354,15 +505,7 @@ template <typename F> void FillSet(const SetType& set, F&& f) {
f(string{buf, size_t(next - buf)});
}
} else {
dict* ds = (dict*)set.first;
string str;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
str.assign((sds)de->key, sdslen((sds)de->key));
f(move(str));
}
dictReleaseIterator(di);
FillFromStrSet(move(f), set.first);
}
}
@ -428,7 +571,12 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
return OpStatus::OUT_OF_MEMORY;
}
// frees 'is' on a way.
co.InitRobj(OBJ_SET, kEncodingStrMap, tmp.ptr);
if (GetFlag(FLAGS_use_set2)) {
co.InitRobj(OBJ_SET, kEncodingStrMap2, tmp.ptr);
} else {
co.InitRobj(OBJ_SET, kEncodingStrMap, tmp.ptr);
}
inner_obj = co.RObjPtr();
break;
}
@ -691,25 +839,7 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
}
}
} else {
dict* ds = (dict*)sets.front().first;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
size_t j = 1;
sds key = (sds)de->key;
string_view member{key, sdslen(key)};
for (j = 1; j < sets.size(); j++) {
if (sets[j].first != ds && !IsInSet(sets[j], member))
break;
}
/* Only take action when all sets contain the member */
if (j == sets.size()) {
result.push_back(string(member));
}
}
dictReleaseIterator(di);
InterStrSet(sets, &result);
}
return result;
@ -1193,25 +1323,40 @@ bool SetFamily::ConvertToStrSet(const intset* is, size_t expected_len, robj* des
char buf[32];
int ii = 0;
dict* ds = dictCreate(&setDictType);
if (expected_len) {
if (dictTryExpand(ds, expected_len) != DICT_OK) {
dictRelease(ds);
return false;
if (GetFlag(FLAGS_use_set2)) {
StringSet* ss = new StringSet{CompactObj::memory_resource()};
if (expected_len) {
ss->Reserve(expected_len);
}
while (intsetGet(const_cast<intset*>(is), ii++, &intele)) {
char* next = absl::numbers_internal::FastIntToBuffer(intele, buf);
string_view str{buf, size_t(next - buf)};
CHECK(ss->Add(str));
}
dest->ptr = ss;
dest->encoding = OBJ_ENCODING_HT;
} else {
dict* ds = dictCreate(&setDictType);
if (expected_len) {
if (dictTryExpand(ds, expected_len) != DICT_OK) {
dictRelease(ds);
return false;
}
}
/* To add the elements we extract integers and create redis objects */
while (intsetGet(const_cast<intset*>(is), ii++, &intele)) {
char* next = absl::numbers_internal::FastIntToBuffer(intele, buf);
sds s = sdsnewlen(buf, next - buf);
CHECK(dictAddRaw(ds, s, NULL));
}
dest->ptr = ds;
dest->encoding = OBJ_ENCODING_HT;
}
/* To add the elements we extract integers and create redis objects */
while (intsetGet(const_cast<intset*>(is), ii++, &intele)) {
char* next = absl::numbers_internal::FastIntToBuffer(intele, buf);
sds s = sdsnewlen(buf, next - buf);
CHECK(dictAddRaw(ds, s, NULL));
}
dest->ptr = ds;
dest->encoding = OBJ_ENCODING_HT;
return true;
}