mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
feat: Memory stats (#2162)
This commit is contained in:
parent
4b685aa809
commit
5ca2be1185
9 changed files with 226 additions and 35 deletions
2
helio
2
helio
|
@ -1 +1 @@
|
||||||
Subproject commit 7d206c88fcbed68ce899971cba802ba17b49ceae
|
Subproject commit 88514b9270edc194ad9ea018e614e2d1b17b3962
|
75
src/core/size_tracking_channel.h
Normal file
75
src/core/size_tracking_channel.h
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
// Copyright 2023, DragonflyDB authors. All rights reserved.
|
||||||
|
// See LICENSE for licensing terms.
|
||||||
|
//
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
|
||||||
|
#include "core/fibers.h"
|
||||||
|
|
||||||
|
namespace dfly {
|
||||||
|
|
||||||
|
// SimpleQueue-like interface, but also keeps track over the size of Ts it owns.
|
||||||
|
// It has a slightly less efficient TryPush() API as it forces construction of Ts even if they are
|
||||||
|
// not pushed.
|
||||||
|
// T must have a .size() method, which should return the heap-allocated size of T, excluding
|
||||||
|
// anything included in sizeof(T). We could generalize this in the future.
|
||||||
|
template <typename T, typename Queue = folly::ProducerConsumerQueue<T>> class SizeTrackingChannel {
|
||||||
|
public:
|
||||||
|
SizeTrackingChannel(size_t n, unsigned num_producers = 1) : queue_(n, num_producers) {
|
||||||
|
}
|
||||||
|
|
||||||
|
// Here and below, we must accept a T instead of building it from variadic args, as we need to
|
||||||
|
// know its size in case it is added.
|
||||||
|
void Push(T t) noexcept {
|
||||||
|
size_.fetch_add(t.size(), std::memory_order_relaxed);
|
||||||
|
queue_.Push(std::move(t));
|
||||||
|
}
|
||||||
|
|
||||||
|
bool TryPush(T t) noexcept {
|
||||||
|
const size_t size = t.size();
|
||||||
|
if (queue_.TryPush(std::move(t))) {
|
||||||
|
size_.fetch_add(size, std::memory_order_relaxed);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Pop(T& dest) {
|
||||||
|
if (queue_.Pop(dest)) {
|
||||||
|
size_.fetch_sub(dest.size(), std::memory_order_relaxed);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void StartClosing() {
|
||||||
|
queue_.StartClosing();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool TryPop(T& dest) {
|
||||||
|
if (queue_.TryPop(dest)) {
|
||||||
|
size_.fetch_sub(dest.size(), std::memory_order_relaxed);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool IsClosing() const {
|
||||||
|
return queue_.IsClosing();
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t GetSize() const {
|
||||||
|
return queue_.Capacity() * sizeof(T) + size_.load(std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
SimpleChannel<T, Queue> queue_;
|
||||||
|
std::atomic<size_t> size_ = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace dfly
|
|
@ -187,6 +187,10 @@ class Connection : public util::Connection {
|
||||||
return name_;
|
return name_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
base::IoBuf::MemoryUsage GetMemoryUsage() const {
|
||||||
|
return io_buf_.GetMemoryUsage();
|
||||||
|
}
|
||||||
|
|
||||||
ConnectionContext* cntx();
|
ConnectionContext* cntx();
|
||||||
|
|
||||||
// Requests that at some point, this connection will be migrated to `dest` thread.
|
// Requests that at some point, this connection will be migrated to `dest` thread.
|
||||||
|
|
|
@ -7,9 +7,13 @@
|
||||||
#include <absl/strings/str_cat.h>
|
#include <absl/strings/str_cat.h>
|
||||||
#include <mimalloc.h>
|
#include <mimalloc.h>
|
||||||
|
|
||||||
|
#include "base/io_buf.h"
|
||||||
|
#include "facade/dragonfly_connection.h"
|
||||||
#include "facade/error.h"
|
#include "facade/error.h"
|
||||||
#include "server/engine_shard_set.h"
|
#include "server/engine_shard_set.h"
|
||||||
|
#include "server/server_family.h"
|
||||||
#include "server/server_state.h"
|
#include "server/server_state.h"
|
||||||
|
#include "server/snapshot.h"
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace facade;
|
using namespace facade;
|
||||||
|
@ -75,7 +79,7 @@ size_t MemoryUsage(PrimeIterator it) {
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
MemoryCmd::MemoryCmd(ServerFamily* owner, ConnectionContext* cntx) : cntx_(cntx) {
|
MemoryCmd::MemoryCmd(ServerFamily* owner, ConnectionContext* cntx) : cntx_(cntx), owner_(owner) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void MemoryCmd::Run(CmdArgList args) {
|
void MemoryCmd::Run(CmdArgList args) {
|
||||||
|
@ -84,6 +88,8 @@ void MemoryCmd::Run(CmdArgList args) {
|
||||||
if (sub_cmd == "HELP") {
|
if (sub_cmd == "HELP") {
|
||||||
string_view help_arr[] = {
|
string_view help_arr[] = {
|
||||||
"MEMORY <subcommand> [<arg> ...]. Subcommands are:",
|
"MEMORY <subcommand> [<arg> ...]. Subcommands are:",
|
||||||
|
"STATS",
|
||||||
|
" Shows breakdown of memory.",
|
||||||
"MALLOC-STATS [BACKING] [thread-id]",
|
"MALLOC-STATS [BACKING] [thread-id]",
|
||||||
" Show malloc stats for a heap residing in specified thread-id. 0 by default.",
|
" Show malloc stats for a heap residing in specified thread-id. 0 by default.",
|
||||||
" If BACKING is specified, show stats for the backing heap.",
|
" If BACKING is specified, show stats for the backing heap.",
|
||||||
|
@ -95,6 +101,10 @@ void MemoryCmd::Run(CmdArgList args) {
|
||||||
return (*cntx_)->SendSimpleStrArr(help_arr);
|
return (*cntx_)->SendSimpleStrArr(help_arr);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if (sub_cmd == "STATS") {
|
||||||
|
return Stats();
|
||||||
|
}
|
||||||
|
|
||||||
if (sub_cmd == "USAGE" && args.size() > 1) {
|
if (sub_cmd == "USAGE" && args.size() > 1) {
|
||||||
string_view key = ArgS(args, 1);
|
string_view key = ArgS(args, 1);
|
||||||
return Usage(key);
|
return Usage(key);
|
||||||
|
@ -143,6 +153,100 @@ void MemoryCmd::Run(CmdArgList args) {
|
||||||
return (*cntx_)->SendError(err, kSyntaxErrType);
|
return (*cntx_)->SendError(err, kSyntaxErrType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
struct ConnectionMemoryUsage {
|
||||||
|
size_t connection_count = 0;
|
||||||
|
size_t pipelined_bytes = 0;
|
||||||
|
base::IoBuf::MemoryUsage connections_memory;
|
||||||
|
|
||||||
|
size_t replication_connection_count = 0;
|
||||||
|
base::IoBuf::MemoryUsage replication_memory;
|
||||||
|
};
|
||||||
|
|
||||||
|
ConnectionMemoryUsage GetConnectionMemoryUsage(ServerFamily* server) {
|
||||||
|
Mutex mu;
|
||||||
|
ConnectionMemoryUsage mem ABSL_GUARDED_BY(mu);
|
||||||
|
|
||||||
|
for (auto* listener : server->GetListeners()) {
|
||||||
|
listener->TraverseConnections([&](unsigned thread_index, util::Connection* conn) {
|
||||||
|
auto* dfly_conn = static_cast<facade::Connection*>(conn);
|
||||||
|
auto* cntx = static_cast<ConnectionContext*>(dfly_conn->cntx());
|
||||||
|
lock_guard lock(mu);
|
||||||
|
|
||||||
|
if (cntx->replication_flow == nullptr) {
|
||||||
|
mem.connection_count++;
|
||||||
|
mem.connections_memory += dfly_conn->GetMemoryUsage();
|
||||||
|
} else {
|
||||||
|
mem.replication_connection_count++;
|
||||||
|
mem.replication_memory += dfly_conn->GetMemoryUsage();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cntx != nullptr) {
|
||||||
|
mem.pipelined_bytes += cntx->conn_state.exec_info.body.capacity() * sizeof(StoredCmd);
|
||||||
|
for (const auto& pipeline : cntx->conn_state.exec_info.body) {
|
||||||
|
mem.pipelined_bytes += pipeline.UsedHeapMemory();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return mem;
|
||||||
|
}
|
||||||
|
|
||||||
|
void PushMemoryUsageStats(const base::IoBuf::MemoryUsage& mem, string_view prefix, size_t total,
|
||||||
|
vector<pair<string, size_t>>* stats) {
|
||||||
|
stats->push_back({absl::StrCat(prefix, ".total_bytes"), total});
|
||||||
|
stats->push_back({absl::StrCat(prefix, ".consumed_bytes"), mem.consumed});
|
||||||
|
stats->push_back({absl::StrCat(prefix, ".pending_input_bytes"), mem.input_length});
|
||||||
|
stats->push_back({absl::StrCat(prefix, ".pending_output_bytes"), mem.append_length});
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
void MemoryCmd::Stats() {
|
||||||
|
vector<pair<string, size_t>> stats;
|
||||||
|
stats.reserve(25);
|
||||||
|
auto server_metrics = owner_->GetMetrics();
|
||||||
|
|
||||||
|
// RSS
|
||||||
|
stats.push_back({"rss_bytes", rss_mem_current.load(memory_order_relaxed)});
|
||||||
|
stats.push_back({"rss_peak_bytes", rss_mem_peak.load(memory_order_relaxed)});
|
||||||
|
|
||||||
|
// Used by DbShards and DashTable
|
||||||
|
stats.push_back({"data_bytes", used_mem_current.load(memory_order_relaxed)});
|
||||||
|
stats.push_back({"data_peak_bytes", used_mem_peak.load(memory_order_relaxed)});
|
||||||
|
|
||||||
|
ConnectionMemoryUsage connection_memory = GetConnectionMemoryUsage(owner_);
|
||||||
|
|
||||||
|
// Connection stats, excluding replication connections
|
||||||
|
stats.push_back({"connections.count", connection_memory.connection_count});
|
||||||
|
PushMemoryUsageStats(
|
||||||
|
connection_memory.connections_memory, "connections",
|
||||||
|
connection_memory.connections_memory.GetTotalSize() + connection_memory.pipelined_bytes,
|
||||||
|
&stats);
|
||||||
|
stats.push_back({"connections.pipeline_bytes", connection_memory.pipelined_bytes});
|
||||||
|
|
||||||
|
// Replication connection stats
|
||||||
|
stats.push_back(
|
||||||
|
{"replication.connections_count", connection_memory.replication_connection_count});
|
||||||
|
PushMemoryUsageStats(connection_memory.replication_memory, "replication",
|
||||||
|
connection_memory.replication_memory.GetTotalSize(), &stats);
|
||||||
|
|
||||||
|
atomic<size_t> serialization_memory = 0;
|
||||||
|
shard_set->pool()->AwaitFiberOnAll(
|
||||||
|
[&](auto*) { serialization_memory.fetch_add(SliceSnapshot::GetThreadLocalMemoryUsage()); });
|
||||||
|
|
||||||
|
// Serialization stats, including both replication-related serialization and saving to RDB files.
|
||||||
|
stats.push_back({"serialization", serialization_memory.load()});
|
||||||
|
|
||||||
|
(*cntx_)->StartCollection(stats.size(), RedisReplyBuilder::MAP);
|
||||||
|
for (const auto& [k, v] : stats) {
|
||||||
|
(*cntx_)->SendBulkString(k);
|
||||||
|
(*cntx_)->SendLong(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void MemoryCmd::Usage(std::string_view key) {
|
void MemoryCmd::Usage(std::string_view key) {
|
||||||
ShardId sid = Shard(key, shard_set->size());
|
ShardId sid = Shard(key, shard_set->size());
|
||||||
ssize_t memory_usage = shard_set->pool()->at(sid)->AwaitBrief([key, this]() -> ssize_t {
|
ssize_t memory_usage = shard_set->pool()->at(sid)->AwaitBrief([key, this]() -> ssize_t {
|
||||||
|
|
|
@ -17,9 +17,11 @@ class MemoryCmd {
|
||||||
void Run(CmdArgList args);
|
void Run(CmdArgList args);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
void Stats();
|
||||||
void Usage(std::string_view key);
|
void Usage(std::string_view key);
|
||||||
|
|
||||||
ConnectionContext* cntx_;
|
ConnectionContext* cntx_;
|
||||||
|
ServerFamily* owner_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
|
@ -950,10 +950,6 @@ class RdbSaver::Impl {
|
||||||
// Multi entry compression is available only on df snapshot, this will
|
// Multi entry compression is available only on df snapshot, this will
|
||||||
// make snapshot size smaller and opreation faster.
|
// make snapshot size smaller and opreation faster.
|
||||||
CompressionMode compression_mode_;
|
CompressionMode compression_mode_;
|
||||||
|
|
||||||
struct Stats {
|
|
||||||
std::atomic<size_t> pulled_bytes{0};
|
|
||||||
} stats_;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// We pass K=sz to say how many producers are pushing data in order to maintain
|
// We pass K=sz to say how many producers are pushing data in order to maintain
|
||||||
|
@ -1043,8 +1039,6 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
DVLOG(2) << "Pulled " << record->id;
|
DVLOG(2) << "Pulled " << record->id;
|
||||||
stats_.pulled_bytes.fetch_add(record->value.size(), memory_order_relaxed);
|
|
||||||
|
|
||||||
io_error = sink_->Write(io::Buffer(record->value));
|
io_error = sink_->Write(io::Buffer(record->value));
|
||||||
if (io_error) {
|
if (io_error) {
|
||||||
break;
|
break;
|
||||||
|
@ -1052,17 +1046,12 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
|
||||||
} while ((record = records_popper.TryPop()));
|
} while ((record = records_popper.TryPop()));
|
||||||
} // while (records_popper.Pop())
|
} // while (records_popper.Pop())
|
||||||
|
|
||||||
size_t pushed_bytes = 0;
|
|
||||||
for (auto& ptr : shard_snapshots_) {
|
for (auto& ptr : shard_snapshots_) {
|
||||||
ptr->Join();
|
ptr->Join();
|
||||||
pushed_bytes += ptr->pushed_bytes();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DCHECK(!record.has_value() || !channel_.TryPop(*record));
|
DCHECK(!record.has_value() || !channel_.TryPop(*record));
|
||||||
|
|
||||||
VLOG(1) << "Channel pulled bytes: " << stats_.pulled_bytes.load(memory_order_relaxed)
|
|
||||||
<< " pushed bytes: " << pushed_bytes;
|
|
||||||
|
|
||||||
return io_error;
|
return io_error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1105,13 +1094,12 @@ void RdbSaver::Impl::Cancel() {
|
||||||
// This function is called from connection thread when info command is invoked.
|
// This function is called from connection thread when info command is invoked.
|
||||||
// All accessed variableds must be thread safe, as they are fetched not from the rdb saver thread.
|
// All accessed variableds must be thread safe, as they are fetched not from the rdb saver thread.
|
||||||
size_t RdbSaver::Impl::GetTotalBuffersSize() const {
|
size_t RdbSaver::Impl::GetTotalBuffersSize() const {
|
||||||
std::atomic<size_t> pushed_bytes{0};
|
std::atomic<size_t> channel_bytes{0};
|
||||||
std::atomic<size_t> serializer_bytes{0};
|
std::atomic<size_t> serializer_bytes{0};
|
||||||
size_t pulled_bytes = stats_.pulled_bytes.load(memory_order_relaxed);
|
|
||||||
|
|
||||||
auto cb = [this, &pushed_bytes, &serializer_bytes](ShardId sid) {
|
auto cb = [this, &channel_bytes, &serializer_bytes](ShardId sid) {
|
||||||
auto& snapshot = shard_snapshots_[sid];
|
auto& snapshot = shard_snapshots_[sid];
|
||||||
pushed_bytes.fetch_add(snapshot->pushed_bytes(), memory_order_relaxed);
|
channel_bytes.fetch_add(snapshot->GetTotalChannelCapacity(), memory_order_relaxed);
|
||||||
serializer_bytes.store(snapshot->GetTotalBufferCapacity(), memory_order_relaxed);
|
serializer_bytes.store(snapshot->GetTotalBufferCapacity(), memory_order_relaxed);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1119,17 +1107,11 @@ size_t RdbSaver::Impl::GetTotalBuffersSize() const {
|
||||||
cb(0);
|
cb(0);
|
||||||
} else {
|
} else {
|
||||||
shard_set->RunBriefInParallel([&](EngineShard* es) { cb(es->shard_id()); });
|
shard_set->RunBriefInParallel([&](EngineShard* es) { cb(es->shard_id()); });
|
||||||
// Note that pushed bytes and pulled bytes values are fetched at different times, as we need to
|
|
||||||
// calc the pushed bytes using RunBriefInParallel.
|
|
||||||
// pulled bytes might be higher untill we return here from RunBriefInParallel.
|
|
||||||
}
|
}
|
||||||
size_t total_bytes = pushed_bytes.load(memory_order_relaxed) +
|
|
||||||
serializer_bytes.load(memory_order_relaxed) - pulled_bytes;
|
|
||||||
VLOG(2) << "pushed_bytes:" << pushed_bytes.load(memory_order_relaxed)
|
|
||||||
<< " serializer_bytes: " << serializer_bytes.load(memory_order_relaxed)
|
|
||||||
<< " pulled_bytes: " << pulled_bytes << " total_bytes:" << total_bytes;
|
|
||||||
|
|
||||||
return total_bytes;
|
VLOG(2) << "channel_bytes:" << channel_bytes.load(memory_order_relaxed)
|
||||||
|
<< " serializer_bytes: " << serializer_bytes.load(memory_order_relaxed);
|
||||||
|
return channel_bytes.load(memory_order_relaxed) + serializer_bytes.load(memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) {
|
RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) {
|
||||||
|
|
|
@ -182,6 +182,10 @@ class ServerFamily {
|
||||||
return dfly_cmd_.get();
|
return dfly_cmd_.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const std::vector<facade::Listener*>& GetListeners() const {
|
||||||
|
return listeners_;
|
||||||
|
}
|
||||||
|
|
||||||
bool HasReplica() const;
|
bool HasReplica() const;
|
||||||
std::optional<Replica::Info> GetReplicaInfo() const;
|
std::optional<Replica::Info> GetReplicaInfo() const;
|
||||||
std::string GetReplicaMasterId() const;
|
std::string GetReplicaMasterId() const;
|
||||||
|
|
|
@ -25,12 +25,26 @@ using namespace std;
|
||||||
using namespace util;
|
using namespace util;
|
||||||
using namespace chrono_literals;
|
using namespace chrono_literals;
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
thread_local absl::flat_hash_set<SliceSnapshot*> tl_slice_snapshots;
|
||||||
|
} // namespace
|
||||||
|
|
||||||
SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode)
|
SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode)
|
||||||
: db_slice_(slice), dest_(dest), compression_mode_(compression_mode) {
|
: db_slice_(slice), dest_(dest), compression_mode_(compression_mode) {
|
||||||
db_array_ = slice->databases();
|
db_array_ = slice->databases();
|
||||||
|
tl_slice_snapshots.insert(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
SliceSnapshot::~SliceSnapshot() {
|
SliceSnapshot::~SliceSnapshot() {
|
||||||
|
tl_slice_snapshots.erase(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t SliceSnapshot::GetThreadLocalMemoryUsage() {
|
||||||
|
size_t mem = 0;
|
||||||
|
for (SliceSnapshot* snapshot : tl_slice_snapshots) {
|
||||||
|
mem += snapshot->GetTotalBufferCapacity() + snapshot->GetTotalChannelCapacity();
|
||||||
|
}
|
||||||
|
return mem;
|
||||||
}
|
}
|
||||||
|
|
||||||
void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) {
|
void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) {
|
||||||
|
@ -274,8 +288,6 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) {
|
||||||
if (serialized == 0)
|
if (serialized == 0)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
stats_.pushed_bytes += serialized;
|
|
||||||
|
|
||||||
auto id = rec_id_++;
|
auto id = rec_id_++;
|
||||||
DVLOG(2) << "Pushed " << id;
|
DVLOG(2) << "Pushed " << id;
|
||||||
DbRecord db_rec{.id = id, .value = std::move(sfile.val)};
|
DbRecord db_rec{.id = id, .value = std::move(sfile.val)};
|
||||||
|
@ -335,4 +347,8 @@ size_t SliceSnapshot::GetTotalBufferCapacity() const {
|
||||||
return serializer_->GetTotalBufferCapacity();
|
return serializer_->GetTotalBufferCapacity();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t SliceSnapshot::GetTotalChannelCapacity() const {
|
||||||
|
return dest_->GetSize();
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
|
@ -8,6 +8,7 @@
|
||||||
#include <bitset>
|
#include <bitset>
|
||||||
|
|
||||||
#include "base/pod_array.h"
|
#include "base/pod_array.h"
|
||||||
|
#include "core/size_tracking_channel.h"
|
||||||
#include "io/file.h"
|
#include "io/file.h"
|
||||||
#include "server/db_slice.h"
|
#include "server/db_slice.h"
|
||||||
#include "server/rdb_save.h"
|
#include "server/rdb_save.h"
|
||||||
|
@ -50,13 +51,20 @@ class SliceSnapshot {
|
||||||
struct DbRecord {
|
struct DbRecord {
|
||||||
uint64_t id;
|
uint64_t id;
|
||||||
std::string value;
|
std::string value;
|
||||||
|
|
||||||
|
size_t size() const {
|
||||||
|
constexpr size_t kSmallStringOptSize = 15;
|
||||||
|
return value.capacity() > kSmallStringOptSize ? value.capacity() : 0UL;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
using RecordChannel = SimpleChannel<DbRecord, base::mpmc_bounded_queue<DbRecord>>;
|
using RecordChannel = SizeTrackingChannel<DbRecord, base::mpmc_bounded_queue<DbRecord>>;
|
||||||
|
|
||||||
SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode);
|
SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode);
|
||||||
~SliceSnapshot();
|
~SliceSnapshot();
|
||||||
|
|
||||||
|
static size_t GetThreadLocalMemoryUsage();
|
||||||
|
|
||||||
// Initialize snapshot, start bucket iteration fiber, register listeners.
|
// Initialize snapshot, start bucket iteration fiber, register listeners.
|
||||||
// In journal streaming mode it needs to be stopped by either Stop or Cancel.
|
// In journal streaming mode it needs to be stopped by either Stop or Cancel.
|
||||||
void Start(bool stream_journal, const Cancellation* cll);
|
void Start(bool stream_journal, const Cancellation* cll);
|
||||||
|
@ -114,15 +122,12 @@ class SliceSnapshot {
|
||||||
return snapshot_version_;
|
return snapshot_version_;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t pushed_bytes() const {
|
|
||||||
return stats_.pushed_bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
const RdbTypeFreqMap& freq_map() const {
|
const RdbTypeFreqMap& freq_map() const {
|
||||||
return type_freq_map_;
|
return type_freq_map_;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t GetTotalBufferCapacity() const;
|
size_t GetTotalBufferCapacity() const; // In bytes
|
||||||
|
size_t GetTotalChannelCapacity() const; // In bytes
|
||||||
|
|
||||||
private:
|
private:
|
||||||
DbSlice* db_slice_;
|
DbSlice* db_slice_;
|
||||||
|
@ -148,7 +153,6 @@ class SliceSnapshot {
|
||||||
uint64_t rec_id_ = 0;
|
uint64_t rec_id_ = 0;
|
||||||
|
|
||||||
struct Stats {
|
struct Stats {
|
||||||
size_t pushed_bytes = 0;
|
|
||||||
size_t loop_serialized = 0, skipped = 0, side_saved = 0;
|
size_t loop_serialized = 0, skipped = 0, side_saved = 0;
|
||||||
size_t savecb_calls = 0;
|
size_t savecb_calls = 0;
|
||||||
} stats_;
|
} stats_;
|
||||||
|
|
Loading…
Reference in a new issue