1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Adding expiry functionality.

1. Major refactoring
2. LICENSE is updated with commons clause.
3. Server is built as "dragonfly"
This commit is contained in:
Roman Gershman 2021-12-20 11:42:55 +02:00
parent a69c967321
commit ac2eb7d45c
19 changed files with 565 additions and 154 deletions

25
LICENSE
View file

@ -1,4 +1,27 @@
Apache License
Dragonfly memory store
Copyright (c) 2021 Roman Gershman
---------------------------------------------------------------------------
1. GENERAL SOFTWARE LICENSE INFORMATION AND MOTIVATION
You may use, distribute, and modify this code under the terms of the Apache License 2.0 (with the "Commons Clause" [commonsclause.com] - See 2. below). As long as you have no intention of selling the software [as is], you are free to use it under the terms of the Apache License 2.0. To anyone wishing to sell this Commons Clause licensed software, as is, an action that the Commons Clause license prohibits, it appears "proprietary", in the sense that it would be necessary to negotiate a license to do that with the owner of the Commons Clause software.
---------------------------------------------------------------------------
2. “Commons Clause” License Condition v1.0 (commonsclause.com)
The Software is provided to you by the Licensor under the Apache License, defined below, subject to the following condition.
Without limiting other conditions in the License, the grant of rights under the License will not include, and the License does not grant to you, the right to Sell the Software.
For purposes of the foregoing, “Sell” means practicing any or all of the rights granted to you under the License to provide to third parties, for a fee or other consideration (including without limitation fees for hosting or consulting/support services related to the Software), a product or service whose value derives, entirely or substantially, from the functionality of the Software. Any license notice or attribution required by the License must also include this Commons Clause License Condition notice.
Software: Dragonfly memory store
License: Apache License 2.0
Licensor: Roman Gershman
---------------------------------------------------------------------------
3. Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/

View file

@ -1,7 +1,6 @@
# midi-redis
# Dragonfly
A toy memory store that supports basic commands like `SET` and `GET` for both memcached and redis protocols.
In addition, it supports redis `PING` command.
A toy memory store that supports basic commands like `SET` and `GET` for both memcached and redis protocols. In addition, it supports redis `PING` command.
Demo features include:
1. High throughput reaching millions of QPS on a single node.
@ -13,16 +12,16 @@ I've tested the build on Ubuntu 21.04+.
```
git clone --recursive https://github.com/romange/midi-redis
cd midi-redis && ./helio/blaze.sh -release
cd build-opt && ninja midi-redis
git clone --recursive https://github.com/romange/dragonfly
cd dragonfly && ./helio/blaze.sh -release
cd build-opt && ninja dragonfly
```
## Running
```
./midi-redis --logtostderr
./dragonfly --logtostderr
```
for more options, run `./midi-redis --help`
for more options, run `./dragonfly --help`

View file

@ -1,11 +1,11 @@
add_executable(midi-redis dfly_main.cc)
cxx_link(midi-redis base dragonfly_lib)
add_executable(dragonfly dfly_main.cc)
cxx_link(dragonfly base dragonfly_lib)
add_library(dragonfly_lib command_registry.cc common_types.cc config_flags.cc
add_library(dragonfly_lib command_registry.cc common.cc config_flags.cc
conn_context.cc db_slice.cc debugcmd.cc dragonfly_listener.cc
dragonfly_connection.cc engine_shard_set.cc
main_service.cc memcache_parser.cc
redis_parser.cc resp_expr.cc reply_builder.cc)
dragonfly_connection.cc engine_shard_set.cc
main_service.cc memcache_parser.cc
redis_parser.cc resp_expr.cc reply_builder.cc string_family.cc)
cxx_link(dragonfly_lib uring_fiber_lib
fibers_ext strings_lib http_server_lib tls_lib)

View file

@ -7,6 +7,7 @@
#include <absl/strings/str_cat.h>
#include "base/logging.h"
#include "server/error.h"
namespace dfly {
@ -18,15 +19,8 @@ string WrongNumArgsError(std::string_view cmd) {
}
const char kSyntaxErr[] = "syntax error";
const char kWrongTypeErr[] = "-WRONGTYPE Operation against a key holding the wrong kind of value";
const char kKeyNotFoundErr[] = "no such key";
const char kInvalidIntErr[] = "value is not an integer or out of range";
const char kUintErr[] = "value is out of range, must be positive";
const char kInvalidFloatErr[] = "value is not a valid float";
const char kInvalidScoreErr[] = "resulting score is not a number (NaN)";
const char kDbIndOutOfRangeErr[] = "DB index is out of range";
const char kInvalidDbIndErr[] = "invalid DB index";
const char kSameObjErr[] = "source and destination objects are the same";
} // namespace dfly

View file

@ -4,10 +4,8 @@
#pragma once
#include <absl/container/flat_hash_map.h>
#include <absl/strings/ascii.h>
#include <absl/types/span.h>
#include <xxhash.h>
#include <string_view>
#include <vector>
@ -24,32 +22,7 @@ using CmdArgVec = std::vector<MutableStrSpan>;
constexpr DbIndex kInvalidDbId = DbIndex(-1);
constexpr ShardId kInvalidSid = ShardId(-1);
struct ConnectionState {
enum Mask : uint32_t {
ASYNC_DISPATCH = 1, // whether a command is handled via async dispatch.
CONN_CLOSING = 2, // could be because of unrecoverable error or planned action.
};
uint32_t mask = 0; // A bitmask of Mask values.
bool IsClosing() const {
return mask & CONN_CLOSING;
}
bool IsRunViaDispatch() const {
return mask & ASYNC_DISPATCH;
}
};
template <typename View> inline ShardId Shard(const View& v, ShardId shard_num) {
XXH64_hash_t hash = XXH64(v.data(), v.size(), 120577);
return hash % shard_num;
}
using MainValue = std::string;
using MainTable = absl::flat_hash_map<std::string, MainValue>;
using MainIterator = MainTable::iterator;
class CommandId;
class EngineShard;
inline std::string_view ArgS(CmdArgList args, size_t i) {
@ -57,21 +30,19 @@ inline std::string_view ArgS(CmdArgList args, size_t i) {
return std::string_view(arg.data(), arg.size());
}
inline MutableStrSpan ToMSS(absl::Span<uint8_t> span) {
return MutableStrSpan{reinterpret_cast<char*>(span.data()), span.size()};
}
inline void ToUpper(const MutableStrSpan* val) {
for (auto& c : *val) {
c = absl::ascii_toupper(c);
}
}
inline MutableStrSpan ToMSS(absl::Span<uint8_t> span) {
return MutableStrSpan{reinterpret_cast<char*>(span.data()), span.size()};
}
std::string WrongNumArgsError(std::string_view cmd);
} // namespace dfly
namespace std {
ostream& operator<<(ostream& os, dfly::CmdArgList args);
} // namespace std
} // namespace std

View file

@ -13,6 +13,23 @@ class Connection;
class EngineShardSet;
class CommandId;
struct ConnectionState {
enum Mask : uint32_t {
ASYNC_DISPATCH = 1, // whether a command is handled via async dispatch.
CONN_CLOSING = 2, // could be because of unrecoverable error or planned action.
};
uint32_t mask = 0; // A bitmask of Mask values.
bool IsClosing() const {
return mask & CONN_CLOSING;
}
bool IsRunViaDispatch() const {
return mask & ASYNC_DISPATCH;
}
};
class ConnectionContext : public ReplyBuilder {
public:
ConnectionContext(::io::Sink* stream, Connection* owner);

View file

@ -41,19 +41,42 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) {
}
auto DbSlice::Find(DbIndex db_index, std::string_view key) const -> OpResult<MainIterator> {
DCHECK_LT(db_index, db_arr_.size());
DCHECK(db_arr_[db_index].main_table);
auto [it, expire_it] = FindExt(db_index, key);
auto& db = db_arr_[db_index];
MainIterator it = db.main_table->find(key);
if (it == db.main_table->end()) {
if (it == MainIterator{})
return OpStatus::KEY_NOTFOUND;
}
return it;
}
pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, std::string_view key) const {
DCHECK_LT(db_ind, db_arr_.size());
DCHECK(db_arr_[db_ind].main_table);
auto& db = db_arr_[db_ind];
MainIterator it = db.main_table->find(key);
if (it == MainIterator{}) {
return make_pair(it, ExpireIterator{});
}
ExpireIterator expire_it;
if (it->second.HasExpire()) { // check expiry state
expire_it = db.expire_table->find(it->first);
CHECK(expire_it != ExpireIterator{});
if (expire_it->second <= now_ms_) {
db.expire_table->erase(expire_it);
db.stats.obj_memory_usage -= (it->first.capacity() + it->second.str.capacity());
db.main_table->erase(it);
return make_pair(MainIterator{}, ExpireIterator{});
}
}
return make_pair(it, expire_it);
}
auto DbSlice::AddOrFind(DbIndex db_index, std::string_view key) -> pair<MainIterator, bool> {
DCHECK_LT(db_index, db_arr_.size());
DCHECK(db_arr_[db_index].main_table);
@ -80,9 +103,30 @@ void DbSlice::CreateDbRedis(unsigned index) {
auto& db = db_arr_[index];
if (!db.main_table) {
db.main_table.reset(new MainTable);
db.expire_table.reset(new ExpireTable);
}
}
// Returns true if a state has changed, false otherwise.
bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) {
auto& db = db_arr_[db_ind];
if (at == 0 && it->second.HasExpire()) {
CHECK_EQ(1u, db.expire_table->erase(it->first));
it->second.SetExpire(false);
return true;
}
if (!it->second.HasExpire() && at) {
CHECK(db.expire_table->emplace(it->first, at).second);
it->second.SetExpire(true);
return true;
}
return false;
}
void DbSlice::AddNew(DbIndex db_ind, std::string_view key, MainValue obj, uint64_t expire_at_ms) {
CHECK(AddIfNotExist(db_ind, key, std::move(obj), expire_at_ms));
}
@ -95,10 +139,11 @@ bool DbSlice::AddIfNotExist(DbIndex db_ind, std::string_view key, MainValue obj,
if (!success)
return false; // in this case obj won't be moved and will be destroyed during unwinding.
db.stats.obj_memory_usage += (new_entry->first.capacity() + new_entry->second.capacity());
db.stats.obj_memory_usage += (new_entry->first.capacity() + new_entry->second.str.capacity());
if (expire_at_ms) {
// TODO
new_entry->second.SetExpire(true);
CHECK(db.expire_table->emplace(new_entry->first, expire_at_ms).second);
}
return true;

View file

@ -6,6 +6,7 @@
#include "server/common_types.h"
#include "server/op_status.h"
#include "server/table.h"
namespace util {
class ProactorBase;
@ -23,15 +24,33 @@ class DbSlice {
DbSlice(uint32_t index, EngineShard* owner);
~DbSlice();
// Activates `db_ind` database if it does not exist (see ActivateDb below).
void Reserve(DbIndex db_ind, size_t key_size);
//! UpdateExpireClock updates the expire clock for this db slice.
//! Must be a wall clock so we could replicate it betweeen machines.
void UpdateExpireClock(uint64_t now_ms) {
now_ms_ = now_ms;
}
// returns wall clock in millis as it has been set via UpdateExpireClock.
uint64_t Now() const {
return now_ms_;
}
OpResult<MainIterator> Find(DbIndex db_index, std::string_view key) const;
// Returns (value, expire) dict entries if key exists, null if it does not exist or has expired.
std::pair<MainIterator, ExpireIterator> FindExt(DbIndex db_ind, std::string_view key) const;
// Return .second=true if insertion ocurred, false if we return the existing key.
std::pair<MainIterator, bool> AddOrFind(DbIndex db_ind, std::string_view key);
// Either adds or removes (if at == 0) expiry. Returns true if a change was made.
// Does not change expiry if at != 0 and expiry already exists.
bool Expire(DbIndex db_ind, MainIterator main_it, uint64_t at);
// Adds a new entry. Requires: key does not exist in this slice.
void AddNew(DbIndex db_ind, std::string_view key, MainValue obj, uint64_t expire_at_ms);
@ -42,6 +61,8 @@ class DbSlice {
// Creates a database with index `db_ind`. If such database exists does nothing.
void ActivateDb(DbIndex db_ind);
ShardId shard_id() const { return shard_id_;}
size_t db_array_size() const {
return db_arr_.size();
}
@ -53,8 +74,6 @@ class DbSlice {
// Returns existing keys count in the db.
size_t DbSize(DbIndex db_ind) const;
ShardId shard_id() const { return shard_id_;}
private:
void CreateDbRedis(unsigned index);
@ -63,8 +82,12 @@ class DbSlice {
EngineShard* owner_;
uint64_t now_ms_ = 0; // Used for expire logic, represents a real clock.
struct DbRedis {
std::unique_ptr<MainTable> main_table;
std::unique_ptr<ExpireTable> expire_table;
mutable InternalDbStats stats;
};

View file

@ -7,22 +7,26 @@
#include "base/logging.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/string_family.h"
namespace dfly {
using namespace boost;
using namespace std;
static const char kUintErr[] = "value is out of range, must be positive";
struct PopulateBatch {
DbIndex dbid;
uint64_t index[32];
uint64_t sz = 0;
PopulateBatch(DbIndex id) : dbid(id) {
}
};
void DoPopulateBatch(std::string_view prefix, size_t val_size, const PopulateBatch& ps) {
EngineShard* es = EngineShard::tlocal();
DbSlice& db_slice = es->db_slice;
void DoPopulateBatch(std::string_view prefix, size_t val_size,
const SetCmd::SetParams& params, const PopulateBatch& ps) {
SetCmd sg(&EngineShard::tlocal()->db_slice());
for (unsigned i = 0; i < ps.sz; ++i) {
string key = absl::StrCat(prefix, ":", ps.index[i]);
@ -31,10 +35,7 @@ void DoPopulateBatch(std::string_view prefix, size_t val_size, const PopulateBat
if (val.size() < val_size) {
val.resize(val_size, 'x');
}
auto [it, res] = db_slice.AddOrFind(0, key);
if (res) {
it->second = std::move(val);
}
sg.Set(params, key, val);
}
}
@ -98,11 +99,12 @@ void DebugCmd::Populate(CmdArgList args) {
}
ranges.emplace_back(from, total_count - from);
auto distribute_cb = [this, val_size, prefix](
uint64_t from, uint64_t len) {
auto distribute_cb = [this, val_size, prefix](uint64_t from, uint64_t len) {
string key = absl::StrCat(prefix, ":");
size_t prefsize = key.size();
std::vector<PopulateBatch> ps(ess_->size(), PopulateBatch{});
DbIndex db_indx = 0; // TODO
std::vector<PopulateBatch> ps(ess_->size(), PopulateBatch{db_indx});
SetCmd::SetParams params{db_indx};
for (uint64_t i = from; i < from + len; ++i) {
absl::StrAppend(&key, i);
@ -113,7 +115,7 @@ void DebugCmd::Populate(CmdArgList args) {
pops.index[pops.sz++] = i;
if (pops.sz == 32) {
ess_->Add(sid, [=, p = pops] {
DoPopulateBatch(prefix, val_size, p);
DoPopulateBatch(prefix, val_size, params, p);
if (i % 100 == 0) {
this_fiber::yield();
}
@ -125,7 +127,9 @@ void DebugCmd::Populate(CmdArgList args) {
}
ess_->RunBriefInParallel(
[&](EngineShard* shard) { DoPopulateBatch(prefix, val_size, ps[shard->shard_id()]); });
[&](EngineShard* shard) {
DoPopulateBatch(prefix, val_size, params, ps[shard->shard_id()]);
});
};
vector<fibers::fiber> fb_arr(ranges.size());
for (size_t i = 0; i < ranges.size(); ++i) {

View file

@ -11,35 +11,46 @@
namespace dfly {
using namespace std;
using namespace boost;
using util::FiberProps;
using namespace util;
namespace fibers = ::boost::fibers;
namespace this_fiber = ::boost::this_fiber;
thread_local EngineShard* EngineShard::shard_ = nullptr;
constexpr size_t kQueueLen = 64;
EngineShard::EngineShard(ShardId index)
: db_slice(index, this), queue_(kQueueLen) {
fiber_q_ = fibers::fiber([this, index] {
EngineShard::EngineShard(util::ProactorBase* pb)
: queue_(kQueueLen), db_slice_(pb->GetIndex(), this) {
fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] {
this_fiber::properties<FiberProps>().set_name(absl::StrCat("shard_queue", index));
queue_.Run();
});
periodic_task_ = pb->AddPeriodic(1, [] {
auto* shard = EngineShard::tlocal();
DCHECK(shard);
// absl::GetCurrentTimeNanos() returns current time since the Unix Epoch.
shard->db_slice().UpdateExpireClock(absl::GetCurrentTimeNanos() / 1000000);
});
}
EngineShard::~EngineShard() {
queue_.Shutdown();
fiber_q_.join();
if (periodic_task_) {
ProactorBase::me()->CancelPeriodic(periodic_task_);
}
}
void EngineShard::InitThreadLocal(ShardId index) {
CHECK(shard_ == nullptr) << index;
shard_ = new EngineShard(index);
void EngineShard::InitThreadLocal(ProactorBase* pb) {
CHECK(shard_ == nullptr) << pb->GetIndex();
shard_ = new EngineShard(pb);
}
void EngineShard::DestroyThreadLocal() {
if (!shard_)
return;
uint32_t index = shard_->db_slice.shard_id();
uint32_t index = shard_->db_slice_.shard_id();
delete shard_;
shard_ = nullptr;
@ -52,9 +63,10 @@ void EngineShardSet::Init(uint32_t sz) {
shard_queue_.resize(sz);
}
void EngineShardSet::InitThreadLocal(ShardId index) {
EngineShard::InitThreadLocal(index);
shard_queue_[index] = EngineShard::tlocal()->GetQueue();
void EngineShardSet::InitThreadLocal(ProactorBase* pb) {
EngineShard::InitThreadLocal(pb);
EngineShard* es = EngineShard::tlocal();
shard_queue_[es->shard_id()] = es->GetQueue();
}
} // namespace dfly

View file

@ -4,6 +4,9 @@
#pragma once
#include <xxhash.h>
#include "server/db_slice.h"
#include "util/fibers/fibers_ext.h"
#include "util/fibers/fiberqueue_threadpool.h"
@ -11,16 +14,13 @@
namespace dfly {
using ShardId = uint16_t;
class EngineShard {
public:
DbSlice db_slice;
//EngineShard() is private down below.
~EngineShard();
static void InitThreadLocal(ShardId index);
static void InitThreadLocal(util::ProactorBase* pb);
static void DestroyThreadLocal();
static EngineShard* tlocal() {
@ -28,7 +28,15 @@ class EngineShard {
}
ShardId shard_id() const {
return db_slice.shard_id();
return db_slice_.shard_id();
}
DbSlice& db_slice() {
return db_slice_;
}
const DbSlice& db_slice() const {
return db_slice_;
}
::util::fibers_ext::FiberQueue* GetQueue() {
@ -36,11 +44,14 @@ class EngineShard {
}
private:
EngineShard(ShardId index);
EngineShard(util::ProactorBase* pb);
::util::fibers_ext::FiberQueue queue_;
::boost::fibers::fiber fiber_q_;
DbSlice db_slice_;
uint32_t periodic_task_ = 0;
static thread_local EngineShard* shard_;
};
@ -58,7 +69,7 @@ class EngineShardSet {
}
void Init(uint32_t size);
void InitThreadLocal(ShardId index);
void InitThreadLocal(util::ProactorBase* pb);
template <typename F> auto Await(ShardId sid, F&& f) {
return shard_queue_[sid]->Await(std::forward<F>(f));
@ -70,6 +81,7 @@ class EngineShardSet {
}
template <typename U> void RunBriefInParallel(U&& func);
template <typename U> void RunBlockingInParallel(U&& func);
private:
util::ProactorPool* pp_;
@ -95,4 +107,22 @@ template <typename U> void EngineShardSet::RunBriefInParallel(U&& func) {
bc.Wait();
}
template <typename U> void EngineShardSet::RunBlockingInParallel(U&& func) {
util::fibers_ext::BlockingCounter bc{size()};
for (uint32_t i = 0; i < size(); ++i) {
util::ProactorBase* dest = pp_->at(i);
dest->AsyncFiber([f = std::forward<U>(func), bc]() mutable {
f(EngineShard::tlocal());
bc.Dec();
});
}
bc.Wait();
}
template <typename View> inline ShardId Shard(const View& v, ShardId shard_num) {
XXH64_hash_t hash = XXH64(v.data(), v.size(), 120577240643ULL);
return hash % shard_num;
}
} // namespace dfly

27
server/error.h Normal file
View file

@ -0,0 +1,27 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <atomic>
namespace dfly {
std::string WrongNumArgsError(std::string_view cmd);
extern const char kSyntaxErr[];
extern const char kInvalidIntErr[];
extern const char kUintErr[];
#ifndef RETURN_ON_ERR
#define RETURN_ON_ERR(x) \
do { \
auto ec = (x); \
if (ec) \
return ec; \
} while (0)
#endif
} // namespace dfly

View file

@ -14,6 +14,7 @@
#include "server/conn_context.h"
#include "server/debugcmd.h"
#include "util/metrics/metrics.h"
#include "server/string_family.h"
#include "util/uring/uring_fiber_algo.h"
#include "util/varz.h"
@ -33,7 +34,6 @@ namespace {
DEFINE_VARZ(VarzMapAverage, request_latency_usec);
DEFINE_VARZ(VarzQps, ping_qps);
DEFINE_VARZ(VarzQps, set_qps);
std::optional<VarzFunction> engine_varz;
metrics::CounterFamily cmd_req("requests_total", "Number of served redis requests");
@ -55,13 +55,13 @@ void Service::Init(util::AcceptServer* acceptor) {
pp_.AwaitOnAll([&](uint32_t index, ProactorBase* pb) {
if (index < shard_count()) {
shard_set_.InitThreadLocal(index);
shard_set_.InitThreadLocal(pb);
}
});
request_latency_usec.Init(&pp_);
ping_qps.Init(&pp_);
set_qps.Init(&pp_);
StringFamily::Init(&pp_);
cmd_req.Init(&pp_, {"type"});
}
@ -71,9 +71,9 @@ void Service::Shutdown() {
engine_varz.reset();
request_latency_usec.Shutdown();
ping_qps.Shutdown();
set_qps.Shutdown();
StringFamily::Shutdown();
shard_set_.RunBriefInParallel([&](EngineShard*) { EngineShard::DestroyThreadLocal(); });
shard_set_.RunBlockingInParallel([&](EngineShard*) { EngineShard::DestroyThreadLocal(); });
}
void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
@ -166,47 +166,6 @@ void Service::Ping(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendSimpleRespString(arg);
}
void Service::Set(CmdArgList args, ConnectionContext* cntx) {
set_qps.Inc();
string_view key = ArgS(args, 1);
string_view val = ArgS(args, 2);
VLOG(2) << "Set " << key << " " << val;
ShardId sid = Shard(key, shard_count());
shard_set_.Await(sid, [&] {
EngineShard* es = EngineShard::tlocal();
auto [it, res] = es->db_slice.AddOrFind(0, key);
it->second = val;
});
cntx->SendStored();
}
void Service::Get(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
ShardId sid = Shard(key, shard_count());
OpResult<string> opres;
shard_set_.Await(sid, [&] {
EngineShard* es = EngineShard::tlocal();
OpResult<MainIterator> res = es->db_slice.Find(0, key);
if (res) {
opres.value() = res.value()->second;
} else {
opres = res.status();
}
});
if (opres) {
cntx->SendGetReply(key, 0, opres.value());
} else if (opres.status() == OpStatus::KEY_NOTFOUND) {
cntx->SendGetNotFound();
}
cntx->EndMultilineReply();
}
void Service::Debug(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[1]);
@ -219,7 +178,7 @@ VarzValue::Map Service::GetVarzStats() {
VarzValue::Map res;
atomic_ulong num_keys{0};
shard_set_.RunBriefInParallel([&](EngineShard* es) { num_keys += es->db_slice.DbSize(0); });
shard_set_.RunBriefInParallel([&](EngineShard* es) { num_keys += es->db_slice().DbSize(0); });
res.emplace_back("keys", VarzValue::FromInt(num_keys.load()));
return res;
@ -236,9 +195,8 @@ void Service::RegisterCommands() {
using CI = CommandId;
registry_ << CI{"PING", CO::STALE | CO::FAST, -1, 0, 0, 0}.HFUNC(Ping)
<< CI{"SET", CO::WRITE | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(Set)
<< CI{"GET", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Get)
<< CI{"DEBUG", CO::RANDOM | CO::READONLY, -2, 0, 0, 0}.HFUNC(Debug);
StringFamily::Register(&registry_);
}
} // namespace dfly

View file

@ -47,8 +47,6 @@ class Service {
private:
void Ping(CmdArgList args, ConnectionContext* cntx);
void Set(CmdArgList args, ConnectionContext* cntx);
void Get(CmdArgList args, ConnectionContext* cntx);
void Debug(CmdArgList args, ConnectionContext* cntx);
void RegisterCommands();

View file

@ -11,6 +11,7 @@ namespace dfly {
enum class OpStatus : uint16_t {
OK,
KEY_NOTFOUND,
SKIPPED,
};
class OpResultBase {
@ -88,4 +89,4 @@ inline std::ostream& operator<<(std::ostream& os, const dfly::OpStatus op) {
return os;
}
} // namespace std
} // namespace std

View file

@ -103,6 +103,10 @@ class ReplyBuilder {
// This one is prefixed with + and with clrf added automatically to each item..
void SendSimpleStrArr(const std::string_view* arr, uint32_t count);
void SendNull() {
as_resp()->SendNull();
}
private:
RespSerializer* as_resp() {
return static_cast<RespSerializer*>(serializer_.get());

208
server/string_family.cc Normal file
View file

@ -0,0 +1,208 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/string_family.h"
#include <absl/container/inlined_vector.h>
#include "base/logging.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "util/varz.h"
namespace dfly {
namespace {
using namespace std;
using CI = CommandId;
DEFINE_VARZ(VarzQps, set_qps);
DEFINE_VARZ(VarzQps, get_qps);
} // namespace
SetCmd::SetCmd(DbSlice* db_slice) : db_slice_(db_slice) {
}
SetCmd::~SetCmd() {
}
OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::string_view value) {
DCHECK_LT(params.db_index, db_slice_->db_array_size());
DCHECK(db_slice_->IsDbValid(params.db_index));
VLOG(2) << "Set (" << db_slice_->shard_id() << ") ";
auto [it, expire_it] = db_slice_->FindExt(params.db_index, key);
uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice_->Now() : 0;
if (it != MainIterator{}) { // existing
if (params.how == SET_IF_NOTEXIST)
return OpStatus::SKIPPED;
if (params.prev_val) {
params.prev_val->emplace(it->second.str);
}
return SetExisting(params.db_index, value, at_ms, it, expire_it);
}
if (params.how == SET_IF_EXISTS)
return OpStatus::SKIPPED;
db_slice_->AddNew(params.db_index, key, value, at_ms);
return OpStatus::OK;
}
OpResult<void> SetCmd::SetExisting(DbIndex db_ind, std::string_view value, uint64_t expire_at_ms,
MainIterator dest, ExpireIterator exp_it) {
if (exp_it != ExpireIterator{} && expire_at_ms) {
exp_it->second = expire_at_ms;
} else {
db_slice_->Expire(db_ind, dest, expire_at_ms);
}
dest->second = value;
return OpStatus::OK;
}
void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
set_qps.Inc();
std::string_view key = ArgS(args, 1);
std::string_view value = ArgS(args, 2);
VLOG(2) << "Set " << key << " " << value;
SetCmd::SetParams sparams{0}; // TODO: db_index.
int64_t int_arg;
for (size_t i = 3; i < args.size(); ++i) {
ToUpper(&args[i]);
std::string_view cur_arg = ArgS(args, i);
if (cur_arg == "EX" || cur_arg == "PX") {
bool is_ms = (cur_arg == "PX");
++i;
if (i == args.size()) {
cntx->SendError(kSyntaxErr);
}
std::string_view ex = ArgS(args, i);
if (!absl::SimpleAtoi(ex, &int_arg)) {
return cntx->SendError(kInvalidIntErr);
}
if (int_arg <= 0 || (!is_ms && int_arg >= 500000000)) {
return cntx->SendError("invalid expire time in set");
}
if (!is_ms) {
int_arg *= 1000;
}
sparams.expire_after_ms = int_arg;
} else if (cur_arg == "NX") {
sparams.how = SetCmd::SET_IF_NOTEXIST;
} else if (cur_arg == "XX") {
sparams.how = SetCmd::SET_IF_EXISTS;
} else if (cur_arg == "KEEPTTL") {
sparams.keep_expire = true;
} else {
return cntx->SendError(kSyntaxErr);
}
}
ShardId sid = Shard(key, cntx->shard_set->size());
OpResult<void> result = cntx->shard_set->Await(sid, [&] {
EngineShard* es = EngineShard::tlocal();
SetCmd cmd(&es->db_slice());
return cmd.Set(sparams, key, value);
});
if (result == OpStatus::OK) {
return cntx->SendStored();
}
CHECK_EQ(result, OpStatus::SKIPPED); // in case of NX option
return cntx->SendNull();
}
void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
get_qps.Inc();
std::string_view key = ArgS(args, 1);
ShardId sid = Shard(key, cntx->shard_set->size());
OpResult<string> result = cntx->shard_set->Await(sid, [&] {
EngineShard* es = EngineShard::tlocal();
OpResult<MainIterator> opres_it = es->db_slice().Find(0, key);
OpResult<string> res;
if (opres_it) {
res = opres_it.value()->second.str;
} else {
res = opres_it.status();
}
return res;
});
if (result) {
DVLOG(1) << "GET "
<< ": " << key << " " << result.value();
cntx->SendGetReply(key, 0, result.value());
} else {
DVLOG(1) << "GET " << key << " nil";
cntx->SendGetNotFound();
}
}
void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1);
std::string_view value = ArgS(args, 2);
std::optional<string> prev_val;
SetCmd::SetParams sparams{0};
sparams.prev_val = &prev_val;
ShardId sid = Shard(key, cntx->shard_set->size());
OpResult<void> result = cntx->shard_set->Await(sid, [&] {
EngineShard* es = EngineShard::tlocal();
SetCmd cmd(&es->db_slice());
return cmd.Set(sparams, key, value);
});
if (!result) {
cntx->SendError(result.status());
return;
}
if (prev_val) {
cntx->SendGetReply(key, 0, *prev_val);
return;
}
return cntx->SendNull();
}
void StringFamily::Init(util::ProactorPool* pp) {
set_qps.Init(pp);
get_qps.Init(pp);
}
void StringFamily::Shutdown() {
set_qps.Shutdown();
get_qps.Shutdown();
}
#define HFUNC(x) SetHandler(&StringFamily::x)
void StringFamily::Register(CommandRegistry* registry) {
*registry << CI{"SET", CO::WRITE | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(Set)
<< CI{"GET", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Get)
<< CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(GetSet);
}
} // namespace dfly

59
server/string_family.h Normal file
View file

@ -0,0 +1,59 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include "server/engine_shard_set.h"
#include "server/common_types.h"
#include "util/proactor_pool.h"
namespace dfly {
class ConnectionContext;
class CommandRegistry;
class SetCmd {
DbSlice* db_slice_;
public:
explicit SetCmd(DbSlice* db_slice);
~SetCmd();
enum SetHow { SET_ALWAYS, SET_IF_NOTEXIST, SET_IF_EXISTS };
struct SetParams {
SetHow how = SET_ALWAYS;
DbIndex db_index;
// Relative value based on now. 0 means no expiration.
uint64_t expire_after_ms = 0;
mutable std::optional<std::string>* prev_val = nullptr; // GETSET option
bool keep_expire = false; // KEEPTTL - TODO: to implement it.
explicit SetParams(DbIndex dib) : db_index(dib) {
}
};
OpResult<void> Set(const SetParams& params, std::string_view key, std::string_view value);
private:
OpResult<void> SetExisting(DbIndex db_ind, std::string_view value, uint64_t expire_at_ms,
MainIterator dest, ExpireIterator exp_it);
};
class StringFamily {
public:
static void Init(util::ProactorPool* pp);
static void Shutdown();
static void Register(CommandRegistry* registry);
private:
static void Set(CmdArgList args, ConnectionContext* cntx);
static void Get(CmdArgList args, ConnectionContext* cntx);
static void GetSet(CmdArgList args, ConnectionContext* cntx);
};
} // namespace dfly

38
server/table.h Normal file
View file

@ -0,0 +1,38 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <absl/container/flat_hash_map.h>
namespace dfly {
struct MainValue {
std::string str;
MainValue() = default;
MainValue(std::string_view s) : str(s) {
}
bool HasExpire() const {
return has_expire_;
}
void SetExpire(bool b) {
has_expire_ = b;
}
private:
bool has_expire_ = false;
};
using MainTable = absl::flat_hash_map<std::string, MainValue>;
using ExpireTable = absl::flat_hash_map<std::string, uint64_t>;
/// Iterators are invalidated when new keys are added to the table or some entries are deleted.
/// Iterators are still valid if a different entry in the table was mutated.
using MainIterator = MainTable::iterator;
using ExpireIterator = ExpireTable::iterator;
} // namespace dfly