1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

chore: Adding a mpsc intrusive queue based on Vyukov's design (#562)

feat(server): Speed up rdb load by using object pool for parsing objects.

1. Add a mpsc intrusive queue based on Vyukov's design.
2. Use it as a object pool when we pull from the queue to reuse the existing object
   and push into it in order to return the object back to the pool.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-12-16 17:54:55 +02:00 committed by GitHub
parent d4cad11f86
commit adc89c7592
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 109 additions and 4 deletions

View file

@ -0,0 +1,93 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <atomic>
#include <cstddef>
// TODO: to move to helio
namespace dfly {
namespace detail {
// a MPSC queue where multiple threads push and a single thread pops.
//
// Requires global functions for T:
//
// T* MPSC_intrusive_load_next(const T& src)
// void MPSC_intrusive_store_next(T* next, T* dest);
// based on the design from here:
// https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
template <typename T> class MPSCIntrusiveQueue {
private:
static constexpr size_t cache_alignment = 64;
static constexpr size_t cacheline_length = 64;
alignas(cache_alignment) typename std::aligned_storage<sizeof(T), alignof(T)>::type storage_{};
T* dummy_;
alignas(cache_alignment) std::atomic<T*> head_;
alignas(cache_alignment) T* tail_;
char pad_[cacheline_length];
public:
MPSCIntrusiveQueue()
: dummy_{reinterpret_cast<T*>(std::addressof(storage_))}, head_{dummy_}, tail_{dummy_} {
MPSC_intrusive_store_next(dummy_, nullptr);
}
MPSCIntrusiveQueue(MPSCIntrusiveQueue const&) = delete;
MPSCIntrusiveQueue& operator=(MPSCIntrusiveQueue const&) = delete;
void Push(T* ctx) noexcept {
// ctx becomes a new head.
MPSC_intrusive_store_next(ctx, nullptr);
T* prev = head_.exchange(ctx, std::memory_order_acq_rel);
MPSC_intrusive_store_next(prev, ctx);
}
T* Pop() noexcept;
};
template <typename T> T* MPSCIntrusiveQueue<T>::Pop() noexcept {
T* tail = tail_;
// tail->next_.load(std::memory_order_acquire);
T* next = MPSC_intrusive_load_next(*tail);
if (dummy_ == tail) {
if (nullptr == next) {
// empty
return nullptr;
}
tail_ = next;
tail = next;
next = MPSC_intrusive_load_next(*next);
}
if (nullptr != next) {
// non-empty
tail_ = next;
return tail;
}
T* head = head_.load(std::memory_order_acquire);
if (tail != head) {
// non-empty, retry is in order: we are in the middle of push.
return nullptr;
}
Push(dummy_);
next = MPSC_intrusive_load_next(*tail);
if (nullptr != next) {
tail_ = next;
return tail;
}
// non-empty, retry is in order: we are still adding.
return nullptr;
}
} // namespace detail
} // namespace dfly

View file

@ -1644,6 +1644,12 @@ RdbLoader::RdbLoader(ScriptMgr* script_mgr) : script_mgr_(script_mgr) {
}
RdbLoader::~RdbLoader() {
while (true) {
Item* item = item_queue_.Pop();
if (item == nullptr)
break;
delete item;
}
}
error_code RdbLoader::Load(io::Source* src) {
@ -2058,7 +2064,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
}
for (auto* item : ib) {
delete item;
item_queue_.Push(item);
}
}
@ -2068,14 +2074,18 @@ void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) {
}
error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
Item* item = new Item;
// We return the item in LoadItemsBuffer.
Item* item = item_queue_.Pop();
if (item == nullptr) {
item = new Item;
}
// Read key
// We free item in LoadItemsBuffer.
SET_OR_RETURN(ReadKey(), item->key);
// Read value
error_code ec = ReadObj(type, &item->val);
if (ec) {
VLOG(1) << "ReadObj error " << ec << " for key " << item->key;
return ec;

View file

@ -12,6 +12,7 @@ extern "C" {
#include "base/io_buf.h"
#include "base/pod_array.h"
#include "core/mpsc_intrusive_queue.h"
#include "io/io.h"
#include "server/common.h"
@ -217,6 +218,7 @@ class RdbLoader : protected RdbLoaderBase {
// Callback when receiving RDB_OPCODE_FULLSYNC_END
std::function<void()> full_sync_cut_cb;
detail::MPSCIntrusiveQueue<Item> item_queue_;
};
} // namespace dfly