1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Add SET command and thread local db slice

This commit is contained in:
Roman Gershman 2021-11-17 12:51:47 +02:00
parent 48589604fc
commit ec78c8a2af
10 changed files with 501 additions and 5 deletions

View file

@ -1,7 +1,8 @@
add_executable(dragonfly dfly_main.cc)
cxx_link(dragonfly base dragonfly_lib)
add_library(dragonfly_lib dragonfly_listener.cc dragonfly_connection.cc main_service.cc
add_library(dragonfly_lib db_slice.cc dragonfly_listener.cc dragonfly_connection.cc
main_service.cc engine_shard_set.cc
redis_parser.cc resp_expr.cc)
cxx_link(dragonfly_lib uring_fiber_lib

116
server/db_slice.cc Normal file
View file

@ -0,0 +1,116 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#include "server/db_slice.h"
#include <boost/fiber/fiber.hpp>
#include <boost/fiber/operations.hpp>
#include "base/logging.h"
#include "server/engine_shard_set.h"
#include "util/fiber_sched_algo.h"
#include "util/proactor_base.h"
namespace dfly {
using namespace boost;
using namespace std;
using namespace util;
DbSlice::DbSlice(uint32_t index, EngineShard* owner) : shard_id_(index), owner_(owner) {
db_arr_.emplace_back();
CreateDbRedis(0);
}
DbSlice::~DbSlice() {
for (auto& db : db_arr_) {
if (!db.main_table)
continue;
db.main_table.reset();
}
}
void DbSlice::Reserve(DbIndex db_ind, size_t key_size) {
ActivateDb(db_ind);
auto& db = db_arr_[db_ind];
DCHECK(db.main_table);
db.main_table->reserve(key_size);
}
auto DbSlice::Find(DbIndex db_index, std::string_view key) const -> OpResult<MainIterator> {
DCHECK_LT(db_index, db_arr_.size());
DCHECK(db_arr_[db_index].main_table);
auto& db = db_arr_[db_index];
MainIterator it = db.main_table->find(key);
if (it == db.main_table->end()) {
return OpStatus::KEY_NOTFOUND;
}
return it;
}
auto DbSlice::AddOrFind(DbIndex db_index, std::string_view key) -> pair<MainIterator, bool> {
DCHECK_LT(db_index, db_arr_.size());
DCHECK(db_arr_[db_index].main_table);
auto& db = db_arr_[db_index];
pair<MainIterator, bool> res = db.main_table->emplace(key, MainValue{});
if (res.second) { // new entry
db.stats.obj_memory_usage += res.first->first.capacity();
return make_pair(res.first, true);
}
return res;
}
void DbSlice::ActivateDb(DbIndex db_ind) {
if (db_arr_.size() <= db_ind)
db_arr_.resize(db_ind + 1);
CreateDbRedis(db_ind);
}
void DbSlice::CreateDbRedis(unsigned index) {
auto& db = db_arr_[index];
if (!db.main_table) {
db.main_table.reset(new MainTable);
}
}
void DbSlice::AddNew(DbIndex db_ind, std::string_view key, MainValue obj, uint64_t expire_at_ms) {
CHECK(AddIfNotExist(db_ind, key, std::move(obj), expire_at_ms));
}
bool DbSlice::AddIfNotExist(DbIndex db_ind, std::string_view key, MainValue obj,
uint64_t expire_at_ms) {
auto& db = db_arr_[db_ind];
auto [new_entry, success] = db.main_table->emplace(key, obj);
if (!success)
return false; // in this case obj won't be moved and will be destroyed during unwinding.
db.stats.obj_memory_usage += (new_entry->first.capacity() + new_entry->second.capacity());
if (expire_at_ms) {
// TODO
}
return true;
}
size_t DbSlice::DbSize(DbIndex db_ind) const {
DCHECK_LT(db_ind, db_array_size());
if (IsDbValid(db_ind)) {
return db_arr_[db_ind].main_table->size();
}
return 0;
}
} // namespace dfly

84
server/db_slice.h Normal file
View file

@ -0,0 +1,84 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#pragma once
#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include "server/op_status.h"
namespace util {
class ProactorBase;
}
namespace dfly {
class EngineShard;
class DbSlice {
struct InternalDbStats {
// Object memory usage besides hash-table capacity.
size_t obj_memory_usage = 0;
};
public:
using MainValue = std::string;
using MainTable = absl::flat_hash_map<std::string, MainValue>;
using MainIterator = MainTable::iterator;
using ShardId = uint16_t;
using DbIndex = uint16_t;
DbSlice(uint32_t index, EngineShard* owner);
~DbSlice();
// Activates `db_ind` database if it does not exist (see ActivateDb below).
void Reserve(DbIndex db_ind, size_t key_size);
OpResult<MainIterator> Find(DbIndex db_index, std::string_view key) const;
// Return .second=true if insertion ocurred, false if we return the existing key.
std::pair<MainIterator, bool> AddOrFind(DbIndex db_ind, std::string_view key);
// Adds a new entry. Requires: key does not exist in this slice.
void AddNew(DbIndex db_ind, std::string_view key, MainValue obj, uint64_t expire_at_ms);
// Adds a new entry if a key does not exists. Returns true if insertion took place,
// false otherwise. expire_at_ms equal to 0 - means no expiry.
bool AddIfNotExist(DbIndex db_ind, std::string_view key, MainValue obj, uint64_t expire_at_ms);
// Creates a database with index `db_ind`. If such database exists does nothing.
void ActivateDb(DbIndex db_ind);
size_t db_array_size() const {
return db_arr_.size();
}
bool IsDbValid(DbIndex id) const {
return bool(db_arr_[id].main_table);
}
// Returns existing keys count in the db.
size_t DbSize(DbIndex db_ind) const;
ShardId shard_id() const { return shard_id_;}
private:
void CreateDbRedis(unsigned index);
ShardId shard_id_;
EngineShard* owner_;
struct DbRedis {
std::unique_ptr<MainTable> main_table;
mutable InternalDbStats stats;
};
std::vector<DbRedis> db_arr_;
};
} // namespace dfly

View file

@ -132,6 +132,10 @@ auto Connection::ParseRedis(base::IoBuf* io_buf, util::FiberSocketBase* peer) ->
string_view sv = ToSV(first.GetBuf());
if (sv == "PING") {
ec = peer->Write(io::Buffer("PONG\r\n"));
} else if (sv == "SET") {
CHECK_EQ(3u, args.size());
service_->Set(ToSV(args[1].GetBuf()), ToSV(args[2].GetBuf()));
ec = peer->Write(io::Buffer("OK\r\n"));
}
}
io_buf->ConsumeInput(consumed);

View file

@ -0,0 +1,60 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#include "server/engine_shard_set.h"
#include "base/logging.h"
#include "util/fiber_sched_algo.h"
#include "util/varz.h"
namespace dfly {
using namespace std;
using namespace boost;
using util::FiberProps;
thread_local EngineShard* EngineShard::shard_ = nullptr;
constexpr size_t kQueueLen = 64;
EngineShard::EngineShard(ShardId index)
: db_slice(index, this), queue_(kQueueLen) {
fiber_q_ = fibers::fiber([this, index] {
this_fiber::properties<FiberProps>().set_name(absl::StrCat("shard_queue", index));
queue_.Run();
});
}
EngineShard::~EngineShard() {
queue_.Shutdown();
fiber_q_.join();
}
void EngineShard::InitThreadLocal(ShardId index) {
CHECK(shard_ == nullptr) << index;
shard_ = new EngineShard(index);
}
void EngineShard::DestroyThreadLocal() {
if (!shard_)
return;
uint32_t index = shard_->db_slice.shard_id();
delete shard_;
shard_ = nullptr;
DVLOG(1) << "Shard reset " << index;
}
void EngineShardSet::Init(uint32_t sz) {
CHECK_EQ(0u, size());
shard_queue_.resize(sz);
}
void EngineShardSet::InitThreadLocal(ShardId index) {
EngineShard::InitThreadLocal(index);
shard_queue_[index] = EngineShard::tlocal()->GetQueue();
}
} // namespace dfly

98
server/engine_shard_set.h Normal file
View file

@ -0,0 +1,98 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#pragma once
#include "server/db_slice.h"
#include "util/fibers/fibers_ext.h"
#include "util/fibers/fiberqueue_threadpool.h"
#include "util/proactor_pool.h"
namespace dfly {
using ShardId = uint16_t;
class EngineShard {
public:
DbSlice db_slice;
//EngineShard() is private down below.
~EngineShard();
static void InitThreadLocal(ShardId index);
static void DestroyThreadLocal();
static EngineShard* tlocal() {
return shard_;
}
ShardId shard_id() const {
return db_slice.shard_id();
}
::util::fibers_ext::FiberQueue* GetQueue() {
return &queue_;
}
private:
EngineShard(ShardId index);
::util::fibers_ext::FiberQueue queue_;
::boost::fibers::fiber fiber_q_;
static thread_local EngineShard* shard_;
};
class EngineShardSet {
public:
explicit EngineShardSet(util::ProactorPool* pp) : pp_(pp) {
}
uint32_t size() const {
return uint32_t(shard_queue_.size());
}
util::ProactorPool* pool() {
return pp_;
}
void Init(uint32_t size);
void InitThreadLocal(ShardId index);
template <typename F> auto Await(ShardId sid, F&& f) {
return shard_queue_[sid]->Await(std::forward<F>(f));
}
template <typename F> auto Add(ShardId sid, F&& f) {
assert(sid < shard_queue_.size());
return shard_queue_[sid]->Add(std::forward<F>(f));
}
template <typename U> void RunBriefInParallel(U&& func);
private:
util::ProactorPool* pp_;
std::vector<util::fibers_ext::FiberQueue*> shard_queue_;
};
/**
* @brief
*
* @tparam U - a function that receives EngineShard* argument and returns void.
* @param func
*/
template <typename U> void EngineShardSet::RunBriefInParallel(U&& func) {
util::fibers_ext::BlockingCounter bc{size()};
for (uint32_t i = 0; i < size(); ++i) {
util::ProactorBase* dest = pp_->at(i);
dest->AsyncBrief([f = std::forward<U>(func), bc]() mutable {
f(EngineShard::tlocal());
bc.Dec();
});
}
bc.Wait();
}
} // namespace dfly

View file

@ -6,6 +6,7 @@
#include <boost/fiber/operations.hpp>
#include <filesystem>
#include <xxhash.h>
#include "base/logging.h"
#include "util/uring/uring_fiber_algo.h"
@ -29,34 +30,63 @@ DEFINE_VARZ(VarzMapAverage, request_latency_usec);
std::optional<VarzFunction> engine_varz;
inline ShardId Shard(string_view sv, ShardId shard_num) {
XXH64_hash_t hash = XXH64(sv.data(), sv.size(), 24061983);
return hash % shard_num;
}
} // namespace
Service::Service(ProactorPool* pp)
: pp_(*pp) {
: shard_set_(pp), pp_(*pp) {
CHECK(pp);
engine_varz.emplace("engine", [this] { return GetVarzStats(); });
}
Service::~Service() {
engine_varz.reset();
}
void Service::Init(util::AcceptServer* acceptor) {
uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
shard_set_.Init(shard_num);
pp_.AwaitOnAll([&](uint32_t index, ProactorBase* pb) {
if (index < shard_count()) {
shard_set_.InitThreadLocal(index);
}
});
request_latency_usec.Init(&pp_);
}
void Service::Shutdown() {
engine_varz.reset();
request_latency_usec.Shutdown();
shard_set_.RunBriefInParallel([&](EngineShard*) { EngineShard::DestroyThreadLocal(); });
}
void Service::RegisterHttp(HttpListenerBase* listener) {
CHECK_NOTNULL(listener);
}
void Service::Set(std::string_view key, std::string_view val) {
ShardId sid = Shard(key, shard_count());
shard_set_.Await(sid, [&] {
EngineShard* es = EngineShard::tlocal();
auto [it, res] = es->db_slice.AddOrFind(0, key);
it->second = val;
});
}
VarzValue::Map Service::GetVarzStats() {
VarzValue::Map res;
res.emplace_back("keys", VarzValue::FromInt(0));
atomic_ulong num_keys{0};
shard_set_.RunBriefInParallel([&](EngineShard* es) {
num_keys += es->db_slice.DbSize(0);
});
res.emplace_back("keys", VarzValue::FromInt(num_keys.load()));
return res;
}

View file

@ -6,6 +6,7 @@
#include "base/varz_value.h"
#include "util/http/http_handler.h"
#include "server/engine_shard_set.h"
namespace util {
class AcceptServer;
@ -26,14 +27,24 @@ class Service {
void Shutdown();
uint32_t shard_count() const {
return shard_set_.size();
}
EngineShardSet& shard_set() {
return shard_set_;
}
util::ProactorPool& proactor_pool() {
return pp_;
}
void Set(std::string_view key, std::string_view val);
private:
base::VarzValue::Map GetVarzStats();
EngineShardSet shard_set_;
util::ProactorPool& pp_;
};

91
server/op_status.h Normal file
View file

@ -0,0 +1,91 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#pragma once
#include <ostream>
namespace dfly {
enum class OpStatus : uint16_t {
OK,
KEY_NOTFOUND,
};
class OpResultBase {
public:
OpResultBase(OpStatus st = OpStatus::OK) : st_(st) {
}
constexpr explicit operator bool() const {
return st_ == OpStatus::OK;
}
OpStatus status() const {
return st_;
}
bool operator==(OpStatus st) const {
return st_ == st;
}
bool ok() const {
return st_ == OpStatus::OK;
}
private:
OpStatus st_;
};
template <typename V> class OpResult : public OpResultBase {
public:
OpResult(V v) : v_(std::move(v)) {
}
using OpResultBase::OpResultBase;
const V& value() const {
return v_;
}
V& value() {
return v_;
}
V value_or(V v) const {
return status() == OpStatus::OK ? v_ : v;
}
const V* operator->() const {
return &v_;
}
private:
V v_;
};
template <> class OpResult<void> : public OpResultBase {
public:
using OpResultBase::OpResultBase;
};
inline bool operator==(OpStatus st, const OpResultBase& ob) {
return ob.operator==(st);
}
} // namespace dfly
namespace std {
template <typename T> std::ostream& operator<<(std::ostream& os, const dfly::OpResult<T>& res) {
os << int(res.status());
return os;
}
inline std::ostream& operator<<(std::ostream& os, const dfly::OpStatus op) {
os << int(op);
return os;
}
} // namespace std

View file

@ -46,6 +46,7 @@ inline std::string_view ToSV(const absl::Span<uint8_t>& s) {
} // namespace dfly
namespace std {
ostream& operator<<(ostream& os, const dfly::RespExpr& e);
ostream& operator<<(ostream& os, dfly::RespSpan rspan);