1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore(tiering): Range functions + small refactoring (#3207)

This commit is contained in:
Vladislav 2024-07-22 18:36:11 +03:00 committed by GitHub
parent 7df6771eaa
commit f81a893368
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 161 additions and 179 deletions

View file

@ -851,12 +851,19 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddNew(const Context& cntx, string_view
.it = res.it, .exp_it = res.exp_it, .post_updater = std::move(res.post_updater)};
}
pair<int64_t, int64_t> DbSlice::ExpireParams::Calculate(int64_t now_ms) const {
int64_t DbSlice::ExpireParams::Cap(int64_t value, TimeUnit unit) {
return unit == TimeUnit::SEC ? min(value, kMaxExpireDeadlineSec)
: min(value, kMaxExpireDeadlineMs);
}
pair<int64_t, int64_t> DbSlice::ExpireParams::Calculate(uint64_t now_ms, bool cap) const {
if (persist)
return {0, 0};
int64_t msec = (unit == TimeUnit::SEC) ? value * 1000 : value;
int64_t now_msec = now_ms;
int64_t rel_msec = absolute ? msec - now_msec : msec;
if (cap)
rel_msec = Cap(rel_msec, TimeUnit::MSEC);
return make_pair(rel_msec, now_msec + rel_msec);
}
@ -872,7 +879,7 @@ OpResult<int64_t> DbSlice::UpdateExpire(const Context& cntx, Iterator prime_it,
}
auto [rel_msec, abs_msec] = params.Calculate(cntx.time_now_ms);
if (rel_msec > kMaxExpireDeadlineSec * 1000) {
if (rel_msec > kMaxExpireDeadlineMs) {
return OpStatus::OUT_OF_RANGE;
}

View file

@ -205,19 +205,27 @@ class DbSlice {
std::function<void(std::string_view, const Context&, const PrimeValue& pv)>;
struct ExpireParams {
int64_t value = INT64_MIN; // undefined
bool absolute = false;
TimeUnit unit = TimeUnit::SEC;
bool persist = false;
int32_t expire_options = 0; // ExpireFlags
bool IsDefined() const {
return persist || value > INT64_MIN;
}
static int64_t Cap(int64_t value, TimeUnit unit);
// Calculate relative and absolue timepoints.
std::pair<int64_t, int64_t> Calculate(int64_t now_msec) const;
std::pair<int64_t, int64_t> Calculate(uint64_t now_msec, bool cap = false) const;
// Return true if relative expiration is in the past
bool IsExpired(uint64_t now_msec) const {
return Calculate(now_msec, false).first < 0;
}
public:
int64_t value = INT64_MIN; // undefined
TimeUnit unit = TimeUnit::SEC;
bool absolute = false;
bool persist = false;
int32_t expire_options = 0; // ExpireFlags
};
DbSlice(uint32_t index, bool caching_mode, EngineShard* owner);

View file

@ -941,8 +941,8 @@ void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) {
return;
}
DbSlice::ExpireParams params{.value = int_arg,
.absolute = true,
.unit = TimeUnit::MSEC,
.absolute = true,
.expire_options = expire_options.value()};
auto cb = [&](Transaction* t, EngineShard* shard) {

View file

@ -63,7 +63,15 @@ string GetString(const PrimeValue& pv) {
return res;
}
template <typename T> T GetResult(std::variant<T, util::fb2::Future<T>> v) {
size_t SetRange(std::string* value, size_t start, std::string_view range) {
value->resize(max(value->size(), start + range.size()));
memcpy(value->data() + start, range.data(), range.size());
return value->size();
}
template <typename T> using TResult = std::variant<T, util::fb2::Future<T>>;
template <typename T> T GetResult(TResult<T> v) {
Overloaded ov{
[](T&& t) { return t; },
[](util::fb2::Future<T>&& future) { return future.Get(); },
@ -71,73 +79,93 @@ template <typename T> T GetResult(std::variant<T, util::fb2::Future<T>> v) {
return std::visit(ov, std::move(v));
}
OpResult<uint32_t> OpSetRange(const OpArgs& op_args, string_view key, size_t start,
string_view value) {
VLOG(2) << "SetRange(" << key << ", " << start << ", " << value << ")";
OpResult<TResult<size_t>> OpStrLen(const OpArgs& op_args, string_view key) {
auto& db_slice = op_args.GetDbSlice();
size_t range_len = start + value.size();
auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STRING);
RETURN_ON_BAD_STATUS(it_res);
if (range_len == 0) {
auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STRING);
if (it_res) {
return it_res.value()->second.Size();
} else {
return it_res.status();
}
// For external entries we have to enqueue reads because modify operations like append could be
// already pending.
// TODO: Optimize to return co.Size() if no modify operations are present
if (const auto& co = it_res.value()->second; co.IsExternal()) {
util::fb2::Future<size_t> fut;
op_args.shard->tiered_storage()->Read(
op_args.db_cntx.db_index, key, co,
[fut](const std::string& s) mutable { fut.Resolve(s.size()); });
return {std::move(fut)};
} else {
return {co.Size()};
}
}
OpResult<TResult<size_t>> OpSetRange(const OpArgs& op_args, string_view key, size_t start,
string_view range) {
VLOG(2) << "SetRange(" << key << ", " << start << ", " << range << ")";
auto& db_slice = op_args.GetDbSlice();
if (start + range.size() == 0) {
return OpStrLen(op_args, key);
}
auto op_res = db_slice.AddOrFind(op_args.db_cntx, key);
RETURN_ON_BAD_STATUS(op_res);
auto& res = *op_res;
string s;
if (res.is_new) {
s.resize(range_len);
if (res.it->second.IsExternal()) {
return {op_args.shard->tiered_storage()->Modify<size_t>(
op_args.db_cntx.db_index, key, res.it->second,
[start = start, range = string(range)](std::string* s) {
return SetRange(s, start, range);
})};
} else {
if (res.it->second.ObjType() != OBJ_STRING)
string value;
if (!res.is_new && res.it->second.ObjType() != OBJ_STRING)
return OpStatus::WRONG_TYPE;
s = GetString(res.it->second);
if (s.size() < range_len)
s.resize(range_len);
}
if (!res.is_new)
value = GetString(res.it->second);
memcpy(s.data() + start, value.data(), value.size());
res.it->second.SetString(s);
return res.it->second.Size();
size_t len = SetRange(&value, start, range);
res.it->second.SetString(value);
return {len};
}
}
OpResult<string> OpGetRange(const OpArgs& op_args, string_view key, int32_t start, int32_t end) {
OpResult<StringValue> OpGetRange(const OpArgs& op_args, string_view key, int32_t start,
int32_t end) {
auto read = [start, end](std::string_view slice) mutable -> string_view {
int32_t strlen = slice.size();
if (start < 0)
start = strlen + start;
if (end < 0)
end = strlen + end;
start = max(start, 0);
end = max(end, 0);
if (strlen == 0 || start > end || start >= strlen)
return "";
end = min(end, strlen - 1);
return slice.substr(start, end - start + 1);
};
auto& db_slice = op_args.GetDbSlice();
auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STRING);
if (!it_res.ok())
return it_res.status();
RETURN_ON_BAD_STATUS(it_res);
const CompactObj& co = it_res.value()->second;
size_t strlen = co.Size();
if (start < 0)
start = strlen + start;
if (end < 0)
end = strlen + end;
if (start < 0)
start = 0;
if (end < 0)
end = 0;
if (strlen == 0 || start > end || size_t(start) >= strlen) {
return OpStatus::OK;
if (const CompactObj& co = it_res.value()->second; co.IsExternal()) {
util::fb2::Future<std::string> fut;
op_args.shard->tiered_storage()->Read(
op_args.db_cntx.db_index, key, co,
[read, fut](const std::string& s) mutable { fut.Resolve(string{read(s)}); });
return {std::move(fut)};
} else {
string tmp;
string_view slice = co.GetSlice(&tmp);
return {string{read(slice)}};
}
if (size_t(end) >= strlen)
end = strlen - 1;
string tmp;
string_view slice = co.GetSlice(&tmp);
return string(slice.substr(start, end - start + 1));
};
size_t ExtendExisting(DbSlice::Iterator it, string_view key, string_view val, bool prepend) {
@ -250,20 +278,6 @@ OpResult<int64_t> OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr,
return new_val;
}
int64_t AbsExpiryToTtl(int64_t abs_expiry_time, bool as_milli) {
using std::chrono::duration_cast;
using std::chrono::milliseconds;
using std::chrono::seconds;
using std::chrono::system_clock;
if (as_milli) {
return abs_expiry_time -
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
} else {
return abs_expiry_time - duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
}
}
// Returns true if keys were set, false otherwise.
OpStatus OpMSet(const OpArgs& op_args, const ShardArgs& args) {
DCHECK(!args.Empty() && args.Size() % 2 == 0);
@ -475,10 +489,8 @@ SinkReplyBuilder::MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, bool f
// Extend key with value, either prepend or append. Return size of stored string
// after modification
OpResult<variant<size_t, util::fb2::Future<size_t>>> OpExtend(const OpArgs& op_args,
std::string_view key,
std::string_view value,
bool prepend) {
OpResult<TResult<size_t>> OpExtend(const OpArgs& op_args, std::string_view key,
std::string_view value, bool prepend) {
auto* shard = op_args.shard;
auto it_res = op_args.GetDbSlice().AddOrFind(op_args.db_cntx, key);
RETURN_ON_BAD_STATUS(it_res);
@ -509,7 +521,7 @@ struct GetReplies {
DCHECK(dynamic_cast<RedisReplyBuilder*>(rb));
}
void Send(OpResult<StringValue>&& res) const {
template <typename T> void Send(OpResult<T>&& res) const {
switch (res.status()) {
case OpStatus::OK:
return Send(std::move(res.value()));
@ -520,6 +532,10 @@ struct GetReplies {
}
}
void Send(TResult<size_t>&& val) const {
rb->SendLong(GetResult(std::move(val)));
}
void Send(StringValue&& val) const {
if (val.IsEmpty()) {
rb->SendNull();
@ -750,32 +766,17 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendError(InvalidExpireTime("set"));
}
bool is_ms = (opt[0] == 'P');
DbSlice::ExpireParams expiry{
.value = int_arg,
.unit = (opt[0] == 'P') ? TimeUnit::MSEC : TimeUnit::SEC,
.absolute = absl::EndsWith(opt, "AT"),
};
// for []AT we need to take expiration time as absolute from the value
// given check here and if the time is in the past, return OK but don't
// set it Note that the time pass here for PXAT is in milliseconds, we
// must not change it!
if (absl::EndsWith(opt, "AT")) {
int_arg = AbsExpiryToTtl(int_arg, is_ms);
if (int_arg < 0) {
// this happened in the past, just return, for some reason Redis
// reports OK in this case
return builder->SendStored();
}
}
// Redis reports just OK in this case
if (expiry.IsExpired(GetCurrentTimeMs()))
return builder->SendStored();
if (is_ms) {
if (int_arg > kMaxExpireDeadlineMs) {
int_arg = kMaxExpireDeadlineMs;
}
} else {
if (int_arg > kMaxExpireDeadlineSec) {
int_arg = kMaxExpireDeadlineSec;
}
int_arg *= 1000;
}
sparams.expire_after_ms = int_arg;
tie(sparams.expire_after_ms, ignore) = expiry.Calculate(GetCurrentTimeMs(), true);
} else if (parser.Check("_MCFLAGS").ExpectTail(1)) {
sparams.memcache_flags = parser.Next<uint16_t>();
} else {
@ -942,42 +943,26 @@ void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContex
}
void StringFamily::GetEx(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
CmdArgParser parser{args};
string_view key = parser.Next();
DbSlice::ExpireParams exp_params;
int64_t int_arg = 0;
for (size_t i = 1; i < args.size(); i++) {
ToUpper(&args[i]);
string_view cur_arg = ArgS(args, i);
if (cur_arg == "EX" || cur_arg == "PX" || cur_arg == "EXAT" || cur_arg == "PXAT") {
i++;
if (i >= args.size()) {
return cntx->SendError(kSyntaxErr);
}
string_view ex = ArgS(args, i);
if (!absl::SimpleAtoi(ex, &int_arg)) {
return cntx->SendError(kInvalidIntErr);
while (parser.ToUpper().HasNext()) {
if (base::_in(parser.Peek(), {"EX", "PX", "EXAT", "PXAT"})) {
auto [ex, int_arg] = parser.Next<string_view, int64_t>();
if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply());
}
if (int_arg <= 0) {
return cntx->SendError(InvalidExpireTime("getex"));
}
if (cur_arg == "EXAT" || cur_arg == "PXAT") {
exp_params.absolute = true;
}
exp_params.absolute = base::_in(ex, {"EXAT", "PXAT"});
exp_params.value = int_arg;
if (cur_arg == "EX" || cur_arg == "EXAT") {
exp_params.unit = TimeUnit::SEC;
} else {
exp_params.unit = TimeUnit::MSEC;
}
} else if (cur_arg == "PERSIST") {
exp_params.unit = ex[0] == 'P' ? TimeUnit::MSEC : TimeUnit::SEC;
} else if (parser.Check("PERSIST")) {
exp_params.persist = true;
} else {
return cntx->SendError(kSyntaxErr);
@ -1125,17 +1110,8 @@ void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext
SetCmd::SetParams sparams;
sparams.flags |= SetCmd::SET_EXPIRE_AFTER_MS;
if (seconds) {
if (unit_vals > kMaxExpireDeadlineSec) {
unit_vals = kMaxExpireDeadlineSec;
}
sparams.expire_after_ms = uint64_t(unit_vals) * 1000;
} else {
if (unit_vals > kMaxExpireDeadlineMs) {
unit_vals = kMaxExpireDeadlineMs;
}
sparams.expire_after_ms = unit_vals;
}
sparams.expire_after_ms =
DbSlice::ExpireParams::Cap(unit_vals * (seconds ? 1000 : 1), TimeUnit::MSEC);
cntx->SendError(SetGeneric(cntx, sparams, key, value));
}
@ -1261,24 +1237,10 @@ void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) {
}
void StringFamily::StrLen(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<size_t> {
auto it_res = t->GetDbSlice(shard->shard_id()).FindReadOnly(t->GetDbContext(), key, OBJ_STRING);
if (!it_res.ok())
return it_res.status();
return it_res.value()->second.Size();
auto cb = [key = ArgS(args, 0)](Transaction* t, EngineShard* shard) {
return OpStrLen(t->GetOpArgs(shard), key);
};
Transaction* trans = cntx->transaction;
OpResult<size_t> result = trans->ScheduleSingleHopT(std::move(cb));
if (result.status() == OpStatus::WRONG_TYPE) {
cntx->SendError(result.status());
} else {
cntx->SendLong(result.value());
}
GetReplies{cntx->reply_builder()}.Send(cntx->transaction->ScheduleSingleHopT(cb));
}
void StringFamily::GetRange(CmdArgList args, ConnectionContext* cntx) {
@ -1295,15 +1257,7 @@ void StringFamily::GetRange(CmdArgList args, ConnectionContext* cntx) {
return OpGetRange(t->GetOpArgs(shard), key, start, end);
};
Transaction* trans = cntx->transaction;
OpResult<string> result = trans->ScheduleSingleHopT(std::move(cb));
if (result.status() == OpStatus::WRONG_TYPE) {
cntx->SendError(result.status());
} else {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
rb->SendBulkString(result.value());
}
GetReplies{cntx->reply_builder()}.Send(cntx->transaction->ScheduleSingleHopT(cb));
}
void StringFamily::SetRange(CmdArgList args, ConnectionContext* cntx) {
@ -1320,23 +1274,20 @@ void StringFamily::SetRange(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendError("offset is out of range");
}
size_t min_size = start + value.size();
if (min_size > kMaxStrLen) {
if (size_t min_size = start + value.size(); min_size > kMaxStrLen) {
return cntx->SendError("string exceeds maximum allowed size");
}
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<uint32_t> {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpSetRange(t->GetOpArgs(shard), key, start, value);
};
auto res = cntx->transaction->ScheduleSingleHopT(cb);
Transaction* trans = cntx->transaction;
OpResult<uint32_t> result = trans->ScheduleSingleHopT(std::move(cb));
if (!result.ok()) {
cntx->SendError(result.status());
} else {
cntx->SendLong(result.value());
}
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
if (res.ok())
rb->SendLong(GetResult(std::move(*res)));
else
rb->SendError(res.status());
}
/* CL.THROTTLE <key> <max_burst> <count per period> <period> [<quantity>] */

View file

@ -100,7 +100,7 @@ TEST_F(TieredStorageTest, SimpleGetSet) {
TEST_F(TieredStorageTest, MGET) {
vector<string> command = {"MGET"}, values = {};
for (char key = 'A'; key <= 'B'; key++) {
for (char key = 'A'; key <= 'Z'; key++) {
command.emplace_back(1, key);
values.emplace_back(3000, key);
Run({"SET", command.back(), values.back()});
@ -127,6 +127,22 @@ TEST_F(TieredStorageTest, SimpleAppend) {
}
}
TEST_F(TieredStorageTest, Ranges) {
Run({"SET", "key", string(3000, 'a')});
ExpectConditionWithinTimeout([this] { return GetMetrics().tiered_stats.total_stashes >= 1; });
Run({"SETRANGE", "key", "1000", string(1000, 'b')});
auto resp = Run({"GET", "key"});
EXPECT_EQ(resp, string(1000, 'a') + string(1000, 'b') + string(1000, 'a'));
Run({"DEL", "key"});
Run({"SET", "key", string(1500, 'c') + string(1500, 'd')});
ExpectConditionWithinTimeout([this] { return GetMetrics().tiered_stats.total_stashes >= 2; });
resp = Run({"GETRANGE", "key", "1000", "1999"});
EXPECT_EQ(resp, string(500, 'c') + string(500, 'd'));
}
TEST_F(TieredStorageTest, MultiDb) {
for (size_t i = 0; i < 10; i++) {
Run({"SELECT", absl::StrCat(i)});