1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: refactor a lambda function into a named one (#3753)

Also did some cosmetic improvements. No functionality should be changed.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-09-22 01:35:56 +03:00 committed by GitHub
parent 9dd79657ce
commit cce2eb35ed
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 56 additions and 44 deletions

View file

@ -842,7 +842,7 @@ vector<RecordVec> OpRead(const OpArgs& op_args, const ShardArgs& shard_args, con
vector<RecordVec> response(shard_args.Size());
unsigned index = 0;
for (string_view key : shard_args) {
auto sitem = opts.stream_ids.at(key);
const auto& sitem = opts.stream_ids.at(key);
auto& dest = response[index++];
if (!sitem.group && opts.read_group) {
continue;
@ -1361,7 +1361,7 @@ vector<GroupConsumerPair> OpGetGroupConsumerPairs(const ShardArgs& shard_args,
for (string_view key : shard_args) {
streamCG* group = nullptr;
streamConsumer* consumer = nullptr;
auto& dest = sid_items[index++];
GroupConsumerPair& dest = sid_items[index++];
auto group_res = FindGroup(op_args, key, opts.group);
if (!group_res) {
@ -2054,11 +2054,12 @@ void FetchGroupInfo(Transaction* tx, ReadOpts* opts) {
tx->Execute(std::move(cb), false);
for (size_t i = 0; i < shard_set->size(); i++) {
auto s_item = res_pairs[i];
auto s_args = tx->GetShardArgs(i);
const auto& s_item = res_pairs[i];
if (s_item.size() == 0) {
continue;
}
ShardArgs s_args = tx->GetShardArgs(i);
unsigned index = 0;
for (string_view key : s_args) {
StreamIDsItem& item = opts->stream_ids.at(key);
@ -2917,14 +2918,14 @@ void XReadBlock(ReadOpts* opts, ConnectionContext* cntx) {
auto tp = (opts->timeout) ? chrono::steady_clock::now() + chrono::milliseconds(opts->timeout)
: Transaction::time_point::max();
const auto key_checker = [&opts](EngineShard* owner, const DbContext& context, Transaction* tx,
std::string_view key) -> bool {
const auto key_checker = [opts](EngineShard* owner, const DbContext& context, Transaction* tx,
std::string_view key) -> bool {
auto& db_slice = context.GetDbSlice(owner->shard_id());
auto res_it = db_slice.FindReadOnly(context, key, OBJ_STREAM);
if (!res_it.ok())
return false;
auto sitem = opts->stream_ids.at(key);
const StreamIDsItem& sitem = opts->stream_ids.at(key);
if (sitem.id.val.ms != UINT64_MAX && sitem.id.val.seq != UINT64_MAX)
return true;
@ -2955,7 +2956,7 @@ void XReadBlock(ReadOpts* opts, ConnectionContext* cntx) {
.ms = UINT64_MAX,
.seq = UINT64_MAX,
}};
auto sitem = opts->stream_ids.at(*wake_key);
const StreamIDsItem& sitem = opts->stream_ids.at(*wake_key);
range_opts.start = sitem.id;
if (sitem.id.val.ms == UINT64_MAX || sitem.id.val.seq == UINT64_MAX) {
range_opts.start.val = sitem.group->last_id; // only for '>'
@ -2986,55 +2987,64 @@ void XReadBlock(ReadOpts* opts, ConnectionContext* cntx) {
return rb->SendNullArray();
}
struct OpReadSingleShardContext {
ReadOpts* opts;
facade::ErrorReply error{OpStatus::OK};
bool requires_blocking = false;
vector<RecordVec> prefetched_results;
};
Transaction::RunnableResult OpReadSingleShard(Transaction* tx, EngineShard* es,
OpReadSingleShardContext* context) {
auto last_ids = OpLastIDs(tx->GetOpArgs(es), tx->GetShardArgs(es->shard_id()));
if (!last_ids)
return last_ids.status();
absl::flat_hash_map<string, streamID> last_ids_map(last_ids->begin(), last_ids->end());
auto has_entries = HasEntries(last_ids_map, context->opts);
if (!has_entries.has_value()) {
context->error = has_entries.error();
return OpStatus::INVALID_VALUE;
}
// If no entries are available, avoid concluding to proceed waiting with acquired keys
if (!*has_entries) {
context->requires_blocking = true;
return {OpStatus::OK, Transaction::RunnableResult::AVOID_CONCLUDING};
}
context->prefetched_results =
OpRead(tx->GetOpArgs(es), tx->GetShardArgs(es->shard_id()), *context->opts);
DCHECK(!context->prefetched_results.empty());
return OpStatus::OK;
}
// Determine if entries are available and read them in a single hop. Returns nullopt in case of an
// error and replies.
std::optional<vector<RecordVec>> XReadImplSingleShard(ConnectionContext* cntx, ReadOpts* opts) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* tx = cntx->transaction;
OpReadSingleShardContext op_cntx;
op_cntx.opts = opts;
vector<RecordVec> prefetched_results;
bool requires_blocking = false;
optional<facade::ErrorReply> detailed_err;
auto res = tx->ScheduleSingleHop([&](auto* tx, auto* es) -> Transaction::RunnableResult {
auto last_ids = OpLastIDs(tx->GetOpArgs(es), tx->GetShardArgs(es->shard_id()));
if (!last_ids)
return last_ids.status();
absl::flat_hash_map<string, streamID> last_ids_map(last_ids->begin(), last_ids->end());
auto has_entries = HasEntries(last_ids_map, opts);
if (!has_entries.has_value()) {
detailed_err = has_entries.error();
return OpStatus::INVALID_VALUE;
}
// If no entries are available, avoid concluding to proceed waiting with acquired keys
if (!*has_entries) {
requires_blocking = true;
return {OpStatus::OK, Transaction::RunnableResult::AVOID_CONCLUDING};
}
prefetched_results = OpRead(tx->GetOpArgs(es), tx->GetShardArgs(es->shard_id()), *opts);
DCHECK(!prefetched_results.empty());
return OpStatus::OK;
});
if (detailed_err.has_value()) {
cntx->SendError(*detailed_err);
return std::nullopt;
}
auto res = tx->ScheduleSingleHop(
[&](auto* tx, auto* es) { return OpReadSingleShard(tx, es, &op_cntx); });
if (res != OpStatus::OK) {
if (res == OpStatus::WRONG_TYPE)
if (res == OpStatus::INVALID_VALUE)
cntx->SendError(op_cntx.error);
else if (res == OpStatus::WRONG_TYPE)
cntx->SendError(kWrongTypeErr);
else
rb->SendNullArray();
return std::nullopt;
}
if (requires_blocking)
if (op_cntx.requires_blocking)
return vector<RecordVec>{};
return {std::move(prefetched_results)};
return {std::move(op_cntx.prefetched_results)};
}
// Read entries from given streams
@ -3151,8 +3161,10 @@ void XReadGeneric(CmdArgList args, bool read_group, ConnectionContext* cntx) {
return;
}
// TODO: we conduct lots of hops that seems to be could be collapsed into the shard
// callback. For example, FetchGroupInfo can probably be moved into OpRead.
if (opts->read_group) {
FetchGroupInfo(cntx->transaction, &*opts);
FetchGroupInfo(cntx->transaction, &opts.value());
}
return XReadImpl(args, &opts.value(), cntx);

View file

@ -106,7 +106,7 @@ class Transaction {
}
public:
// Result returned by callbacks. Most should use the implcit conversion from OpStatus.
// Result returned by callbacks. Most should use the implicit conversion from OpStatus.
struct RunnableResult {
enum Flag : uint16_t {
// Can be issued by a **single** shard callback to avoid concluding, i.e. perform one more hop