1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: pass SinkReplyBuilder and Transaction explicitly. Part2 (#3954)

This pr follows #3946

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-10-23 11:54:24 +03:00 committed by GitHub
parent c5a8008348
commit e24f697bb3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 275 additions and 279 deletions

View file

@ -109,6 +109,14 @@ std::ostream& operator<<(std::ostream& os, const GlobalState& state);
enum class TimeUnit : uint8_t { SEC, MSEC };
enum ExpireFlags {
EXPIRE_ALWAYS = 0,
EXPIRE_NX = 1 << 0, // Set expiry only when key has no expiry
EXPIRE_XX = 1 << 2, // Set expiry only when the key has expiry
EXPIRE_GT = 1 << 3, // GT: Set expiry only when the new expiry is greater than current one
EXPIRE_LT = 1 << 4, // LT: Set expiry only when the new expiry is less than current one
};
bool ParseHumanReadableBytes(std::string_view str, int64_t* num_bytes);
bool ParseDouble(std::string_view src, double* value);

View file

@ -13,7 +13,6 @@
#include "server/cluster/cluster_defs.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/generic_family.h"
#include "server/journal/journal.h"
#include "server/server_state.h"
#include "server/tiered_storage.h"

View file

@ -616,7 +616,7 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys
do {
auto cb = [&] {
OpArgs op_args{EngineShard::tlocal(), 0, db_cntx};
OpArgs op_args{EngineShard::tlocal(), nullptr, db_cntx};
OpScan(op_args, scan_opts, &cursor, keys);
};
@ -863,6 +863,99 @@ OpResult<uint64_t> OpTtl(Transaction* t, EngineShard* shard, string_view key) {
}
}
ErrorReply RenameGeneric(CmdArgList args, bool destination_should_not_exist, Transaction* tx) {
string_view key[2] = {ArgS(args, 0), ArgS(args, 1)};
if (tx->GetUniqueShardCnt() == 1) {
tx->ReviveAutoJournal(); // Safe to use RENAME with single shard
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRen(t->GetOpArgs(shard), key[0], key[1], destination_should_not_exist);
};
OpResult<void> result = tx->ScheduleSingleHopT(std::move(cb));
return result.status();
}
Renamer renamer{tx, key[0], key[1], shard_set->size()};
return renamer.Rename(destination_should_not_exist);
}
void ExpireTimeGeneric(CmdArgList args, TimeUnit unit, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) { return OpExpireTime(t, shard, key); };
OpResult<uint64_t> result = tx->ScheduleSingleHopT(std::move(cb));
if (result) {
long ttl = (unit == TimeUnit::SEC) ? (result.value() + 500) / 1000 : result.value();
builder->SendLong(ttl);
return;
}
switch (result.status()) {
case OpStatus::KEY_NOTFOUND:
builder->SendLong(-2);
break;
default:
LOG_IF(ERROR, result.status() != OpStatus::SKIPPED)
<< "Unexpected status " << result.status();
builder->SendLong(-1);
break;
}
}
void TtlGeneric(CmdArgList args, TimeUnit unit, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) { return OpTtl(t, shard, key); };
OpResult<uint64_t> result = tx->ScheduleSingleHopT(std::move(cb));
if (result) {
long ttl = (unit == TimeUnit::SEC) ? (result.value() + 500) / 1000 : result.value();
builder->SendLong(ttl);
return;
}
switch (result.status()) {
case OpStatus::KEY_NOTFOUND:
builder->SendLong(-2);
break;
default:
LOG_IF(ERROR, result.status() != OpStatus::SKIPPED)
<< "Unexpected status " << result.status();
builder->SendLong(-1);
break;
}
}
std::optional<int32_t> ParseExpireOptionsOrReply(const CmdArgList args, SinkReplyBuilder* builder) {
int32_t flags = ExpireFlags::EXPIRE_ALWAYS;
for (auto& arg : args) {
string arg_sv = absl::AsciiStrToUpper(ToSV(arg));
if (arg_sv == "NX") {
flags |= ExpireFlags::EXPIRE_NX;
} else if (arg_sv == "XX") {
flags |= ExpireFlags::EXPIRE_XX;
} else if (arg_sv == "GT") {
flags |= ExpireFlags::EXPIRE_GT;
} else if (arg_sv == "LT") {
flags |= ExpireFlags::EXPIRE_LT;
} else {
builder->SendError(absl::StrCat("Unsupported option: ", arg_sv));
return nullopt;
}
}
if ((flags & ExpireFlags::EXPIRE_NX) && (flags & ~ExpireFlags::EXPIRE_NX)) {
builder->SendError("NX and XX, GT or LT options at the same time are not compatible");
return nullopt;
}
if ((flags & ExpireFlags::EXPIRE_GT) && (flags & ExpireFlags::EXPIRE_LT)) {
builder->SendError("GT and LT options at the same time are not compatible");
return nullopt;
}
return flags;
}
} // namespace
OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) {
@ -882,12 +975,11 @@ OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs&
return res;
}
void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) {
Transaction* transaction = cntx->transaction;
void GenericFamily::Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
VLOG(1) << "Del " << ArgS(args, 0);
atomic_uint32_t result{0};
bool is_mc = cntx->protocol() == Protocol::MEMCACHE;
bool is_mc = (builder->type() == SinkReplyBuilder::MC);
auto cb = [&result](const Transaction* t, EngineShard* shard) {
ShardArgs args = t->GetShardArgs(shard->shard_id());
@ -897,15 +989,15 @@ void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK;
};
OpStatus status = transaction->ScheduleSingleHop(std::move(cb));
OpStatus status = tx->ScheduleSingleHop(std::move(cb));
CHECK_EQ(OpStatus::OK, status);
DVLOG(2) << "Del ts " << transaction->txid();
DVLOG(2) << "Del ts " << tx->txid();
uint32_t del_cnt = result.load(memory_order_relaxed);
if (is_mc) {
using facade::MCReplyBuilder;
MCReplyBuilder* mc_builder = static_cast<MCReplyBuilder*>(cntx->reply_builder());
MCReplyBuilder* mc_builder = static_cast<MCReplyBuilder*>(builder);
if (del_cnt == 0) {
mc_builder->SendNotFound();
@ -913,17 +1005,19 @@ void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) {
mc_builder->SendSimpleString("DELETED");
}
} else {
cntx->SendLong(del_cnt);
builder->SendLong(del_cnt);
}
}
void GenericFamily::Ping(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Ping(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (args.size() > 1) {
return cntx->SendError(facade::WrongNumArgsError("ping"), kSyntaxErrType);
return builder->SendError(facade::WrongNumArgsError("ping"), kSyntaxErrType);
}
string_view msg;
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
// If a client in the subscribe state and in resp2 mode, it returns an array for some reason.
if (cntx->conn_state.subscribe_info && !rb->IsResp3()) {
@ -936,18 +1030,16 @@ void GenericFamily::Ping(CmdArgList args, ConnectionContext* cntx) {
}
if (args.size() == 0) {
return cntx->SendSimpleString("PONG");
return builder->SendSimpleString("PONG");
} else {
msg = ArgS(args, 0);
DVLOG(2) << "Ping " << msg;
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
return rb->SendBulkString(msg);
}
}
void GenericFamily::Exists(CmdArgList args, ConnectionContext* cntx) {
Transaction* transaction = cntx->transaction;
void GenericFamily::Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
VLOG(1) << "Exists " << ArgS(args, 0);
atomic_uint32_t result{0};
@ -960,59 +1052,28 @@ void GenericFamily::Exists(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK;
};
OpStatus status = transaction->ScheduleSingleHop(std::move(cb));
OpStatus status = tx->ScheduleSingleHop(std::move(cb));
CHECK_EQ(OpStatus::OK, status);
return cntx->SendLong(result.load(memory_order_acquire));
return builder->SendLong(result.load(memory_order_acquire));
}
void GenericFamily::Persist(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Persist(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) { return OpPersist(t->GetOpArgs(shard), key); };
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
if (status == OpStatus::OK)
cntx->SendLong(1);
else
cntx->SendLong(0);
OpStatus status = tx->ScheduleSingleHop(std::move(cb));
builder->SendLong(status == OpStatus::OK);
}
std::optional<int32_t> ParseExpireOptionsOrReply(const CmdArgList args, ConnectionContext* cntx) {
int32_t flags = ExpireFlags::EXPIRE_ALWAYS;
for (auto& arg : args) {
string arg_sv = absl::AsciiStrToUpper(ToSV(arg));
if (arg_sv == "NX") {
flags |= ExpireFlags::EXPIRE_NX;
} else if (arg_sv == "XX") {
flags |= ExpireFlags::EXPIRE_XX;
} else if (arg_sv == "GT") {
flags |= ExpireFlags::EXPIRE_GT;
} else if (arg_sv == "LT") {
flags |= ExpireFlags::EXPIRE_LT;
} else {
cntx->SendError(absl::StrCat("Unsupported option: ", arg_sv));
return nullopt;
}
}
if ((flags & ExpireFlags::EXPIRE_NX) && (flags & ~ExpireFlags::EXPIRE_NX)) {
cntx->SendError("NX and XX, GT or LT options at the same time are not compatible");
return nullopt;
}
if ((flags & ExpireFlags::EXPIRE_GT) && (flags & ExpireFlags::EXPIRE_LT)) {
cntx->SendError("GT and LT options at the same time are not compatible");
return nullopt;
}
return flags;
}
void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Expire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
string_view sec = ArgS(args, 1);
int64_t int_arg;
if (!absl::SimpleAtoi(sec, &int_arg)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
}
int_arg = std::max<int64_t>(int_arg, -1);
@ -1022,7 +1083,7 @@ void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) {
int_arg = kMaxExpireDeadlineSec;
}
auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cntx);
auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder);
if (!expire_options) {
return;
}
@ -1032,21 +1093,21 @@ void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) {
return OpExpire(t->GetOpArgs(shard), key, params);
};
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
cntx->SendLong(status == OpStatus::OK);
OpStatus status = tx->ScheduleSingleHop(std::move(cb));
builder->SendLong(status == OpStatus::OK);
}
void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::ExpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
string_view sec = ArgS(args, 1);
int64_t int_arg;
if (!absl::SimpleAtoi(sec, &int_arg)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
}
int_arg = std::max<int64_t>(int_arg, 0L);
auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cntx);
auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder);
if (!expire_options) {
return;
}
@ -1056,16 +1117,17 @@ void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExpire(t->GetOpArgs(shard), key, params);
};
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
OpStatus status = tx->ScheduleSingleHop(std::move(cb));
if (status == OpStatus::OUT_OF_RANGE) {
return cntx->SendError(kExpiryOutOfRange);
} else {
cntx->SendLong(status == OpStatus::OK);
return builder->SendError(kExpiryOutOfRange);
}
builder->SendLong(status == OpStatus::OK);
}
void GenericFamily::Keys(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Keys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string_view pattern(ArgS(args, 0));
uint64_t cursor = 0;
@ -1083,23 +1145,24 @@ void GenericFamily::Keys(CmdArgList args, ConnectionContext* cntx) {
cursor = ScanGeneric(cursor, scan_opts, &keys, cntx);
} while (cursor != 0 && keys.size() < output_limit);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(keys.size());
for (const auto& k : keys) {
rb->SendBulkString(k);
}
}
void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::PexpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
string_view msec = ArgS(args, 1);
int64_t int_arg;
if (!absl::SimpleAtoi(msec, &int_arg)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
}
int_arg = std::max<int64_t>(int_arg, 0L);
auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cntx);
auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder);
if (!expire_options) {
return;
}
@ -1111,22 +1174,22 @@ void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExpire(t->GetOpArgs(shard), key, params);
};
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
OpStatus status = tx->ScheduleSingleHop(std::move(cb));
if (status == OpStatus::OUT_OF_RANGE) {
return cntx->SendError(kExpiryOutOfRange);
return builder->SendError(kExpiryOutOfRange);
} else {
cntx->SendLong(status == OpStatus::OK);
builder->SendLong(status == OpStatus::OK);
}
}
void GenericFamily::Pexpire(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Pexpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
string_view msec = ArgS(args, 1);
int64_t int_arg;
if (!absl::SimpleAtoi(msec, &int_arg)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
}
int_arg = std::max<int64_t>(int_arg, -1);
@ -1135,7 +1198,7 @@ void GenericFamily::Pexpire(CmdArgList args, ConnectionContext* cntx) {
int_arg = kMaxExpireDeadlineMs;
}
auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cntx);
auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder);
if (!expire_options) {
return;
}
@ -1145,17 +1208,16 @@ void GenericFamily::Pexpire(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExpire(t->GetOpArgs(shard), key, params);
};
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
OpStatus status = tx->ScheduleSingleHop(std::move(cb));
if (status == OpStatus::OUT_OF_RANGE) {
return cntx->SendError(kExpiryOutOfRange);
} else {
cntx->SendLong(status == OpStatus::OK);
return builder->SendError(kExpiryOutOfRange);
}
builder->SendLong(status == OpStatus::OK);
}
void GenericFamily::Stick(CmdArgList args, ConnectionContext* cntx) {
Transaction* transaction = cntx->transaction;
void GenericFamily::Stick(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
Transaction* transaction = tx;
VLOG(1) << "Stick " << ArgS(args, 0);
atomic_uint32_t result{0};
@ -1174,7 +1236,7 @@ void GenericFamily::Stick(CmdArgList args, ConnectionContext* cntx) {
DVLOG(2) << "Stick ts " << transaction->txid();
uint32_t match_cnt = result.load(memory_order_relaxed);
cntx->SendLong(match_cnt);
builder->SendLong(match_cnt);
}
// Used to conditionally store double score
@ -1295,7 +1357,7 @@ OpResultTyped<SortEntryList> OpFetchSortEntries(const OpArgs& op_args, std::stri
return success ? res : OpStatus::INVALID_NUMERIC_RESULT;
}
void GenericFamily::Sort(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Sort(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
std::string_view key = ArgS(args, 0);
bool alpha = false;
bool reversed = false;
@ -1312,37 +1374,37 @@ void GenericFamily::Sort(CmdArgList args, ConnectionContext* cntx) {
} else if (arg == "LIMIT") {
int offset, limit;
if (i + 2 >= args.size()) {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}
if (!absl::SimpleAtoi(ArgS(args, i + 1), &offset) ||
!absl::SimpleAtoi(ArgS(args, i + 2), &limit)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
}
bounds = {offset, limit};
i += 2;
} else {
LOG_EVERY_T(ERROR, 1) << "Unsupported option " << arg;
return cntx->SendError(kSyntaxErr, kSyntaxErrType);
return builder->SendError(kSyntaxErr, kSyntaxErrType);
}
}
OpResultTyped<SortEntryList> fetch_result =
cntx->transaction->ScheduleSingleHopT([&](Transaction* t, EngineShard* shard) {
tx->ScheduleSingleHopT([&](Transaction* t, EngineShard* shard) {
return OpFetchSortEntries(t->GetOpArgs(shard), key, alpha);
});
if (fetch_result == OpStatus::WRONG_TYPE)
return cntx->SendError(fetch_result.status());
return builder->SendError(fetch_result.status());
if (fetch_result.status() == OpStatus::INVALID_NUMERIC_RESULT)
return cntx->SendError("One or more scores can't be converted into double");
return builder->SendError("One or more scores can't be converted into double");
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (!fetch_result.ok())
return rb->SendEmptyArray();
auto result_type = fetch_result.type();
auto sort_call = [cntx, bounds, reversed, result_type](auto& entries) {
auto sort_call = [builder, bounds, reversed, result_type](auto& entries) {
using value_t = typename std::decay_t<decltype(entries)>::value_type;
auto cmp = reversed ? &value_t::greater : &value_t::less;
if (bounds) {
@ -1360,7 +1422,7 @@ void GenericFamily::Sort(CmdArgList args, ConnectionContext* cntx) {
}
bool is_set = (result_type == OBJ_SET || result_type == OBJ_ZSET);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartCollection(std::distance(start_it, end_it),
is_set ? RedisReplyBuilder::SET : RedisReplyBuilder::ARRAY);
@ -1372,21 +1434,21 @@ void GenericFamily::Sort(CmdArgList args, ConnectionContext* cntx) {
std::visit(std::move(sort_call), fetch_result.value());
}
void GenericFamily::Restore(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Restore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
std::string_view key = ArgS(args, 0);
std::string_view serialized_value = ArgS(args, 2);
auto rdb_version = GetRdbVersion(serialized_value);
if (!rdb_version) {
return cntx->SendError(kInvalidDumpValueErr);
return builder->SendError(kInvalidDumpValueErr);
}
OpResult<RestoreArgs> restore_args = RestoreArgs::TryFrom(args);
if (!restore_args) {
if (restore_args.status() == OpStatus::OUT_OF_RANGE) {
return cntx->SendError("Invalid IDLETIME value, must be >= 0");
return builder->SendError("Invalid IDLETIME value, must be >= 0");
} else {
return cntx->SendError(restore_args.status());
return builder->SendError(restore_args.status());
}
}
@ -1395,28 +1457,28 @@ void GenericFamily::Restore(CmdArgList args, ConnectionContext* cntx) {
rdb_version.value());
};
OpResult<bool> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpResult<bool> result = tx->ScheduleSingleHopT(std::move(cb));
if (result) {
if (result.value()) {
return cntx->SendOk();
return builder->SendOk();
} else {
return cntx->SendError("Bad data format");
return builder->SendError("Bad data format");
}
} else if (result.status() == OpStatus::KEY_EXISTS) {
return cntx->SendError("BUSYKEY: key name already exists.");
return builder->SendError("BUSYKEY: key name already exists.");
} else {
return cntx->SendError(result.status());
return builder->SendError(result.status());
}
}
void GenericFamily::FieldExpire(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::FieldExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
string_view key = parser.Next();
string_view ttl_str = parser.Next();
uint32_t ttl_sec;
if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
}
CmdArgList fields = parser.Tail();
@ -1424,8 +1486,8 @@ void GenericFamily::FieldExpire(CmdArgList args, ConnectionContext* cntx) {
return OpFieldExpire(t->GetOpArgs(shard), key, ttl_sec, fields);
};
OpResult<vector<long>> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
OpResult<vector<long>> result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (result) {
rb->StartArray(result->size());
const auto& array = result.value();
@ -1433,43 +1495,43 @@ void GenericFamily::FieldExpire(CmdArgList args, ConnectionContext* cntx) {
rb->SendLong(v);
}
} else {
cntx->SendError(result.status());
builder->SendError(result.status());
}
}
// Returns -2 if key not found, WRONG_TYPE if key is not a set or hash
// -1 if the field does not have associated TTL on it, and -3 if field is not found.
void GenericFamily::FieldTtl(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::FieldTtl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
string_view field = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) { return OpFieldTtl(t, shard, key, field); };
OpResult<long> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpResult<long> result = tx->ScheduleSingleHopT(std::move(cb));
if (result) {
cntx->SendLong(*result);
builder->SendLong(*result);
return;
}
cntx->SendError(result.status());
builder->SendError(result.status());
}
void GenericFamily::Move(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
string_view target_db_sv = ArgS(args, 1);
int64_t target_db;
int32_t target_db;
if (!absl::SimpleAtoi(target_db_sv, &target_db)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
}
if (target_db < 0 || target_db >= absl::GetFlag(FLAGS_dbnum)) {
return cntx->SendError(kDbIndOutOfRangeErr);
if (target_db < 0 || uint32_t(target_db) >= absl::GetFlag(FLAGS_dbnum)) {
return builder->SendError(kDbIndOutOfRangeErr);
}
if (target_db == cntx->db_index()) {
return cntx->SendError("source and destination objects are the same");
if (target_db == tx->GetDbIndex()) {
return builder->SendError("source and destination objects are the same");
}
OpStatus res = OpStatus::SKIPPED;
@ -1488,115 +1550,67 @@ void GenericFamily::Move(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK;
};
cntx->transaction->ScheduleSingleHop(std::move(cb));
tx->ScheduleSingleHop(std::move(cb));
// Exactly one shard will call OpMove.
DCHECK(res != OpStatus::SKIPPED);
cntx->SendLong(res == OpStatus::OK);
builder->SendLong(res == OpStatus::OK);
}
void GenericFamily::Rename(CmdArgList args, ConnectionContext* cntx) {
auto reply = RenameGeneric(args, false, cntx);
cntx->SendError(reply);
void GenericFamily::Rename(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
auto reply = RenameGeneric(args, false, tx);
builder->SendError(reply);
}
void GenericFamily::RenameNx(CmdArgList args, ConnectionContext* cntx) {
auto reply = RenameGeneric(args, true, cntx);
void GenericFamily::RenameNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
auto reply = RenameGeneric(args, true, tx);
if (!reply.status) {
cntx->SendError(reply);
builder->SendError(reply);
return;
}
OpStatus st = reply.status.value();
if (st == OpStatus::OK) {
cntx->SendLong(1);
builder->SendLong(1);
} else if (st == OpStatus::KEY_EXISTS) {
cntx->SendLong(0);
builder->SendLong(0);
} else {
cntx->SendError(reply);
builder->SendError(reply);
}
}
void GenericFamily::ExpireTime(CmdArgList args, ConnectionContext* cntx) {
ExpireTimeGeneric(args, cntx, TimeUnit::SEC);
void GenericFamily::ExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
ExpireTimeGeneric(args, TimeUnit::SEC, tx, builder);
}
void GenericFamily::PExpireTime(CmdArgList args, ConnectionContext* cntx) {
ExpireTimeGeneric(args, cntx, TimeUnit::MSEC);
void GenericFamily::PExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
ExpireTimeGeneric(args, TimeUnit::MSEC, tx, builder);
}
void GenericFamily::ExpireTimeGeneric(CmdArgList args, ConnectionContext* cntx, TimeUnit unit) {
string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) { return OpExpireTime(t, shard, key); };
OpResult<uint64_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
long ttl = (unit == TimeUnit::SEC) ? (result.value() + 500) / 1000 : result.value();
cntx->SendLong(ttl);
return;
}
switch (result.status()) {
case OpStatus::KEY_NOTFOUND:
cntx->SendLong(-2);
break;
default:
LOG_IF(ERROR, result.status() != OpStatus::SKIPPED)
<< "Unexpected status " << result.status();
cntx->SendLong(-1);
break;
}
void GenericFamily::Ttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
TtlGeneric(args, TimeUnit::SEC, tx, builder);
}
void GenericFamily::Ttl(CmdArgList args, ConnectionContext* cntx) {
TtlGeneric(args, cntx, TimeUnit::SEC);
void GenericFamily::Pttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
TtlGeneric(args, TimeUnit::MSEC, tx, builder);
}
void GenericFamily::Pttl(CmdArgList args, ConnectionContext* cntx) {
TtlGeneric(args, cntx, TimeUnit::MSEC);
}
void GenericFamily::TtlGeneric(CmdArgList args, ConnectionContext* cntx, TimeUnit unit) {
string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) { return OpTtl(t, shard, key); };
OpResult<uint64_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
long ttl = (unit == TimeUnit::SEC) ? (result.value() + 500) / 1000 : result.value();
cntx->SendLong(ttl);
return;
}
switch (result.status()) {
case OpStatus::KEY_NOTFOUND:
cntx->SendLong(-2);
break;
default:
LOG_IF(ERROR, result.status() != OpStatus::SKIPPED)
<< "Unexpected status " << result.status();
cntx->SendLong(-1);
break;
}
}
void GenericFamily::Select(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Select(CmdArgList args, Transaction*, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
int64_t index;
if (!absl::SimpleAtoi(key, &index)) {
return cntx->SendError(kInvalidDbIndErr);
return builder->SendError(kInvalidDbIndErr);
}
if (cluster::IsClusterEnabled() && index != 0) {
return cntx->SendError("SELECT is not allowed in cluster mode");
return builder->SendError("SELECT is not allowed in cluster mode");
}
if (index < 0 || index >= absl::GetFlag(FLAGS_dbnum)) {
return cntx->SendError(kDbIndOutOfRangeErr);
return builder->SendError(kDbIndOutOfRangeErr);
}
cntx->conn_state.db_index = index;
auto cb = [cntx, index](EngineShard* shard) {
CHECK(cntx->ns != nullptr);
auto& db_slice = cntx->ns->GetDbSlice(shard->shard_id());
auto cb = [ns = cntx->ns, index](EngineShard* shard) {
auto& db_slice = ns->GetDbSlice(shard->shard_id());
db_slice.ActivateDb(index);
return OpStatus::OK;
};
@ -1605,28 +1619,26 @@ void GenericFamily::Select(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendOk();
}
void GenericFamily::Dump(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Dump(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
std::string_view key = ArgS(args, 0);
DVLOG(1) << "Dumping before ::ScheduleSingleHopT " << key;
auto cb = [&](Transaction* t, EngineShard* shard) { return OpDump(t->GetOpArgs(shard), key); };
Transaction* trans = cntx->transaction;
OpResult<string> result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
OpResult<string> result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (result) {
DVLOG(1) << "Dump " << trans->DebugId() << ": " << key << ", dump size "
<< result.value().size();
DVLOG(1) << "Dump " << tx->DebugId() << ": " << key << ", dump size " << result.value().size();
rb->SendBulkString(*result);
} else {
rb->SendNull();
}
}
void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Type(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
std::string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<CompactObjType> {
auto& db_slice = cntx->ns->GetDbSlice(shard->shard_id());
auto& db_slice = t->GetDbSlice(shard->shard_id());
auto it = db_slice.FindReadOnly(t->GetDbContext(), key).it;
if (!it.is_done()) {
return it->second.ObjType();
@ -1634,67 +1646,48 @@ void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::KEY_NOTFOUND;
}
};
OpResult<CompactObjType> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpResult<CompactObjType> result = tx->ScheduleSingleHopT(std::move(cb));
if (!result) {
cntx->SendSimpleString("none");
builder->SendSimpleString("none");
} else {
cntx->SendSimpleString(ObjTypeToString(result.value()));
builder->SendSimpleString(ObjTypeToString(result.value()));
}
}
void GenericFamily::Time(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Time(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
uint64_t now_usec;
if (cntx->transaction) {
now_usec = cntx->transaction->GetDbContext().time_now_ms * 1000;
if (tx) {
now_usec = tx->GetDbContext().time_now_ms * 1000;
} else {
now_usec = absl::GetCurrentTimeNanos() / 1000;
}
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(2);
rb->SendLong(now_usec / 1000000);
rb->SendLong(now_usec % 1000000);
}
ErrorReply GenericFamily::RenameGeneric(CmdArgList args, bool destination_should_not_exist,
ConnectionContext* cntx) {
string_view key[2] = {ArgS(args, 0), ArgS(args, 1)};
Transaction* transaction = cntx->transaction;
if (transaction->GetUniqueShardCnt() == 1) {
transaction->ReviveAutoJournal(); // Safe to use RENAME with single shard
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRen(t->GetOpArgs(shard), key[0], key[1], destination_should_not_exist);
};
OpResult<void> result = transaction->ScheduleSingleHopT(std::move(cb));
return result.status();
}
Renamer renamer{transaction, key[0], key[1], shard_set->size()};
return renamer.Rename(destination_should_not_exist);
}
void GenericFamily::Echo(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Echo(CmdArgList args, Transaction*, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
return rb->SendBulkString(key);
}
// SCAN cursor [MATCH <glob>] [TYPE <type>] [COUNT <count>] [BUCKET <bucket_id>]
void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Scan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string_view token = ArgS(args, 0);
uint64_t cursor = 0;
if (!absl::SimpleAtoi(token, &cursor)) {
return cntx->SendError("invalid cursor");
return builder->SendError("invalid cursor");
}
OpResult<ScanOpts> ops = ScanOpts::TryFrom(args.subspan(1));
if (!ops) {
DVLOG(1) << "Scan invalid args - return " << ops << " to the user";
return cntx->SendError(ops.status());
return builder->SendError(ops.status());
}
ScanOpts scan_op = ops.value();
@ -1702,7 +1695,7 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
StringVec keys;
cursor = ScanGeneric(cursor, scan_op, &keys, cntx);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(2);
rb->SendBulkString(absl::StrCat(cursor));
rb->StartArray(keys.size());
@ -1723,7 +1716,8 @@ OpResult<uint32_t> GenericFamily::OpExists(const OpArgs& op_args, const ShardArg
return res;
}
void GenericFamily::RandomKey(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::RandomKey(CmdArgList args, Transaction*, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
const static size_t kMaxAttempts = 3;
absl::BitGen bitgen;
@ -1761,7 +1755,7 @@ void GenericFamily::RandomKey(CmdArgList args, ConnectionContext* cntx) {
auto candidates_count = candidates_counter.load(memory_order_relaxed);
std::optional<string> random_key = std::nullopt;
auto random_idx = absl::Uniform<size_t>(bitgen, 0, candidates_count);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
for (const auto& candidate : candidates_collection) {
if (random_idx >= candidate.size()) {
random_idx -= candidate.size();

View file

@ -6,28 +6,22 @@
#include "base/flags.h"
#include "facade/facade_types.h"
#include "server/common.h"
#include "server/tx_base.h"
ABSL_DECLARE_FLAG(uint32_t, dbnum);
namespace facade {
class SinkReplyBuilder;
};
namespace dfly {
using facade::ErrorReply;
using facade::CmdArgList;
using facade::OpResult;
using facade::OpStatus;
class ConnectionContext;
class CommandRegistry;
class EngineShard;
enum ExpireFlags {
EXPIRE_ALWAYS = 0,
EXPIRE_NX = 1 << 0, // Set expiry only when key has no expiry
EXPIRE_XX = 1 << 2, // Set expiry only when the key has expiry
EXPIRE_GT = 1 << 3, // GT: Set expiry only when the new expiry is greater than current one
EXPIRE_LT = 1 << 4, // LT: Set expiry only when the new expiry is less than current one
};
class Transaction;
class GenericFamily {
public:
@ -38,42 +32,43 @@ class GenericFamily {
static OpResult<uint32_t> OpDel(const OpArgs& op_args, const ShardArgs& keys);
private:
static void Del(CmdArgList args, ConnectionContext* cntx);
static void Ping(CmdArgList args, ConnectionContext* cntx);
static void Exists(CmdArgList args, ConnectionContext* cntx);
static void Expire(CmdArgList args, ConnectionContext* cntx);
static void ExpireAt(CmdArgList args, ConnectionContext* cntx);
static void Persist(CmdArgList args, ConnectionContext* cntx);
static void Keys(CmdArgList args, ConnectionContext* cntx);
static void PexpireAt(CmdArgList args, ConnectionContext* cntx);
static void Pexpire(CmdArgList args, ConnectionContext* cntx);
static void Stick(CmdArgList args, ConnectionContext* cntx);
static void Sort(CmdArgList args, ConnectionContext* cntx);
static void Move(CmdArgList args, ConnectionContext* cntx);
using SinkReplyBuilder = facade::SinkReplyBuilder;
static void Rename(CmdArgList args, ConnectionContext* cntx);
static void RenameNx(CmdArgList args, ConnectionContext* cntx);
static void ExpireTime(CmdArgList args, ConnectionContext* cntx);
static void PExpireTime(CmdArgList args, ConnectionContext* cntx);
static void Ttl(CmdArgList args, ConnectionContext* cntx);
static void Pttl(CmdArgList args, ConnectionContext* cntx);
static void Echo(CmdArgList args, ConnectionContext* cntx);
static void Select(CmdArgList args, ConnectionContext* cntx);
static void Scan(CmdArgList args, ConnectionContext* cntx);
static void Time(CmdArgList args, ConnectionContext* cntx);
static void Type(CmdArgList args, ConnectionContext* cntx);
static void Dump(CmdArgList args, ConnectionContext* cntx);
static void Restore(CmdArgList args, ConnectionContext* cntx);
static void RandomKey(CmdArgList args, ConnectionContext* cntx);
static void FieldTtl(CmdArgList args, ConnectionContext* cntx);
static void FieldExpire(CmdArgList args, ConnectionContext* cntx);
static ErrorReply RenameGeneric(CmdArgList args, bool destination_should_not_exist,
static void Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Ping(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
static void Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Expire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void ExpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Persist(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Keys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
static void PexpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Pexpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Stick(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Sort(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void ExpireTimeGeneric(CmdArgList args, ConnectionContext* cntx, TimeUnit unit);
static void TtlGeneric(CmdArgList args, ConnectionContext* cntx, TimeUnit unit);
static void Rename(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void RenameNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void ExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void PExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Ttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Pttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Echo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Select(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
static void Scan(CmdArgList args, Transaction*, SinkReplyBuilder* builder,
ConnectionContext* cntx);
static void Time(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Type(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Dump(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Restore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void RandomKey(CmdArgList args, Transaction*, SinkReplyBuilder* builder,
ConnectionContext* cntx);
static void FieldTtl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void FieldExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
};
} // namespace dfly