1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

Code simplifications.

1. Sets do not use anymore redis encoding constants in our codebase.
2. rdb_save serializes now expiry information as well.
3. db_slice provides a simplified api to translate from absolute time to ExpirePeriod.
4. common_types.h is renamed to common.h. cached memory usage variables are moved there.
This commit is contained in:
Roman Gershman 2022-04-07 11:01:10 +03:00
parent fa70267729
commit d03cea5e36
29 changed files with 205 additions and 196 deletions

View file

@ -371,12 +371,6 @@ void RobjWrapper::MakeInnerRoom(size_t current_cap, size_t desired, pmr::memory_
inner_obj_ = newp;
}
quicklist* RobjWrapper::GetQL() const {
CHECK_EQ(type(), OBJ_LIST);
CHECK_EQ(encoding(), OBJ_ENCODING_QUICKLIST);
return (quicklist*)inner_obj_;
}
#pragma GCC push_options
#pragma GCC optimize("Ofast")
@ -580,11 +574,6 @@ unsigned CompactObj::Encoding() const {
}
}
quicklist* CompactObj::GetQL() const {
CHECK_EQ(taglen_, ROBJ_TAG);
return u_.r_obj.GetQL();
}
// Takes ownership over o.
void CompactObj::ImportRObj(robj* o) {
CHECK(1 == o->refcount || o->refcount == OBJ_STATIC_REFCOUNT);
@ -620,12 +609,7 @@ robj* CompactObj::AsRObj() const {
res->type = u_.r_obj.type();
if (res->type == OBJ_SET) {
if (kEncodingIntSet == u_.r_obj.encoding()) {
enc = OBJ_ENCODING_INTSET;
} else {
DCHECK_EQ(kEncodingStrMap, u_.r_obj.encoding());
enc = OBJ_ENCODING_HT;
}
LOG(FATAL) << "Should not call AsRObj for sets";
}
res->encoding = enc;
res->lru = 0; // u_.r_obj.unneeded;
@ -648,12 +632,13 @@ void CompactObj::SyncRObj() {
unsigned enc = obj->encoding;
if (obj->type == OBJ_SET) {
if (OBJ_ENCODING_INTSET == enc) {
LOG(FATAL) << "Should not reach";
/*if (OBJ_ENCODING_INTSET == enc) {
enc = kEncodingIntSet;
} else {
DCHECK_EQ(OBJ_ENCODING_HT, enc);
enc = kEncodingStrMap;
}
}*/
}
u_.r_obj.Init(obj->type, enc, obj->ptr);
}

View file

@ -12,7 +12,6 @@
#include "core/small_string.h"
typedef struct redisObject robj;
typedef struct quicklist quicklist;
namespace dfly {
@ -39,7 +38,6 @@ class RobjWrapper {
unsigned type() const { return type_; }
unsigned encoding() const { return encoding_; }
quicklist* GetQL() const;
void* inner_obj() const { return inner_obj_;}
std::string_view AsView() const {
@ -198,8 +196,6 @@ class CompactObj {
unsigned Encoding() const;
unsigned ObjType() const;
quicklist* GetQL() const;
void* RObjPtr() const {
return u_.r_obj.inner_obj();
}

View file

@ -40,13 +40,13 @@ inline void ExpirePeriod::Set(uint64_t ms) {
if (ms < kBarrier) {
val_ = ms;
precision_ = 0;
precision_ = 0; // ms
return;
}
precision_ = 1;
if (ms < kBarrier << 10) {
ms = (ms + 500) / 1000;
ms = (ms + 500) / 1000; // seconds
}
val_ = ms >= kBarrier ? kBarrier - 1 : ms;
}

View file

@ -12,10 +12,13 @@ Server server;
void InitRedisTables() {
crc64_init();
server.page_size = sysconf(_SC_PAGESIZE);
// been used by t_zset routines that convert listpack to skiplist for cases
// above these thresholds.
server.zset_max_listpack_entries = 128;
server.zset_max_listpack_value = 64;
// Present so that redis code compiles. However, we ignore this field and instead check against
// Present so that redis code compiles. However, we ignore this field and instead check against
// listpack total size in hset_family.cc
server.hash_max_listpack_entries = 512;
server.hash_max_listpack_value = 32; // decreased from redis default 64.

View file

@ -10,7 +10,7 @@
#include <functional>
#include "base/function2.hpp"
#include "server/common_types.h"
#include "server/common.h"
namespace dfly {

View file

@ -1,20 +1,24 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/common.h"
#include <absl/strings/str_cat.h>
#include "base/logging.h"
#include "server/common_types.h"
#include "server/error.h"
#include "server/server_state.h"
namespace dfly {
using std::string;
using namespace std;
thread_local ServerState ServerState::state_;
atomic_uint64_t used_mem_peak(0);
atomic_uint64_t used_mem_current(0);
ServerState::ServerState() {
}

View file

@ -9,6 +9,7 @@
#include <string_view>
#include <vector>
#include "facade/facade_types.h"
namespace dfly {
@ -71,4 +72,8 @@ inline void ToLower(const MutableSlice* val) {
bool ParseHumanReadableBytes(std::string_view str, int64_t* num_bytes);
// Cached values, updated frequently to represent the correct state of the system.
extern std::atomic_uint64_t used_mem_peak;
extern std::atomic_uint64_t used_mem_current;
} // namespace dfly

View file

@ -7,7 +7,7 @@
#include <absl/container/flat_hash_set.h>
#include "facade/conn_context.h"
#include "server/common_types.h"
#include "server/common.h"
#include "util/fibers/fibers_ext.h"
namespace dfly {

View file

@ -1,4 +1,4 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
@ -9,7 +9,7 @@
#include "core/intent_lock.h"
#include "facade/op_status.h"
#include "server/common_types.h"
#include "server/common.h"
#include "server/table.h"
namespace util {
@ -101,8 +101,13 @@ class DbSlice {
memory_budget_ = budget;
}
uint64_t expire_base() const {
return expire_base_[0];
// returns absolute time of the expiration.
uint64_t ExpireTime(ExpireIterator it) const {
return it.is_done() ? 0 : expire_base_[0] + it->second.duration();
}
ExpirePeriod FromAbsoluteTime(uint64_t time_ms) const {
return ExpirePeriod{time_ms - expire_base_[0]};
}
// returns wall clock in millis as it has been set via UpdateExpireClock.

View file

@ -74,7 +74,7 @@ void Renamer::Find(Transaction* t) {
res->found = IsValid(it);
if (IsValid(it)) {
res->ref_val = it->second.AsRef();
res->expire_ts = IsValid(exp_it) ? db_slice.expire_base() + exp_it->second.duration() : 0;
res->expire_ts = db_slice.ExpireTime(exp_it);
}
return OpStatus::OK;
};
@ -535,7 +535,7 @@ OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key,
if (rel_msec <= 0) {
CHECK(db_slice.Del(op_args.db_ind, it));
} else if (IsValid(expire_it)) {
expire_it->second.Set(rel_msec + now_msec - db_slice.expire_base());
expire_it->second = db_slice.FromAbsoluteTime(now_msec + rel_msec);
} else {
db_slice.Expire(op_args.db_ind, it, rel_msec + now_msec);
}
@ -545,14 +545,14 @@ OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key,
OpResult<uint64_t> GenericFamily::OpTtl(Transaction* t, EngineShard* shard, string_view key) {
auto& db_slice = shard->db_slice();
auto [it, expire] = db_slice.FindExt(t->db_index(), key);
auto [it, expire_it] = db_slice.FindExt(t->db_index(), key);
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
if (!IsValid(expire))
if (!IsValid(expire_it))
return OpStatus::SKIPPED;
int64_t ttl_ms = db_slice.expire_base() + expire->second.duration() - db_slice.Now();
int64_t ttl_ms = db_slice.ExpireTime(expire_it) - db_slice.Now();
DCHECK_GT(ttl_ms, 0); // Otherwise FindExt would return null.
return ttl_ms;
}
@ -601,8 +601,7 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key,
is_prior_list = (to_it->second.ObjType() == OBJ_LIST);
}
uint64_t exp_ts =
IsValid(from_expire) ? db_slice.expire_base() + from_expire->second.duration() : 0;
uint64_t exp_ts = db_slice.ExpireTime(from_expire);
// we keep the value we want to move.
PrimeValue from_obj = std::move(from_it->second);

View file

@ -1,11 +1,11 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include "facade/op_status.h"
#include "server/common_types.h"
#include "server/common.h"
#include "server/table.h"

View file

@ -7,7 +7,7 @@
#include <optional>
#include "facade/op_status.h"
#include "server/common_types.h"
#include "server/common.h"
namespace dfly {

View file

@ -74,7 +74,7 @@ quicklistEntry QLEntry() {
}
quicklist* GetQL(const PrimeValue& mv) {
return mv.GetQL();
return (quicklist*)mv.RObjPtr();
}
void* listPopSaver(unsigned char* data, size_t sz) {
@ -583,7 +583,7 @@ OpResult<uint32_t> ListFamily::OpRem(const OpArgs& op_args, std::string_view key
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!res)
return res.status();
quicklist* ql = res.value()->second.GetQL();
quicklist* ql = GetQL(res.value()->second);
int iter_direction = AL_START_HEAD;
long long index = 0;
@ -620,7 +620,7 @@ OpStatus ListFamily::OpSet(const OpArgs& op_args, std::string_view key, std::str
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!res)
return res.status();
quicklist* ql = res.value()->second.GetQL();
quicklist* ql = GetQL(res.value()->second);
int replaced = quicklistReplaceAtIndex(ql, index, elem.data(), elem.size());
if (!replaced) {
@ -633,7 +633,7 @@ OpStatus ListFamily::OpTrim(const OpArgs& op_args, std::string_view key, long st
auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST);
if (!res)
return res.status();
quicklist* ql = res.value()->second.GetQL();
quicklist* ql = GetQL(res.value()->second);
long llen = quicklistCount(ql);
/* convert negative indexes */
@ -673,7 +673,7 @@ OpResult<StringVec> ListFamily::OpRange(const OpArgs& op_args, std::string_view
if (!res)
return res.status();
quicklist* ql = res.value()->second.GetQL();
quicklist* ql = GetQL(res.value()->second);
long llen = quicklistCount(ql);
/* convert negative indexes */

View file

@ -5,7 +5,7 @@
#pragma once
#include "facade/op_status.h"
#include "server/common_types.h"
#include "server/common.h"
namespace dfly {

View file

@ -9,7 +9,7 @@ extern "C" {
#include "base/io_buf.h"
#include "io/io.h"
#include "server/common_types.h"
#include "server/common.h"
namespace dfly {

View file

@ -116,30 +116,30 @@ inline unsigned SerializeLen(uint64_t len, uint8_t* buf) {
return 1 + 8;
}
uint8_t RdbObjectType(const robj* o) {
switch (o->type) {
uint8_t RdbObjectType(unsigned type, unsigned encoding) {
switch (type) {
case OBJ_STRING:
return RDB_TYPE_STRING;
case OBJ_LIST:
if (o->encoding == OBJ_ENCODING_QUICKLIST)
if (encoding == OBJ_ENCODING_QUICKLIST)
return RDB_TYPE_LIST_QUICKLIST;
break;
case OBJ_SET:
if (o->encoding == OBJ_ENCODING_INTSET)
if (encoding == kEncodingIntSet)
return RDB_TYPE_SET_INTSET;
else if (o->encoding == OBJ_ENCODING_HT)
else if (encoding == kEncodingStrMap)
return RDB_TYPE_SET;
break;
case OBJ_ZSET:
if (o->encoding == OBJ_ENCODING_LISTPACK)
if (encoding == OBJ_ENCODING_LISTPACK)
return RDB_TYPE_ZSET_ZIPLIST; // we save using the old ziplist encoding.
else if (o->encoding == OBJ_ENCODING_SKIPLIST)
else if (encoding == OBJ_ENCODING_SKIPLIST)
return RDB_TYPE_ZSET_2;
break;
case OBJ_HASH:
if (o->encoding == OBJ_ENCODING_LISTPACK)
if (encoding == OBJ_ENCODING_LISTPACK)
return RDB_TYPE_HASH_ZIPLIST;
else if (o->encoding == OBJ_ENCODING_HT)
else if (encoding == OBJ_ENCODING_HT)
return RDB_TYPE_HASH;
break;
case OBJ_STREAM:
@ -147,7 +147,7 @@ uint8_t RdbObjectType(const robj* o) {
case OBJ_MODULE:
return RDB_TYPE_MODULE_2;
}
LOG(FATAL) << "Unknown encoding " << o->encoding << " for type " << o->type;
LOG(FATAL) << "Unknown encoding " << encoding << " for type " << type;
return 0; /* avoid warning */
}
@ -159,7 +159,8 @@ RdbSerializer::RdbSerializer(io::Sink* s) : sink_(s), mem_buf_{4_KB}, tmp_buf_(n
RdbSerializer::~RdbSerializer() {
}
error_code RdbSerializer::SaveKeyVal(string_view key, const robj* val, uint64_t expire_ms) {
// Called by snapshot
error_code RdbSerializer::SaveEntry(PrimeIterator it, uint64_t expire_ms) {
uint8_t buf[16];
/* Save the expire time */
@ -169,73 +170,56 @@ error_code RdbSerializer::SaveKeyVal(string_view key, const robj* val, uint64_t
RETURN_ON_ERR(WriteRaw(Bytes{buf, 9}));
}
uint8_t rdb_type = RdbObjectType(val);
RETURN_ON_ERR(WriteOpcode(rdb_type));
RETURN_ON_ERR(SaveString(key));
const PrimeKey& pk = it->first;
const PrimeValue& pv = it->second;
return SaveObject(val);
}
error_code RdbSerializer::SaveKeyVal(string_view key, string_view value, uint64_t expire_ms) {
uint8_t buf[16];
/* Save the expire time */
if (expire_ms > 0) {
buf[0] = RDB_OPCODE_EXPIRETIME_MS;
absl::little_endian::Store64(buf + 1, expire_ms);
RETURN_ON_ERR(WriteRaw(Bytes{buf, 9}));
}
string_view key = pk.GetSlice(&tmp_str_);
unsigned obj_type = pv.ObjType();
unsigned encoding = pv.Encoding();
uint8_t rdb_type = RdbObjectType(obj_type, encoding);
DVLOG(2) << "Saving keyval start " << key;
RETURN_ON_ERR(WriteOpcode(RDB_TYPE_STRING));
++type_freq_map_[rdb_type];
RETURN_ON_ERR(WriteOpcode(rdb_type));
RETURN_ON_ERR(SaveString(key));
RETURN_ON_ERR(SaveString(value));
if (obj_type == OBJ_STRING) {
auto opt_int = pv.TryGetInt();
if (opt_int) {
return SaveLongLongAsString(*opt_int);
}
return SaveString(pv.GetSlice(&tmp_str_));
}
return error_code{};
return SaveObject(pv);
}
error_code RdbSerializer::SaveObject(const robj* o) {
if (o->type == OBJ_STRING) {
/* Save a string value */
return SaveStringObject(o);
error_code RdbSerializer::SaveObject(const PrimeValue& pv) {
unsigned obj_type = pv.ObjType();
CHECK_NE(obj_type, OBJ_STRING);
if (obj_type == OBJ_LIST) {
return SaveListObject(pv.AsRObj());
}
if (o->type == OBJ_LIST) {
return SaveListObject(o);
if (obj_type == OBJ_SET) {
return SaveSetObject(pv);
}
if (o->type == OBJ_SET) {
return SaveSetObject(o);
if (obj_type == OBJ_HASH) {
return SaveHSetObject(pv.AsRObj());
}
if (o->type == OBJ_HASH) {
return SaveHSetObject(o);
if (obj_type == OBJ_ZSET) {
return SaveZSetObject(pv.AsRObj());
}
if (o->type == OBJ_ZSET) {
return SaveZSetObject(o);
}
LOG(FATAL) << "Not implemented " << o->type;
LOG(FATAL) << "Not implemented " << obj_type;
return error_code{};
}
error_code RdbSerializer::SaveStringObject(const robj* obj) {
/* Avoid to decode the object, then encode it again, if the
* object is already integer encoded. */
if (obj->encoding == OBJ_ENCODING_INT) {
return SaveLongLongAsString(long(obj->ptr));
}
CHECK(sdsEncodedObject(obj));
sds s = reinterpret_cast<sds>(obj->ptr);
return SaveString(std::string_view{s, sdslen(s)});
}
error_code RdbSerializer::SaveListObject(const robj* obj) {
/* Save a list value */
DCHECK_EQ(OBJ_ENCODING_QUICKLIST, obj->encoding);
@ -285,9 +269,9 @@ error_code RdbSerializer::SaveListObject(const robj* obj) {
return error_code{};
}
error_code RdbSerializer::SaveSetObject(const robj* obj) {
if (obj->encoding == OBJ_ENCODING_HT) {
dict* set = (dict*)obj->ptr;
error_code RdbSerializer::SaveSetObject(const PrimeValue& obj) {
if (obj.Encoding() == kEncodingStrMap) {
dict* set = (dict*)obj.RObjPtr();
RETURN_ON_ERR(SaveLen(dictSize(set)));
@ -300,12 +284,12 @@ error_code RdbSerializer::SaveSetObject(const robj* obj) {
RETURN_ON_ERR(SaveString(string_view{ele, sdslen(ele)}));
}
} else if (obj->encoding == OBJ_ENCODING_INTSET) {
size_t len = intsetBlobLen((intset*)obj->ptr);
RETURN_ON_ERR(SaveString(string_view{(char*)obj->ptr, len}));
} else {
LOG(FATAL) << "Unknown set encoding " << obj->encoding;
CHECK_EQ(obj.Encoding(), kEncodingIntSet);
intset* is = (intset*)obj.RObjPtr();
size_t len = intsetBlobLen(is);
RETURN_ON_ERR(SaveString(string_view{(char*)is, len}));
}
return error_code{};
@ -516,6 +500,7 @@ error_code RdbSerializer::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_
}
struct RdbSaver::Impl {
// used for serializing non-body components in the calling fiber.
RdbSerializer serializer;
SliceSnapshot::StringChannel channel;
vector<unique_ptr<SliceSnapshot>> shard_snapshots;
@ -547,7 +532,7 @@ std::error_code RdbSaver::SaveHeader() {
return error_code{};
}
error_code RdbSaver::SaveBody() {
error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
RETURN_ON_ERR(impl_->serializer.FlushMem());
VLOG(1) << "SaveBody";
@ -580,6 +565,14 @@ error_code RdbSaver::SaveBody() {
RETURN_ON_ERR(SaveEpilog());
if (freq_map) {
freq_map->clear();
for (auto& ptr : impl_->shard_snapshots) {
const RdbTypeFreqMap& src_map = ptr->serializer()->type_freq_map();
for (const auto& k_v : src_map)
(*freq_map)[k_v.first] += k_v.second;
}
}
return error_code{};
}
@ -604,7 +597,7 @@ error_code RdbSaver::SaveAux() {
RETURN_ON_ERR(SaveAuxFieldStrInt("ctime", time(NULL)));
// TODO: to implement used-mem caching.
RETURN_ON_ERR(SaveAuxFieldStrInt("used-mem", 666666666));
RETURN_ON_ERR(SaveAuxFieldStrInt("used-mem", used_mem_current.load(memory_order_relaxed)));
RETURN_ON_ERR(SaveAuxFieldStrInt("aof-preamble", aof_preamble));

View file

@ -3,6 +3,8 @@
//
#pragma once
#include <absl/container/flat_hash_map.h>
extern "C" {
#include "redis/lzfP.h"
#include "redis/object.h"
@ -10,65 +12,27 @@ extern "C" {
#include "base/io_buf.h"
#include "io/io.h"
#include "server/table.h"
namespace dfly {
class EngineShardSet;
class EngineShard;
class RdbSerializer {
public:
RdbSerializer(::io::Sink* s = nullptr);
~RdbSerializer();
// The ownership stays with the caller.
void set_sink(::io::Sink* s) {
sink_ = s;
}
std::error_code WriteOpcode(uint8_t opcode) {
return WriteRaw(::io::Bytes{&opcode, 1});
}
std::error_code SaveKeyVal(std::string_view key, const robj* val, uint64_t expire_ms);
std::error_code SaveKeyVal(std::string_view key, std::string_view value, uint64_t expire_ms);
std::error_code WriteRaw(const ::io::Bytes& buf);
std::error_code SaveString(std::string_view val);
std::error_code SaveString(const uint8_t* buf, size_t len) {
return SaveString(std::string_view{reinterpret_cast<const char*>(buf), len});
}
std::error_code SaveLen(size_t len);
std::error_code FlushMem();
private:
std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len);
std::error_code SaveObject(const robj* o);
std::error_code SaveStringObject(const robj* obj);
std::error_code SaveListObject(const robj* obj);
std::error_code SaveSetObject(const robj* obj);
std::error_code SaveHSetObject(const robj* obj);
std::error_code SaveZSetObject(const robj* obj);
std::error_code SaveLongLongAsString(int64_t value);
std::error_code SaveBinaryDouble(double val);
std::error_code SaveListPackAsZiplist(uint8_t* lp);
::io::Sink* sink_ = nullptr;
std::unique_ptr<LZF_HSLOT[]> lzf_;
base::IoBuf mem_buf_;
base::PODArray<uint8_t> tmp_buf_;
};
// keys are RDB_TYPE_xxx constants.
using RdbTypeFreqMap = absl::flat_hash_map<unsigned, size_t>;
class RdbSaver {
public:
RdbSaver(EngineShardSet* ess, ::io::Sink* sink);
~RdbSaver();
std::error_code SaveHeader();
std::error_code SaveBody();
// Writes the RDB file into sink. Waits for the serialization to finish.
// Fills freq_map with the histogram of rdb types.
// freq_map can optionally be null.
std::error_code SaveBody(RdbTypeFreqMap* freq_map);
// Initiates the serialization in the shard's thread.
void StartSnapshotInShard(EngineShard* shard);
private:
@ -85,4 +49,57 @@ class RdbSaver {
std::unique_ptr<Impl> impl_;
};
class RdbSerializer {
public:
RdbSerializer(::io::Sink* s = nullptr);
~RdbSerializer();
// The ownership stays with the caller.
void set_sink(::io::Sink* s) {
sink_ = s;
}
std::error_code WriteOpcode(uint8_t opcode) {
return WriteRaw(::io::Bytes{&opcode, 1});
}
// Must be called in the thread to which `it` belongs.
std::error_code SaveEntry(PrimeIterator it, uint64_t expire_ms);
std::error_code WriteRaw(const ::io::Bytes& buf);
std::error_code SaveString(std::string_view val);
std::error_code SaveString(const uint8_t* buf, size_t len) {
return SaveString(std::string_view{reinterpret_cast<const char*>(buf), len});
}
std::error_code SaveLen(size_t len);
std::error_code FlushMem();
const RdbTypeFreqMap& type_freq_map() const {
return type_freq_map_;
}
private:
std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len);
std::error_code SaveObject(const PrimeValue& pv);
std::error_code SaveListObject(const robj* obj);
std::error_code SaveSetObject(const PrimeValue& pv);
std::error_code SaveHSetObject(const robj* obj);
std::error_code SaveZSetObject(const robj* obj);
std::error_code SaveLongLongAsString(int64_t value);
std::error_code SaveBinaryDouble(double val);
std::error_code SaveListPackAsZiplist(uint8_t* lp);
::io::Sink* sink_ = nullptr;
std::unique_ptr<LZF_HSLOT[]> lzf_;
base::IoBuf mem_buf_;
base::PODArray<uint8_t> tmp_buf_;
std::string tmp_str_;
RdbTypeFreqMap type_freq_map_;
};
} // namespace dfly

View file

@ -101,7 +101,11 @@ TEST_F(RdbTest, Save) {
Run({"rpush", "list_key2", "head", string(511, 'a'), string(500, 'b'), "tail"});
Run({"zadd", "zs1", "1.1", "a", "-1.1", "b"});
Run({"zadd", "zs2", "1.1", string(510, 'a'), "-1.1", string(502, 'b')});
Run({"save"});
}
TEST_F(RdbTest, Load) {
}
} // namespace dfly

View file

@ -75,9 +75,6 @@ error_code CreateDirs(fs::path dir_path) {
} // namespace
atomic_uint64_t used_mem_peak(0);
atomic_uint64_t used_mem_current(0);
ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service->shard_set()) {
start_time_ = time(NULL);
last_save_.store(start_time_, memory_order_release);
@ -327,12 +324,14 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
cntx->transaction->ScheduleSingleHop(std::move(cb));
// perform snapshot serialization, block the current fiber until it completes.
ec = saver.SaveBody();
}
RdbTypeFreqMap freq_map;
ec = saver.SaveBody(&freq_map);
if (ec) {
(*cntx)->SendError(res.error().message());
return;
// TODO: needs protection from reads.
last_save_freq_map_.clear();
for (const auto& k_v : freq_map) {
last_save_freq_map_.push_back(k_v);
}
}
pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::IDLE); });

View file

@ -1,4 +1,4 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
@ -95,6 +95,8 @@ class ServerFamily {
GlobalState global_state_;
time_t start_time_ = 0; // in seconds, epoch time.
// RDB_TYPE_xxx -> count mapping.
std::vector<std::pair<unsigned, size_t>> last_save_freq_map_;
};
} // namespace dfly

View file

@ -1,4 +1,4 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
@ -8,7 +8,7 @@
#include <vector>
#include "core/interpreter.h"
#include "server/common_types.h"
#include "server/common.h"
#include "server/global_state.h"
#include "util/sliding_counter.h"

View file

@ -1046,8 +1046,9 @@ OpResult<StringVec> SetFamily::OpInter(const Transaction* t, EngineShard* es, bo
OpResult<PrimeIterator> find_res = es->db_slice().Find(t->db_index(), keys[i], OBJ_SET);
if (!find_res)
return find_res.status();
robj* sobj = find_res.value()->second.AsRObj();
sets[i] = make_pair(sobj->ptr, unsigned(sobj->encoding));
const PrimeValue& pv = find_res.value()->second;
void* ptr = pv.RObjPtr();
sets[i] = make_pair(ptr, pv.Encoding());
}
auto comp = [](const SetType& left, const SetType& right) {

View file

@ -5,7 +5,7 @@
#pragma once
#include "facade/op_status.h"
#include "server/common_types.h"
#include "server/common.h"
namespace dfly {

View file

@ -25,7 +25,7 @@ namespace this_fiber = ::boost::this_fiber;
using boost::fibers::fiber;
SliceSnapshot::SliceSnapshot(PrimeTable* prime, ExpireTable* et, StringChannel* dest)
: prime_table_(prime), dest_(dest) {
: prime_table_(prime), expire_tbl_(et), dest_(dest) {
}
SliceSnapshot::~SliceSnapshot() {
@ -71,19 +71,13 @@ void SliceSnapshot::Join() {
static_assert(sizeof(PrimeTable::const_iterator) == 16);
void SliceSnapshot::SerializeSingleEntry(PrimeIterator it) {
error_code ec;
string tmp;
auto key = it->first.GetSlice(&tmp);
// TODO: fetch expire.
if (it->second.ObjType() == OBJ_STRING) {
ec = rdb_serializer_->SaveKeyVal(key, it->second.ToString(), 0);
} else {
robj* obj = it->second.AsRObj();
ec = rdb_serializer_->SaveKeyVal(key, obj, 0);
uint64_t expire_time = 0;
if (it->second.HasExpire()) {
auto eit = expire_tbl_->Find(it->first);
expire_time = db_slice_->ExpireTime(eit);
}
error_code ec = rdb_serializer_->SaveEntry(it, expire_time);
CHECK(!ec); // we write to StringFile.
++serialized_;
}

View file

@ -30,6 +30,7 @@ class SliceSnapshot {
return snapshot_version_;
}
RdbSerializer* serializer() { return rdb_serializer_.get(); }
private:
void FiberFunc();
bool FlushSfile(bool force);
@ -50,6 +51,7 @@ class SliceSnapshot {
// version upper bound for entries that should be saved (not included).
uint64_t snapshot_version_ = 0;
PrimeTable* prime_table_;
ExpireTable* expire_tbl_;
DbSlice* db_slice_ = nullptr;
StringChannel* dest_;

View file

@ -63,7 +63,7 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
}
if (IsValid(expire_it) && at_ms) {
expire_it->second.Set(at_ms - db_slice_->expire_base());
expire_it->second = db_slice_->FromAbsoluteTime(at_ms);
} else {
bool changed = db_slice_->Expire(params.db_index, it, at_ms);
if (changed && at_ms == 0) // erased.

View file

@ -4,7 +4,7 @@
#pragma once
#include "server/common_types.h"
#include "server/common.h"
#include "server/engine_shard_set.h"
#include "util/proactor_pool.h"

View file

@ -15,7 +15,7 @@
#include "core/intent_lock.h"
#include "core/tx_queue.h"
#include "facade/op_status.h"
#include "server/common_types.h"
#include "server/common.h"
#include "server/table.h"
#include "util/fibers/fibers_ext.h"

View file

@ -7,7 +7,7 @@
#include <variant>
#include "facade/op_status.h"
#include "server/common_types.h"
#include "server/common.h"
namespace dfly {