mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
feat(cluser): support GETSLOTINFO (#1217)
feat(cluser): support GETSLOTINFO Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
7f547151bf
commit
8ae01dbea5
7 changed files with 111 additions and 2 deletions
|
@ -61,6 +61,7 @@ class ClusterConfig {
|
|||
// Returns true if `new_config` is valid and internal state was changed. Returns false and changes
|
||||
// nothing otherwise.
|
||||
bool SetConfig(const ClusterShards& new_config);
|
||||
static constexpr SlotId kMaxSlotNum = 0x3FFF;
|
||||
|
||||
private:
|
||||
struct SlotEntry {
|
||||
|
@ -71,7 +72,6 @@ class ClusterConfig {
|
|||
bool IsConfigValid(const ClusterShards& new_config);
|
||||
|
||||
static bool cluster_enabled;
|
||||
static constexpr SlotId kMaxSlotNum = 0x3FFF;
|
||||
|
||||
const std::string my_id_;
|
||||
|
||||
|
|
|
@ -55,6 +55,13 @@ void PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, EngineShard* s
|
|||
if (pv.ObjType() == OBJ_STRING)
|
||||
stats.strval_memory_usage -= value_heap_size;
|
||||
|
||||
if (ClusterConfig::IsClusterEnabled()) {
|
||||
string tmp;
|
||||
string_view key = del_it->first.GetSlice(&tmp);
|
||||
SlotId sid = ClusterConfig::KeySlot(key);
|
||||
table->slots_stats[sid].key_count -= 1;
|
||||
}
|
||||
|
||||
table->prime.Erase(del_it);
|
||||
}
|
||||
|
||||
|
@ -266,6 +273,11 @@ auto DbSlice::GetStats() const -> Stats {
|
|||
return s;
|
||||
}
|
||||
|
||||
SlotStats DbSlice::GetSlotStats(SlotId sid) const {
|
||||
CHECK(db_arr_[0]);
|
||||
return db_arr_[0]->slots_stats[sid];
|
||||
}
|
||||
|
||||
void DbSlice::Reserve(DbIndex db_ind, size_t key_size) {
|
||||
ActivateDb(db_ind);
|
||||
|
||||
|
@ -419,7 +431,10 @@ tuple<PrimeIterator, ExpireIterator, bool> DbSlice::AddOrFind2(const Context& cn
|
|||
|
||||
it.SetVersion(NextVersion());
|
||||
memory_budget_ = evp.mem_budget() + evicted_obj_bytes;
|
||||
|
||||
if (ClusterConfig::IsClusterEnabled()) {
|
||||
SlotId sid = ClusterConfig::KeySlot(key);
|
||||
db.slots_stats[sid].key_count += 1;
|
||||
}
|
||||
return make_tuple(it, ExpireIterator{}, true);
|
||||
}
|
||||
|
||||
|
|
|
@ -110,6 +110,9 @@ class DbSlice {
|
|||
// Returns statistics for the whole db slice. A bit heavy operation.
|
||||
Stats GetStats() const;
|
||||
|
||||
// Returns slot statistics for db 0.
|
||||
SlotStats GetSlotStats(SlotId sid) const;
|
||||
|
||||
void UpdateExpireBase(uint64_t now, unsigned generation) {
|
||||
expire_base_[generation & 1] = now;
|
||||
}
|
||||
|
|
|
@ -120,6 +120,10 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
|
|||
return ReplicaOffset(args, cntx);
|
||||
}
|
||||
|
||||
if (sub_cmd == "CLUSTER" && args.size() > 2) {
|
||||
return ClusterManagmentCmd(args, cntx);
|
||||
}
|
||||
|
||||
rb->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
|
@ -366,6 +370,69 @@ void DflyCmd::ReplicaOffset(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void DflyCmd::ClusterManagmentCmd(CmdArgList args, ConnectionContext* cntx) {
|
||||
if (!ClusterConfig::IsClusterEnabled()) {
|
||||
return (*cntx)->SendError("DFLY CLUSTER commands requires --cluster_mode=yes");
|
||||
}
|
||||
// TODO check admin port
|
||||
ToUpper(&args[1]);
|
||||
string_view sub_cmd = ArgS(args, 1);
|
||||
if (sub_cmd == "GETSLOTINFO") {
|
||||
return ClusterGetSlotInfo(args, cntx);
|
||||
}
|
||||
|
||||
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "DFLY CLUSTER"), kSyntaxErrType);
|
||||
}
|
||||
|
||||
void DflyCmd::ClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx) {
|
||||
if (args.size() == 3) {
|
||||
return (*cntx)->SendError(facade::WrongNumArgsError("DFLY CLUSTER GETSLOTINFO"),
|
||||
kSyntaxErrType);
|
||||
}
|
||||
ToUpper(&args[2]);
|
||||
string_view slots_str = ArgS(args, 2);
|
||||
if (slots_str != "SLOTS") {
|
||||
return (*cntx)->SendError(kSyntaxErr, kSyntaxErrType);
|
||||
}
|
||||
|
||||
vector<std::pair<SlotId, SlotStats>> slots_stats;
|
||||
for (size_t i = 3; i < args.size(); ++i) {
|
||||
string_view slot_str = ArgS(args, i);
|
||||
uint32_t sid;
|
||||
if (!absl::SimpleAtoi(slot_str, &sid)) {
|
||||
return (*cntx)->SendError(kInvalidIntErr);
|
||||
}
|
||||
if (sid > ClusterConfig::kMaxSlotNum) {
|
||||
return (*cntx)->SendError("Invalid slot id");
|
||||
}
|
||||
slots_stats.push_back(make_pair(sid, SlotStats{}));
|
||||
}
|
||||
|
||||
Mutex mu;
|
||||
|
||||
auto cb = [&](auto*) {
|
||||
EngineShard* shard = EngineShard::tlocal();
|
||||
if (shard == nullptr)
|
||||
return;
|
||||
|
||||
lock_guard lk(mu);
|
||||
for (auto& [slot, data] : slots_stats) {
|
||||
data += shard->db_slice().GetSlotStats(slot);
|
||||
}
|
||||
};
|
||||
|
||||
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
|
||||
|
||||
(*cntx)->StartArray(slots_stats.size());
|
||||
|
||||
for (const auto& slot_data : slots_stats) {
|
||||
(*cntx)->StartArray(3);
|
||||
(*cntx)->SendBulkString(absl::StrCat(slot_data.first));
|
||||
(*cntx)->SendBulkString("key_count");
|
||||
(*cntx)->SendBulkString(absl::StrCat(slot_data.second.key_count));
|
||||
}
|
||||
}
|
||||
|
||||
OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
|
||||
DCHECK(!flow->full_sync_fb.IsJoinable());
|
||||
|
||||
|
|
|
@ -163,6 +163,12 @@ class DflyCmd {
|
|||
// Return journal records num sent for each flow of replication.
|
||||
void ReplicaOffset(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
// Runs DFLY CLUSTER sub commands
|
||||
void ClusterManagmentCmd(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
// CLUSTER GETSLOTINFO command
|
||||
void ClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
// Start full sync in thread. Start FullSyncFb. Called for each flow.
|
||||
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);
|
||||
|
||||
|
|
|
@ -33,10 +33,21 @@ DbTableStats& DbTableStats::operator+=(const DbTableStats& o) {
|
|||
return *this;
|
||||
}
|
||||
|
||||
SlotStats& SlotStats::operator+=(const SlotStats& o) {
|
||||
constexpr size_t kDbSz = sizeof(SlotStats);
|
||||
static_assert(kDbSz == 8);
|
||||
|
||||
ADD(key_count);
|
||||
return *this;
|
||||
}
|
||||
|
||||
DbTable::DbTable(std::pmr::memory_resource* mr)
|
||||
: prime(kInitSegmentLog, detail::PrimeTablePolicy{}, mr),
|
||||
expire(0, detail::ExpireTablePolicy{}, mr), mcflag(0, detail::ExpireTablePolicy{}, mr),
|
||||
top_keys({.enabled = absl::GetFlag(FLAGS_enable_top_keys_tracking)}) {
|
||||
if (ClusterConfig::IsClusterEnabled()) {
|
||||
slots_stats.resize(ClusterConfig::kMaxSlotNum + 1);
|
||||
}
|
||||
}
|
||||
|
||||
DbTable::~DbTable() {
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
|
||||
#include "core/expire_period.h"
|
||||
#include "core/intent_lock.h"
|
||||
#include "server/cluster/cluster_config.h"
|
||||
#include "server/conn_context.h"
|
||||
#include "server/detail/table.h"
|
||||
#include "server/top_keys.h"
|
||||
|
@ -36,6 +37,11 @@ inline bool IsValid(ExpireIterator it) {
|
|||
return !it.is_done();
|
||||
}
|
||||
|
||||
struct SlotStats {
|
||||
uint64_t key_count = 0;
|
||||
SlotStats& operator+=(const SlotStats& o);
|
||||
};
|
||||
|
||||
struct DbTableStats {
|
||||
// Number of inline keys.
|
||||
uint64_t inline_keys = 0;
|
||||
|
@ -70,6 +76,7 @@ struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_coun
|
|||
absl::flat_hash_map<std::string, std::vector<ConnectionState::ExecInfo*>> watched_keys;
|
||||
|
||||
mutable DbTableStats stats;
|
||||
std::vector<SlotStats> slots_stats;
|
||||
ExpireTable::Cursor expire_cursor;
|
||||
PrimeTable::Cursor prime_cursor;
|
||||
|
||||
|
|
Loading…
Reference in a new issue