From aa2136a406610b1b175cc0acb908cc94c9ed93c4 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 13 Jan 2022 10:49:25 +0200 Subject: [PATCH] Add CompactObject that will represent all the possible dragonfly types Specifically, it act as union for int,string, set, zset and other types that exist in Redis API. In order to be more memory friendly, CompactObject incorporates inline storage that will be used for SSO. --- core/CMakeLists.txt | 7 +- core/compact_object.cc | 447 ++++++++++++++++++++++++++++++++++++ core/compact_object.h | 242 +++++++++++++++++++ core/compact_object_test.cc | 68 ++++++ 4 files changed, 761 insertions(+), 3 deletions(-) create mode 100644 core/compact_object.cc create mode 100644 core/compact_object.h create mode 100644 core/compact_object_test.cc diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 3891ec4ea..6503c4175 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -1,4 +1,5 @@ -add_library(dfly_core tx_queue.cc dragonfly_core.cc) -cxx_link(dfly_core base absl::flat_hash_map) +add_library(dfly_core compact_object.cc dragonfly_core.cc tx_queue.cc) +cxx_link(dfly_core base absl::flat_hash_map redis_lib) -cxx_test(dfly_core_test dfly_core) \ No newline at end of file +cxx_test(dfly_core_test dfly_core) +cxx_test(compact_object_test dfly_core) \ No newline at end of file diff --git a/core/compact_object.cc b/core/compact_object.cc new file mode 100644 index 000000000..5532d5303 --- /dev/null +++ b/core/compact_object.cc @@ -0,0 +1,447 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "core/compact_object.h" + +// #define XXH_INLINE_ALL +// #include + +extern "C" { +#include "redis/object.h" +#include "redis/util.h" +#include "redis/zmalloc.h" // for non-string objects. +} + +#include + +#include "base/logging.h" + +namespace dfly { +using namespace std; + +namespace { + +size_t QlUsedSize(quicklist* ql) { + size_t res = ql->len * sizeof(quicklistNode) + znallocx(sizeof(quicklist)); + quicklistNode* ptr = ql->head; + while (ptr) { + res += ptr->sz; + ptr = ptr->next; + } + return res; +} + +thread_local robj tmp_robj{ + .type = 0, .encoding = 0, .lru = 0, .refcount = OBJ_STATIC_REFCOUNT, .ptr = nullptr}; +} // namespace + +static_assert(sizeof(CompactObj) == 18); + +namespace detail { + +CompactBlob::CompactBlob(std::string_view s, pmr::memory_resource* mr) + : ptr_(nullptr), sz(s.size()) { + if (sz) { + ptr_ = mr->allocate(sz); + memcpy(ptr_, s.data(), s.size()); + } +} + +void CompactBlob::Assign(std::string_view s, std::pmr::memory_resource* mr) { + if (s.size() > sz) { + size_t cur_cap = capacity(); + if (s.size() > cur_cap) + MakeRoom(cur_cap, s.size(), mr); + } + memcpy(ptr_, s.data(), s.size()); + sz = s.size(); +} + +void CompactBlob::Free(pmr::memory_resource* mr) { + mr->deallocate(ptr_, 0); // we do not keep the allocated size. + sz = 0; + ptr_ = nullptr; +} + +void CompactBlob::MakeRoom(size_t current_cap, size_t desired, std::pmr::memory_resource* mr) { + if (current_cap * 2 > desired) { + if (desired < SDS_MAX_PREALLOC) + desired *= 2; + else + desired += SDS_MAX_PREALLOC; + } + void* newp = mr->allocate(desired); + if (sz) { + memcpy(newp, ptr_, sz); + } + if (current_cap) { + mr->deallocate(ptr_, current_cap); + } + ptr_ = newp; +} + +// here we break pmr model since we use non-pmr api of fetching usable size based on pointer. +size_t CompactBlob::capacity() const { + return malloc_usable_size(ptr_); +} + +size_t RobjWrapper::MallocUsed() const { + void* ptr = blob.ptr(); + if (!ptr) + return 0; + + switch (type) { + case OBJ_STRING: + DVLOG(2) << "Freeing string object"; + CHECK_EQ(OBJ_ENCODING_RAW, encoding); + return blob.capacity(); + break; + case OBJ_LIST: + CHECK_EQ(encoding, OBJ_ENCODING_QUICKLIST); + return QlUsedSize((quicklist*)ptr); + default: + LOG(FATAL) << "Not supported " << type; + } + + return 0; +} + +size_t RobjWrapper::Size() const { + switch (type) { + case OBJ_STRING: + DVLOG(2) << "Freeing string object"; + DCHECK_EQ(OBJ_ENCODING_RAW, encoding); + return blob.size(); + break; + default:; + } + return 0; +} + +void RobjWrapper::Free(std::pmr::memory_resource* mr) { + void* ptr = blob.ptr(); + if (!ptr) + return; + + switch (type) { + case OBJ_STRING: + DVLOG(2) << "Freeing string object"; + if (encoding == OBJ_ENCODING_RAW) { + blob.Free(mr); + } else { + CHECK_EQ(OBJ_ENCODING_INT, encoding); + } + break; + case OBJ_LIST: + CHECK_EQ(encoding, OBJ_ENCODING_QUICKLIST); + quicklistRelease((quicklist*)ptr); + break; + + case OBJ_SET: + LOG(FATAL) << "TBD"; + break; + case OBJ_ZSET: + LOG(FATAL) << "TBD"; + break; + case OBJ_HASH: + LOG(FATAL) << "Unsupported HASH type"; + break; + case OBJ_MODULE: + LOG(FATAL) << "Unsupported OBJ_MODULE type"; + break; + case OBJ_STREAM: + LOG(FATAL) << "Unsupported OBJ_STREAM type"; + break; + default: + LOG(FATAL) << "Unknown object type"; + break; + } + blob.Set(nullptr, 0); +} + +bool RobjWrapper::Equal(const RobjWrapper& ow) const { + if (ow.type != type || ow.encoding != encoding) + return false; + if (type == OBJ_STRING) { + DCHECK_EQ(OBJ_ENCODING_RAW, encoding); + return blob.AsView() == ow.blob.AsView(); + } + LOG(FATAL) << "Unsupported type " << type; + return false; +} + +bool RobjWrapper::Equal(std::string_view sv) const { + if (type != OBJ_STRING) + return false; + + DCHECK_EQ(OBJ_ENCODING_RAW, encoding); + return blob.AsView() == sv; +} + +} // namespace detail + +using namespace std; + +CompactObj::~CompactObj() { + if (HasAllocated()) { + Free(); + } +} + +CompactObj& CompactObj::operator=(CompactObj&& o) noexcept { + SetMeta(o.taglen_, o.mask_); // Frees underlying resources if needed. + memcpy(&u_, &o.u_, sizeof(u_)); + + // SetMeta deallocates the object and we only want reset it. + o.taglen_ = 0; + o.mask_ = 0; + + return *this; +} + +size_t CompactObj::StrSize() const { + if (IsInline()) { + return taglen_; + } + + if (taglen_ == ROBJ_TAG) { + return u_.r_obj.Size(); + } + + LOG(DFATAL) << "Should not reach " << int(taglen_); + return 0; +} + +unsigned CompactObj::ObjType() const { + if (IsInline() || taglen_ == INT_TAG) + return OBJ_STRING; + + if (taglen_ == ROBJ_TAG) + return u_.r_obj.type; + + LOG(FATAL) << "TBD " << taglen_; + return 0; +} + +unsigned CompactObj::Encoding() const { + switch (taglen_) { + case ROBJ_TAG: + return u_.r_obj.encoding; + case INT_TAG: + return OBJ_ENCODING_INT; + default: + return OBJ_ENCODING_RAW; + } +} + +quicklist* CompactObj::GetQL() const { + CHECK_EQ(taglen_, ROBJ_TAG); + CHECK_EQ(u_.r_obj.type, OBJ_LIST); + CHECK_EQ(u_.r_obj.encoding, OBJ_ENCODING_QUICKLIST); + + return (quicklist*)u_.r_obj.blob.ptr(); +} + +// Takes ownership over o. +void CompactObj::ImportRObj(robj* o) { + CHECK(1 == o->refcount || o->refcount == OBJ_STATIC_REFCOUNT); + CHECK_NE(o->encoding, OBJ_ENCODING_EMBSTR); // need regular one + + SetMeta(ROBJ_TAG); + + u_.r_obj.type = o->type; + u_.r_obj.encoding = o->encoding; + u_.r_obj.lru_unneeded = o->lru; + + if (o->type == OBJ_STRING) { + std::string_view src((char*)o->ptr, sdslen((sds)o->ptr)); + u_.r_obj.blob.Assign(src, pmr::get_default_resource()); + decrRefCount(o); + } else { // Non-string objects we move as is and release Robj wrapper. + u_.r_obj.blob.Set(o->ptr, 0); + if (o->refcount == 1) + zfree(o); + } +} + +robj* CompactObj::AsRObj() const { + CHECK_EQ(ROBJ_TAG, taglen_); + + tmp_robj.encoding = u_.r_obj.encoding; + tmp_robj.type = u_.r_obj.type; + tmp_robj.lru = u_.r_obj.lru_unneeded; + tmp_robj.ptr = u_.r_obj.blob.ptr(); + + return &tmp_robj; +} + +void CompactObj::SyncRObj() { + CHECK_EQ(ROBJ_TAG, taglen_); + CHECK_EQ(u_.r_obj.type, tmp_robj.type); + + u_.r_obj.encoding = tmp_robj.encoding; + u_.r_obj.blob.Set(tmp_robj.ptr, 0); +} + +void CompactObj::SetInt(int64_t val) { + if (INT_TAG != taglen_) { + SetMeta(INT_TAG); + } + + u_.ival = val; +} + +std::optional CompactObj::TryGetInt() const { + if (taglen_ != INT_TAG) + return std::nullopt; + int64_t val = u_.ival; + return val; +} + +void CompactObj::SetString(std::string_view str) { + // Trying auto-detection heuristics first. + if (str.size() <= 20) { // TODO: to move OBJ_ENCODING_INT out of ROBJ logic. + long long ival; + static_assert(sizeof(long long) == 8); + + // We use redis string2ll to be compatible with Redis. + if (string2ll(str.data(), str.size(), &ival)) { + SetMeta(INT_TAG); + u_.ival = ival; + + return; + } + + if (str.size() <= kInlineLen) { + SetMeta(str.size()); + + memcpy(u_.inline_str, str.data(), str.size()); + return; + } + } + + std::string_view input = str; + + if (str.size() <= kInlineLen) { + SetMeta(str.size(), 0); + return; + } + + if (taglen_ != ROBJ_TAG || u_.r_obj.type != OBJ_STRING) { + SetMeta(ROBJ_TAG); + u_.r_obj.type = OBJ_STRING; + u_.r_obj.encoding = OBJ_ENCODING_RAW; + } + + DCHECK(taglen_ == ROBJ_TAG && u_.r_obj.type == OBJ_STRING); + CHECK_EQ(OBJ_ENCODING_RAW, u_.r_obj.encoding); + u_.r_obj.blob.Assign(input, pmr::get_default_resource()); +} + +std::string_view CompactObj::GetSlice(std::string* scratch) const { + if (IsInline()) { + return std::string_view{u_.inline_str, taglen_}; + } + + if (taglen_ == ROBJ_TAG) { + CHECK_EQ(OBJ_STRING, u_.r_obj.type); + DCHECK_EQ(OBJ_ENCODING_RAW, u_.r_obj.encoding); + return u_.r_obj.blob.AsView(); + } + + if (taglen_ == INT_TAG) { + absl::AlphaNum an(u_.ival); + scratch->assign(an.Piece()); + + return *scratch; + } + + LOG(FATAL) << "Bad tag " << int(taglen_); + + return std::string_view{}; +} + +bool CompactObj::HasAllocated() const { + if (IsRef() || taglen_ == INT_TAG || IsInline() || + (taglen_ == ROBJ_TAG && u_.r_obj.blob.ptr() == nullptr)) + return false; + + DCHECK(taglen_ == ROBJ_TAG); + return true; +} + +void CompactObj::GetString(string* res) const { + std::string_view slice = GetSlice(res); + if (res->data() != slice.data()) { + res->assign(slice); + } +} + +void CompactObj::Reset() { + if (HasAllocated()) { + Free(); + } + taglen_ = 0; + mask_ = 0; +} + +// Frees all resources if owns. +void CompactObj::Free() { + DCHECK(HasAllocated()); + + if (taglen_ == ROBJ_TAG) { + u_.r_obj.Free(pmr::get_default_resource()); + } else { + LOG(FATAL) << "Bad compact object type " << int(taglen_); + } + + memset(u_.inline_str, 0, kInlineLen); +} + +size_t CompactObj::MallocUsed() const { + if (!HasAllocated()) + return 0; + + if (taglen_ == ROBJ_TAG) { + return u_.r_obj.MallocUsed(); + } + + LOG(FATAL) << "TBD"; + return 0; +} + +bool CompactObj::operator==(const CompactObj& o) const { + if (taglen_ == ROBJ_TAG || o.taglen_ == ROBJ_TAG) { + if (o.taglen_ != taglen_) + return false; + return u_.r_obj.Equal(o.u_.r_obj); + } + + if (taglen_ != o.taglen_) + return false; + if (taglen_ == INT_TAG) + return u_.ival == o.u_.ival; + DCHECK(IsInline() && o.IsInline()); + + if (memcmp(u_.inline_str, o.u_.inline_str, taglen_) != 0) + return false; + + return true; +} + +bool CompactObj::EqualNonInline(std::string_view sv) const { + switch (taglen_) { + case INT_TAG: { + absl::AlphaNum an(u_.ival); + return sv == an.Piece(); + } + case ROBJ_TAG: + return u_.r_obj.Equal(sv); + default: + break; + } + return false; +} + +} // namespace dfly diff --git a/core/compact_object.h b/core/compact_object.h new file mode 100644 index 000000000..224d1f1af --- /dev/null +++ b/core/compact_object.h @@ -0,0 +1,242 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include +#include + +typedef struct redisObject robj; +typedef struct quicklist quicklist; + +namespace dfly { + +namespace detail { + +class CompactBlob { + void* ptr_; + uint32_t sz; + + public: + CompactBlob() : ptr_(nullptr), sz(0) { + } + + explicit CompactBlob(std::string_view s, std::pmr::memory_resource* mr); + + void Assign(std::string_view s, std::pmr::memory_resource* mr); + + void Set(void* p, uint32_t s) { + ptr_ = p; + sz = s; + } + + void Free(std::pmr::memory_resource* mr); + + size_t size() const { + return sz; + } + + size_t capacity() const; + + void* ptr() const { + return ptr_; + } + + std::string_view AsView() const { + return std::string_view{reinterpret_cast(ptr_), sz}; + } + + void MakeRoom(size_t current_cap, size_t desired, std::pmr::memory_resource* mr); +} __attribute__((packed)); + +static_assert(sizeof(CompactBlob) == 12, ""); + +// Objects/blobs of upto 4GB size. +struct RobjWrapper { + size_t MallocUsed() const; + + bool Equal(const RobjWrapper& ow) const; + bool Equal(std::string_view sv) const; + size_t Size() const; + void Free(std::pmr::memory_resource* mr); + + CompactBlob blob; + static_assert(sizeof(blob) == 12); + + uint32_t type : 4; + uint32_t encoding : 4; + uint32_t lru_unneeded : 24; + RobjWrapper() { + } +} __attribute__((packed)); + +} // namespace detail + +class CompactObj { + static constexpr unsigned kInlineLen = 16; + + void operator=(const CompactObj&) = delete; + CompactObj(const CompactObj&) = delete; + + // 0-16 is reserved for inline lengths of string type. + enum TagEnum { + INT_TAG = 17, + SMALL_TAG = 18, // TBD + ROBJ_TAG = 19, + }; + + enum MaskBit { + REF_BIT = 1, + EXPIRE_BIT = 2, + }; + + public: + using PrefixArray = std::vector; + + CompactObj() { // By default - empty string. + } + + explicit CompactObj(robj* o) { + ImportRObj(o); + } + + explicit CompactObj(std::string_view str) { + SetString(str); + } + + CompactObj(CompactObj&& cs) noexcept { + operator=(std::move(cs)); + }; + + ~CompactObj(); + + CompactObj& operator=(CompactObj&& o) noexcept; + + size_t StrSize() const; + + // TODO: We don't use c++ constructs (ctor, dtor, =) in objects of U, + // because we use memcpy here. + CompactObj AsRef() const { + CompactObj res; + memcpy(&res.u_, &u_, sizeof(u_)); + res.taglen_ = taglen_; + res.mask_ = mask_ | REF_BIT; + + return res; + } + + std::string_view GetSlice(std::string* scratch) const; + + std::string ToString() const { + std::string res; + GetString(&res); + return res; + } + + bool operator==(const CompactObj& o) const; + + bool operator==(std::string_view sl) const; + + friend bool operator!=(const CompactObj& lhs, const CompactObj& rhs) { + return !(lhs == rhs); + } + + friend bool operator==(std::string_view sl, const CompactObj& o) { + return o.operator==(sl); + } + + bool HasExpire() const { + return mask_ & EXPIRE_BIT; + } + + void SetExpire(bool e) { + if (e) { + mask_ |= EXPIRE_BIT; + } else { + mask_ &= ~EXPIRE_BIT; + } + } + + unsigned Encoding() const; + unsigned ObjType() const; + quicklist* GetQL() const; + + // Takes ownership over o. + void ImportRObj(robj* o); + + robj* AsRObj() const; + + // Syncs 'this' instance with the object that was previously returned by AsRObj(). + // Requires: AsRObj() has been called before in the same thread in fiber-atomic section. + void SyncRObj(); + + void SetInt(int64_t val); + std::optional TryGetInt() const; + + void SetString(std::string_view str); + + void GetString(std::string* res) const; + + size_t MallocUsed() const; + + // Resets the object to empty state. + void Reset(); + + bool IsInline() const { + return taglen_ <= kInlineLen; + } + + static constexpr unsigned InlineLen() { + return kInlineLen; + } + + private: + bool EqualNonInline(std::string_view sv) const; + + // Requires: HasAllocated() - true. + void Free(); + + bool HasAllocated() const; + + bool IsRef() const { + return mask_ & REF_BIT; + } + + void SetMeta(uint8_t taglen, uint8_t mask = 0) { + if (HasAllocated()) { + Free(); + } else { + memset(u_.inline_str, 0, kInlineLen); + } + taglen_ = taglen; + mask_ = mask; + } + + // My main data structure. Union of representations. + union U { + char inline_str[kInlineLen]; + + detail::RobjWrapper r_obj; + int64_t ival __attribute__((packed)); + + U() : r_obj() { + } + } u_; + + static_assert(sizeof(u_) == 16, ""); + + mutable uint8_t mask_ = 0; + uint8_t taglen_ = 0; +}; + +inline bool CompactObj::operator==(std::string_view sv) const { + if (IsInline()) { + return std::string_view{u_.inline_str, taglen_} == sv; + } + return EqualNonInline(sv); +} + +} // namespace dfly diff --git a/core/compact_object_test.cc b/core/compact_object_test.cc new file mode 100644 index 000000000..a81664755 --- /dev/null +++ b/core/compact_object_test.cc @@ -0,0 +1,68 @@ +// Copyright 2021, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "core/compact_object.h" +#include "base/gtest.h" + +extern "C" { +#include "redis/object.h" +} + +namespace dfly { +using namespace std; + +void PrintTo(const CompactObj& cobj, std::ostream* os) { + if (cobj.ObjType() == OBJ_STRING) { + *os << "'" << cobj.ToString() << "' "; + return; + } + *os << "cobj: [" << cobj.ObjType() << "]"; +} + +class CompactObjectTest : public ::testing::Test { + protected: + static void SetUpTestCase() { + } + + CompactObj cs_; + string tmp_; +}; + +TEST_F(CompactObjectTest, Basic) { + robj* rv = createRawStringObject("foo", 3); + cs_.ImportRObj(rv); + + CompactObj a; + + a.SetString("val"); + string res; + a.GetString(&res); + EXPECT_EQ("val", res); + + CompactObj b("vala"); + EXPECT_NE(a, b); + + CompactObj c = a.AsRef(); + EXPECT_EQ(a, c); +} + +TEST_F(CompactObjectTest, Int) { + cs_.SetString("0"); + EXPECT_EQ(0, cs_.TryGetInt()); + EXPECT_EQ(cs_, "0"); + EXPECT_EQ("0", cs_.GetSlice(&tmp_)); + EXPECT_EQ(OBJ_STRING, cs_.ObjType()); + cs_.SetString("42"); + EXPECT_EQ(OBJ_ENCODING_INT, cs_.Encoding()); +} + +TEST_F(CompactObjectTest, MediumString) { + CompactObj obj; + string tmp(512, 'b'); + obj.SetString(tmp); + obj.SetString(tmp); + obj.Reset(); +} + +} // namespace dfly