1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix: keep the ttl bit when dense_set grows with expiring items (#3995)

Add a test that triggerred a corner case with assertion failure.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-10-25 22:29:54 +03:00 committed by GitHub
parent 408a8a33f2
commit c895b96274
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 147 additions and 54 deletions

View file

@ -101,6 +101,7 @@ DenseSet::~DenseSet() {
size_t DenseSet::PushFront(DenseSet::ChainVectorIterator it, void* data, bool has_ttl) {
// if this is an empty list assign the value to the empty placeholder pointer
DCHECK(!it->IsDisplaced());
if (it->IsEmpty()) {
it->SetObject(data);
} else {
@ -118,6 +119,7 @@ size_t DenseSet::PushFront(DenseSet::ChainVectorIterator it, void* data, bool ha
void DenseSet::PushFront(DenseSet::ChainVectorIterator it, DenseSet::DensePtr ptr) {
DVLOG(2) << "PushFront to " << distance(entries_.begin(), it) << ", "
<< ObjectAllocSize(ptr.GetObject());
DCHECK(!it->IsDisplaced());
if (it->IsEmpty()) {
it->SetObject(ptr.GetObject());
@ -159,30 +161,13 @@ auto DenseSet::PopPtrFront(DenseSet::ChainVectorIterator it) -> DensePtr {
it->Reset();
} else {
DCHECK(it->IsLink());
// since a DenseLinkKey could be at the end of a chain and have a nullptr for next
// avoid dereferencing a nullptr and just reset the pointer to this DenseLinkKey
if (it->Next() == nullptr) {
it->Reset();
} else {
*it = *it->Next();
}
DenseLinkKey* link = it->AsLink();
*it = link->next;
}
return front;
}
void* DenseSet::PopDataFront(DenseSet::ChainVectorIterator it) {
DensePtr front = PopPtrFront(it);
void* ret = front.GetObject();
if (front.IsLink()) {
FreeLink(front.AsLink());
}
return ret;
}
uint32_t DenseSet::ClearInternal(uint32_t start, uint32_t count) {
constexpr unsigned kArrLen = 32;
ClearItem arr[kArrLen];
@ -263,7 +248,10 @@ void DenseSet::CloneBatch(unsigned len, CloneItem* items, DenseSet* other) const
auto& dest = items[dest_id++];
DenseLinkKey* link = src.ptr.AsLink();
dest.obj = link->Raw();
dest.has_ttl = link->HasTtl();
DCHECK(!link->HasTtl());
// ttl is attached to the wrapping pointer.
dest.has_ttl = src.ptr.HasTtl();
dest.ptr = link->next;
PREFETCH_READ(dest.ptr.Raw());
PREFETCH_READ(dest.obj);
@ -297,8 +285,9 @@ void DenseSet::ClearBatch(unsigned len, ClearItem* items) {
} else {
auto& dest = items[dest_id++];
DenseLinkKey* link = src.ptr.AsLink();
DCHECK(!link->HasTtl());
dest.obj = link->Raw();
dest.has_ttl = link->HasTtl();
dest.has_ttl = src.ptr.HasTtl();
dest.ptr = link->next;
PREFETCH_READ(dest.ptr.Raw());
PREFETCH_READ(dest.obj);
@ -413,12 +402,24 @@ void DenseSet::Fill(DenseSet* other) const {
}
void DenseSet::Grow(size_t prev_size) {
// perform rehashing of items in the set
DensePtr first;
// Corner case. Usually elements are moved to higher buckets during rehashing.
// By moving upper elements first we make sure that there are no displaced elements
// when we move the lower elements.
// However the (displaced) elements at bucket_id=1 can move to bucket 0, and
// bucket 0 can host displaced elements from bucket 1. To avoid this situation, we
// stash the displaced element from bucket 0 and move it to the correct bucket at the end.
if (entries_.front().IsDisplaced()) {
first = PopPtrFront(entries_.begin());
}
// perform rehashing of items in the array, chain by chain.
for (long i = prev_size - 1; i >= 0; --i) {
DensePtr* curr = &entries_[i];
DensePtr* prev = nullptr;
while (true) {
do {
if (ExpireIfNeeded(prev, curr)) {
// if curr has disappeared due to expiry and prev was converted from Link to a
// regular DensePtr
@ -450,8 +451,6 @@ void DenseSet::Grow(size_t prev_size) {
DensePtr dptr = *curr;
if (curr->IsObject()) {
curr->Reset(); // reset the original placeholder (.next or root)
if (prev) {
DCHECK(prev->IsLink());
@ -461,28 +460,34 @@ void DenseSet::Grow(size_t prev_size) {
// we want to make *prev a DensePtr instead of DenseLink and we
// want to deallocate the link.
DensePtr tmp = DensePtr::From(plink);
// Important to transfer the ttl flag.
tmp.SetTtl(prev->HasTtl());
DCHECK(ObjectAllocSize(tmp.GetObject()));
FreeLink(plink);
// we deallocated the link, curr is invalid now.
curr = nullptr;
*prev = tmp;
} else {
// prev == nullptr
curr->Reset(); // reset the root placeholder.
}
} else {
// !curr.IsObject
*curr = *dptr.Next();
DCHECK(!curr->IsEmpty());
}
DVLOG(2) << " Pushing to " << bid << " " << dptr.GetObject();
DCHECK_EQ(BucketId(dptr.GetObject(), 0), bid);
PushFront(dest, dptr);
dest->ClearDisplaced();
break;
} // if IsObject
*curr = *dptr.Next();
DCHECK(!curr->IsEmpty());
DVLOG(2) << " Pushing to " << bid << " " << dptr.GetObject();
DCHECK_EQ(BucketId(dptr.GetObject(), 0), bid);
PushFront(dest, dptr);
dest->ClearDisplaced();
}
}
} while (curr);
}
if (!first.IsEmpty()) {
uint32_t bid = BucketId(first.GetObject(), 0);
PushFront(entries_.begin() + bid, first);
}
}
@ -670,7 +675,13 @@ void* DenseSet::PopInternal() {
// unlink the first node in the first non-empty chain
obj_malloc_used_ -= ObjectAllocSize(bucket_iter->GetObject());
void* ret = PopDataFront(bucket_iter);
DensePtr front = PopPtrFront(bucket_iter);
void* ret = front.GetObject();
if (front.IsLink()) {
FreeLink(front.AsLink());
}
--size_;
return ret;

View file

@ -367,7 +367,6 @@ class DenseSet {
size_t PushFront(ChainVectorIterator, void* obj, bool has_ttl);
void PushFront(ChainVectorIterator, DensePtr);
void* PopDataFront(ChainVectorIterator);
DensePtr PopPtrFront(ChainVectorIterator);
// ============ Pseudo Linked List in DenseSet end ==================

View file

@ -121,6 +121,58 @@ TEST_F(StringSetTest, StandardAddErase) {
EXPECT_TRUE(ss_->Erase("AAAAAAAAAAAAAAA@"));
}
TEST_F(StringSetTest, DisplacedBug) {
string_view vals[] = {"imY", "OVl", "NhH", "BCe", "YDL", "lpb",
"nhF", "xod", "zYR", "PSa", "hce", "cTR"};
ss_->AddMany(absl::MakeSpan(vals), UINT32_MAX);
ss_->Add("fIc");
ss_->Erase("YDL");
ss_->Add("fYs");
ss_->Erase("hce");
ss_->Erase("nhF");
ss_->Add("dye");
ss_->Add("xZT");
ss_->Add("LVK");
ss_->Erase("zYR");
ss_->Erase("fYs");
ss_->Add("ueB");
ss_->Erase("PSa");
ss_->Erase("OVl");
ss_->Add("cga");
ss_->Add("too");
ss_->Erase("ueB");
ss_->Add("HZe");
ss_->Add("oQn");
ss_->Erase("too");
ss_->Erase("HZe");
ss_->Erase("xZT");
ss_->Erase("cga");
ss_->Erase("cTR");
ss_->Erase("BCe");
ss_->Add("eua");
ss_->Erase("lpb");
ss_->Add("OXK");
ss_->Add("QmO");
ss_->Add("SzV");
ss_->Erase("QmO");
ss_->Add("jbe");
ss_->Add("BPN");
ss_->Add("OfH");
ss_->Add("Muf");
ss_->Add("CwP");
ss_->Erase("Muf");
ss_->Erase("xod");
ss_->Add("Cis");
ss_->Add("Xvd");
ss_->Erase("SzV");
ss_->Erase("eua");
ss_->Add("DGb");
ss_->Add("leD");
ss_->Add("MVX");
ss_->Add("HPq");
}
static string random_string(mt19937& rand, unsigned len) {
const string_view alpanum = "1234567890abcdefghijklmnopqrstuvwxyz";
string ret;
@ -135,27 +187,30 @@ static string random_string(mt19937& rand, unsigned len) {
TEST_F(StringSetTest, Resizing) {
constexpr size_t num_strs = 4096;
vector<string> strs;
unordered_set<string> strs;
while (strs.size() != num_strs) {
auto str = random_string(generator_, 10);
if (find(strs.begin(), strs.end(), str) != strs.end()) {
continue;
}
strs.push_back(random_string(generator_, 10));
strs.insert(str);
}
for (size_t i = 0; i < num_strs; ++i) {
EXPECT_TRUE(ss_->Add(strs[i]));
EXPECT_EQ(ss_->UpperBoundSize(), i + 1);
unsigned size = 0;
for (auto it = strs.begin(); it != strs.end(); ++it) {
const auto& str = *it;
EXPECT_TRUE(ss_->Add(str, 1));
EXPECT_EQ(ss_->UpperBoundSize(), size + 1);
// make sure we haven't lost any items after a grow
// which happens every power of 2
if (i != 0 && (ss_->UpperBoundSize() & (ss_->UpperBoundSize() - 1)) == 0) {
for (size_t j = 0; j < i; ++j) {
EXPECT_TRUE(ss_->Contains(strs[j]));
if ((size & (size - 1)) == 0) {
for (auto j = strs.begin(); j != it; ++j) {
const auto& str = *j;
auto it = ss_->Find(str);
ASSERT_TRUE(it != ss_->end());
EXPECT_TRUE(it.HasExpiry());
EXPECT_EQ(it.ExpiryTime(), ss_->time_now() + 1);
}
}
++size;
}
}
@ -235,7 +290,7 @@ TEST_F(StringSetTest, IntOnly) {
}
for (size_t i = 0; i < num_ints; ++i) {
EXPECT_FALSE(ss_->Add(to_string(i)));
ASSERT_FALSE(ss_->Add(to_string(i)));
}
size_t num_remove = generator_() % 4096;
@ -574,4 +629,32 @@ void BM_AddMany(benchmark::State& state) {
}
BENCHMARK(BM_AddMany);
void BM_Grow(benchmark::State& state) {
vector<string> strs;
mt19937 generator(0);
StringSet src;
unsigned elems = 1 << 18;
for (size_t i = 0; i < elems; ++i) {
src.Add(random_string(generator, 16), UINT32_MAX);
strs.push_back(random_string(generator, 16));
}
while (state.KeepRunning()) {
state.PauseTiming();
StringSet tmp;
src.Fill(&tmp);
CHECK_EQ(tmp.BucketCount(), elems);
state.ResumeTiming();
for (const auto& str : strs) {
tmp.Add(str);
if (tmp.BucketCount() > elems) {
break; // we grew
}
}
CHECK_GT(tmp.BucketCount(), elems);
}
}
BENCHMARK(BM_Grow);
} // namespace dfly