1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Introduce TxQueue and IntentLock for VLL algo

This commit is contained in:
Roman Gershman 2021-12-20 22:46:51 +02:00
parent ac2eb7d45c
commit 7fe07f8e4f
19 changed files with 391 additions and 33 deletions

View file

@ -24,4 +24,5 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR})
include_directories(helio)
add_subdirectory(helio)
add_subdirectory(core)
add_subdirectory(server)

4
core/CMakeLists.txt Normal file
View file

@ -0,0 +1,4 @@
add_library(dfly_core tx_queue.cc dragonfly_core.cc)
cxx_link(dfly_core base absl::flat_hash_map)
cxx_test(dfly_core_test dfly_core)

58
core/dfly_core_test.cc Normal file
View file

@ -0,0 +1,58 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "core/tx_queue.h"
#include "base/gtest.h"
#include "core/intent_lock.h"
namespace dfly {
class TxQueueTest : public ::testing::Test {
protected:
TxQueueTest() {
}
uint64_t Pop() {
if (pq_.Empty())
return uint64_t(-1);
TxQueue::ValueType val = pq_.Front();
pq_.PopFront();
return std::get<uint64_t>(val);
}
TxQueue pq_;
};
TEST_F(TxQueueTest, Basic) {
pq_.Insert(4);
pq_.Insert(3);
pq_.Insert(2);
ASSERT_EQ(2, Pop());
ASSERT_EQ(3, Pop());
ASSERT_EQ(4, Pop());
ASSERT_TRUE(pq_.Empty());
pq_.Insert(10);
ASSERT_EQ(10, Pop());
}
class IntentLockTest : public ::testing::Test {
protected:
IntentLock lk_;
};
TEST_F(IntentLockTest, Basic) {
ASSERT_TRUE(lk_.Acquire(IntentLock::SHARED));
ASSERT_FALSE(lk_.Acquire(IntentLock::EXCLUSIVE));
lk_.Release(IntentLock::EXCLUSIVE);
ASSERT_FALSE(lk_.Check(IntentLock::EXCLUSIVE));
lk_.Release(IntentLock::SHARED);
ASSERT_TRUE(lk_.Check(IntentLock::EXCLUSIVE));
}
} // namespace dfly

View file

@ -2,9 +2,9 @@
// See LICENSE for licensing terms.
//
#include "server/resp_expr.h"
#include "base/logging.h"
#include "core/intent_lock.h"
#include "core/resp_expr.h"
namespace dfly {
@ -26,10 +26,25 @@ const char* RespExpr::TypeName(Type t) {
ABSL_INTERNAL_UNREACHABLE;
}
const char* IntentLock::ModeName(Mode m) {
switch (m) {
case IntentLock::SHARED:
return "SHARED";
case IntentLock::EXCLUSIVE:
return "EXCLUSIVE";
}
ABSL_INTERNAL_UNREACHABLE;
}
void IntentLock::VerifyDebug() {
constexpr uint32_t kMsb = 1ULL << (sizeof(cnt_[0]) * 8 - 1);
DCHECK_EQ(0u, cnt_[0] & kMsb);
DCHECK_EQ(0u, cnt_[1] & kMsb);
}
} // namespace dfly
namespace std {
ostream& operator<<(ostream& os, const dfly::RespExpr& e) {
using dfly::RespExpr;
using dfly::ToSV;
@ -39,7 +54,7 @@ ostream& operator<<(ostream& os, const dfly::RespExpr& e) {
os << "i" << get<int64_t>(e.u);
break;
case RespExpr::STRING:
os << "'" << ToSV(e.GetBuf()) << "'";
os << "'" << ToSV(get<RespExpr::Buffer>(e.u)) << "'";
break;
case RespExpr::NIL:
os << "nil";
@ -51,7 +66,7 @@ ostream& operator<<(ostream& os, const dfly::RespExpr& e) {
os << dfly::RespSpan{*get<RespExpr::Vec*>(e.u)};
break;
case RespExpr::ERROR:
os << "e(" << ToSV(e.GetBuf()) << ")";
os << "e(" << ToSV(get<RespExpr::Buffer>(e.u)) << ")";
break;
}
@ -71,4 +86,4 @@ ostream& operator<<(ostream& os, dfly::RespSpan ras) {
return os;
}
} // namespace std
} // namespace std

54
core/intent_lock.h Normal file
View file

@ -0,0 +1,54 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include <assert.h>
#pragma once
namespace dfly {
// SHARED - can be acquired multiple times as long as other intents are absent.
// EXCLUSIVE - is acquired only if it's the only lock recorded.
// BLOCKED_READY - can not be acquired - it's recorded for intent purposes.
// Transactions at the head of tx-queue are considered to be the ones that acquired the lock
class IntentLock {
public:
enum Mode { SHARED = 0, EXCLUSIVE = 1 };
// Returns true if lock was acquired. In any case, the intent is recorded.
bool Acquire(Mode m) {
++cnt_[m];
if (cnt_[1 ^ int(m)])
return false;
return m == SHARED || cnt_[EXCLUSIVE] == 1;
}
bool Check(Mode m) const {
unsigned s = cnt_[EXCLUSIVE];
if (s)
return false;
return (m == SHARED) ? true : IsFree();
}
void Release(Mode m, unsigned val = 1) {
assert(cnt_[m] >= val);
cnt_[m] -= val;
// return cnt_[m] == 0 ? cnt_[1 ^ int(m)] : 0;
}
bool IsFree() const {
return (cnt_[0] | cnt_[1]) == 0;
}
static const char* ModeName(Mode m);
void VerifyDebug();
private:
unsigned cnt_[2] = {0, 0};
};
} // namespace dfly

115
core/tx_queue.cc Normal file
View file

@ -0,0 +1,115 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "core/tx_queue.h"
#include "base/logging.h"
namespace dfly {
TxQueue::TxQueue(std::function<uint64_t(const Transaction*)> sf)
: score_fun_(sf), vec_(32) {
for (size_t i = 0; i < vec_.size(); ++i) {
vec_[i].next = i + 1;
}
}
auto TxQueue::Insert(Transaction* t) -> Iterator {
if (next_free_ >= vec_.size()) {
Grow();
}
DCHECK_LT(next_free_, vec_.size());
DCHECK_EQ(FREE_TAG, vec_[next_free_].tag);
Iterator res = next_free_;
vec_[next_free_].u.trans = t;
vec_[next_free_].tag = TRANS_TAG;
DVLOG(1) << "Insert " << next_free_ << " " << t;
LinkFree(score_fun_(t));
return res;
}
auto TxQueue::Insert(uint64_t val) -> Iterator {
if (next_free_ >= vec_.size()) {
Grow();
}
DCHECK_LT(next_free_, vec_.size());
Iterator res = next_free_;
vec_[next_free_].u.uval = val;
vec_[next_free_].tag = UINT_TAG;
LinkFree(val);
return res;
}
void TxQueue::LinkFree(uint64_t weight) {
uint32_t taken = next_free_;
next_free_ = vec_[taken].next;
if (size_ == 0) {
head_ = taken;
vec_[head_].next = vec_[head_].prev = head_;
} else {
uint32_t cur = vec_[head_].prev;
while (true) {
if (Rank(vec_[cur]) < weight) {
Link(cur, taken);
break;
}
if (cur == head_) {
Link(vec_[head_].prev, taken);
head_ = taken;
break;
}
cur = vec_[cur].prev;
}
}
++size_;
}
void TxQueue::Grow() {
size_t start = vec_.size();
DVLOG(1) << "Grow from " << start << " to " << start * 2;
vec_.resize(start * 2);
for (size_t i = start; i < vec_.size(); ++i) {
vec_[i].next = i + 1;
}
}
void TxQueue::Remove(Iterator it) {
DCHECK_GT(size_, 0u);
DCHECK_LT(it, vec_.size());
DCHECK_NE(FREE_TAG, vec_[it].tag);
DVLOG(1) << "Remove " << it << " " << vec_[it].u.trans;
Iterator next = kEnd;
if (size_ > 1) {
Iterator prev = vec_[it].prev;
next = vec_[it].next;
vec_[prev].next = next;
vec_[next].prev = prev;
}
--size_;
vec_[it].next = next_free_;
vec_[it].tag = FREE_TAG;
next_free_ = it;
if (head_ == it) {
head_ = next;
}
}
uint64_t TxQueue::Rank(const QRecord& r) const {
switch (r.tag) {
case UINT_TAG:
return r.u.uval;
case TRANS_TAG:
return score_fun_(r.u.trans);
}
return 0;
}
} // namespace dfly

111
core/tx_queue.h Normal file
View file

@ -0,0 +1,111 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <cstdint>
#include <functional>
#include <variant>
#include <vector>
namespace dfly {
class Transaction;
class TxQueue {
void Link(uint32_t p, uint32_t n) {
uint32_t next = vec_[p].next;
vec_[n].next = next;
vec_[n].prev = p;
vec_[p].next = n;
vec_[next].prev = n;
}
public:
// uint64_t is used for unit-tests.
using ValueType = std::variant<Transaction*, uint64_t>;
using Iterator = uint32_t;
enum { kEnd = Iterator(-1) };
TxQueue(std::function<uint64_t(const Transaction*)> score_fun = nullptr);
// returns iterator to that item the list
Iterator Insert(Transaction* t);
Iterator Insert(uint64_t val);
void Remove(Iterator);
ValueType At(Iterator it) const {
switch (vec_[it].tag) {
case TRANS_TAG:
return vec_[it].u.trans;
case UINT_TAG:
return vec_[it].u.uval;
}
return 0u;
}
ValueType Front() const {
return At(head_);
}
void PopFront() {
Remove(head_);
}
size_t size() const {
return size_;
}
bool Empty() const {
return size_ == 0;
}
//! returns the score of the tail record. Can be called only if !Empty().
uint64_t TailScore() const {
return Rank(vec_[vec_[head_].prev]);
}
//! returns the score of the head record. Can be called only if !Empty().
uint64_t HeadScore() const {
return Rank(vec_[head_]);
}
//! Can be called only if !Empty().
Iterator Head() const {
return head_;
}
private:
enum { TRANS_TAG = 0, UINT_TAG = 11, FREE_TAG = 12 };
void Grow();
void LinkFree(uint64_t rank);
struct QRecord {
union {
Transaction* trans;
uint64_t uval;
} u;
uint32_t tag : 8;
uint32_t next : 24;
uint32_t prev;
QRecord() : tag(FREE_TAG), prev(kEnd) {
}
};
static_assert(sizeof(QRecord) == 16, "");
uint64_t Rank(const QRecord& r) const;
std::function<uint64_t(const Transaction*)> score_fun_;
std::vector<QRecord> vec_;
uint32_t next_free_ = 0, head_ = 0;
size_t size_ = 0;
TxQueue(const TxQueue&) = delete;
};
} // namespace dfly

View file

@ -5,9 +5,9 @@ add_library(dragonfly_lib command_registry.cc common.cc config_flags.cc
conn_context.cc db_slice.cc debugcmd.cc dragonfly_listener.cc
dragonfly_connection.cc engine_shard_set.cc
main_service.cc memcache_parser.cc
redis_parser.cc resp_expr.cc reply_builder.cc string_family.cc)
redis_parser.cc reply_builder.cc string_family.cc)
cxx_link(dragonfly_lib uring_fiber_lib
cxx_link(dragonfly_lib dfly_core uring_fiber_lib
fibers_ext strings_lib http_server_lib tls_lib)
add_library(dfly_test_lib test_utils.cc)

View file

@ -12,9 +12,17 @@
namespace dfly {
enum class Protocol : uint8_t {
MEMCACHE = 1,
REDIS = 2
};
using DbIndex = uint16_t;
using ShardId = uint16_t;
using TxId = uint64_t;
using TxClock = uint64_t;
using ArgSlice = absl::Span<const std::string_view>;
using MutableStrSpan = absl::Span<char>;
using CmdArgList = absl::Span<MutableStrSpan>;
using CmdArgVec = std::vector<MutableStrSpan>;
@ -23,8 +31,15 @@ constexpr DbIndex kInvalidDbId = DbIndex(-1);
constexpr ShardId kInvalidSid = ShardId(-1);
class CommandId;
class Transaction;
class EngineShard;
struct KeyLockArgs {
DbIndex db_index;
ArgSlice args;
unsigned key_step;
};
inline std::string_view ArgS(CmdArgList args, size_t i) {
auto arg = args[i];
return std::string_view(arg.data(), arg.size());

View file

@ -4,8 +4,8 @@
#pragma once
#include "server/reply_builder.h"
#include "server/common_types.h"
#include "server/reply_builder.h"
namespace dfly {
@ -35,6 +35,7 @@ class ConnectionContext : public ReplyBuilder {
ConnectionContext(::io::Sink* stream, Connection* owner);
// TODO: to introduce proper accessors.
Transaction* transaction = nullptr;
const CommandId* cid = nullptr;
EngineShardSet* shard_set = nullptr;

View file

@ -5,7 +5,7 @@
#pragma once
#include "server/common_types.h"
#include "server/op_status.h"
#include "core/op_status.h"
#include "server/table.h"
namespace util {

View file

@ -1,16 +0,0 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <cstdint>
namespace dfly {
enum class Protocol : uint8_t {
MEMCACHE = 1,
REDIS = 2
};
} // namespace dfly

View file

@ -9,9 +9,8 @@
#include <deque>
#include "base/io_buf.h"
#include "core/resp_expr.h"
#include "server/common_types.h"
#include "server/dfly_protocol.h"
#include "server/resp_expr.h"
#include "util/connection.h"
#include "util/fibers/event_count.h"

View file

@ -5,7 +5,7 @@
#pragma once
#include "util/listener_interface.h"
#include "server/dfly_protocol.h"
#include "server/common_types.h"
typedef struct ssl_ctx_st SSL_CTX;

View file

@ -13,8 +13,9 @@
#include "base/logging.h"
#include "server/conn_context.h"
#include "server/debugcmd.h"
#include "util/metrics/metrics.h"
#include "server/error.h"
#include "server/string_family.h"
#include "util/metrics/metrics.h"
#include "util/uring/uring_fiber_algo.h"
#include "util/varz.h"

View file

@ -5,7 +5,7 @@
#include <absl/container/inlined_vector.h>
#include "resp_expr.h"
#include "core/resp_expr.h"
namespace dfly {

View file

@ -3,9 +3,9 @@
//
#include <string_view>
#include "core/op_status.h"
#include "io/sync_stream_interface.h"
#include "server/dfly_protocol.h"
#include "server/op_status.h"
#include "server/common_types.h"
namespace dfly {