1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: grow.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-10-25 00:17:36 +03:00
parent 7b5fc9a244
commit e7e4399d7a
No known key found for this signature in database
GPG key ID: 6568CCAB9736B618
4 changed files with 273 additions and 39 deletions

View file

@ -4,6 +4,7 @@
#include "core/dense_set.h"
#include <absl/container/inlined_vector.h>
#include <absl/numeric/bits.h>
#include <cstddef>
@ -102,6 +103,7 @@ size_t DenseSet::PushFront(DenseSet::ChainVectorIterator it, void* data, bool ha
if (it->IsEmpty()) {
it->SetObject(data);
} else {
DCHECK(!it->IsDisplaced());
// otherwise make a new link and connect it to the front of the list
it->SetLink(NewLink(data, *it));
}
@ -116,6 +118,7 @@ size_t DenseSet::PushFront(DenseSet::ChainVectorIterator it, void* data, bool ha
void DenseSet::PushFront(DenseSet::ChainVectorIterator it, DenseSet::DensePtr ptr) {
DVLOG(2) << "PushFront to " << distance(entries_.begin(), it) << ", "
<< ObjectAllocSize(ptr.GetObject());
DCHECK(!it->IsDisplaced());
if (it->IsEmpty()) {
it->SetObject(ptr.GetObject());
@ -261,7 +264,10 @@ void DenseSet::CloneBatch(unsigned len, CloneItem* items, DenseSet* other) const
auto& dest = items[dest_id++];
DenseLinkKey* link = src.ptr.AsLink();
dest.obj = link->Raw();
dest.has_ttl = link->HasTtl();
DCHECK(!link->HasTtl());
// ttl is attached to the wrapping pointer.
dest.has_ttl = src.ptr.HasTtl();
dest.ptr = link->next;
PREFETCH_READ(dest.ptr.Raw());
PREFETCH_READ(dest.obj);
@ -295,8 +301,9 @@ void DenseSet::ClearBatch(unsigned len, ClearItem* items) {
} else {
auto& dest = items[dest_id++];
DenseLinkKey* link = src.ptr.AsLink();
DCHECK(!link->HasTtl());
dest.obj = link->Raw();
dest.has_ttl = link->HasTtl();
dest.has_ttl = src.ptr.HasTtl();
dest.ptr = link->next;
PREFETCH_READ(dest.ptr.Raw());
PREFETCH_READ(dest.obj);
@ -308,6 +315,112 @@ void DenseSet::ClearBatch(unsigned len, ClearItem* items) {
len = dest_id;
}
}
void DenseSet::GrowBatch(unsigned len, GrowItem* items) {
absl::InlinedVector<GrowItem, 32> delayed_pushes;
while (len) {
unsigned dest_id = 0;
for (unsigned i = 0; i < len; ++i) {
auto& src = items[i];
if (!src.obj) {
DCHECK(src.chain.curr->IsLink());
src.obj = src.chain.curr->GetObject();
src.new_bid = UINT32_MAX;
items[dest_id++] = src;
PREFETCH_READ(src.obj);
continue;
}
if (src.new_bid == UINT32_MAX) {
if (ExpireIfNeeded(src.chain.prev, src.chain.curr)) {
src.obj = nullptr;
if (src.chain.prev && src.chain.prev->IsLink()) {
// curr has been updated by ExpireIfNeeded with the next in chain.
src.obj = src.chain.curr->GetObject();
items[dest_id++] = src;
PREFETCH_READ(src.obj);
}
continue;
}
uint32_t bid = BucketId(src.obj, 0);
DCHECK(src.chain.curr->GetObject() == src.obj);
if (src.bid != bid) {
src.new_bid = bid;
items[dest_id++] = src;
/*if (src.chain.curr->IsLink()) {
PREFETCH_READ(src.chain.curr->AsLink()->Raw());
}*/
PREFETCH_READ(&entries_[bid]);
continue;
}
// Edge case when curr was the first in the chain and
// was displaced by one but now it's actually correct place.
src.chain.curr->ClearDisplaced();
if (src.chain.curr->IsLink()) {
src.chain.Shift();
src.SyncObj();
PREFETCH_READ(src.chain.curr->Raw());
items[dest_id++] = src;
}
continue;
}
DCHECK_NE(src.new_bid, UINT32_MAX);
// Move curr to src.bid
DVLOG(2) << " Pushing to " << src.new_bid << " " << src.obj;
DCHECK_EQ(BucketId(src.obj, 0), src.new_bid);
bool has_ttl = src.chain.curr->HasTtl();
auto& next = entries_[src.new_bid];
if (next.IsEmpty()) {
next.SetObject(src.obj);
if (has_ttl) {
next.SetTtl(true);
}
} else {
GrowItem delayed = src;
delayed.has_ttl = has_ttl;
delayed_pushes.push_back(std::move(delayed));
}
if (src.chain.curr->IsObject()) {
src.obj = nullptr;
DenseLinkKey* plink = src.chain.Trim();
if (plink) {
FreeLink(plink);
}
continue;
}
DCHECK(src.chain.curr->IsLink());
DenseLinkKey* plink = src.chain.PullCurrentLink();
src.new_bid = UINT32_MAX;
src.SyncObj();
PREFETCH_READ(src.chain.curr->Raw());
FreeLink(plink);
items[dest_id++] = src; // continue with this chain.
} // for
// update the length of the batch for the next iteration.
len = dest_id;
}
for (auto& item : delayed_pushes) {
PushFront(entries_.begin() + item.new_bid, item.obj, item.has_ttl);
}
}
bool DenseSet::NoItemBelongsBucket(uint32_t bid) const {
auto& entries = const_cast<DenseSet*>(this)->entries_;
DensePtr* curr = &entries[bid];
@ -410,13 +523,41 @@ void DenseSet::Fill(DenseSet* other) const {
CloneBatch(len, arr, other);
}
#if 0
void DenseSet::Grow(size_t prev_size) {
constexpr unsigned kArrLen = 32;
GrowItem arr[kArrLen];
unsigned len = 0;
// perform rehashing of items in the set
for (long i = prev_size - 1; i >= 0; --i) {
DensePtr* curr = &entries_[i];
if (curr->IsEmpty())
continue;
auto& item = arr[len++];
item.Init(i, curr);
PREFETCH_READ(curr->Raw());
if (len == kArrLen) {
GrowBatch(kArrLen, arr);
len = 0;
}
}
GrowBatch(len, arr);
}
#else
void DenseSet::Grow(size_t prev_size) {
vector<pair<uint32_t, DensePtr> > delayed_pushes;
// perform rehashing of items in the set
// for (long i = prev_size - 1; i >= 0; --i) {
for (size_t i = 0; i < prev_size; ++i) {
DensePtr* curr = &entries_[i];
DensePtr* prev = nullptr;
while (true) {
do {
if (ExpireIfNeeded(prev, curr)) {
// if curr has disappeared due to expiry and prev was converted from Link to a
// regular DensePtr
@ -448,8 +589,6 @@ void DenseSet::Grow(size_t prev_size) {
DensePtr dptr = *curr;
if (curr->IsObject()) {
curr->Reset(); // reset the original placeholder (.next or root)
if (prev) {
DCHECK(prev->IsLink());
@ -459,31 +598,41 @@ void DenseSet::Grow(size_t prev_size) {
// we want to make *prev a DensePtr instead of DenseLink and we
// want to deallocate the link.
DensePtr tmp = DensePtr::From(plink);
// Important to transfer the ttl flag.
tmp.SetTtl(prev->HasTtl());
DCHECK(ObjectAllocSize(tmp.GetObject()));
FreeLink(plink);
FreeLink(plink); // we deallocated the link, curr is invalid now.
curr = nullptr;
*prev = tmp;
} else {
curr->Reset(); // reset the root placeholder.
}
} else { // if IsObject
*curr = *dptr.Next();
DCHECK(!curr->IsEmpty());
}
if (dest->IsDisplaced()) {
delayed_pushes.emplace_back(bid, dptr);
} else {
DVLOG(2) << " Pushing to " << bid << " " << dptr.GetObject();
DCHECK_EQ(BucketId(dptr.GetObject(), 0), bid);
PushFront(dest, dptr);
dest->ClearDisplaced();
break;
} // if IsObject
*curr = *dptr.Next();
DCHECK(!curr->IsEmpty());
PushFront(dest, dptr);
dest->ClearDisplaced();
}
}
}
} while (curr);
}
// DCHECK_LT(delayed_pushes.size(), 5);
for (auto [bid, dptr] : delayed_pushes) {
DVLOG(2) << " Pushing to " << bid << " " << dptr.GetObject();
DCHECK_EQ(BucketId(dptr.GetObject(), 0), bid);
PushFront(entries_.begin() + bid, dptr);
}
}
#endif
// Assumes that the object does not exist in the set.
void DenseSet::AddUnique(void* obj, bool has_ttl, uint64_t hashcode) {
if (entries_.empty()) {

View file

@ -342,6 +342,64 @@ class DenseSet {
using ClearItem = CloneItem;
void ClearBatch(unsigned len, ClearItem* items);
struct GrowItem {
union {
struct {
DensePtr* curr = nullptr;
DensePtr* prev = nullptr;
void Shift() {
prev = curr;
curr = &curr->AsLink()->next;
}
// Discards current and updates prev to be the end
// of the chain.
DenseLinkKey* Trim() {
curr->Reset();
if (!prev)
return nullptr;
DenseLinkKey* plink = prev->AsLink();
// we want to make *prev a DensePtr instead of DenseLink and we
// want to deallocate the link.
DensePtr tmp = DensePtr::From(plink);
*prev = tmp;
return plink;
}
DenseLinkKey* PullCurrentLink() {
DenseLinkKey* plink = curr->AsLink();
*curr = plink->next;
return plink;
}
} chain;
bool has_ttl;
};
GrowItem() {
chain.prev = chain.curr = nullptr;
}
void Init(uint32_t b, DensePtr* p) {
chain.prev = nullptr;
chain.curr = p;
bid = b;
new_bid = UINT32_MAX;
obj = p->IsObject() ? p->Raw() : nullptr;
}
void* obj = nullptr;
uint32_t bid = 0;
uint32_t new_bid = UINT32_MAX;
void SyncObj() {
obj = chain.curr->IsObject() ? chain.curr->Raw() : nullptr;
}
};
void GrowBatch(unsigned len, GrowItem* items);
MemoryResource* mr() {
return entries_.get_allocator().resource();
}

View file

@ -158,15 +158,14 @@ TEST_F(StringMapTest, Bug3973) {
}
for (unsigned i = 100; i < 1000; i++) {
EXPECT_TRUE(sm_->AddOrUpdate(to_string(i), "val"));
}
// make sure the first 8 keys have expiry set
for (unsigned i = 0; i < 8; i++) {
auto k = sm_->Find(to_string(i));
ASSERT_TRUE(k.HasExpiry());
for (unsigned j = 0; j < 8; j++) {
auto k = sm_->Find(to_string(j));
ASSERT_TRUE(k.HasExpiry()) << j << " " << i;
EXPECT_EQ(k.ExpiryTime(), 1);
}
}
}
unsigned total_wasted_memory = 0;

View file

@ -135,27 +135,27 @@ static string random_string(mt19937& rand, unsigned len) {
TEST_F(StringSetTest, Resizing) {
constexpr size_t num_strs = 4096;
vector<string> strs;
unordered_set<string> strs;
while (strs.size() != num_strs) {
auto str = random_string(generator_, 10);
if (find(strs.begin(), strs.end(), str) != strs.end()) {
continue;
strs.insert(str);
}
strs.push_back(random_string(generator_, 10));
}
for (size_t i = 0; i < num_strs; ++i) {
EXPECT_TRUE(ss_->Add(strs[i]));
EXPECT_EQ(ss_->UpperBoundSize(), i + 1);
unsigned size = 0;
for (auto it = strs.begin(); it != strs.end(); ++it) {
const auto& str = *it;
EXPECT_TRUE(ss_->Add(str));
EXPECT_EQ(ss_->UpperBoundSize(), size + 1);
// make sure we haven't lost any items after a grow
// which happens every power of 2
if (i != 0 && (ss_->UpperBoundSize() & (ss_->UpperBoundSize() - 1)) == 0) {
for (size_t j = 0; j < i; ++j) {
EXPECT_TRUE(ss_->Contains(strs[j]));
if ((size & (size - 1)) == 0) {
for (auto j = strs.begin(); j != it; ++j) {
const auto& str = *j;
ASSERT_TRUE(ss_->Contains(str)) << size << " " << str;
}
}
++size;
}
}
@ -235,7 +235,7 @@ TEST_F(StringSetTest, IntOnly) {
}
for (size_t i = 0; i < num_ints; ++i) {
EXPECT_FALSE(ss_->Add(to_string(i)));
ASSERT_FALSE(ss_->Add(to_string(i)));
}
size_t num_remove = generator_() % 4096;
@ -574,4 +574,32 @@ void BM_AddMany(benchmark::State& state) {
}
BENCHMARK(BM_AddMany);
void BM_Grow(benchmark::State& state) {
vector<string> strs;
mt19937 generator(0);
StringSet src;
unsigned elems = 1 << 18;
for (size_t i = 0; i < elems; ++i) {
src.Add(random_string(generator, 16), UINT32_MAX);
strs.push_back(random_string(generator, 16));
}
while (state.KeepRunning()) {
state.PauseTiming();
StringSet tmp;
src.Fill(&tmp);
CHECK_EQ(tmp.BucketCount(), elems);
state.ResumeTiming();
for (const auto& str : strs) {
tmp.Add(str);
if (tmp.BucketCount() > elems) {
break; // we grew
}
}
CHECK_GT(tmp.BucketCount(), elems);
}
}
BENCHMARK(BM_Grow);
} // namespace dfly