1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: change the interface of hll, generic and list commands (#4227)

This commit is contained in:
Roman Gershman 2024-11-29 14:02:51 +02:00 committed by GitHub
parent 3ad5b387bd
commit a4b3724929
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 244 additions and 254 deletions

View file

@ -995,10 +995,11 @@ OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs&
return res; return res;
} }
void GenericFamily::Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) {
VLOG(1) << "Del " << ArgS(args, 0); VLOG(1) << "Del " << ArgS(args, 0);
atomic_uint32_t result{0}; atomic_uint32_t result{0};
auto* builder = cmd_cntx.rb;
bool is_mc = (builder->GetProtocol() == Protocol::MEMCACHE); bool is_mc = (builder->GetProtocol() == Protocol::MEMCACHE);
auto cb = [&result](const Transaction* t, EngineShard* shard) { auto cb = [&result](const Transaction* t, EngineShard* shard) {
@ -1009,10 +1010,10 @@ void GenericFamily::Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
return OpStatus::OK; return OpStatus::OK;
}; };
OpStatus status = tx->ScheduleSingleHop(std::move(cb)); OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
CHECK_EQ(OpStatus::OK, status); CHECK_EQ(OpStatus::OK, status);
DVLOG(2) << "Del ts " << tx->txid(); DVLOG(2) << "Del ts " << cmd_cntx.tx->txid();
uint32_t del_cnt = result.load(memory_order_relaxed); uint32_t del_cnt = result.load(memory_order_relaxed);
if (is_mc) { if (is_mc) {
@ -1029,18 +1030,16 @@ void GenericFamily::Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
} }
} }
void GenericFamily::Ping(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void GenericFamily::Ping(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) { auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (args.size() > 1) { if (args.size() > 1) {
return builder->SendError(facade::WrongNumArgsError("ping"), kSyntaxErrType); return rb->SendError(facade::WrongNumArgsError("ping"), kSyntaxErrType);
} }
string_view msg; string_view msg;
auto* rb = static_cast<RedisReplyBuilder*>(builder);
// If a client in the subscribe state and in resp2 mode, it returns an array for some reason. // If a client in the subscribe state and in resp2 mode, it returns an array for some reason.
if (cntx->conn_state.subscribe_info && !rb->IsResp3()) { if (cmd_cntx.conn_cntx->conn_state.subscribe_info && !rb->IsResp3()) {
if (args.size() == 1) { if (args.size() == 1) {
msg = ArgS(args, 0); msg = ArgS(args, 0);
} }
@ -1050,16 +1049,16 @@ void GenericFamily::Ping(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
} }
if (args.size() == 0) { if (args.size() == 0) {
return builder->SendSimpleString("PONG"); return rb->SendSimpleString("PONG");
} else {
msg = ArgS(args, 0);
DVLOG(2) << "Ping " << msg;
return rb->SendBulkString(msg);
} }
msg = ArgS(args, 0);
DVLOG(2) << "Ping " << msg;
return rb->SendBulkString(msg);
} }
void GenericFamily::Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Exists(CmdArgList args, const CommandContext& cmd_cntx) {
VLOG(1) << "Exists " << ArgS(args, 0); VLOG(1) << "Exists " << ArgS(args, 0);
atomic_uint32_t result{0}; atomic_uint32_t result{0};
@ -1072,28 +1071,28 @@ void GenericFamily::Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* b
return OpStatus::OK; return OpStatus::OK;
}; };
OpStatus status = tx->ScheduleSingleHop(std::move(cb)); OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
CHECK_EQ(OpStatus::OK, status); CHECK_EQ(OpStatus::OK, status);
return builder->SendLong(result.load(memory_order_acquire)); return cmd_cntx.rb->SendLong(result.load(memory_order_acquire));
} }
void GenericFamily::Persist(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Persist(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) { return OpPersist(t->GetOpArgs(shard), key); }; auto cb = [&](Transaction* t, EngineShard* shard) { return OpPersist(t->GetOpArgs(shard), key); };
OpStatus status = tx->ScheduleSingleHop(std::move(cb)); OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
builder->SendLong(status == OpStatus::OK); cmd_cntx.rb->SendLong(status == OpStatus::OK);
} }
void GenericFamily::Expire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Expire(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view sec = ArgS(args, 1); string_view sec = ArgS(args, 1);
int64_t int_arg; int64_t int_arg;
if (!absl::SimpleAtoi(sec, &int_arg)) { if (!absl::SimpleAtoi(sec, &int_arg)) {
return builder->SendError(kInvalidIntErr); return cmd_cntx.rb->SendError(kInvalidIntErr);
} }
int_arg = std::max<int64_t>(int_arg, -1); int_arg = std::max<int64_t>(int_arg, -1);
@ -1103,7 +1102,7 @@ void GenericFamily::Expire(CmdArgList args, Transaction* tx, SinkReplyBuilder* b
int_arg = kMaxExpireDeadlineSec; int_arg = kMaxExpireDeadlineSec;
} }
auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder); auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cmd_cntx.rb);
if (!expire_options) { if (!expire_options) {
return; return;
} }
@ -1113,21 +1112,21 @@ void GenericFamily::Expire(CmdArgList args, Transaction* tx, SinkReplyBuilder* b
return OpExpire(t->GetOpArgs(shard), key, params); return OpExpire(t->GetOpArgs(shard), key, params);
}; };
OpStatus status = tx->ScheduleSingleHop(std::move(cb)); OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
builder->SendLong(status == OpStatus::OK); cmd_cntx.rb->SendLong(status == OpStatus::OK);
} }
void GenericFamily::ExpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::ExpireAt(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view sec = ArgS(args, 1); string_view sec = ArgS(args, 1);
int64_t int_arg; int64_t int_arg;
if (!absl::SimpleAtoi(sec, &int_arg)) { if (!absl::SimpleAtoi(sec, &int_arg)) {
return builder->SendError(kInvalidIntErr); return cmd_cntx.rb->SendError(kInvalidIntErr);
} }
int_arg = std::max<int64_t>(int_arg, 0L); int_arg = std::max<int64_t>(int_arg, 0L);
auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder); auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cmd_cntx.rb);
if (!expire_options) { if (!expire_options) {
return; return;
} }
@ -1137,17 +1136,16 @@ void GenericFamily::ExpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder*
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExpire(t->GetOpArgs(shard), key, params); return OpExpire(t->GetOpArgs(shard), key, params);
}; };
OpStatus status = tx->ScheduleSingleHop(std::move(cb)); OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
if (status == OpStatus::OUT_OF_RANGE) { if (status == OpStatus::OUT_OF_RANGE) {
return builder->SendError(kExpiryOutOfRange); return cmd_cntx.rb->SendError(kExpiryOutOfRange);
} }
builder->SendLong(status == OpStatus::OK); cmd_cntx.rb->SendLong(status == OpStatus::OK);
} }
void GenericFamily::Keys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void GenericFamily::Keys(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) {
string_view pattern(ArgS(args, 0)); string_view pattern(ArgS(args, 0));
uint64_t cursor = 0; uint64_t cursor = 0;
@ -1162,27 +1160,27 @@ void GenericFamily::Keys(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
auto output_limit = absl::GetFlag(FLAGS_keys_output_limit); auto output_limit = absl::GetFlag(FLAGS_keys_output_limit);
do { do {
cursor = ScanGeneric(cursor, scan_opts, &keys, cntx); cursor = ScanGeneric(cursor, scan_opts, &keys, cmd_cntx.conn_cntx);
} while (cursor != 0 && keys.size() < output_limit); } while (cursor != 0 && keys.size() < output_limit);
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
rb->StartArray(keys.size()); rb->StartArray(keys.size());
for (const auto& k : keys) { for (const auto& k : keys) {
rb->SendBulkString(k); rb->SendBulkString(k);
} }
} }
void GenericFamily::PexpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::PexpireAt(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view msec = ArgS(args, 1); string_view msec = ArgS(args, 1);
int64_t int_arg; int64_t int_arg;
if (!absl::SimpleAtoi(msec, &int_arg)) { if (!absl::SimpleAtoi(msec, &int_arg)) {
return builder->SendError(kInvalidIntErr); return cmd_cntx.rb->SendError(kInvalidIntErr);
} }
int_arg = std::max<int64_t>(int_arg, 0L); int_arg = std::max<int64_t>(int_arg, 0L);
auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder); auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cmd_cntx.rb);
if (!expire_options) { if (!expire_options) {
return; return;
} }
@ -1194,22 +1192,22 @@ void GenericFamily::PexpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExpire(t->GetOpArgs(shard), key, params); return OpExpire(t->GetOpArgs(shard), key, params);
}; };
OpStatus status = tx->ScheduleSingleHop(std::move(cb)); OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
if (status == OpStatus::OUT_OF_RANGE) { if (status == OpStatus::OUT_OF_RANGE) {
return builder->SendError(kExpiryOutOfRange); return cmd_cntx.rb->SendError(kExpiryOutOfRange);
} else { } else {
builder->SendLong(status == OpStatus::OK); cmd_cntx.rb->SendLong(status == OpStatus::OK);
} }
} }
void GenericFamily::Pexpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Pexpire(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view msec = ArgS(args, 1); string_view msec = ArgS(args, 1);
int64_t int_arg; int64_t int_arg;
if (!absl::SimpleAtoi(msec, &int_arg)) { if (!absl::SimpleAtoi(msec, &int_arg)) {
return builder->SendError(kInvalidIntErr); return cmd_cntx.rb->SendError(kInvalidIntErr);
} }
int_arg = std::max<int64_t>(int_arg, -1); int_arg = std::max<int64_t>(int_arg, -1);
@ -1218,7 +1216,7 @@ void GenericFamily::Pexpire(CmdArgList args, Transaction* tx, SinkReplyBuilder*
int_arg = kMaxExpireDeadlineMs; int_arg = kMaxExpireDeadlineMs;
} }
auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder); auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cmd_cntx.rb);
if (!expire_options) { if (!expire_options) {
return; return;
} }
@ -1228,16 +1226,16 @@ void GenericFamily::Pexpire(CmdArgList args, Transaction* tx, SinkReplyBuilder*
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExpire(t->GetOpArgs(shard), key, params); return OpExpire(t->GetOpArgs(shard), key, params);
}; };
OpStatus status = tx->ScheduleSingleHop(std::move(cb)); OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
if (status == OpStatus::OUT_OF_RANGE) { if (status == OpStatus::OUT_OF_RANGE) {
return builder->SendError(kExpiryOutOfRange); return cmd_cntx.rb->SendError(kExpiryOutOfRange);
} }
builder->SendLong(status == OpStatus::OK); cmd_cntx.rb->SendLong(status == OpStatus::OK);
} }
void GenericFamily::Stick(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Stick(CmdArgList args, const CommandContext& cmd_cntx) {
Transaction* transaction = tx; Transaction* transaction = cmd_cntx.tx;
VLOG(1) << "Stick " << ArgS(args, 0); VLOG(1) << "Stick " << ArgS(args, 0);
atomic_uint32_t result{0}; atomic_uint32_t result{0};
@ -1256,7 +1254,7 @@ void GenericFamily::Stick(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
DVLOG(2) << "Stick ts " << transaction->txid(); DVLOG(2) << "Stick ts " << transaction->txid();
uint32_t match_cnt = result.load(memory_order_relaxed); uint32_t match_cnt = result.load(memory_order_relaxed);
builder->SendLong(match_cnt); cmd_cntx.rb->SendLong(match_cnt);
} }
// Used to conditionally store double score // Used to conditionally store double score
@ -1377,12 +1375,12 @@ OpResultTyped<SortEntryList> OpFetchSortEntries(const OpArgs& op_args, std::stri
return success ? res : OpStatus::INVALID_NUMERIC_RESULT; return success ? res : OpStatus::INVALID_NUMERIC_RESULT;
} }
void GenericFamily::Sort(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Sort(CmdArgList args, const CommandContext& cmd_cntx) {
std::string_view key = ArgS(args, 0); std::string_view key = ArgS(args, 0);
bool alpha = false; bool alpha = false;
bool reversed = false; bool reversed = false;
std::optional<std::pair<size_t, size_t>> bounds; std::optional<std::pair<size_t, size_t>> bounds;
auto* builder = cmd_cntx.rb;
for (size_t i = 1; i < args.size(); i++) { for (size_t i = 1; i < args.size(); i++) {
string arg = absl::AsciiStrToUpper(ArgS(args, i)); string arg = absl::AsciiStrToUpper(ArgS(args, i));
if (arg == "ALPHA") { if (arg == "ALPHA") {
@ -1409,7 +1407,7 @@ void GenericFamily::Sort(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
} }
OpResultTyped<SortEntryList> fetch_result = OpResultTyped<SortEntryList> fetch_result =
tx->ScheduleSingleHopT([&](Transaction* t, EngineShard* shard) { cmd_cntx.tx->ScheduleSingleHopT([&](Transaction* t, EngineShard* shard) {
return OpFetchSortEntries(t->GetOpArgs(shard), key, alpha); return OpFetchSortEntries(t->GetOpArgs(shard), key, alpha);
}); });
@ -1454,11 +1452,12 @@ void GenericFamily::Sort(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
std::visit(std::move(sort_call), fetch_result.value()); std::visit(std::move(sort_call), fetch_result.value());
} }
void GenericFamily::Restore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Restore(CmdArgList args, const CommandContext& cmd_cntx) {
std::string_view key = ArgS(args, 0); std::string_view key = ArgS(args, 0);
std::string_view serialized_value = ArgS(args, 2); std::string_view serialized_value = ArgS(args, 2);
auto rdb_version = GetRdbVersion(serialized_value); auto rdb_version = GetRdbVersion(serialized_value);
auto* builder = cmd_cntx.rb;
if (!rdb_version) { if (!rdb_version) {
return builder->SendError(kInvalidDumpValueErr); return builder->SendError(kInvalidDumpValueErr);
} }
@ -1477,7 +1476,7 @@ void GenericFamily::Restore(CmdArgList args, Transaction* tx, SinkReplyBuilder*
rdb_version.value()); rdb_version.value());
}; };
OpResult<bool> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<bool> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
if (result.value()) { if (result.value()) {
@ -1492,13 +1491,14 @@ void GenericFamily::Restore(CmdArgList args, Transaction* tx, SinkReplyBuilder*
} }
} }
void GenericFamily::FieldExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::FieldExpire(CmdArgList args, const CommandContext& cmd_cntx) {
CmdArgParser parser{args}; CmdArgParser parser{args};
string_view key = parser.Next(); string_view key = parser.Next();
string_view ttl_str = parser.Next(); string_view ttl_str = parser.Next();
uint32_t ttl_sec; uint32_t ttl_sec;
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) { if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) {
return builder->SendError(kInvalidIntErr); return rb->SendError(kInvalidIntErr);
} }
CmdArgList fields = parser.Tail(); CmdArgList fields = parser.Tail();
@ -1506,8 +1506,8 @@ void GenericFamily::FieldExpire(CmdArgList args, Transaction* tx, SinkReplyBuild
return OpFieldExpire(t->GetOpArgs(shard), key, ttl_sec, fields); return OpFieldExpire(t->GetOpArgs(shard), key, ttl_sec, fields);
}; };
OpResult<vector<long>> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<vector<long>> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (result) { if (result) {
rb->StartArray(result->size()); rb->StartArray(result->size());
const auto& array = result.value(); const auto& array = result.value();
@ -1515,33 +1515,33 @@ void GenericFamily::FieldExpire(CmdArgList args, Transaction* tx, SinkReplyBuild
rb->SendLong(v); rb->SendLong(v);
} }
} else { } else {
builder->SendError(result.status()); rb->SendError(result.status());
} }
} }
// Returns -2 if key not found, WRONG_TYPE if key is not a set or hash // Returns -2 if key not found, WRONG_TYPE if key is not a set or hash
// -1 if the field does not have associated TTL on it, and -3 if field is not found. // -1 if the field does not have associated TTL on it, and -3 if field is not found.
void GenericFamily::FieldTtl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::FieldTtl(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view field = ArgS(args, 1); string_view field = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) { return OpFieldTtl(t, shard, key, field); }; auto cb = [&](Transaction* t, EngineShard* shard) { return OpFieldTtl(t, shard, key, field); };
OpResult<long> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<long> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
builder->SendLong(*result); cmd_cntx.rb->SendLong(*result);
return; return;
} }
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
void GenericFamily::Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Move(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view target_db_sv = ArgS(args, 1); string_view target_db_sv = ArgS(args, 1);
int32_t target_db; int32_t target_db;
auto* builder = cmd_cntx.rb;
if (!absl::SimpleAtoi(target_db_sv, &target_db)) { if (!absl::SimpleAtoi(target_db_sv, &target_db)) {
return builder->SendError(kInvalidIntErr); return builder->SendError(kInvalidIntErr);
} }
@ -1550,7 +1550,7 @@ void GenericFamily::Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
return builder->SendError(kDbIndOutOfRangeErr); return builder->SendError(kDbIndOutOfRangeErr);
} }
if (target_db == tx->GetDbIndex()) { if (target_db == cmd_cntx.tx->GetDbIndex()) {
return builder->SendError("source and destination objects are the same"); return builder->SendError("source and destination objects are the same");
} }
@ -1570,20 +1570,20 @@ void GenericFamily::Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
return OpStatus::OK; return OpStatus::OK;
}; };
tx->ScheduleSingleHop(std::move(cb)); cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
// Exactly one shard will call OpMove. // Exactly one shard will call OpMove.
DCHECK(res != OpStatus::SKIPPED); DCHECK(res != OpStatus::SKIPPED);
builder->SendLong(res == OpStatus::OK); builder->SendLong(res == OpStatus::OK);
} }
void GenericFamily::Rename(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Rename(CmdArgList args, const CommandContext& cmd_cntx) {
auto reply = RenameGeneric(args, false, tx); auto reply = RenameGeneric(args, false, cmd_cntx.tx);
builder->SendError(reply); cmd_cntx.rb->SendError(reply);
} }
void GenericFamily::RenameNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::RenameNx(CmdArgList args, const CommandContext& cmd_cntx) {
auto reply = RenameGeneric(args, true, tx); auto reply = RenameGeneric(args, true, cmd_cntx.tx);
auto* builder = cmd_cntx.rb;
if (!reply.status) { if (!reply.status) {
builder->SendError(reply); builder->SendError(reply);
return; return;
@ -1599,26 +1599,26 @@ void GenericFamily::RenameNx(CmdArgList args, Transaction* tx, SinkReplyBuilder*
} }
} }
void GenericFamily::ExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::ExpireTime(CmdArgList args, const CommandContext& cmd_cntx) {
ExpireTimeGeneric(args, TimeUnit::SEC, tx, builder); ExpireTimeGeneric(args, TimeUnit::SEC, cmd_cntx.tx, cmd_cntx.rb);
} }
void GenericFamily::PExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::PExpireTime(CmdArgList args, const CommandContext& cmd_cntx) {
ExpireTimeGeneric(args, TimeUnit::MSEC, tx, builder); ExpireTimeGeneric(args, TimeUnit::MSEC, cmd_cntx.tx, cmd_cntx.rb);
} }
void GenericFamily::Ttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Ttl(CmdArgList args, const CommandContext& cmd_cntx) {
TtlGeneric(args, TimeUnit::SEC, tx, builder); TtlGeneric(args, TimeUnit::SEC, cmd_cntx.tx, cmd_cntx.rb);
} }
void GenericFamily::Pttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Pttl(CmdArgList args, const CommandContext& cmd_cntx) {
TtlGeneric(args, TimeUnit::MSEC, tx, builder); TtlGeneric(args, TimeUnit::MSEC, cmd_cntx.tx, cmd_cntx.rb);
} }
void GenericFamily::Select(CmdArgList args, Transaction*, SinkReplyBuilder* builder, void GenericFamily::Select(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
int64_t index; int64_t index;
auto* builder = cmd_cntx.rb;
if (!absl::SimpleAtoi(key, &index)) { if (!absl::SimpleAtoi(key, &index)) {
return builder->SendError(kInvalidDbIndErr); return builder->SendError(kInvalidDbIndErr);
} }
@ -1628,7 +1628,7 @@ void GenericFamily::Select(CmdArgList args, Transaction*, SinkReplyBuilder* buil
if (index < 0 || index >= absl::GetFlag(FLAGS_dbnum)) { if (index < 0 || index >= absl::GetFlag(FLAGS_dbnum)) {
return builder->SendError(kDbIndOutOfRangeErr); return builder->SendError(kDbIndOutOfRangeErr);
} }
auto* cntx = cmd_cntx.conn_cntx;
if (cntx->conn_state.db_index == index) { if (cntx->conn_state.db_index == index) {
// accept a noop. // accept a noop.
return builder->SendOk(); return builder->SendOk();
@ -1649,22 +1649,23 @@ void GenericFamily::Select(CmdArgList args, Transaction*, SinkReplyBuilder* buil
return builder->SendOk(); return builder->SendOk();
} }
void GenericFamily::Dump(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Dump(CmdArgList args, const CommandContext& cmd_cntx) {
std::string_view key = ArgS(args, 0); std::string_view key = ArgS(args, 0);
DVLOG(1) << "Dumping before ::ScheduleSingleHopT " << key; DVLOG(1) << "Dumping before ::ScheduleSingleHopT " << key;
auto cb = [&](Transaction* t, EngineShard* shard) { return OpDump(t->GetOpArgs(shard), key); }; auto cb = [&](Transaction* t, EngineShard* shard) { return OpDump(t->GetOpArgs(shard), key); };
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
OpResult<string> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
OpResult<string> result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (result) { if (result) {
DVLOG(1) << "Dump " << tx->DebugId() << ": " << key << ", dump size " << result.value().size(); DVLOG(1) << "Dump " << cmd_cntx.tx->DebugId() << ": " << key << ", dump size "
<< result.value().size();
rb->SendBulkString(*result); rb->SendBulkString(*result);
} else { } else {
rb->SendNull(); rb->SendNull();
} }
} }
void GenericFamily::Type(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Type(CmdArgList args, const CommandContext& cmd_cntx) {
std::string_view key = ArgS(args, 0); std::string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<CompactObjType> { auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<CompactObjType> {
@ -1676,40 +1677,39 @@ void GenericFamily::Type(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
return OpStatus::KEY_NOTFOUND; return OpStatus::KEY_NOTFOUND;
} }
}; };
OpResult<CompactObjType> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<CompactObjType> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (!result) { if (!result) {
builder->SendSimpleString("none"); cmd_cntx.rb->SendSimpleString("none");
} else { } else {
builder->SendSimpleString(ObjTypeToString(result.value())); cmd_cntx.rb->SendSimpleString(ObjTypeToString(result.value()));
} }
} }
void GenericFamily::Time(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void GenericFamily::Time(CmdArgList args, const CommandContext& cmd_cntx) {
uint64_t now_usec; uint64_t now_usec;
if (tx) { if (cmd_cntx.tx) {
now_usec = tx->GetDbContext().time_now_ms * 1000; now_usec = cmd_cntx.tx->GetDbContext().time_now_ms * 1000;
} else { } else {
now_usec = absl::GetCurrentTimeNanos() / 1000; now_usec = absl::GetCurrentTimeNanos() / 1000;
} }
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
rb->StartArray(2); rb->StartArray(2);
rb->SendLong(now_usec / 1000000); rb->SendLong(now_usec / 1000000);
rb->SendLong(now_usec % 1000000); rb->SendLong(now_usec % 1000000);
} }
void GenericFamily::Echo(CmdArgList args, Transaction*, SinkReplyBuilder* builder) { void GenericFamily::Echo(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
return rb->SendBulkString(key); return rb->SendBulkString(key);
} }
// SCAN cursor [MATCH <glob>] [TYPE <type>] [COUNT <count>] [BUCKET <bucket_id>] // SCAN cursor [MATCH <glob>] [TYPE <type>] [COUNT <count>] [BUCKET <bucket_id>]
void GenericFamily::Scan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void GenericFamily::Scan(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) {
string_view token = ArgS(args, 0); string_view token = ArgS(args, 0);
uint64_t cursor = 0; uint64_t cursor = 0;
auto* builder = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (!absl::SimpleAtoi(token, &cursor)) { if (!absl::SimpleAtoi(token, &cursor)) {
return builder->SendError("invalid cursor"); return builder->SendError("invalid cursor");
} }
@ -1723,14 +1723,13 @@ void GenericFamily::Scan(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
ScanOpts scan_op = ops.value(); ScanOpts scan_op = ops.value();
StringVec keys; StringVec keys;
cursor = ScanGeneric(cursor, scan_op, &keys, cntx); cursor = ScanGeneric(cursor, scan_op, &keys, cmd_cntx.conn_cntx);
auto* rb = static_cast<RedisReplyBuilder*>(builder); builder->StartArray(2);
rb->StartArray(2); builder->SendBulkString(absl::StrCat(cursor));
rb->SendBulkString(absl::StrCat(cursor)); builder->StartArray(keys.size());
rb->StartArray(keys.size());
for (const auto& k : keys) { for (const auto& k : keys) {
rb->SendBulkString(k); builder->SendBulkString(k);
} }
} }
@ -1746,12 +1745,12 @@ OpResult<uint32_t> GenericFamily::OpExists(const OpArgs& op_args, const ShardArg
return res; return res;
} }
void GenericFamily::RandomKey(CmdArgList args, Transaction*, SinkReplyBuilder* builder, void GenericFamily::RandomKey(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) {
const static size_t kMaxAttempts = 3; const static size_t kMaxAttempts = 3;
absl::BitGen bitgen; absl::BitGen bitgen;
atomic_size_t candidates_counter{0}; atomic_size_t candidates_counter{0};
auto* cntx = cmd_cntx.conn_cntx;
DbContext db_cntx{cntx->ns, cntx->conn_state.db_index, GetCurrentTimeMs()}; DbContext db_cntx{cntx->ns, cntx->conn_state.db_index, GetCurrentTimeMs()};
ScanOpts scan_opts; ScanOpts scan_opts;
scan_opts.limit = 3; // number of entries per shard scan_opts.limit = 3; // number of entries per shard
@ -1785,7 +1784,7 @@ void GenericFamily::RandomKey(CmdArgList args, Transaction*, SinkReplyBuilder* b
auto candidates_count = candidates_counter.load(memory_order_relaxed); auto candidates_count = candidates_counter.load(memory_order_relaxed);
std::optional<string> random_key = std::nullopt; std::optional<string> random_key = std::nullopt;
auto random_idx = absl::Uniform<size_t>(bitgen, 0, candidates_count); auto random_idx = absl::Uniform<size_t>(bitgen, 0, candidates_count);
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
for (const auto& candidate : candidates_collection) { for (const auto& candidate : candidates_collection) {
if (random_idx >= candidate.size()) { if (random_idx >= candidate.size()) {
random_idx -= candidate.size(); random_idx -= candidate.size();

View file

@ -22,6 +22,7 @@ using facade::OpResult;
class ConnectionContext; class ConnectionContext;
class CommandRegistry; class CommandRegistry;
class Transaction; class Transaction;
struct CommandContext;
class GenericFamily { class GenericFamily {
public: public:
@ -34,41 +35,36 @@ class GenericFamily {
private: private:
using SinkReplyBuilder = facade::SinkReplyBuilder; using SinkReplyBuilder = facade::SinkReplyBuilder;
static void Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Del(CmdArgList args, const CommandContext& cmd_cntx);
static void Ping(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, static void Ping(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx); static void Exists(CmdArgList args, const CommandContext& cmd_cntx);
static void Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Expire(CmdArgList args, const CommandContext& cmd_cntx);
static void Expire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void ExpireAt(CmdArgList args, const CommandContext& cmd_cntx);
static void ExpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Persist(CmdArgList args, const CommandContext& cmd_cntx);
static void Persist(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Keys(CmdArgList args, const CommandContext& cmd_cntx);
static void Keys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, static void PexpireAt(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx); static void Pexpire(CmdArgList args, const CommandContext& cmd_cntx);
static void PexpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Stick(CmdArgList args, const CommandContext& cmd_cntx);
static void Pexpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Sort(CmdArgList args, const CommandContext& cmd_cntx);
static void Stick(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Move(CmdArgList args, const CommandContext& cmd_cntx);
static void Sort(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Rename(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Rename(CmdArgList args, const CommandContext& cmd_cntx);
static void RenameNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void RenameNx(CmdArgList args, const CommandContext& cmd_cntx);
static void ExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void ExpireTime(CmdArgList args, const CommandContext& cmd_cntx);
static void PExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void PExpireTime(CmdArgList args, const CommandContext& cmd_cntx);
static void Ttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Ttl(CmdArgList args, const CommandContext& cmd_cntx);
static void Pttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Pttl(CmdArgList args, const CommandContext& cmd_cntx);
static void Echo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Echo(CmdArgList args, const CommandContext& cmd_cntx);
static void Select(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, static void Select(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx); static void Scan(CmdArgList args, const CommandContext& cmd_cntx);
static void Scan(CmdArgList args, Transaction*, SinkReplyBuilder* builder, static void Time(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx); static void Type(CmdArgList args, const CommandContext& cmd_cntx);
static void Time(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Dump(CmdArgList args, const CommandContext& cmd_cntx);
static void Type(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void Restore(CmdArgList args, const CommandContext& cmd_cntx);
static void Dump(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void RandomKey(CmdArgList args, const CommandContext& cmd_cntx);
static void Restore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void FieldTtl(CmdArgList args, const CommandContext& cmd_cntx);
static void RandomKey(CmdArgList args, Transaction*, SinkReplyBuilder* builder, static void FieldExpire(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx);
static void FieldTtl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void FieldExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
}; };
} // namespace dfly } // namespace dfly

View file

@ -125,7 +125,7 @@ OpResult<int> AddToHll(const OpArgs& op_args, string_view key, CmdArgList values
return std::min(updated, 1); return std::min(updated, 1);
} }
void PFAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void PFAdd(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
args.remove_prefix(1); args.remove_prefix(1);
@ -133,8 +133,8 @@ void PFAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return AddToHll(t->GetOpArgs(shard), key, args); return AddToHll(t->GetOpArgs(shard), key, args);
}; };
OpResult<int> res = tx->ScheduleSingleHopT(std::move(cb)); OpResult<int> res = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder); HandleOpValueResult(res, cmd_cntx.rb);
} }
OpResult<int64_t> CountHllsSingle(const OpArgs& op_args, string_view key) { OpResult<int64_t> CountHllsSingle(const OpArgs& op_args, string_view key) {
@ -204,7 +204,7 @@ vector<HllBufferPtr> ConvertShardVector(const vector<vector<string>>& hlls) {
return ptrs; return ptrs;
} }
OpResult<int64_t> PFCountMulti(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { OpResult<int64_t> PFCountMulti(CmdArgList args, const CommandContext& cmd_cntx) {
vector<vector<string>> hlls; vector<vector<string>> hlls;
hlls.resize(shard_set->size()); hlls.resize(shard_set->size());
@ -218,7 +218,7 @@ OpResult<int64_t> PFCountMulti(CmdArgList args, Transaction* tx, SinkReplyBuilde
return result.status(); return result.status();
}; };
tx->ScheduleSingleHop(std::move(cb)); cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
vector<HllBufferPtr> ptrs = ConvertShardVector(hlls); vector<HllBufferPtr> ptrs = ConvertShardVector(hlls);
int64_t pf_count = pfcountMulti(ptrs.data(), ptrs.size()); int64_t pf_count = pfcountMulti(ptrs.data(), ptrs.size());
@ -229,17 +229,17 @@ OpResult<int64_t> PFCountMulti(CmdArgList args, Transaction* tx, SinkReplyBuilde
} }
} }
void PFCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void PFCount(CmdArgList args, const CommandContext& cmd_cntx) {
if (args.size() == 1) { if (args.size() == 1) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return CountHllsSingle(t->GetOpArgs(shard), key); return CountHllsSingle(t->GetOpArgs(shard), key);
}; };
OpResult<int64_t> res = tx->ScheduleSingleHopT(std::move(cb)); OpResult<int64_t> res = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder); HandleOpValueResult(res, cmd_cntx.rb);
} else { } else {
HandleOpValueResult(PFCountMulti(args, tx, builder), builder); HandleOpValueResult(PFCountMulti(args, cmd_cntx), cmd_cntx.rb);
} }
} }

View file

@ -909,19 +909,18 @@ void MoveGeneric(string_view src, string_view dest, ListDir src_dir, ListDir des
} }
} }
void RPopLPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void RPopLPush(CmdArgList args, const CommandContext& cmd_cntx) {
string_view src = ArgS(args, 0); string_view src = ArgS(args, 0);
string_view dest = ArgS(args, 1); string_view dest = ArgS(args, 1);
MoveGeneric(src, dest, ListDir::RIGHT, ListDir::LEFT, tx, builder); MoveGeneric(src, dest, ListDir::RIGHT, ListDir::LEFT, cmd_cntx.tx, cmd_cntx.rb);
} }
void BRPopLPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void BRPopLPush(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) {
facade::CmdArgParser parser{args}; facade::CmdArgParser parser{args};
auto [src, dest] = parser.Next<string_view, string_view>(); auto [src, dest] = parser.Next<string_view, string_view>();
float timeout = parser.Next<float>(); float timeout = parser.Next<float>();
auto* builder = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (auto err = parser.Error(); err) if (auto err = parser.Error(); err)
return builder->SendError(err->MakeReply()); return builder->SendError(err->MakeReply());
@ -929,17 +928,17 @@ void BRPopLPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
return builder->SendError("timeout is negative"); return builder->SendError("timeout is negative");
BPopPusher bpop_pusher(src, dest, ListDir::RIGHT, ListDir::LEFT); BPopPusher bpop_pusher(src, dest, ListDir::RIGHT, ListDir::LEFT);
OpResult<string> op_res = bpop_pusher.Run(unsigned(timeout * 1000), tx, cntx); OpResult<string> op_res =
bpop_pusher.Run(unsigned(timeout * 1000), cmd_cntx.tx, cmd_cntx.conn_cntx);
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (op_res) { if (op_res) {
return rb->SendBulkString(*op_res); return builder->SendBulkString(*op_res);
} }
switch (op_res.status()) { switch (op_res.status()) {
case OpStatus::CANCELLED: case OpStatus::CANCELLED:
case OpStatus::TIMED_OUT: case OpStatus::TIMED_OUT:
return rb->SendNull(); return builder->SendNull();
break; break;
default: default:
@ -948,13 +947,13 @@ void BRPopLPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
} }
} }
void BLMove(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { void BLMove(CmdArgList args, const CommandContext& cmd_cntx) {
facade::CmdArgParser parser{args}; facade::CmdArgParser parser{args};
auto [src, dest] = parser.Next<string_view, string_view>(); auto [src, dest] = parser.Next<string_view, string_view>();
ListDir src_dir = ParseDir(&parser); ListDir src_dir = ParseDir(&parser);
ListDir dest_dir = ParseDir(&parser); ListDir dest_dir = ParseDir(&parser);
float timeout = parser.Next<float>(); float timeout = parser.Next<float>();
auto* builder = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (auto err = parser.Error(); err) if (auto err = parser.Error(); err)
return builder->SendError(err->MakeReply()); return builder->SendError(err->MakeReply());
@ -962,17 +961,17 @@ void BLMove(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, Connect
return builder->SendError("timeout is negative"); return builder->SendError("timeout is negative");
BPopPusher bpop_pusher(src, dest, src_dir, dest_dir); BPopPusher bpop_pusher(src, dest, src_dir, dest_dir);
OpResult<string> op_res = bpop_pusher.Run(unsigned(timeout * 1000), tx, cntx); OpResult<string> op_res =
bpop_pusher.Run(unsigned(timeout * 1000), cmd_cntx.tx, cmd_cntx.conn_cntx);
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (op_res) { if (op_res) {
return rb->SendBulkString(*op_res); return builder->SendBulkString(*op_res);
} }
switch (op_res.status()) { switch (op_res.status()) {
case OpStatus::CANCELLED: case OpStatus::CANCELLED:
case OpStatus::TIMED_OUT: case OpStatus::TIMED_OUT:
return rb->SendNull(); return builder->SendNull();
break; break;
default: default:
@ -1176,44 +1175,44 @@ void BPopGeneric(ListDir dir, CmdArgList args, Transaction* tx, SinkReplyBuilder
} // namespace } // namespace
void ListFamily::LPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::LPush(CmdArgList args, const CommandContext& cmd_cntx) {
return PushGeneric(ListDir::LEFT, false, std::move(args), tx, builder); return PushGeneric(ListDir::LEFT, false, std::move(args), cmd_cntx.tx, cmd_cntx.rb);
} }
void ListFamily::LPushX(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::LPushX(CmdArgList args, const CommandContext& cmd_cntx) {
return PushGeneric(ListDir::LEFT, true, std::move(args), tx, builder); return PushGeneric(ListDir::LEFT, true, std::move(args), cmd_cntx.tx, cmd_cntx.rb);
} }
void ListFamily::LPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::LPop(CmdArgList args, const CommandContext& cmd_cntx) {
return PopGeneric(ListDir::LEFT, std::move(args), tx, builder); return PopGeneric(ListDir::LEFT, std::move(args), cmd_cntx.tx, cmd_cntx.rb);
} }
void ListFamily::RPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::RPush(CmdArgList args, const CommandContext& cmd_cntx) {
return PushGeneric(ListDir::RIGHT, false, std::move(args), tx, builder); return PushGeneric(ListDir::RIGHT, false, std::move(args), cmd_cntx.tx, cmd_cntx.rb);
} }
void ListFamily::RPushX(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::RPushX(CmdArgList args, const CommandContext& cmd_cntx) {
return PushGeneric(ListDir::RIGHT, true, std::move(args), tx, builder); return PushGeneric(ListDir::RIGHT, true, std::move(args), cmd_cntx.tx, cmd_cntx.rb);
} }
void ListFamily::RPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::RPop(CmdArgList args, const CommandContext& cmd_cntx) {
return PopGeneric(ListDir::RIGHT, std::move(args), tx, builder); return PopGeneric(ListDir::RIGHT, std::move(args), cmd_cntx.tx, cmd_cntx.rb);
} }
void ListFamily::LLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::LLen(CmdArgList args, const CommandContext& cmd_cntx) {
auto key = ArgS(args, 0); auto key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) { return OpLen(t->GetOpArgs(shard), key); }; auto cb = [&](Transaction* t, EngineShard* shard) { return OpLen(t->GetOpArgs(shard), key); };
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
builder->SendLong(result.value()); cmd_cntx.rb->SendLong(result.value());
} else if (result.status() == OpStatus::KEY_NOTFOUND) { } else if (result.status() == OpStatus::KEY_NOTFOUND) {
builder->SendLong(0); cmd_cntx.rb->SendLong(0);
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
void ListFamily::LPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::LPos(CmdArgList args, const CommandContext& cmd_cntx) {
facade::CmdArgParser parser{args}; facade::CmdArgParser parser{args};
auto [key, elem] = parser.Next<string_view, string_view>(); auto [key, elem] = parser.Next<string_view, string_view>();
@ -1242,26 +1241,26 @@ void ListFamily::LPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
parser.Skip(1); parser.Skip(1);
} }
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (rank == 0) if (rank == 0)
return builder->SendError(kInvalidIntErr); return rb->SendError(kInvalidIntErr);
if (auto err = parser.Error(); err) if (auto err = parser.Error(); err)
return builder->SendError(err->MakeReply()); return rb->SendError(err->MakeReply());
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpPos(t->GetOpArgs(shard), key, elem, rank, count, max_len); return OpPos(t->GetOpArgs(shard), key, elem, rank, count, max_len);
}; };
Transaction* trans = tx; Transaction* trans = cmd_cntx.tx;
OpResult<vector<uint32_t>> result = trans->ScheduleSingleHopT(std::move(cb)); OpResult<vector<uint32_t>> result = trans->ScheduleSingleHopT(std::move(cb));
if (result.status() == OpStatus::WRONG_TYPE) { if (result.status() == OpStatus::WRONG_TYPE) {
return builder->SendError(result.status()); return rb->SendError(result.status());
} else if (result.status() == OpStatus::INVALID_VALUE) { } else if (result.status() == OpStatus::INVALID_VALUE) {
return builder->SendError(result.status()); return rb->SendError(result.status());
} }
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (skip_count) { if (skip_count) {
if (result->empty()) { if (result->empty()) {
rb->SendNull(); rb->SendNull();
@ -1269,7 +1268,7 @@ void ListFamily::LPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
rb->SendLong((*result)[0]); rb->SendLong((*result)[0]);
} }
} else { } else {
SinkReplyBuilder::ReplyAggregator agg(builder); SinkReplyBuilder::ReplyAggregator agg(rb);
rb->StartArray(result->size()); rb->StartArray(result->size());
const auto& array = result.value(); const auto& array = result.value();
for (const auto& v : array) { for (const auto& v : array) {
@ -1278,12 +1277,14 @@ void ListFamily::LPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
} }
} }
void ListFamily::LIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::LIndex(CmdArgList args, const CommandContext& cmd_cntx) {
std::string_view key = ArgS(args, 0); std::string_view key = ArgS(args, 0);
std::string_view index_str = ArgS(args, 1); std::string_view index_str = ArgS(args, 1);
int32_t index; int32_t index;
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (!absl::SimpleAtoi(index_str, &index)) { if (!absl::SimpleAtoi(index_str, &index)) {
builder->SendError(kInvalidIntErr); rb->SendError(kInvalidIntErr);
return; return;
} }
@ -1291,26 +1292,25 @@ void ListFamily::LIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
return OpIndex(t->GetOpArgs(shard), key, index); return OpIndex(t->GetOpArgs(shard), key, index);
}; };
auto* rb = static_cast<RedisReplyBuilder*>(builder); OpResult<string> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
OpResult<string> result = tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
rb->SendBulkString(result.value()); rb->SendBulkString(result.value());
} else if (result.status() == OpStatus::WRONG_TYPE) { } else if (result.status() == OpStatus::WRONG_TYPE) {
builder->SendError(result.status()); rb->SendError(result.status());
} else { } else {
rb->SendNull(); rb->SendNull();
} }
} }
/* LINSERT <key> (BEFORE|AFTER) <pivot> <element> */ /* LINSERT <key> (BEFORE|AFTER) <pivot> <element> */
void ListFamily::LInsert(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::LInsert(CmdArgList args, const CommandContext& cmd_cntx) {
facade::CmdArgParser parser{args}; facade::CmdArgParser parser{args};
string_view key = parser.Next(); string_view key = parser.Next();
InsertParam where = parser.MapNext("AFTER", INSERT_AFTER, "BEFORE", INSERT_BEFORE); InsertParam where = parser.MapNext("AFTER", INSERT_AFTER, "BEFORE", INSERT_BEFORE);
auto [pivot, elem] = parser.Next<string_view, string_view>(); auto [pivot, elem] = parser.Next<string_view, string_view>();
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (auto err = parser.Error(); err) if (auto err = parser.Error(); err)
return builder->SendError(err->MakeReply()); return rb->SendError(err->MakeReply());
DCHECK(pivot.data() && elem.data()); DCHECK(pivot.data() && elem.data());
@ -1318,42 +1318,43 @@ void ListFamily::LInsert(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
return OpInsert(t->GetOpArgs(shard), key, pivot, elem, where); return OpInsert(t->GetOpArgs(shard), key, pivot, elem, where);
}; };
OpResult<int> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<int> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result || result == OpStatus::KEY_NOTFOUND) { if (result || result == OpStatus::KEY_NOTFOUND) {
return builder->SendLong(result.value_or(0)); return rb->SendLong(result.value_or(0));
} }
builder->SendError(result.status()); rb->SendError(result.status());
} }
void ListFamily::LTrim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::LTrim(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view s_str = ArgS(args, 1); string_view s_str = ArgS(args, 1);
string_view e_str = ArgS(args, 2); string_view e_str = ArgS(args, 2);
int32_t start, end; int32_t start, end;
if (!absl::SimpleAtoi(s_str, &start) || !absl::SimpleAtoi(e_str, &end)) { if (!absl::SimpleAtoi(s_str, &start) || !absl::SimpleAtoi(e_str, &end)) {
builder->SendError(kInvalidIntErr); cmd_cntx.rb->SendError(kInvalidIntErr);
return; return;
} }
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpTrim(t->GetOpArgs(shard), key, start, end); return OpTrim(t->GetOpArgs(shard), key, start, end);
}; };
OpStatus st = tx->ScheduleSingleHop(std::move(cb)); OpStatus st = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
if (st == OpStatus::KEY_NOTFOUND) if (st == OpStatus::KEY_NOTFOUND)
st = OpStatus::OK; st = OpStatus::OK;
builder->SendError(st); cmd_cntx.rb->SendError(st);
} }
void ListFamily::LRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::LRange(CmdArgList args, const CommandContext& cmd_cntx) {
std::string_view key = ArgS(args, 0); std::string_view key = ArgS(args, 0);
std::string_view s_str = ArgS(args, 1); std::string_view s_str = ArgS(args, 1);
std::string_view e_str = ArgS(args, 2); std::string_view e_str = ArgS(args, 2);
int32_t start, end; int32_t start, end;
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (!absl::SimpleAtoi(s_str, &start) || !absl::SimpleAtoi(e_str, &end)) { if (!absl::SimpleAtoi(s_str, &start) || !absl::SimpleAtoi(e_str, &end)) {
builder->SendError(kInvalidIntErr); rb->SendError(kInvalidIntErr);
return; return;
} }
@ -1361,79 +1362,76 @@ void ListFamily::LRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
return OpRange(t->GetOpArgs(shard), key, start, end); return OpRange(t->GetOpArgs(shard), key, start, end);
}; };
auto res = tx->ScheduleSingleHopT(std::move(cb)); auto res = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (!res && res.status() != OpStatus::KEY_NOTFOUND) { if (!res && res.status() != OpStatus::KEY_NOTFOUND) {
return builder->SendError(res.status()); return rb->SendError(res.status());
} }
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendBulkStrArr(*res); rb->SendBulkStrArr(*res);
} }
// lrem key 5 foo, will remove foo elements from the list if exists at most 5 times. // lrem key 5 foo, will remove foo elements from the list if exists at most 5 times.
void ListFamily::LRem(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::LRem(CmdArgList args, const CommandContext& cmd_cntx) {
std::string_view key = ArgS(args, 0); std::string_view key = ArgS(args, 0);
std::string_view index_str = ArgS(args, 1); std::string_view index_str = ArgS(args, 1);
std::string_view elem = ArgS(args, 2); std::string_view elem = ArgS(args, 2);
int32_t count; int32_t count;
if (!absl::SimpleAtoi(index_str, &count)) { if (!absl::SimpleAtoi(index_str, &count)) {
builder->SendError(kInvalidIntErr); cmd_cntx.rb->SendError(kInvalidIntErr);
return; return;
} }
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRem(t->GetOpArgs(shard), key, elem, count); return OpRem(t->GetOpArgs(shard), key, elem, count);
}; };
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result || result == OpStatus::KEY_NOTFOUND) { if (result || result == OpStatus::KEY_NOTFOUND) {
return builder->SendLong(result.value_or(0)); return cmd_cntx.rb->SendLong(result.value_or(0));
} }
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
void ListFamily::LSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::LSet(CmdArgList args, const CommandContext& cmd_cntx) {
std::string_view key = ArgS(args, 0); std::string_view key = ArgS(args, 0);
std::string_view index_str = ArgS(args, 1); std::string_view index_str = ArgS(args, 1);
std::string_view elem = ArgS(args, 2); std::string_view elem = ArgS(args, 2);
int32_t count; int32_t count;
if (!absl::SimpleAtoi(index_str, &count)) { if (!absl::SimpleAtoi(index_str, &count)) {
builder->SendError(kInvalidIntErr); cmd_cntx.rb->SendError(kInvalidIntErr);
return; return;
} }
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpSet(t->GetOpArgs(shard), key, elem, count); return OpSet(t->GetOpArgs(shard), key, elem, count);
}; };
OpResult<void> result = tx->ScheduleSingleHop(std::move(cb)); OpResult<void> result = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
if (result) { if (result) {
builder->SendOk(); cmd_cntx.rb->SendOk();
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
void ListFamily::BLPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void ListFamily::BLPop(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) { BPopGeneric(ListDir::LEFT, std::move(args), cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx);
BPopGeneric(ListDir::LEFT, std::move(args), tx, builder, cntx);
} }
void ListFamily::BRPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void ListFamily::BRPop(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) { BPopGeneric(ListDir::RIGHT, std::move(args), cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx);
BPopGeneric(ListDir::RIGHT, std::move(args), tx, builder, cntx);
} }
void ListFamily::LMove(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void ListFamily::LMove(CmdArgList args, const CommandContext& cmd_cntx) {
facade::CmdArgParser parser{args}; facade::CmdArgParser parser{args};
auto [src, dest] = parser.Next<string_view, string_view>(); auto [src, dest] = parser.Next<string_view, string_view>();
ListDir src_dir = ParseDir(&parser); ListDir src_dir = ParseDir(&parser);
ListDir dest_dir = ParseDir(&parser); ListDir dest_dir = ParseDir(&parser);
if (auto err = parser.Error(); err) if (auto err = parser.Error(); err)
return builder->SendError(err->MakeReply()); return cmd_cntx.rb->SendError(err->MakeReply());
MoveGeneric(src, dest, src_dir, dest_dir, tx, builder); MoveGeneric(src, dest, src_dir, dest_dir, cmd_cntx.tx, cmd_cntx.rb);
} }
using CI = CommandId; using CI = CommandId;

View file

@ -15,9 +15,8 @@ namespace dfly {
using facade::OpResult; using facade::OpResult;
class ConnectionContext;
class CommandRegistry; class CommandRegistry;
class Transaction; struct CommandContext;
class ListFamily { class ListFamily {
public: public:
@ -26,25 +25,23 @@ class ListFamily {
private: private:
using SinkReplyBuilder = facade::SinkReplyBuilder; using SinkReplyBuilder = facade::SinkReplyBuilder;
static void LPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void LPush(CmdArgList args, const CommandContext& cmd_cntx);
static void LPushX(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void LPushX(CmdArgList args, const CommandContext& cmd_cntx);
static void RPush(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void RPush(CmdArgList args, const CommandContext& cmd_cntx);
static void RPushX(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void RPushX(CmdArgList args, const CommandContext& cmd_cntx);
static void LPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void LPop(CmdArgList args, const CommandContext& cmd_cntx);
static void RPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void RPop(CmdArgList args, const CommandContext& cmd_cntx);
static void BLPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, static void BLPop(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx); static void BRPop(CmdArgList args, const CommandContext& cmd_cntx);
static void BRPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, static void LLen(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx); static void LPos(CmdArgList args, const CommandContext& cmd_cntx);
static void LLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void LIndex(CmdArgList args, const CommandContext& cmd_cntx);
static void LPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void LInsert(CmdArgList args, const CommandContext& cmd_cntx);
static void LIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void LTrim(CmdArgList args, const CommandContext& cmd_cntx);
static void LInsert(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void LRange(CmdArgList args, const CommandContext& cmd_cntx);
static void LTrim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void LRem(CmdArgList args, const CommandContext& cmd_cntx);
static void LRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void LSet(CmdArgList args, const CommandContext& cmd_cntx);
static void LRem(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void LMove(CmdArgList args, const CommandContext& cmd_cntx);
static void LSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void LMove(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
}; };
} // namespace dfly } // namespace dfly