1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore(refactoring): header clean ups (#3943)

Move privately used header code to cc files. Remove redunandant includes.
No functional changes.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-10-18 12:47:26 +03:00 committed by GitHub
parent 866c82a3fa
commit 5ab32b97d9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 406 additions and 425 deletions

View file

@ -8,6 +8,7 @@
#include "core/heap_size.h"
#include "facade/acl_commands_def.h"
#include "server/acl/acl_commands_def.h"
#include "server/channel_store.h"
#include "server/command_registry.h"
#include "server/engine_shard_set.h"
#include "server/server_family.h"

View file

@ -731,6 +731,139 @@ OpResult<uint32_t> OpStick(const OpArgs& op_args, const ShardArgs& keys) {
return res;
}
OpResult<uint64_t> OpExpireTime(Transaction* t, EngineShard* shard, string_view key) {
auto& db_slice = t->GetDbSlice(shard->shard_id());
auto [it, expire_it] = db_slice.FindReadOnly(t->GetDbContext(), key);
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
if (!IsValid(expire_it))
return OpStatus::SKIPPED;
int64_t ttl_ms = db_slice.ExpireTime(expire_it);
DCHECK_GT(ttl_ms, 0); // Otherwise FindReadOnly would return null.
return ttl_ms;
}
// OpMove touches multiple databases (op_args.db_idx, target_db), so it assumes it runs
// as a global transaction.
// TODO: Allow running OpMove without a global transaction.
OpStatus OpMove(const OpArgs& op_args, string_view key, DbIndex target_db) {
auto& db_slice = op_args.GetDbSlice();
// Fetch value at key in current db.
auto from_res = db_slice.FindMutable(op_args.db_cntx, key);
if (!IsValid(from_res.it))
return OpStatus::KEY_NOTFOUND;
// Fetch value at key in target db.
DbContext target_cntx = op_args.db_cntx;
target_cntx.db_index = target_db;
auto to_res = db_slice.FindReadOnly(target_cntx, key);
if (IsValid(to_res.it))
return OpStatus::KEY_EXISTS;
// Ensure target database exists.
db_slice.ActivateDb(target_db);
bool sticky = from_res.it->first.IsSticky();
uint64_t exp_ts = db_slice.ExpireTime(from_res.exp_it);
from_res.post_updater.Run();
PrimeValue from_obj = std::move(from_res.it->second);
// Restore expire flag after std::move.
from_res.it->second.SetExpire(IsValid(from_res.exp_it));
CHECK(db_slice.Del(op_args.db_cntx, from_res.it));
auto op_result = db_slice.AddNew(target_cntx, key, std::move(from_obj), exp_ts);
RETURN_ON_BAD_STATUS(op_result);
auto& add_res = *op_result;
add_res.it->first.SetSticky(sticky);
auto bc = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
if (add_res.it->second.ObjType() == OBJ_LIST && bc) {
bc->AwakeWatched(target_db, key);
}
return OpStatus::OK;
}
OpResult<void> OpRen(const OpArgs& op_args, string_view from_key, string_view to_key,
bool destination_should_not_exist) {
auto* es = op_args.shard;
auto& db_slice = op_args.GetDbSlice();
auto from_res = db_slice.FindMutable(op_args.db_cntx, from_key);
if (!IsValid(from_res.it))
return OpStatus::KEY_NOTFOUND;
if (from_key == to_key)
return destination_should_not_exist ? OpStatus::KEY_EXISTS : OpStatus::OK;
bool is_prior_list = false;
auto to_res = db_slice.FindMutable(op_args.db_cntx, to_key);
if (IsValid(to_res.it)) {
if (destination_should_not_exist)
return OpStatus::KEY_EXISTS;
op_args.shard->search_indices()->RemoveDoc(to_key, op_args.db_cntx, to_res.it->second);
is_prior_list = (to_res.it->second.ObjType() == OBJ_LIST);
}
bool sticky = from_res.it->first.IsSticky();
uint64_t exp_ts = db_slice.ExpireTime(from_res.exp_it);
// we keep the value we want to move.
PrimeValue from_obj = std::move(from_res.it->second);
// Restore the expire flag on 'from' so we could delete it from expire table.
from_res.it->second.SetExpire(IsValid(from_res.exp_it));
if (IsValid(to_res.it)) {
to_res.it->second = std::move(from_obj);
to_res.it->second.SetExpire(IsValid(to_res.exp_it)); // keep the expire flag on 'to'.
// It is guaranteed that UpdateExpire() call does not erase the element because then
// from_it would be invalid. Therefore, UpdateExpire does not invalidate any iterators,
// therefore we can delete 'from_it'.
db_slice.UpdateExpire(op_args.db_cntx.db_index, to_res.it, exp_ts);
to_res.it->first.SetSticky(sticky);
to_res.post_updater.Run();
from_res.post_updater.Run();
CHECK(db_slice.Del(op_args.db_cntx, from_res.it));
} else {
// Here we first delete from_it because AddNew below could invalidate from_it.
// On the other hand, AddNew does not rely on the iterators - this is why we keep
// the value in `from_obj`.
from_res.post_updater.Run();
CHECK(db_slice.Del(op_args.db_cntx, from_res.it));
auto op_result = db_slice.AddNew(op_args.db_cntx, to_key, std::move(from_obj), exp_ts);
RETURN_ON_BAD_STATUS(op_result);
to_res = std::move(*op_result);
to_res.it->first.SetSticky(sticky);
}
op_args.shard->search_indices()->AddDoc(to_key, op_args.db_cntx, to_res.it->second);
auto bc = op_args.db_cntx.ns->GetBlockingController(es->shard_id());
if (!is_prior_list && to_res.it->second.ObjType() == OBJ_LIST && bc) {
bc->AwakeWatched(op_args.db_cntx.db_index, to_key);
}
return OpStatus::OK;
}
OpResult<uint64_t> OpTtl(Transaction* t, EngineShard* shard, string_view key) {
auto opExpireTimeResult = OpExpireTime(t, shard, key);
if (opExpireTimeResult) {
int64_t ttl_ms = opExpireTimeResult.value() - t->GetDbContext().time_now_ms;
DCHECK_GT(ttl_ms, 0); // Otherwise FindReadOnly would return null.
return ttl_ms;
} else {
return opExpireTimeResult;
}
}
} // namespace
OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) {
@ -1582,33 +1715,6 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
}
}
OpResult<uint64_t> GenericFamily::OpExpireTime(Transaction* t, EngineShard* shard,
string_view key) {
auto& db_slice = t->GetDbSlice(shard->shard_id());
auto [it, expire_it] = db_slice.FindReadOnly(t->GetDbContext(), key);
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
if (!IsValid(expire_it))
return OpStatus::SKIPPED;
int64_t ttl_ms = db_slice.ExpireTime(expire_it);
DCHECK_GT(ttl_ms, 0); // Otherwise FindReadOnly would return null.
return ttl_ms;
}
OpResult<uint64_t> GenericFamily::OpTtl(Transaction* t, EngineShard* shard, string_view key) {
auto opExpireTimeResult = OpExpireTime(t, shard, key);
if (opExpireTimeResult) {
int64_t ttl_ms = opExpireTimeResult.value() - t->GetDbContext().time_now_ms;
DCHECK_GT(ttl_ms, 0); // Otherwise FindReadOnly would return null.
return ttl_ms;
} else {
return opExpireTimeResult;
}
}
OpResult<uint32_t> GenericFamily::OpExists(const OpArgs& op_args, const ShardArgs& keys) {
DVLOG(1) << "Exists: " << keys.Front();
auto& db_slice = op_args.GetDbSlice();
@ -1621,113 +1727,6 @@ OpResult<uint32_t> GenericFamily::OpExists(const OpArgs& op_args, const ShardArg
return res;
}
OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, string_view to_key,
bool destination_should_not_exist) {
auto* es = op_args.shard;
auto& db_slice = op_args.GetDbSlice();
auto from_res = db_slice.FindMutable(op_args.db_cntx, from_key);
if (!IsValid(from_res.it))
return OpStatus::KEY_NOTFOUND;
if (from_key == to_key)
return destination_should_not_exist ? OpStatus::KEY_EXISTS : OpStatus::OK;
bool is_prior_list = false;
auto to_res = db_slice.FindMutable(op_args.db_cntx, to_key);
if (IsValid(to_res.it)) {
if (destination_should_not_exist)
return OpStatus::KEY_EXISTS;
op_args.shard->search_indices()->RemoveDoc(to_key, op_args.db_cntx, to_res.it->second);
is_prior_list = (to_res.it->second.ObjType() == OBJ_LIST);
}
bool sticky = from_res.it->first.IsSticky();
uint64_t exp_ts = db_slice.ExpireTime(from_res.exp_it);
// we keep the value we want to move.
PrimeValue from_obj = std::move(from_res.it->second);
// Restore the expire flag on 'from' so we could delete it from expire table.
from_res.it->second.SetExpire(IsValid(from_res.exp_it));
if (IsValid(to_res.it)) {
to_res.it->second = std::move(from_obj);
to_res.it->second.SetExpire(IsValid(to_res.exp_it)); // keep the expire flag on 'to'.
// It is guaranteed that UpdateExpire() call does not erase the element because then
// from_it would be invalid. Therefore, UpdateExpire does not invalidate any iterators,
// therefore we can delete 'from_it'.
db_slice.UpdateExpire(op_args.db_cntx.db_index, to_res.it, exp_ts);
to_res.it->first.SetSticky(sticky);
to_res.post_updater.Run();
from_res.post_updater.Run();
CHECK(db_slice.Del(op_args.db_cntx, from_res.it));
} else {
// Here we first delete from_it because AddNew below could invalidate from_it.
// On the other hand, AddNew does not rely on the iterators - this is why we keep
// the value in `from_obj`.
from_res.post_updater.Run();
CHECK(db_slice.Del(op_args.db_cntx, from_res.it));
auto op_result = db_slice.AddNew(op_args.db_cntx, to_key, std::move(from_obj), exp_ts);
RETURN_ON_BAD_STATUS(op_result);
to_res = std::move(*op_result);
to_res.it->first.SetSticky(sticky);
}
op_args.shard->search_indices()->AddDoc(to_key, op_args.db_cntx, to_res.it->second);
auto bc = op_args.db_cntx.ns->GetBlockingController(es->shard_id());
if (!is_prior_list && to_res.it->second.ObjType() == OBJ_LIST && bc) {
bc->AwakeWatched(op_args.db_cntx.db_index, to_key);
}
return OpStatus::OK;
}
// OpMove touches multiple databases (op_args.db_idx, target_db), so it assumes it runs
// as a global transaction.
// TODO: Allow running OpMove without a global transaction.
OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex target_db) {
auto& db_slice = op_args.GetDbSlice();
// Fetch value at key in current db.
auto from_res = db_slice.FindMutable(op_args.db_cntx, key);
if (!IsValid(from_res.it))
return OpStatus::KEY_NOTFOUND;
// Fetch value at key in target db.
DbContext target_cntx = op_args.db_cntx;
target_cntx.db_index = target_db;
auto to_res = db_slice.FindReadOnly(target_cntx, key);
if (IsValid(to_res.it))
return OpStatus::KEY_EXISTS;
// Ensure target database exists.
db_slice.ActivateDb(target_db);
bool sticky = from_res.it->first.IsSticky();
uint64_t exp_ts = db_slice.ExpireTime(from_res.exp_it);
from_res.post_updater.Run();
PrimeValue from_obj = std::move(from_res.it->second);
// Restore expire flag after std::move.
from_res.it->second.SetExpire(IsValid(from_res.exp_it));
CHECK(db_slice.Del(op_args.db_cntx, from_res.it));
auto op_result = db_slice.AddNew(target_cntx, key, std::move(from_obj), exp_ts);
RETURN_ON_BAD_STATUS(op_result);
auto& add_res = *op_result;
add_res.it->first.SetSticky(sticky);
auto bc = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
if (add_res.it->second.ObjType() == OBJ_LIST && bc) {
bc->AwakeWatched(target_db, key);
}
return OpStatus::OK;
}
void GenericFamily::RandomKey(CmdArgList args, ConnectionContext* cntx) {
const static size_t kMaxAttempts = 3;
@ -1782,6 +1781,7 @@ using CI = CommandId;
#define HFUNC(x) SetHandler(&GenericFamily::x)
namespace acl {
constexpr uint32_t kDel = KEYSPACE | WRITE | SLOW;
constexpr uint32_t kPing = FAST | CONNECTION;
constexpr uint32_t kEcho = FAST | CONNECTION;

View file

@ -7,14 +7,10 @@
#include "base/flags.h"
#include "facade/facade_types.h"
#include "server/common.h"
#include "server/table.h"
#include "server/tx_base.h"
ABSL_DECLARE_FLAG(uint32_t, dbnum);
namespace util {
class ProactorPool;
} // namespace util
namespace dfly {
using facade::ErrorReply;
@ -78,12 +74,6 @@ class GenericFamily {
static void ExpireTimeGeneric(CmdArgList args, ConnectionContext* cntx, TimeUnit unit);
static void TtlGeneric(CmdArgList args, ConnectionContext* cntx, TimeUnit unit);
static OpResult<uint64_t> OpExpireTime(Transaction* t, EngineShard* shard, std::string_view key);
static OpResult<uint64_t> OpTtl(Transaction* t, EngineShard* shard, std::string_view key);
static OpResult<void> OpRen(const OpArgs& op_args, std::string_view from, std::string_view to,
bool destination_should_not_exist);
static OpStatus OpMove(const OpArgs& op_args, std::string_view key, DbIndex target_db);
};
} // namespace dfly

View file

@ -37,6 +37,7 @@ extern "C" {
#include "server/acl/validator.h"
#include "server/bitops_family.h"
#include "server/bloom_family.h"
#include "server/channel_store.h"
#include "server/cluster/cluster_family.h"
#include "server/cluster/cluster_utility.h"
#include "server/conn_context.h"

View file

@ -306,82 +306,6 @@ std::optional<cron::cronexpr> InferSnapshotCronExpr() {
return std::nullopt;
}
void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, dfly::Service& service,
std::string_view sub_cmd) {
size_t requested_slow_log_length = UINT32_MAX;
size_t argc = args.size();
if (argc >= 3) {
cntx->SendError(facade::UnknownSubCmd(sub_cmd, "SLOWLOG"), facade::kSyntaxErrType);
return;
} else if (argc == 2) {
string_view length = facade::ArgS(args, 1);
int64_t num;
if ((!absl::SimpleAtoi(length, &num)) || (num < -1)) {
cntx->SendError("count should be greater than or equal to -1");
return;
}
if (num >= 0) {
requested_slow_log_length = num;
}
}
// gather all the individual slowlogs from all the fibers and sort them by their timestamp
std::vector<boost::circular_buffer<SlowLogEntry>> entries(service.proactor_pool().size());
service.proactor_pool().AwaitFiberOnAll([&](auto index, auto* context) {
auto shard_entries = ServerState::tlocal()->GetSlowLog().Entries();
entries[index] = shard_entries;
});
std::vector<std::pair<SlowLogEntry, unsigned>> merged_slow_log;
for (size_t i = 0; i < entries.size(); ++i) {
for (const auto& log_item : entries[i]) {
merged_slow_log.emplace_back(log_item, i);
}
}
std::sort(merged_slow_log.begin(), merged_slow_log.end(), [](const auto& e1, const auto& e2) {
return e1.first.unix_ts_usec > e2.first.unix_ts_usec;
});
requested_slow_log_length = std::min(merged_slow_log.size(), requested_slow_log_length);
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx->reply_builder());
rb->StartArray(requested_slow_log_length);
for (size_t i = 0; i < requested_slow_log_length; ++i) {
const auto& entry = merged_slow_log[i].first;
const auto& args = entry.cmd_args;
rb->StartArray(6);
rb->SendLong(entry.entry_id * service.proactor_pool().size() + merged_slow_log[i].second);
rb->SendLong(entry.unix_ts_usec / 1000000);
rb->SendLong(entry.exec_time_usec);
// if we truncated the args, there is one pseudo-element containing the number of truncated
// args that we must add, so the result length is increased by 1
size_t len = args.size() + int(args.size() < entry.original_length);
rb->StartArray(len);
for (const auto& arg : args) {
if (arg.second > 0) {
auto suffix = absl::StrCat("... (", arg.second, " more bytes)");
auto cmd_arg = arg.first.substr(0, kMaximumSlowlogArgLength - suffix.length());
rb->SendBulkString(absl::StrCat(cmd_arg, suffix));
} else {
rb->SendBulkString(arg.first);
}
}
// if we truncated arguments - add a special string to indicate that.
if (args.size() < entry.original_length) {
rb->SendBulkString(
absl::StrCat("... (", entry.original_length - args.size(), " more arguments)"));
}
rb->SendBulkString(entry.client_ip);
rb->SendBulkString(entry.client_name);
}
}
void ClientSetName(CmdArgList args, ConnectionContext* cntx) {
if (args.size() == 1) {
cntx->conn()->SetName(string{ArgS(args, 0)});
@ -708,6 +632,83 @@ optional<ReplicaOfArgs> ReplicaOfArgs::FromCmdArgs(CmdArgList args, ConnectionCo
} // namespace
void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, std::string_view sub_cmd,
util::ProactorPool* pp) {
size_t requested_slow_log_length = UINT32_MAX;
size_t argc = args.size();
if (argc >= 3) {
cntx->SendError(facade::UnknownSubCmd(sub_cmd, "SLOWLOG"), facade::kSyntaxErrType);
return;
} else if (argc == 2) {
string_view length = facade::ArgS(args, 1);
int64_t num;
if ((!absl::SimpleAtoi(length, &num)) || (num < -1)) {
cntx->SendError("count should be greater than or equal to -1");
return;
}
if (num >= 0) {
requested_slow_log_length = num;
}
}
// gather all the individual slowlogs from all the fibers and sort them by their timestamp
std::vector<boost::circular_buffer<SlowLogEntry>> entries(pp->size());
pp->AwaitFiberOnAll([&](auto index, auto* context) {
auto shard_entries = ServerState::tlocal()->GetSlowLog().Entries();
entries[index] = shard_entries;
});
std::vector<std::pair<SlowLogEntry, unsigned>> merged_slow_log;
for (size_t i = 0; i < entries.size(); ++i) {
for (const auto& log_item : entries[i]) {
merged_slow_log.emplace_back(log_item, i);
}
}
std::sort(merged_slow_log.begin(), merged_slow_log.end(), [](const auto& e1, const auto& e2) {
return e1.first.unix_ts_usec > e2.first.unix_ts_usec;
});
requested_slow_log_length = std::min(merged_slow_log.size(), requested_slow_log_length);
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx->reply_builder());
rb->StartArray(requested_slow_log_length);
for (size_t i = 0; i < requested_slow_log_length; ++i) {
const auto& entry = merged_slow_log[i].first;
const auto& args = entry.cmd_args;
rb->StartArray(6);
rb->SendLong(entry.entry_id * pp->size() + merged_slow_log[i].second);
rb->SendLong(entry.unix_ts_usec / 1000000);
rb->SendLong(entry.exec_time_usec);
// if we truncated the args, there is one pseudo-element containing the number of truncated
// args that we must add, so the result length is increased by 1
size_t len = args.size() + int(args.size() < entry.original_length);
rb->StartArray(len);
for (const auto& arg : args) {
if (arg.second > 0) {
auto suffix = absl::StrCat("... (", arg.second, " more bytes)");
auto cmd_arg = arg.first.substr(0, kMaximumSlowlogArgLength - suffix.length());
rb->SendBulkString(absl::StrCat(cmd_arg, suffix));
} else {
rb->SendBulkString(arg.first);
}
}
// if we truncated arguments - add a special string to indicate that.
if (args.size() < entry.original_length) {
rb->SendBulkString(
absl::StrCat("... (", entry.original_length - args.size(), " more arguments)"));
}
rb->SendBulkString(entry.client_ip);
rb->SendBulkString(entry.client_name);
}
}
std::optional<fb2::Fiber> Pause(std::vector<facade::Listener*> listeners, Namespace* ns,
facade::Connection* conn, ClientPause pause_state,
std::function<bool()> is_pause_in_progress) {
@ -3088,7 +3089,7 @@ void ServerFamily::SlowLog(CmdArgList args, ConnectionContext* cntx) {
}
if (sub_cmd == "GET") {
return SlowLogGet(args, cntx, service_, sub_cmd);
return SlowLogGet(args, cntx, sub_cmd, &service_.proactor_pool());
}
cntx->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType);
}

View file

@ -7,11 +7,7 @@
#include <optional>
#include <string>
#include "facade/conn_context.h"
#include "facade/dragonfly_listener.h"
#include "facade/redis_parser.h"
#include "facade/reply_builder.h"
#include "server/channel_store.h"
#include "server/detail/save_stages_controller.h"
#include "server/dflycmd.h"
#include "server/engine_shard_set.h"
@ -21,9 +17,6 @@
#include "util/fibers/fiberqueue_threadpool.h"
#include "util/fibers/future.h"
void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, dfly::Service& service,
std::string_view sub_cmd);
namespace util {
class AcceptServer;
@ -45,6 +38,7 @@ std::string GetPassword();
namespace journal {
class Journal;
} // namespace journal
namespace cluster {
class ClusterFamily;
}
@ -379,4 +373,7 @@ std::optional<util::fb2::Fiber> Pause(std::vector<facade::Listener*> listeners,
facade::Connection* conn, ClientPause pause_state,
std::function<bool()> is_pause_in_progress);
void SlowLogGet(CmdArgList args, ConnectionContext* cntx, std::string_view sub_cmd,
util::ProactorPool* pp);
} // namespace dfly

View file

@ -48,6 +48,82 @@ enum class ExpT { EX, PX, EXAT, PXAT };
constexpr uint32_t kMaxStrLen = 1 << 28;
// Stores a string, the pending result of a tiered read or nothing
struct StringValue {
StringValue() : v_{} {
}
StringValue(std::string s) : v_{std::move(s)} {
}
StringValue(util::fb2::Future<std::string> f) : v_{std::move(f)} {
}
// Get and consume value. If backed by a future, blocks until resolved.
std::string Get() &&;
// If no value is stored
bool IsEmpty() const;
// Read string from prime value - either from memory or issue tiered storage read
static StringValue Read(DbIndex dbid, std::string_view key, const PrimeValue& pv,
EngineShard* es);
bool IsFuturized() const {
return std::holds_alternative<util::fb2::Future<std::string>>(v_);
}
private:
std::variant<std::monostate, std::string, util::fb2::Future<std::string>> v_;
};
// Helper for performing SET operations with various options
class SetCmd {
public:
explicit SetCmd(OpArgs op_args, bool manual_journal)
: op_args_(op_args), manual_journal_{manual_journal} {
}
enum SetFlags {
SET_ALWAYS = 0,
SET_IF_NOTEXIST = 1 << 0, /* NX: Set if key not exists. */
SET_IF_EXISTS = 1 << 1, /* XX: Set if key exists. */
SET_KEEP_EXPIRE = 1 << 2, /* KEEPTTL: Set and keep the ttl */
SET_GET = 1 << 3, /* GET: Set if want to get key before set */
SET_EXPIRE_AFTER_MS = 1 << 4, /* EX,PX,EXAT,PXAT: Expire after ms. */
SET_STICK = 1 << 5, /* Set STICK flag */
};
struct SetParams {
uint16_t flags = SET_ALWAYS;
uint32_t memcache_flags = 0;
uint64_t expire_after_ms = 0; // Relative value based on now. 0 means no expiration.
StringValue* prev_val = nullptr; // If set, previous value is stored at pointer
constexpr bool IsConditionalSet() const {
return flags & SET_IF_NOTEXIST || flags & SET_IF_EXISTS;
}
};
OpStatus Set(const SetParams& params, std::string_view key, std::string_view value);
private:
OpStatus SetExisting(const SetParams& params, DbSlice::Iterator it, DbSlice::ExpIterator e_it,
std::string_view key, std::string_view value);
void AddNew(const SetParams& params, DbSlice::Iterator it, DbSlice::ExpIterator e_it,
std::string_view key, std::string_view value);
// Called at the end of AddNew of SetExisting
void PostEdit(const SetParams& params, std::string_view key, std::string_view value,
PrimeValue* pv);
void RecordJournal(const SetParams& params, std::string_view key, std::string_view value);
OpStatus CachePrevIfNeeded(const SetParams& params, DbSlice::Iterator it);
const OpArgs op_args_;
bool manual_journal_;
};
void CopyValueToBuffer(const PrimeValue& pv, char* dest) {
DCHECK_EQ(pv.ObjType(), OBJ_STRING);
DCHECK(!pv.IsExternal());
@ -569,6 +645,115 @@ struct GetReplies {
RedisReplyBuilder* rb;
};
void ExtendGeneric(CmdArgList args, bool prepend, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view value = ArgS(args, 1);
VLOG(2) << "ExtendGeneric(" << key << ", " << value << ")";
if (cntx->protocol() == Protocol::REDIS) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExtend(t->GetOpArgs(shard), key, value, prepend);
};
auto res = cntx->transaction->ScheduleSingleHopT(cb);
if (!res)
return cntx->SendError(res.status());
cntx->SendLong(GetResult(std::move(res.value())));
} else {
// Memcached skips if key is missing
DCHECK(cntx->protocol() == Protocol::MEMCACHE);
auto cb = [&](Transaction* t, EngineShard* shard) {
return ExtendOrSkip(t->GetOpArgs(shard), key, value, prepend);
};
OpResult<bool> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
SinkReplyBuilder* builder = cntx->reply_builder();
if (result.value_or(false)) {
return builder->SendStored();
}
builder->SendSetSkipped();
}
}
// Wrapper to call SetCmd::Set in ScheduleSingleHop
OpStatus SetGeneric(ConnectionContext* cntx, const SetCmd::SetParams& sparams, string_view key,
string_view value) {
DCHECK(cntx->transaction);
bool manual_journal = cntx->cid->opt_mask() & CO::NO_AUTOJOURNAL;
return cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* shard) {
return SetCmd(t->GetOpArgs(shard), manual_journal).Set(sparams, key, value);
});
}
/// (P)SETEX key seconds value
void SetExGeneric(bool seconds, CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view ex = ArgS(args, 1);
string_view value = ArgS(args, 2);
int64_t unit_vals;
if (!absl::SimpleAtoi(ex, &unit_vals)) {
return cntx->SendError(kInvalidIntErr, kSyntaxErrType);
}
if (unit_vals < 1) {
return cntx->SendError(InvalidExpireTime(cntx->cid->name()));
}
DbSlice::ExpireParams expiry{
.value = unit_vals,
.unit = seconds ? TimeUnit::SEC : TimeUnit::MSEC,
.absolute = false,
};
int64_t now_ms = GetCurrentTimeMs();
auto [rel_ms, abs_ms] = expiry.Calculate(now_ms, false);
if (abs_ms < 0)
return cntx->SendError(InvalidExpireTime("set"));
SetCmd::SetParams sparams;
sparams.flags |= SetCmd::SET_EXPIRE_AFTER_MS;
sparams.expire_after_ms = expiry.Calculate(now_ms, true).first;
cntx->SendError(SetGeneric(cntx, sparams, key, value));
}
void IncrByGeneric(string_view key, int64_t val, ConnectionContext* cntx) {
bool skip_on_missing = cntx->protocol() == Protocol::MEMCACHE;
auto cb = [&](Transaction* t, EngineShard* shard) {
OpResult<int64_t> res = OpIncrBy(t->GetOpArgs(shard), key, val, skip_on_missing);
return res;
};
OpResult<int64_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
auto* builder = cntx->reply_builder();
DVLOG(2) << "IncrByGeneric " << key << "/" << result.value();
switch (result.status()) {
case OpStatus::OK:
builder->SendLong(result.value());
break;
case OpStatus::INVALID_VALUE:
cntx->SendError(kInvalidIntErr);
break;
case OpStatus::OUT_OF_RANGE:
cntx->SendError(kIncrOverflow);
break;
case OpStatus::KEY_NOTFOUND: // Relevant only for MC
reinterpret_cast<MCReplyBuilder*>(builder)->SendNotFound();
break;
default:
reinterpret_cast<RedisReplyBuilder*>(builder)->SendError(result.status());
break;
}
}
} // namespace
StringValue StringValue::Read(DbIndex dbid, string_view key, const PrimeValue& pv,
@ -747,17 +932,6 @@ OpStatus SetCmd::CachePrevIfNeeded(const SetCmd::SetParams& params, DbSlice::Ite
return OpStatus::OK;
}
// Wrapper to call SetCmd::Set in ScheduleSingleHop
OpStatus SetGeneric(ConnectionContext* cntx, const SetCmd::SetParams& sparams, string_view key,
string_view value) {
DCHECK(cntx->transaction);
bool manual_journal = cntx->cid->opt_mask() & CO::NO_AUTOJOURNAL;
return cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* shard) {
return SetCmd(t->GetOpArgs(shard), manual_journal).Set(sparams, key, value);
});
}
void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
facade::CmdArgParser parser{args};
@ -943,39 +1117,6 @@ void StringFamily::Prepend(CmdArgList args, ConnectionContext* cntx) {
ExtendGeneric(args, true, cntx);
}
void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view value = ArgS(args, 1);
VLOG(2) << "ExtendGeneric(" << key << ", " << value << ")";
if (cntx->protocol() == Protocol::REDIS) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExtend(t->GetOpArgs(shard), key, value, prepend);
};
auto res = cntx->transaction->ScheduleSingleHopT(cb);
if (!res)
return cntx->SendError(res.status());
cntx->SendLong(GetResult(std::move(res.value())));
} else {
// Memcached skips if key is missing
DCHECK(cntx->protocol() == Protocol::MEMCACHE);
auto cb = [&](Transaction* t, EngineShard* shard) {
return ExtendOrSkip(t->GetOpArgs(shard), key, value, prepend);
};
OpResult<bool> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
SinkReplyBuilder* builder = cntx->reply_builder();
if (result.value_or(false)) {
return builder->SendStored();
}
builder->SendSetSkipped();
}
}
void StringFamily::GetEx(CmdArgList args, ConnectionContext* cntx) {
CmdArgParser parser{args};
string_view key = parser.Next();
@ -1103,71 +1244,6 @@ void StringFamily::DecrBy(CmdArgList args, ConnectionContext* cntx) {
return IncrByGeneric(key, -val, cntx);
}
void StringFamily::IncrByGeneric(string_view key, int64_t val, ConnectionContext* cntx) {
bool skip_on_missing = cntx->protocol() == Protocol::MEMCACHE;
auto cb = [&](Transaction* t, EngineShard* shard) {
OpResult<int64_t> res = OpIncrBy(t->GetOpArgs(shard), key, val, skip_on_missing);
return res;
};
OpResult<int64_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
auto* builder = cntx->reply_builder();
DVLOG(2) << "IncrByGeneric " << key << "/" << result.value();
switch (result.status()) {
case OpStatus::OK:
builder->SendLong(result.value());
break;
case OpStatus::INVALID_VALUE:
cntx->SendError(kInvalidIntErr);
break;
case OpStatus::OUT_OF_RANGE:
cntx->SendError(kIncrOverflow);
break;
case OpStatus::KEY_NOTFOUND: // Relevant only for MC
reinterpret_cast<MCReplyBuilder*>(builder)->SendNotFound();
break;
default:
reinterpret_cast<RedisReplyBuilder*>(builder)->SendError(result.status());
break;
}
}
/// (P)SETEX key seconds value
void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view ex = ArgS(args, 1);
string_view value = ArgS(args, 2);
int64_t unit_vals;
if (!absl::SimpleAtoi(ex, &unit_vals)) {
return cntx->SendError(kInvalidIntErr, kSyntaxErrType);
}
if (unit_vals < 1) {
return cntx->SendError(InvalidExpireTime(cntx->cid->name()));
}
DbSlice::ExpireParams expiry{
.value = unit_vals,
.unit = seconds ? TimeUnit::SEC : TimeUnit::MSEC,
.absolute = false,
};
int64_t now_ms = GetCurrentTimeMs();
auto [rel_ms, abs_ms] = expiry.Calculate(now_ms, false);
if (abs_ms < 0)
return cntx->SendError(InvalidExpireTime("set"));
SetCmd::SetParams sparams;
sparams.flags |= SetCmd::SET_EXPIRE_AFTER_MS;
sparams.expire_after_ms = expiry.Calculate(now_ms, true).first;
cntx->SendError(SetGeneric(cntx, sparams, key, value));
}
void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
DCHECK_GE(args.size(), 1U);
Transaction* transaction = cntx->transaction;

View file

@ -4,94 +4,14 @@
#pragma once
#include "server/common.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "util/fibers/future.h"
#include "util/proactor_pool.h"
#include "facade/facade_types.h"
namespace dfly {
class ConnectionContext;
class CommandRegistry;
using facade::OpResult;
using facade::OpStatus;
// Stores a string, the pending result of a tiered read or nothing
struct StringValue {
StringValue() : v_{} {
}
StringValue(std::string s) : v_{std::move(s)} {
}
StringValue(util::fb2::Future<std::string> f) : v_{std::move(f)} {
}
// Get and consume value. If backed by a future, blocks until resolved.
std::string Get() &&;
// If no value is stored
bool IsEmpty() const;
// Read string from prime value - either from memory or issue tiered storage read
static StringValue Read(DbIndex dbid, std::string_view key, const PrimeValue& pv,
EngineShard* es);
bool IsFuturized() const {
return std::holds_alternative<util::fb2::Future<std::string>>(v_);
}
private:
std::variant<std::monostate, std::string, util::fb2::Future<std::string>> v_;
};
// Helper for performing SET operations with various options
class SetCmd {
public:
explicit SetCmd(OpArgs op_args, bool manual_journal)
: op_args_(op_args), manual_journal_{manual_journal} {
}
enum SetFlags {
SET_ALWAYS = 0,
SET_IF_NOTEXIST = 1 << 0, /* NX: Set if key not exists. */
SET_IF_EXISTS = 1 << 1, /* XX: Set if key exists. */
SET_KEEP_EXPIRE = 1 << 2, /* KEEPTTL: Set and keep the ttl */
SET_GET = 1 << 3, /* GET: Set if want to get key before set */
SET_EXPIRE_AFTER_MS = 1 << 4, /* EX,PX,EXAT,PXAT: Expire after ms. */
SET_STICK = 1 << 5, /* Set STICK flag */
};
struct SetParams {
uint16_t flags = SET_ALWAYS;
uint32_t memcache_flags = 0;
uint64_t expire_after_ms = 0; // Relative value based on now. 0 means no expiration.
StringValue* prev_val = nullptr; // If set, previous value is stored at pointer
constexpr bool IsConditionalSet() const {
return flags & SET_IF_NOTEXIST || flags & SET_IF_EXISTS;
}
};
OpStatus Set(const SetParams& params, std::string_view key, std::string_view value);
private:
OpStatus SetExisting(const SetParams& params, DbSlice::Iterator it, DbSlice::ExpIterator e_it,
std::string_view key, std::string_view value);
void AddNew(const SetParams& params, DbSlice::Iterator it, DbSlice::ExpIterator e_it,
std::string_view key, std::string_view value);
// Called at the end of AddNew of SetExisting
void PostEdit(const SetParams& params, std::string_view key, std::string_view value,
PrimeValue* pv);
void RecordJournal(const SetParams& params, std::string_view key, std::string_view value);
OpStatus CachePrevIfNeeded(const SetParams& params, DbSlice::Iterator it);
const OpArgs op_args_;
bool manual_journal_;
};
using facade::CmdArgList;
class StringFamily {
public:
@ -122,11 +42,6 @@ class StringFamily {
static void PSetEx(CmdArgList args, ConnectionContext* cntx);
static void ClThrottle(CmdArgList args, ConnectionContext* cntx);
// These functions are used internally, they do not implement any specific command
static void IncrByGeneric(std::string_view key, int64_t val, ConnectionContext* cntx);
static void ExtendGeneric(CmdArgList args, bool prepend, ConnectionContext* cntx);
static void SetExGeneric(bool seconds, CmdArgList args, ConnectionContext* cntx);
};
} // namespace dfly