mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-15 17:51:06 +00:00
feat: allow reading offloaded strings without loading to the store (#2449)
* feat: allow reading offloaded strings without loading to the store * Add disk stats to IoMgr --------- Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
b777d31a98
commit
1074bcb30b
9 changed files with 93 additions and 21 deletions
|
@ -231,10 +231,18 @@ void TriggerJournalWriteToSink() {
|
|||
|
||||
#define ADD(x) (x) += o.x
|
||||
|
||||
TieredStats& TieredStats::operator+=(const TieredStats& o) {
|
||||
static_assert(sizeof(TieredStats) == 56);
|
||||
IoMgrStats& IoMgrStats::operator+=(const IoMgrStats& rhs) {
|
||||
static_assert(sizeof(IoMgrStats) == 16);
|
||||
|
||||
read_total += rhs.read_total;
|
||||
read_delay_usec += rhs.read_delay_usec;
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
TieredStats& TieredStats::operator+=(const TieredStats& o) {
|
||||
static_assert(sizeof(TieredStats) == 48);
|
||||
|
||||
ADD(tiered_reads);
|
||||
ADD(tiered_writes);
|
||||
ADD(storage_capacity);
|
||||
ADD(storage_reserved);
|
||||
|
|
|
@ -122,8 +122,14 @@ void RecordExpiry(DbIndex dbid, std::string_view key);
|
|||
// Must be called from shard thread of journal to sink.
|
||||
void TriggerJournalWriteToSink();
|
||||
|
||||
struct IoMgrStats {
|
||||
uint64_t read_total = 0;
|
||||
uint64_t read_delay_usec = 0;
|
||||
|
||||
IoMgrStats& operator+=(const IoMgrStats& rhs);
|
||||
};
|
||||
|
||||
struct TieredStats {
|
||||
uint64_t tiered_reads = 0;
|
||||
uint64_t tiered_writes = 0;
|
||||
|
||||
size_t storage_capacity = 0;
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "facade/facade_types.h"
|
||||
#include "util/fibers/uring_proactor.h"
|
||||
|
||||
ABSL_FLAG(bool, backing_file_direct, false, "If true uses O_DIRECT to open backing files");
|
||||
|
||||
|
@ -45,23 +46,23 @@ error_code IoMgr::Open(const string& path) {
|
|||
if (absl::GetFlag(FLAGS_backing_file_direct)) {
|
||||
kFlags |= O_DIRECT;
|
||||
}
|
||||
auto res = OpenLinux(path, kFlags, 0666);
|
||||
auto res = fb2::OpenLinux(path, kFlags, 0666);
|
||||
if (!res)
|
||||
return res.error();
|
||||
backing_file_ = std::move(res.value());
|
||||
Proactor* proactor = (Proactor*)ProactorBase::me();
|
||||
{
|
||||
FiberCall fc(proactor);
|
||||
fb2::FiberCall fc(proactor);
|
||||
fc->PrepFallocate(backing_file_->fd(), 0, 0, kInitialSize);
|
||||
FiberCall::IoResult io_res = fc.Get();
|
||||
fb2::FiberCall::IoResult io_res = fc.Get();
|
||||
if (io_res < 0) {
|
||||
return error_code{-io_res, system_category()};
|
||||
}
|
||||
}
|
||||
{
|
||||
FiberCall fc(proactor);
|
||||
fb2::FiberCall fc(proactor);
|
||||
fc->PrepFadvise(backing_file_->fd(), 0, 0, POSIX_FADV_RANDOM);
|
||||
FiberCall::IoResult io_res = fc.Get();
|
||||
fb2::FiberCall::IoResult io_res = fc.Get();
|
||||
if (io_res < 0) {
|
||||
return error_code{-io_res, system_category()};
|
||||
}
|
||||
|
@ -118,7 +119,13 @@ error_code IoMgr::Read(size_t offset, io::MutableBytes dest) {
|
|||
|
||||
uint8_t* space = (uint8_t*)mi_malloc_aligned(space_needed, 4096);
|
||||
iovec v{.iov_base = space, .iov_len = space_needed};
|
||||
uint64_t from_ts = ProactorBase::GetMonotonicTimeNs();
|
||||
error_code ec = backing_file_->Read(&v, 1, read_offs, 0);
|
||||
uint64_t end_ts = ProactorBase::GetMonotonicTimeNs();
|
||||
|
||||
stats_.read_delay_usec += (end_ts - from_ts) / 1000;
|
||||
++stats_.read_total;
|
||||
|
||||
if (ec) {
|
||||
mi_free(space);
|
||||
return ec;
|
||||
|
@ -130,7 +137,13 @@ error_code IoMgr::Read(size_t offset, io::MutableBytes dest) {
|
|||
}
|
||||
|
||||
iovec v{.iov_base = dest.data(), .iov_len = dest.size()};
|
||||
return backing_file_->Read(&v, 1, offset, 0);
|
||||
uint64_t from_ts = ProactorBase::GetMonotonicTimeNs();
|
||||
auto ec = backing_file_->Read(&v, 1, offset, 0);
|
||||
uint64_t end_ts = ProactorBase::GetMonotonicTimeNs();
|
||||
|
||||
stats_.read_delay_usec += (end_ts - from_ts) / 1000;
|
||||
++stats_.read_total;
|
||||
return ec;
|
||||
}
|
||||
|
||||
void IoMgr::Shutdown() {
|
||||
|
|
|
@ -7,7 +7,8 @@
|
|||
#include <functional>
|
||||
#include <string>
|
||||
|
||||
#include "core/uring.h"
|
||||
#include "server/common.h"
|
||||
#include "util/fibers/uring_file.h"
|
||||
|
||||
namespace dfly {
|
||||
|
||||
|
@ -46,8 +47,12 @@ class IoMgr {
|
|||
return flags.grow_progress;
|
||||
}
|
||||
|
||||
const IoMgrStats& GetStats() const {
|
||||
return stats_;
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<LinuxFile> backing_file_;
|
||||
std::unique_ptr<util::fb2::LinuxFile> backing_file_;
|
||||
size_t sz_ = 0;
|
||||
|
||||
union {
|
||||
|
@ -56,6 +61,8 @@ class IoMgr {
|
|||
uint8_t grow_progress : 1;
|
||||
} flags;
|
||||
};
|
||||
|
||||
IoMgrStats stats_;
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -1076,11 +1076,19 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
|
|||
break;
|
||||
}
|
||||
|
||||
AppendMetricValue(kReplyLatency, stats.total_duration / 1'000'000, {"type"}, {type},
|
||||
AppendMetricValue(kReplyLatency, double(stats.total_duration) * 1e-6, {"type"}, {type},
|
||||
&send_latency_metrics);
|
||||
AppendMetricValue(kReplyCount, stats.count, {"type"}, {type}, &send_count_metrics);
|
||||
}
|
||||
|
||||
// Tiered metrics.
|
||||
if (m.disk_stats.read_total > 0) {
|
||||
AppendMetricWithoutLabels("tiered_reads_total", "", m.disk_stats.read_total,
|
||||
MetricType::COUNTER, &resp->body());
|
||||
AppendMetricWithoutLabels("tiered_reads_latency_seconds", "",
|
||||
double(m.disk_stats.read_delay_usec) * 1e-6, MetricType::COUNTER,
|
||||
&resp->body());
|
||||
}
|
||||
absl::StrAppend(&resp->body(), send_latency_metrics);
|
||||
absl::StrAppend(&resp->body(), send_count_metrics);
|
||||
}
|
||||
|
@ -1143,7 +1151,7 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
|
|||
AppendMetricWithoutLabels("fiber_longrun_total", "", m.fiber_longrun_cnt, MetricType::COUNTER,
|
||||
&resp->body());
|
||||
double longrun_seconds = m.fiber_longrun_usec * 1e-6;
|
||||
AppendMetricWithoutLabels("fiber_longrun_seconds_total", "", longrun_seconds, MetricType::COUNTER,
|
||||
AppendMetricWithoutLabels("fiber_longrun_seconds", "", longrun_seconds, MetricType::COUNTER,
|
||||
&resp->body());
|
||||
AppendMetricWithoutLabels("tx_queue_len", "", m.tx_queue_len, MetricType::GAUGE, &resp->body());
|
||||
|
||||
|
@ -1674,8 +1682,11 @@ Metrics ServerFamily::GetMetrics() const {
|
|||
MergeDbSliceStats(shard->db_slice().GetStats(), &result);
|
||||
result.shard_stats += shard->stats();
|
||||
|
||||
if (shard->tiered_storage())
|
||||
if (shard->tiered_storage()) {
|
||||
result.tiered_stats += shard->tiered_storage()->GetStats();
|
||||
result.disk_stats += shard->tiered_storage()->GetDiskStats();
|
||||
}
|
||||
|
||||
if (shard->search_indices())
|
||||
result.search_stats += shard->search_indices()->GetStats();
|
||||
|
||||
|
@ -1877,7 +1888,8 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (should_enter("TIERED", true)) {
|
||||
append("tiered_entries", total.tiered_entries);
|
||||
append("tiered_bytes", total.tiered_size);
|
||||
append("tiered_reads", m.tiered_stats.tiered_reads);
|
||||
append("tiered_reads", m.disk_stats.read_total);
|
||||
append("tiered_read_latency_usec", m.disk_stats.read_delay_usec);
|
||||
append("tiered_writes", m.tiered_stats.tiered_writes);
|
||||
append("tiered_reserved", m.tiered_stats.storage_reserved);
|
||||
append("tiered_capacity", m.tiered_stats.storage_capacity);
|
||||
|
|
|
@ -75,6 +75,7 @@ struct Metrics {
|
|||
|
||||
facade::FacadeStats facade_stats; // client stats and buffer sizes
|
||||
TieredStats tiered_stats; // stats for tiered storage
|
||||
IoMgrStats disk_stats; // disk stats for io_mgr
|
||||
SearchStats search_stats;
|
||||
ServerState::Stats coordinator_stats; // stats on transaction running
|
||||
PeakStats peak_stats;
|
||||
|
|
|
@ -16,8 +16,8 @@ extern "C" {
|
|||
#include <cstdint>
|
||||
#include <tuple>
|
||||
|
||||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "redis/util.h"
|
||||
#include "server/acl/acl_commands_def.h"
|
||||
#include "server/command_registry.h"
|
||||
#include "server/conn_context.h"
|
||||
|
@ -27,6 +27,10 @@ extern "C" {
|
|||
#include "server/tiered_storage.h"
|
||||
#include "server/transaction.h"
|
||||
|
||||
ABSL_FLAG(bool, tiered_skip_prefetch, false,
|
||||
"If true, does not load offloaded string back to in-memory store during GET command."
|
||||
"For testing/development purposes only.");
|
||||
|
||||
namespace dfly {
|
||||
|
||||
namespace {
|
||||
|
@ -847,11 +851,28 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
|
|||
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<string> {
|
||||
auto op_args = t->GetOpArgs(shard);
|
||||
DbSlice& db_slice = op_args.shard->db_slice();
|
||||
auto res = db_slice.FindAndFetchReadOnly(op_args.db_cntx, key, OBJ_STRING);
|
||||
|
||||
OpResult<PrimeConstIterator> res;
|
||||
|
||||
// A temporary code that allows running dragonfly without filling up memory store
|
||||
// when reading data from disk.
|
||||
if (TieredStorage* tiered = shard->tiered_storage();
|
||||
tiered && absl::GetFlag(FLAGS_tiered_skip_prefetch)) {
|
||||
res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STRING);
|
||||
if (res && (*res)->second.IsExternal()) {
|
||||
auto [offset, size] = (*res)->second.GetExternalSlice();
|
||||
string blob(size, '\0');
|
||||
auto ec = tiered->Read(offset, size, blob.data());
|
||||
CHECK(!ec) << "TBD";
|
||||
return blob;
|
||||
}
|
||||
} else {
|
||||
res = db_slice.FindAndFetchReadOnly(op_args.db_cntx, key, OBJ_STRING);
|
||||
}
|
||||
|
||||
if (!res) {
|
||||
return res.status();
|
||||
}
|
||||
|
||||
return GetString((*res)->second);
|
||||
};
|
||||
|
||||
|
|
|
@ -327,7 +327,6 @@ error_code TieredStorage::Open(const string& base) {
|
|||
}
|
||||
|
||||
std::error_code TieredStorage::Read(size_t offset, size_t len, char* dest) {
|
||||
stats_.tiered_reads++;
|
||||
DVLOG(1) << "Read " << offset << " " << len;
|
||||
|
||||
return io_mgr_.Read(offset, io::MutableBytes{reinterpret_cast<uint8_t*>(dest), len});
|
||||
|
|
|
@ -43,8 +43,14 @@ class TieredStorage {
|
|||
|
||||
TieredStats GetStats() const;
|
||||
|
||||
const IoMgrStats& GetDiskStats() const {
|
||||
return io_mgr_.GetStats();
|
||||
}
|
||||
|
||||
void CancelAllIos(DbIndex db_index);
|
||||
|
||||
std::error_code Read(size_t offset, size_t len, char* dest);
|
||||
|
||||
private:
|
||||
class InflightWriteRequest;
|
||||
|
||||
|
@ -61,7 +67,6 @@ class TieredStorage {
|
|||
|
||||
void FinishIoRequest(int io_res, InflightWriteRequest* req);
|
||||
void SetExternal(DbIndex db_index, size_t item_offset, PrimeValue* dest);
|
||||
std::error_code Read(size_t offset, size_t len, char* dest);
|
||||
|
||||
DbSlice& db_slice_;
|
||||
IoMgr io_mgr_;
|
||||
|
|
Loading…
Reference in a new issue