From e24f697bb3117381ed3a86e5238e7e655e6f3d14 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 23 Oct 2024 11:54:24 +0300 Subject: [PATCH] chore: pass SinkReplyBuilder and Transaction explicitly. Part2 (#3954) This pr follows #3946 Signed-off-by: Roman Gershman --- src/server/common.h | 8 + src/server/db_slice.cc | 1 - src/server/generic_family.cc | 460 +++++++++++++++++------------------ src/server/generic_family.h | 85 +++---- 4 files changed, 275 insertions(+), 279 deletions(-) diff --git a/src/server/common.h b/src/server/common.h index bad819ca7..efe66d6ee 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -109,6 +109,14 @@ std::ostream& operator<<(std::ostream& os, const GlobalState& state); enum class TimeUnit : uint8_t { SEC, MSEC }; +enum ExpireFlags { + EXPIRE_ALWAYS = 0, + EXPIRE_NX = 1 << 0, // Set expiry only when key has no expiry + EXPIRE_XX = 1 << 2, // Set expiry only when the key has expiry + EXPIRE_GT = 1 << 3, // GT: Set expiry only when the new expiry is greater than current one + EXPIRE_LT = 1 << 4, // LT: Set expiry only when the new expiry is less than current one +}; + bool ParseHumanReadableBytes(std::string_view str, int64_t* num_bytes); bool ParseDouble(std::string_view src, double* value); diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 87198b6c3..805fb81fa 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -13,7 +13,6 @@ #include "server/cluster/cluster_defs.h" #include "server/engine_shard_set.h" #include "server/error.h" -#include "server/generic_family.h" #include "server/journal/journal.h" #include "server/server_state.h" #include "server/tiered_storage.h" diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 722f07e84..4deb09016 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -616,7 +616,7 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys do { auto cb = [&] { - OpArgs op_args{EngineShard::tlocal(), 0, db_cntx}; + OpArgs op_args{EngineShard::tlocal(), nullptr, db_cntx}; OpScan(op_args, scan_opts, &cursor, keys); }; @@ -863,6 +863,99 @@ OpResult OpTtl(Transaction* t, EngineShard* shard, string_view key) { } } +ErrorReply RenameGeneric(CmdArgList args, bool destination_should_not_exist, Transaction* tx) { + string_view key[2] = {ArgS(args, 0), ArgS(args, 1)}; + + if (tx->GetUniqueShardCnt() == 1) { + tx->ReviveAutoJournal(); // Safe to use RENAME with single shard + auto cb = [&](Transaction* t, EngineShard* shard) { + return OpRen(t->GetOpArgs(shard), key[0], key[1], destination_should_not_exist); + }; + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + + return result.status(); + } + + Renamer renamer{tx, key[0], key[1], shard_set->size()}; + return renamer.Rename(destination_should_not_exist); +} + +void ExpireTimeGeneric(CmdArgList args, TimeUnit unit, Transaction* tx, SinkReplyBuilder* builder) { + string_view key = ArgS(args, 0); + + auto cb = [&](Transaction* t, EngineShard* shard) { return OpExpireTime(t, shard, key); }; + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + + if (result) { + long ttl = (unit == TimeUnit::SEC) ? (result.value() + 500) / 1000 : result.value(); + builder->SendLong(ttl); + return; + } + + switch (result.status()) { + case OpStatus::KEY_NOTFOUND: + builder->SendLong(-2); + break; + default: + LOG_IF(ERROR, result.status() != OpStatus::SKIPPED) + << "Unexpected status " << result.status(); + builder->SendLong(-1); + break; + } +} + +void TtlGeneric(CmdArgList args, TimeUnit unit, Transaction* tx, SinkReplyBuilder* builder) { + string_view key = ArgS(args, 0); + + auto cb = [&](Transaction* t, EngineShard* shard) { return OpTtl(t, shard, key); }; + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + + if (result) { + long ttl = (unit == TimeUnit::SEC) ? (result.value() + 500) / 1000 : result.value(); + builder->SendLong(ttl); + return; + } + + switch (result.status()) { + case OpStatus::KEY_NOTFOUND: + builder->SendLong(-2); + break; + default: + LOG_IF(ERROR, result.status() != OpStatus::SKIPPED) + << "Unexpected status " << result.status(); + builder->SendLong(-1); + break; + } +} + +std::optional ParseExpireOptionsOrReply(const CmdArgList args, SinkReplyBuilder* builder) { + int32_t flags = ExpireFlags::EXPIRE_ALWAYS; + for (auto& arg : args) { + string arg_sv = absl::AsciiStrToUpper(ToSV(arg)); + if (arg_sv == "NX") { + flags |= ExpireFlags::EXPIRE_NX; + } else if (arg_sv == "XX") { + flags |= ExpireFlags::EXPIRE_XX; + } else if (arg_sv == "GT") { + flags |= ExpireFlags::EXPIRE_GT; + } else if (arg_sv == "LT") { + flags |= ExpireFlags::EXPIRE_LT; + } else { + builder->SendError(absl::StrCat("Unsupported option: ", arg_sv)); + return nullopt; + } + } + if ((flags & ExpireFlags::EXPIRE_NX) && (flags & ~ExpireFlags::EXPIRE_NX)) { + builder->SendError("NX and XX, GT or LT options at the same time are not compatible"); + return nullopt; + } + if ((flags & ExpireFlags::EXPIRE_GT) && (flags & ExpireFlags::EXPIRE_LT)) { + builder->SendError("GT and LT options at the same time are not compatible"); + return nullopt; + } + return flags; +} + } // namespace OpResult GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) { @@ -882,12 +975,11 @@ OpResult GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& return res; } -void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) { - Transaction* transaction = cntx->transaction; +void GenericFamily::Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { VLOG(1) << "Del " << ArgS(args, 0); atomic_uint32_t result{0}; - bool is_mc = cntx->protocol() == Protocol::MEMCACHE; + bool is_mc = (builder->type() == SinkReplyBuilder::MC); auto cb = [&result](const Transaction* t, EngineShard* shard) { ShardArgs args = t->GetShardArgs(shard->shard_id()); @@ -897,15 +989,15 @@ void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) { return OpStatus::OK; }; - OpStatus status = transaction->ScheduleSingleHop(std::move(cb)); + OpStatus status = tx->ScheduleSingleHop(std::move(cb)); CHECK_EQ(OpStatus::OK, status); - DVLOG(2) << "Del ts " << transaction->txid(); + DVLOG(2) << "Del ts " << tx->txid(); uint32_t del_cnt = result.load(memory_order_relaxed); if (is_mc) { using facade::MCReplyBuilder; - MCReplyBuilder* mc_builder = static_cast(cntx->reply_builder()); + MCReplyBuilder* mc_builder = static_cast(builder); if (del_cnt == 0) { mc_builder->SendNotFound(); @@ -913,17 +1005,19 @@ void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) { mc_builder->SendSimpleString("DELETED"); } } else { - cntx->SendLong(del_cnt); + builder->SendLong(del_cnt); } } -void GenericFamily::Ping(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::Ping(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { if (args.size() > 1) { - return cntx->SendError(facade::WrongNumArgsError("ping"), kSyntaxErrType); + return builder->SendError(facade::WrongNumArgsError("ping"), kSyntaxErrType); } string_view msg; - auto* rb = static_cast(cntx->reply_builder()); + + auto* rb = static_cast(builder); // If a client in the subscribe state and in resp2 mode, it returns an array for some reason. if (cntx->conn_state.subscribe_info && !rb->IsResp3()) { @@ -936,18 +1030,16 @@ void GenericFamily::Ping(CmdArgList args, ConnectionContext* cntx) { } if (args.size() == 0) { - return cntx->SendSimpleString("PONG"); + return builder->SendSimpleString("PONG"); } else { msg = ArgS(args, 0); DVLOG(2) << "Ping " << msg; - auto* rb = static_cast(cntx->reply_builder()); return rb->SendBulkString(msg); } } -void GenericFamily::Exists(CmdArgList args, ConnectionContext* cntx) { - Transaction* transaction = cntx->transaction; +void GenericFamily::Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { VLOG(1) << "Exists " << ArgS(args, 0); atomic_uint32_t result{0}; @@ -960,59 +1052,28 @@ void GenericFamily::Exists(CmdArgList args, ConnectionContext* cntx) { return OpStatus::OK; }; - OpStatus status = transaction->ScheduleSingleHop(std::move(cb)); + OpStatus status = tx->ScheduleSingleHop(std::move(cb)); CHECK_EQ(OpStatus::OK, status); - return cntx->SendLong(result.load(memory_order_acquire)); + return builder->SendLong(result.load(memory_order_acquire)); } -void GenericFamily::Persist(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::Persist(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) { return OpPersist(t->GetOpArgs(shard), key); }; - OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); - if (status == OpStatus::OK) - cntx->SendLong(1); - else - cntx->SendLong(0); + OpStatus status = tx->ScheduleSingleHop(std::move(cb)); + builder->SendLong(status == OpStatus::OK); } -std::optional ParseExpireOptionsOrReply(const CmdArgList args, ConnectionContext* cntx) { - int32_t flags = ExpireFlags::EXPIRE_ALWAYS; - for (auto& arg : args) { - string arg_sv = absl::AsciiStrToUpper(ToSV(arg)); - if (arg_sv == "NX") { - flags |= ExpireFlags::EXPIRE_NX; - } else if (arg_sv == "XX") { - flags |= ExpireFlags::EXPIRE_XX; - } else if (arg_sv == "GT") { - flags |= ExpireFlags::EXPIRE_GT; - } else if (arg_sv == "LT") { - flags |= ExpireFlags::EXPIRE_LT; - } else { - cntx->SendError(absl::StrCat("Unsupported option: ", arg_sv)); - return nullopt; - } - } - if ((flags & ExpireFlags::EXPIRE_NX) && (flags & ~ExpireFlags::EXPIRE_NX)) { - cntx->SendError("NX and XX, GT or LT options at the same time are not compatible"); - return nullopt; - } - if ((flags & ExpireFlags::EXPIRE_GT) && (flags & ExpireFlags::EXPIRE_LT)) { - cntx->SendError("GT and LT options at the same time are not compatible"); - return nullopt; - } - return flags; -} - -void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::Expire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view sec = ArgS(args, 1); int64_t int_arg; if (!absl::SimpleAtoi(sec, &int_arg)) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } int_arg = std::max(int_arg, -1); @@ -1022,7 +1083,7 @@ void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) { int_arg = kMaxExpireDeadlineSec; } - auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cntx); + auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder); if (!expire_options) { return; } @@ -1032,21 +1093,21 @@ void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) { return OpExpire(t->GetOpArgs(shard), key, params); }; - OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); - cntx->SendLong(status == OpStatus::OK); + OpStatus status = tx->ScheduleSingleHop(std::move(cb)); + builder->SendLong(status == OpStatus::OK); } -void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::ExpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view sec = ArgS(args, 1); int64_t int_arg; if (!absl::SimpleAtoi(sec, &int_arg)) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } int_arg = std::max(int_arg, 0L); - auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cntx); + auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder); if (!expire_options) { return; } @@ -1056,16 +1117,17 @@ void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { return OpExpire(t->GetOpArgs(shard), key, params); }; - OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); + OpStatus status = tx->ScheduleSingleHop(std::move(cb)); if (status == OpStatus::OUT_OF_RANGE) { - return cntx->SendError(kExpiryOutOfRange); - } else { - cntx->SendLong(status == OpStatus::OK); + return builder->SendError(kExpiryOutOfRange); } + + builder->SendLong(status == OpStatus::OK); } -void GenericFamily::Keys(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::Keys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { string_view pattern(ArgS(args, 0)); uint64_t cursor = 0; @@ -1083,23 +1145,24 @@ void GenericFamily::Keys(CmdArgList args, ConnectionContext* cntx) { cursor = ScanGeneric(cursor, scan_opts, &keys, cntx); } while (cursor != 0 && keys.size() < output_limit); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->StartArray(keys.size()); for (const auto& k : keys) { rb->SendBulkString(k); } } -void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::PexpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view msec = ArgS(args, 1); int64_t int_arg; if (!absl::SimpleAtoi(msec, &int_arg)) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } + int_arg = std::max(int_arg, 0L); - auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cntx); + auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder); if (!expire_options) { return; } @@ -1111,22 +1174,22 @@ void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { return OpExpire(t->GetOpArgs(shard), key, params); }; - OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); + OpStatus status = tx->ScheduleSingleHop(std::move(cb)); if (status == OpStatus::OUT_OF_RANGE) { - return cntx->SendError(kExpiryOutOfRange); + return builder->SendError(kExpiryOutOfRange); } else { - cntx->SendLong(status == OpStatus::OK); + builder->SendLong(status == OpStatus::OK); } } -void GenericFamily::Pexpire(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::Pexpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view msec = ArgS(args, 1); int64_t int_arg; if (!absl::SimpleAtoi(msec, &int_arg)) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } int_arg = std::max(int_arg, -1); @@ -1135,7 +1198,7 @@ void GenericFamily::Pexpire(CmdArgList args, ConnectionContext* cntx) { int_arg = kMaxExpireDeadlineMs; } - auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), cntx); + auto expire_options = ParseExpireOptionsOrReply(args.subspan(2), builder); if (!expire_options) { return; } @@ -1145,17 +1208,16 @@ void GenericFamily::Pexpire(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { return OpExpire(t->GetOpArgs(shard), key, params); }; - OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); + OpStatus status = tx->ScheduleSingleHop(std::move(cb)); if (status == OpStatus::OUT_OF_RANGE) { - return cntx->SendError(kExpiryOutOfRange); - } else { - cntx->SendLong(status == OpStatus::OK); + return builder->SendError(kExpiryOutOfRange); } + builder->SendLong(status == OpStatus::OK); } -void GenericFamily::Stick(CmdArgList args, ConnectionContext* cntx) { - Transaction* transaction = cntx->transaction; +void GenericFamily::Stick(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { + Transaction* transaction = tx; VLOG(1) << "Stick " << ArgS(args, 0); atomic_uint32_t result{0}; @@ -1174,7 +1236,7 @@ void GenericFamily::Stick(CmdArgList args, ConnectionContext* cntx) { DVLOG(2) << "Stick ts " << transaction->txid(); uint32_t match_cnt = result.load(memory_order_relaxed); - cntx->SendLong(match_cnt); + builder->SendLong(match_cnt); } // Used to conditionally store double score @@ -1295,7 +1357,7 @@ OpResultTyped OpFetchSortEntries(const OpArgs& op_args, std::stri return success ? res : OpStatus::INVALID_NUMERIC_RESULT; } -void GenericFamily::Sort(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::Sort(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { std::string_view key = ArgS(args, 0); bool alpha = false; bool reversed = false; @@ -1312,37 +1374,37 @@ void GenericFamily::Sort(CmdArgList args, ConnectionContext* cntx) { } else if (arg == "LIMIT") { int offset, limit; if (i + 2 >= args.size()) { - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } if (!absl::SimpleAtoi(ArgS(args, i + 1), &offset) || !absl::SimpleAtoi(ArgS(args, i + 2), &limit)) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } bounds = {offset, limit}; i += 2; } else { LOG_EVERY_T(ERROR, 1) << "Unsupported option " << arg; - return cntx->SendError(kSyntaxErr, kSyntaxErrType); + return builder->SendError(kSyntaxErr, kSyntaxErrType); } } OpResultTyped fetch_result = - cntx->transaction->ScheduleSingleHopT([&](Transaction* t, EngineShard* shard) { + tx->ScheduleSingleHopT([&](Transaction* t, EngineShard* shard) { return OpFetchSortEntries(t->GetOpArgs(shard), key, alpha); }); if (fetch_result == OpStatus::WRONG_TYPE) - return cntx->SendError(fetch_result.status()); + return builder->SendError(fetch_result.status()); if (fetch_result.status() == OpStatus::INVALID_NUMERIC_RESULT) - return cntx->SendError("One or more scores can't be converted into double"); + return builder->SendError("One or more scores can't be converted into double"); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); if (!fetch_result.ok()) return rb->SendEmptyArray(); auto result_type = fetch_result.type(); - auto sort_call = [cntx, bounds, reversed, result_type](auto& entries) { + auto sort_call = [builder, bounds, reversed, result_type](auto& entries) { using value_t = typename std::decay_t::value_type; auto cmp = reversed ? &value_t::greater : &value_t::less; if (bounds) { @@ -1360,7 +1422,7 @@ void GenericFamily::Sort(CmdArgList args, ConnectionContext* cntx) { } bool is_set = (result_type == OBJ_SET || result_type == OBJ_ZSET); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->StartCollection(std::distance(start_it, end_it), is_set ? RedisReplyBuilder::SET : RedisReplyBuilder::ARRAY); @@ -1372,21 +1434,21 @@ void GenericFamily::Sort(CmdArgList args, ConnectionContext* cntx) { std::visit(std::move(sort_call), fetch_result.value()); } -void GenericFamily::Restore(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::Restore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { std::string_view key = ArgS(args, 0); std::string_view serialized_value = ArgS(args, 2); auto rdb_version = GetRdbVersion(serialized_value); if (!rdb_version) { - return cntx->SendError(kInvalidDumpValueErr); + return builder->SendError(kInvalidDumpValueErr); } OpResult restore_args = RestoreArgs::TryFrom(args); if (!restore_args) { if (restore_args.status() == OpStatus::OUT_OF_RANGE) { - return cntx->SendError("Invalid IDLETIME value, must be >= 0"); + return builder->SendError("Invalid IDLETIME value, must be >= 0"); } else { - return cntx->SendError(restore_args.status()); + return builder->SendError(restore_args.status()); } } @@ -1395,28 +1457,28 @@ void GenericFamily::Restore(CmdArgList args, ConnectionContext* cntx) { rdb_version.value()); }; - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); if (result) { if (result.value()) { - return cntx->SendOk(); + return builder->SendOk(); } else { - return cntx->SendError("Bad data format"); + return builder->SendError("Bad data format"); } } else if (result.status() == OpStatus::KEY_EXISTS) { - return cntx->SendError("BUSYKEY: key name already exists."); + return builder->SendError("BUSYKEY: key name already exists."); } else { - return cntx->SendError(result.status()); + return builder->SendError(result.status()); } } -void GenericFamily::FieldExpire(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::FieldExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view ttl_str = parser.Next(); uint32_t ttl_sec; if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } CmdArgList fields = parser.Tail(); @@ -1424,8 +1486,8 @@ void GenericFamily::FieldExpire(CmdArgList args, ConnectionContext* cntx) { return OpFieldExpire(t->GetOpArgs(shard), key, ttl_sec, fields); }; - OpResult> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + OpResult> result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); if (result) { rb->StartArray(result->size()); const auto& array = result.value(); @@ -1433,43 +1495,43 @@ void GenericFamily::FieldExpire(CmdArgList args, ConnectionContext* cntx) { rb->SendLong(v); } } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } // Returns -2 if key not found, WRONG_TYPE if key is not a set or hash // -1 if the field does not have associated TTL on it, and -3 if field is not found. -void GenericFamily::FieldTtl(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::FieldTtl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view field = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { return OpFieldTtl(t, shard, key, field); }; - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); if (result) { - cntx->SendLong(*result); + builder->SendLong(*result); return; } - cntx->SendError(result.status()); + builder->SendError(result.status()); } -void GenericFamily::Move(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view target_db_sv = ArgS(args, 1); - int64_t target_db; + int32_t target_db; if (!absl::SimpleAtoi(target_db_sv, &target_db)) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } - if (target_db < 0 || target_db >= absl::GetFlag(FLAGS_dbnum)) { - return cntx->SendError(kDbIndOutOfRangeErr); + if (target_db < 0 || uint32_t(target_db) >= absl::GetFlag(FLAGS_dbnum)) { + return builder->SendError(kDbIndOutOfRangeErr); } - if (target_db == cntx->db_index()) { - return cntx->SendError("source and destination objects are the same"); + if (target_db == tx->GetDbIndex()) { + return builder->SendError("source and destination objects are the same"); } OpStatus res = OpStatus::SKIPPED; @@ -1488,115 +1550,67 @@ void GenericFamily::Move(CmdArgList args, ConnectionContext* cntx) { return OpStatus::OK; }; - cntx->transaction->ScheduleSingleHop(std::move(cb)); + tx->ScheduleSingleHop(std::move(cb)); // Exactly one shard will call OpMove. DCHECK(res != OpStatus::SKIPPED); - cntx->SendLong(res == OpStatus::OK); + builder->SendLong(res == OpStatus::OK); } -void GenericFamily::Rename(CmdArgList args, ConnectionContext* cntx) { - auto reply = RenameGeneric(args, false, cntx); - cntx->SendError(reply); +void GenericFamily::Rename(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { + auto reply = RenameGeneric(args, false, tx); + builder->SendError(reply); } -void GenericFamily::RenameNx(CmdArgList args, ConnectionContext* cntx) { - auto reply = RenameGeneric(args, true, cntx); +void GenericFamily::RenameNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { + auto reply = RenameGeneric(args, true, tx); if (!reply.status) { - cntx->SendError(reply); + builder->SendError(reply); return; } OpStatus st = reply.status.value(); if (st == OpStatus::OK) { - cntx->SendLong(1); + builder->SendLong(1); } else if (st == OpStatus::KEY_EXISTS) { - cntx->SendLong(0); + builder->SendLong(0); } else { - cntx->SendError(reply); + builder->SendError(reply); } } -void GenericFamily::ExpireTime(CmdArgList args, ConnectionContext* cntx) { - ExpireTimeGeneric(args, cntx, TimeUnit::SEC); +void GenericFamily::ExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { + ExpireTimeGeneric(args, TimeUnit::SEC, tx, builder); } -void GenericFamily::PExpireTime(CmdArgList args, ConnectionContext* cntx) { - ExpireTimeGeneric(args, cntx, TimeUnit::MSEC); +void GenericFamily::PExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { + ExpireTimeGeneric(args, TimeUnit::MSEC, tx, builder); } -void GenericFamily::ExpireTimeGeneric(CmdArgList args, ConnectionContext* cntx, TimeUnit unit) { - string_view key = ArgS(args, 0); - - auto cb = [&](Transaction* t, EngineShard* shard) { return OpExpireTime(t, shard, key); }; - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); - - if (result) { - long ttl = (unit == TimeUnit::SEC) ? (result.value() + 500) / 1000 : result.value(); - cntx->SendLong(ttl); - return; - } - - switch (result.status()) { - case OpStatus::KEY_NOTFOUND: - cntx->SendLong(-2); - break; - default: - LOG_IF(ERROR, result.status() != OpStatus::SKIPPED) - << "Unexpected status " << result.status(); - cntx->SendLong(-1); - break; - } +void GenericFamily::Ttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { + TtlGeneric(args, TimeUnit::SEC, tx, builder); } -void GenericFamily::Ttl(CmdArgList args, ConnectionContext* cntx) { - TtlGeneric(args, cntx, TimeUnit::SEC); +void GenericFamily::Pttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { + TtlGeneric(args, TimeUnit::MSEC, tx, builder); } -void GenericFamily::Pttl(CmdArgList args, ConnectionContext* cntx) { - TtlGeneric(args, cntx, TimeUnit::MSEC); -} - -void GenericFamily::TtlGeneric(CmdArgList args, ConnectionContext* cntx, TimeUnit unit) { - string_view key = ArgS(args, 0); - - auto cb = [&](Transaction* t, EngineShard* shard) { return OpTtl(t, shard, key); }; - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); - - if (result) { - long ttl = (unit == TimeUnit::SEC) ? (result.value() + 500) / 1000 : result.value(); - cntx->SendLong(ttl); - return; - } - - switch (result.status()) { - case OpStatus::KEY_NOTFOUND: - cntx->SendLong(-2); - break; - default: - LOG_IF(ERROR, result.status() != OpStatus::SKIPPED) - << "Unexpected status " << result.status(); - cntx->SendLong(-1); - break; - } -} - -void GenericFamily::Select(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::Select(CmdArgList args, Transaction*, SinkReplyBuilder* builder, + ConnectionContext* cntx) { string_view key = ArgS(args, 0); int64_t index; if (!absl::SimpleAtoi(key, &index)) { - return cntx->SendError(kInvalidDbIndErr); + return builder->SendError(kInvalidDbIndErr); } if (cluster::IsClusterEnabled() && index != 0) { - return cntx->SendError("SELECT is not allowed in cluster mode"); + return builder->SendError("SELECT is not allowed in cluster mode"); } if (index < 0 || index >= absl::GetFlag(FLAGS_dbnum)) { - return cntx->SendError(kDbIndOutOfRangeErr); + return builder->SendError(kDbIndOutOfRangeErr); } cntx->conn_state.db_index = index; - auto cb = [cntx, index](EngineShard* shard) { - CHECK(cntx->ns != nullptr); - auto& db_slice = cntx->ns->GetDbSlice(shard->shard_id()); + auto cb = [ns = cntx->ns, index](EngineShard* shard) { + auto& db_slice = ns->GetDbSlice(shard->shard_id()); db_slice.ActivateDb(index); return OpStatus::OK; }; @@ -1605,28 +1619,26 @@ void GenericFamily::Select(CmdArgList args, ConnectionContext* cntx) { return cntx->SendOk(); } -void GenericFamily::Dump(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::Dump(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { std::string_view key = ArgS(args, 0); DVLOG(1) << "Dumping before ::ScheduleSingleHopT " << key; auto cb = [&](Transaction* t, EngineShard* shard) { return OpDump(t->GetOpArgs(shard), key); }; - Transaction* trans = cntx->transaction; - OpResult result = trans->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); if (result) { - DVLOG(1) << "Dump " << trans->DebugId() << ": " << key << ", dump size " - << result.value().size(); + DVLOG(1) << "Dump " << tx->DebugId() << ": " << key << ", dump size " << result.value().size(); rb->SendBulkString(*result); } else { rb->SendNull(); } } -void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::Type(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { std::string_view key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - auto& db_slice = cntx->ns->GetDbSlice(shard->shard_id()); + auto& db_slice = t->GetDbSlice(shard->shard_id()); auto it = db_slice.FindReadOnly(t->GetDbContext(), key).it; if (!it.is_done()) { return it->second.ObjType(); @@ -1634,67 +1646,48 @@ void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) { return OpStatus::KEY_NOTFOUND; } }; - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); if (!result) { - cntx->SendSimpleString("none"); + builder->SendSimpleString("none"); } else { - cntx->SendSimpleString(ObjTypeToString(result.value())); + builder->SendSimpleString(ObjTypeToString(result.value())); } } -void GenericFamily::Time(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::Time(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { uint64_t now_usec; - if (cntx->transaction) { - now_usec = cntx->transaction->GetDbContext().time_now_ms * 1000; + if (tx) { + now_usec = tx->GetDbContext().time_now_ms * 1000; } else { now_usec = absl::GetCurrentTimeNanos() / 1000; } - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->StartArray(2); rb->SendLong(now_usec / 1000000); rb->SendLong(now_usec % 1000000); } -ErrorReply GenericFamily::RenameGeneric(CmdArgList args, bool destination_should_not_exist, - ConnectionContext* cntx) { - string_view key[2] = {ArgS(args, 0), ArgS(args, 1)}; - - Transaction* transaction = cntx->transaction; - - if (transaction->GetUniqueShardCnt() == 1) { - transaction->ReviveAutoJournal(); // Safe to use RENAME with single shard - auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRen(t->GetOpArgs(shard), key[0], key[1], destination_should_not_exist); - }; - OpResult result = transaction->ScheduleSingleHopT(std::move(cb)); - - return result.status(); - } - - Renamer renamer{transaction, key[0], key[1], shard_set->size()}; - return renamer.Rename(destination_should_not_exist); -} - -void GenericFamily::Echo(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::Echo(CmdArgList args, Transaction*, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); return rb->SendBulkString(key); } // SCAN cursor [MATCH ] [TYPE ] [COUNT ] [BUCKET ] -void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::Scan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { string_view token = ArgS(args, 0); uint64_t cursor = 0; if (!absl::SimpleAtoi(token, &cursor)) { - return cntx->SendError("invalid cursor"); + return builder->SendError("invalid cursor"); } OpResult ops = ScanOpts::TryFrom(args.subspan(1)); if (!ops) { DVLOG(1) << "Scan invalid args - return " << ops << " to the user"; - return cntx->SendError(ops.status()); + return builder->SendError(ops.status()); } ScanOpts scan_op = ops.value(); @@ -1702,7 +1695,7 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) { StringVec keys; cursor = ScanGeneric(cursor, scan_op, &keys, cntx); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->StartArray(2); rb->SendBulkString(absl::StrCat(cursor)); rb->StartArray(keys.size()); @@ -1723,7 +1716,8 @@ OpResult GenericFamily::OpExists(const OpArgs& op_args, const ShardArg return res; } -void GenericFamily::RandomKey(CmdArgList args, ConnectionContext* cntx) { +void GenericFamily::RandomKey(CmdArgList args, Transaction*, SinkReplyBuilder* builder, + ConnectionContext* cntx) { const static size_t kMaxAttempts = 3; absl::BitGen bitgen; @@ -1761,7 +1755,7 @@ void GenericFamily::RandomKey(CmdArgList args, ConnectionContext* cntx) { auto candidates_count = candidates_counter.load(memory_order_relaxed); std::optional random_key = std::nullopt; auto random_idx = absl::Uniform(bitgen, 0, candidates_count); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); for (const auto& candidate : candidates_collection) { if (random_idx >= candidate.size()) { random_idx -= candidate.size(); diff --git a/src/server/generic_family.h b/src/server/generic_family.h index 3a6f8e7e7..c74e485bd 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -6,28 +6,22 @@ #include "base/flags.h" #include "facade/facade_types.h" -#include "server/common.h" #include "server/tx_base.h" ABSL_DECLARE_FLAG(uint32_t, dbnum); +namespace facade { +class SinkReplyBuilder; +}; + namespace dfly { -using facade::ErrorReply; +using facade::CmdArgList; using facade::OpResult; -using facade::OpStatus; class ConnectionContext; class CommandRegistry; -class EngineShard; - -enum ExpireFlags { - EXPIRE_ALWAYS = 0, - EXPIRE_NX = 1 << 0, // Set expiry only when key has no expiry - EXPIRE_XX = 1 << 2, // Set expiry only when the key has expiry - EXPIRE_GT = 1 << 3, // GT: Set expiry only when the new expiry is greater than current one - EXPIRE_LT = 1 << 4, // LT: Set expiry only when the new expiry is less than current one -}; +class Transaction; class GenericFamily { public: @@ -38,42 +32,43 @@ class GenericFamily { static OpResult OpDel(const OpArgs& op_args, const ShardArgs& keys); private: - static void Del(CmdArgList args, ConnectionContext* cntx); - static void Ping(CmdArgList args, ConnectionContext* cntx); - static void Exists(CmdArgList args, ConnectionContext* cntx); - static void Expire(CmdArgList args, ConnectionContext* cntx); - static void ExpireAt(CmdArgList args, ConnectionContext* cntx); - static void Persist(CmdArgList args, ConnectionContext* cntx); - static void Keys(CmdArgList args, ConnectionContext* cntx); - static void PexpireAt(CmdArgList args, ConnectionContext* cntx); - static void Pexpire(CmdArgList args, ConnectionContext* cntx); - static void Stick(CmdArgList args, ConnectionContext* cntx); - static void Sort(CmdArgList args, ConnectionContext* cntx); - static void Move(CmdArgList args, ConnectionContext* cntx); + using SinkReplyBuilder = facade::SinkReplyBuilder; - static void Rename(CmdArgList args, ConnectionContext* cntx); - static void RenameNx(CmdArgList args, ConnectionContext* cntx); - static void ExpireTime(CmdArgList args, ConnectionContext* cntx); - static void PExpireTime(CmdArgList args, ConnectionContext* cntx); - static void Ttl(CmdArgList args, ConnectionContext* cntx); - static void Pttl(CmdArgList args, ConnectionContext* cntx); + static void Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Ping(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx); + static void Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Expire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void ExpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Persist(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Keys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx); + static void PexpireAt(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Pexpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Stick(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Sort(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Move(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void Echo(CmdArgList args, ConnectionContext* cntx); - static void Select(CmdArgList args, ConnectionContext* cntx); - static void Scan(CmdArgList args, ConnectionContext* cntx); - static void Time(CmdArgList args, ConnectionContext* cntx); - static void Type(CmdArgList args, ConnectionContext* cntx); - static void Dump(CmdArgList args, ConnectionContext* cntx); - static void Restore(CmdArgList args, ConnectionContext* cntx); - static void RandomKey(CmdArgList args, ConnectionContext* cntx); - static void FieldTtl(CmdArgList args, ConnectionContext* cntx); - static void FieldExpire(CmdArgList args, ConnectionContext* cntx); + static void Rename(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void RenameNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void ExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void PExpireTime(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Ttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Pttl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static ErrorReply RenameGeneric(CmdArgList args, bool destination_should_not_exist, - ConnectionContext* cntx); - - static void ExpireTimeGeneric(CmdArgList args, ConnectionContext* cntx, TimeUnit unit); - static void TtlGeneric(CmdArgList args, ConnectionContext* cntx, TimeUnit unit); + static void Echo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Select(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx); + static void Scan(CmdArgList args, Transaction*, SinkReplyBuilder* builder, + ConnectionContext* cntx); + static void Time(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Type(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Dump(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void Restore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void RandomKey(CmdArgList args, Transaction*, SinkReplyBuilder* builder, + ConnectionContext* cntx); + static void FieldTtl(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void FieldExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); }; } // namespace dfly