1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: update command interface for cluster and search families (#4258)

This commit is contained in:
Roman Gershman 2024-12-05 09:12:50 +02:00 committed by GitHub
parent 81079df0e1
commit c2f8349c51
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 106 additions and 113 deletions

View file

@ -124,6 +124,17 @@ void ClusterFamily::Shutdown() {
});
}
std::optional<ClusterShardInfos> ClusterFamily::GetShardInfos(ConnectionContext* cntx) const {
if (IsClusterEmulated()) {
return {GetEmulatedShardInfo(cntx)};
}
if (tl_cluster_config != nullptr) {
return GetConfigForStats(cntx);
}
return nullopt;
}
ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const {
ClusterShardInfo info{.slot_ranges = SlotRanges({{.start = 0, .end = kMaxSlotNum}}),
.master = {},
@ -227,13 +238,11 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde
} // namespace
void ClusterFamily::ClusterShards(SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (IsClusterEmulated()) {
return ClusterShardsImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterShardsImpl(GetConfigForStats(cntx), builder);
} else {
return builder->SendError(kClusterNotConfigured);
auto shard_infos = GetShardInfos(cntx);
if (shard_infos) {
return ClusterShardsImpl(*shard_infos, builder);
}
return builder->SendError(kClusterNotConfigured);
}
namespace {
@ -272,13 +281,11 @@ void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder
} // namespace
void ClusterFamily::ClusterSlots(SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (IsClusterEmulated()) {
return ClusterSlotsImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterSlotsImpl(GetConfigForStats(cntx), builder);
} else {
return builder->SendError(kClusterNotConfigured);
auto shard_infos = GetShardInfos(cntx);
if (shard_infos) {
return ClusterSlotsImpl(*shard_infos, builder);
}
return builder->SendError(kClusterNotConfigured);
}
namespace {
@ -328,13 +335,11 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id,
} // namespace
void ClusterFamily::ClusterNodes(SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (IsClusterEmulated()) {
return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, id_, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterNodesImpl(GetConfigForStats(cntx), id_, builder);
} else {
return builder->SendError(kClusterNotConfigured);
auto shard_infos = GetShardInfos(cntx);
if (shard_infos) {
return ClusterNodesImpl(*shard_infos, id_, builder);
}
return builder->SendError(kClusterNotConfigured);
}
namespace {
@ -392,13 +397,8 @@ void ClusterInfoImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder)
} // namespace
void ClusterFamily::ClusterInfo(SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (IsClusterEmulated()) {
return ClusterInfoImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterInfoImpl(GetConfigForStats(cntx), builder);
} else {
return ClusterInfoImpl({}, builder);
}
auto shard_infos = GetShardInfos(cntx);
return ClusterInfoImpl(shard_infos.value_or(ClusterShardInfos{}), builder);
}
void ClusterFamily::KeySlot(CmdArgList args, SinkReplyBuilder* builder) {
@ -410,11 +410,12 @@ void ClusterFamily::KeySlot(CmdArgList args, SinkReplyBuilder* builder) {
return builder->SendLong(id);
}
void ClusterFamily::Cluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) {
void ClusterFamily::Cluster(CmdArgList args, const CommandContext& cmd_cntx) {
// In emulated cluster mode, all slots are mapped to the same host, and number of cluster
// instances is thus 1.
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
auto* builder = cmd_cntx.rb;
if (!IsClusterEnabledOrEmulated()) {
return builder->SendError(kClusterDisabled);
}
@ -432,34 +433,35 @@ void ClusterFamily::Cluster(CmdArgList args, SinkReplyBuilder* builder, Connecti
} else if (sub_cmd == "MYID") {
return ClusterMyId(builder);
} else if (sub_cmd == "SHARDS") {
return ClusterShards(builder, cntx);
return ClusterShards(builder, cmd_cntx.conn_cntx);
} else if (sub_cmd == "SLOTS") {
return ClusterSlots(builder, cntx);
return ClusterSlots(builder, cmd_cntx.conn_cntx);
} else if (sub_cmd == "NODES") {
return ClusterNodes(builder, cntx);
return ClusterNodes(builder, cmd_cntx.conn_cntx);
} else if (sub_cmd == "INFO") {
return ClusterInfo(builder, cntx);
return ClusterInfo(builder, cmd_cntx.conn_cntx);
} else {
return builder->SendError(facade::UnknownSubCmd(sub_cmd, "CLUSTER"), facade::kSyntaxErrType);
}
}
void ClusterFamily::ReadOnly(CmdArgList args, SinkReplyBuilder* builder) {
void ClusterFamily::ReadOnly(CmdArgList args, const CommandContext& cmd_cntx) {
if (!IsClusterEmulated()) {
return builder->SendError(kClusterDisabled);
return cmd_cntx.rb->SendError(kClusterDisabled);
}
builder->SendOk();
cmd_cntx.rb->SendOk();
}
void ClusterFamily::ReadWrite(CmdArgList args, SinkReplyBuilder* builder) {
void ClusterFamily::ReadWrite(CmdArgList args, const CommandContext& cmd_cntx) {
if (!IsClusterEmulated()) {
return builder->SendError(kClusterDisabled);
return cmd_cntx.rb->SendError(kClusterDisabled);
}
builder->SendOk();
cmd_cntx.rb->SendOk();
}
void ClusterFamily::DflyCluster(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ClusterFamily::DflyCluster(CmdArgList args, const CommandContext& cmd_cntx) {
auto* builder = cmd_cntx.rb;
auto* cntx = cmd_cntx.conn_cntx;
if (!(IsClusterEnabled() || (IsClusterEmulated() && cntx->journal_emulated))) {
return builder->SendError("Cluster is disabled. Use --cluster_mode=yes to enable.");
}
@ -773,15 +775,15 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, SinkReplyBuilder* b
}
}
void ClusterFamily::DflyMigrate(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ClusterFamily::DflyMigrate(CmdArgList args, const CommandContext& cmd_cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
args.remove_prefix(1);
auto* builder = cmd_cntx.rb;
if (sub_cmd == "INIT") {
InitMigration(args, builder);
} else if (sub_cmd == "FLOW") {
DflyMigrateFlow(args, builder, cntx);
DflyMigrateFlow(args, builder, cmd_cntx.conn_cntx);
} else if (sub_cmd == "ACK") {
DflyMigrateAck(args, builder);
} else {
@ -1034,21 +1036,10 @@ void ClusterFamily::PauseAllIncomingMigrations(bool pause) {
}
}
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx);
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, const CommandContext& cmd_cntx);
using EngineFunc2 = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder);
inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) {
return [=](CmdArgList args, Transaction*, SinkReplyBuilder* builder, ConnectionContext* cntx) {
return (se->*f)(args, builder, cntx);
};
}
inline CommandId::Handler2 HandlerFunc(ClusterFamily* se, EngineFunc2 f) {
return [=](CmdArgList args, Transaction*, SinkReplyBuilder* builder) {
return (se->*f)(args, builder);
};
inline CommandId::Handler3 HandlerFunc(ClusterFamily* se, EngineFunc f) {
return [=](CmdArgList args, const CommandContext& cmd_cntx) { return (se->*f)(args, cmd_cntx); };
}
#define HFUNC(x) SetHandler(HandlerFunc(this, &ClusterFamily::x))

View file

@ -51,7 +51,7 @@ class ClusterFamily {
using SinkReplyBuilder = facade::SinkReplyBuilder;
// Cluster commands compatible with Redis
void Cluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
void Cluster(CmdArgList args, const CommandContext& cmd_cntx);
void ClusterHelp(SinkReplyBuilder* builder);
void ClusterShards(SinkReplyBuilder* builder, ConnectionContext* cntx);
void ClusterSlots(SinkReplyBuilder* builder, ConnectionContext* cntx);
@ -61,11 +61,11 @@ class ClusterFamily {
void KeySlot(CmdArgList args, SinkReplyBuilder* builder);
void ReadOnly(CmdArgList args, SinkReplyBuilder* builder);
void ReadWrite(CmdArgList args, SinkReplyBuilder* builder);
void ReadOnly(CmdArgList args, const CommandContext& cmd_cntx);
void ReadWrite(CmdArgList args, const CommandContext& cmd_cntx);
// Custom Dragonfly commands for cluster management
void DflyCluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
void DflyCluster(CmdArgList args, const CommandContext& cmd_cntx);
void DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
void DflyClusterGetSlotInfo(CmdArgList args, SinkReplyBuilder* builder)
@ -77,7 +77,7 @@ class ClusterFamily {
ABSL_LOCKS_EXCLUDED(migration_mu_);
// DFLYMIGRATE is internal command defines several steps in slots migrations process
void DflyMigrate(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
void DflyMigrate(CmdArgList args, const CommandContext& cmd_cntx);
// DFLYMIGRATE INIT is internal command to create incoming migration object
void InitMigration(CmdArgList args, SinkReplyBuilder* builder) ABSL_LOCKS_EXCLUDED(migration_mu_);
@ -122,6 +122,8 @@ class ClusterFamily {
ABSL_GUARDED_BY(migration_mu_);
private:
std::optional<ClusterShardInfos> GetShardInfos(ConnectionContext* cntx) const;
ClusterShardInfo GetEmulatedShardInfo(ConnectionContext* cntx) const;
// Guards set configuration, so that we won't handle 2 in parallel.

View file

@ -515,9 +515,9 @@ void ReplySorted(search::AggregationInfo agg, const SearchParams& params,
} // namespace
void SearchFamily::FtCreate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (cntx->conn_state.db_index != 0) {
void SearchFamily::FtCreate(CmdArgList args, const CommandContext& cmd_cntx) {
auto* builder = cmd_cntx.rb;
if (cmd_cntx.conn_cntx->conn_state.db_index != 0) {
return builder->SendError("Cannot create index on db != 0"sv);
}
@ -567,7 +567,7 @@ void SearchFamily::FtCreate(CmdArgList args, Transaction* tx, SinkReplyBuilder*
// Check if index already exists
atomic_uint exists_cnt = 0;
tx->Execute(
cmd_cntx.tx->Execute(
[idx_name, &exists_cnt](auto* tx, auto* es) {
if (es->search_indices()->GetIndex(idx_name) != nullptr)
exists_cnt.fetch_add(1, std::memory_order_relaxed);
@ -578,12 +578,12 @@ void SearchFamily::FtCreate(CmdArgList args, Transaction* tx, SinkReplyBuilder*
DCHECK(exists_cnt == 0u || exists_cnt == shard_set->size());
if (exists_cnt.load(memory_order_relaxed) > 0) {
tx->Conclude();
cmd_cntx.tx->Conclude();
return builder->SendError("Index already exists");
}
auto idx_ptr = make_shared<DocIndex>(std::move(index));
tx->Execute(
cmd_cntx.tx->Execute(
[idx_name, idx_ptr](auto* tx, auto* es) {
es->search_indices()->InitIndex(tx->GetOpArgs(es), idx_name, idx_ptr);
return OpStatus::OK;
@ -593,12 +593,12 @@ void SearchFamily::FtCreate(CmdArgList args, Transaction* tx, SinkReplyBuilder*
builder->SendOk();
}
void SearchFamily::FtAlter(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void SearchFamily::FtAlter(CmdArgList args, const CommandContext& cmd_cntx) {
CmdArgParser parser{args};
string_view idx_name = parser.Next();
parser.ExpectTag("SCHEMA");
parser.ExpectTag("ADD");
auto* builder = cmd_cntx.rb;
if (auto err = parser.Error(); err)
return builder->SendError(err->MakeReply());
@ -612,17 +612,17 @@ void SearchFamily::FtAlter(CmdArgList args, Transaction* tx, SinkReplyBuilder* b
index_info = make_shared<DocIndex>(idx->GetInfo().base_index);
return OpStatus::OK;
};
tx->Execute(idx_cb, false);
cmd_cntx.tx->Execute(idx_cb, false);
if (!index_info) {
tx->Conclude();
cmd_cntx.tx->Conclude();
return builder->SendError("Index not found");
}
// Parse additional schema
optional<search::Schema> new_fields = ParseSchemaOrReply(index_info->type, parser, builder);
if (!new_fields) {
tx->Conclude();
cmd_cntx.tx->Conclude();
return;
}
@ -641,17 +641,17 @@ void SearchFamily::FtAlter(CmdArgList args, Transaction* tx, SinkReplyBuilder* b
es->search_indices()->InitIndex(tx->GetOpArgs(es), idx_name, index_info);
return OpStatus::OK;
};
tx->Execute(upd_cb, true);
cmd_cntx.tx->Execute(upd_cb, true);
builder->SendOk();
}
void SearchFamily::FtDropIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void SearchFamily::FtDropIndex(CmdArgList args, const CommandContext& cmd_cntx) {
string_view idx_name = ArgS(args, 0);
// TODO: Handle optional DD param
atomic_uint num_deleted{0};
tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
cmd_cntx.tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
if (es->search_indices()->DropIndex(idx_name))
num_deleted.fetch_add(1);
return OpStatus::OK;
@ -659,17 +659,17 @@ void SearchFamily::FtDropIndex(CmdArgList args, Transaction* tx, SinkReplyBuilde
DCHECK(num_deleted == 0u || num_deleted == shard_set->size());
if (num_deleted == 0u)
return builder->SendError("-Unknown Index name");
return builder->SendOk();
return cmd_cntx.rb->SendError("-Unknown Index name");
return cmd_cntx.rb->SendOk();
}
void SearchFamily::FtInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void SearchFamily::FtInfo(CmdArgList args, const CommandContext& cmd_cntx) {
string_view idx_name = ArgS(args, 0);
atomic_uint num_notfound{0};
vector<DocIndexInfo> infos(shard_set->size());
tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
cmd_cntx.tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
auto* index = es->search_indices()->GetIndex(idx_name);
if (index == nullptr)
num_notfound.fetch_add(1);
@ -679,9 +679,10 @@ void SearchFamily::FtInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
});
DCHECK(num_notfound == 0u || num_notfound == shard_set->size());
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (num_notfound > 0u)
return builder->SendError("Unknown Index name");
return rb->SendError("Unknown Index name");
DCHECK(infos.front().base_index.schema.fields.size() ==
infos.back().base_index.schema.fields.size());
@ -693,7 +694,6 @@ void SearchFamily::FtInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
const auto& info = infos.front();
const auto& schema = info.base_index.schema;
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartCollection(4, RedisReplyBuilder::MAP);
rb->SendSimpleString("index_name");
@ -731,25 +731,25 @@ void SearchFamily::FtInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
rb->SendLong(total_num_docs);
}
void SearchFamily::FtList(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void SearchFamily::FtList(CmdArgList args, const CommandContext& cmd_cntx) {
atomic_int first{0};
vector<string> names;
tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
cmd_cntx.tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
// Using `first` to assign `names` only once without a race
if (first.fetch_add(1) == 0)
names = es->search_indices()->GetIndexNames();
return OpStatus::OK;
});
auto* rb = static_cast<RedisReplyBuilder*>(builder);
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
rb->SendBulkStrArr(names);
}
void SearchFamily::FtSearch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void SearchFamily::FtSearch(CmdArgList args, const CommandContext& cmd_cntx) {
CmdArgParser parser{args};
string_view index_name = parser.Next();
string_view query_str = parser.Next();
auto* builder = cmd_cntx.rb;
auto params = ParseSearchParamsOrReply(&parser, builder);
if (!params.has_value())
return;
@ -763,7 +763,7 @@ void SearchFamily::FtSearch(CmdArgList args, Transaction* tx, SinkReplyBuilder*
atomic<bool> index_not_found{false};
vector<SearchResult> docs(shard_set->size());
tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
cmd_cntx.tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
if (auto* index = es->search_indices()->GetIndex(index_name); index)
docs[es->shard_id()] = index->Search(t->GetOpArgs(es), *params, &search_algo);
else
@ -785,13 +785,14 @@ void SearchFamily::FtSearch(CmdArgList args, Transaction* tx, SinkReplyBuilder*
ReplyWithResults(*params, absl::MakeSpan(docs), builder);
}
void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void SearchFamily::FtProfile(CmdArgList args, const CommandContext& cmd_cntx) {
CmdArgParser parser{args};
string_view index_name = parser.Next();
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (!parser.Check("SEARCH") && !parser.Check("AGGREGATE")) {
return builder->SendError("no `SEARCH` or `AGGREGATE` provided");
return rb->SendError("no `SEARCH` or `AGGREGATE` provided");
}
parser.Check("LIMITED"); // TODO: Implement limited profiling
@ -799,14 +800,14 @@ void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder*
string_view query_str = parser.Next();
optional<SearchParams> params = ParseSearchParamsOrReply(&parser, builder);
optional<SearchParams> params = ParseSearchParamsOrReply(&parser, rb);
if (!params.has_value())
return;
search::SearchAlgorithm search_algo;
search::SortOption* sort_opt = params->sort_option.has_value() ? &*params->sort_option : nullptr;
if (!search_algo.Init(query_str, &params->query_params, sort_opt))
return builder->SendError("query syntax error");
return rb->SendError("query syntax error");
search_algo.EnableProfiling();
@ -818,7 +819,7 @@ void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder*
std::vector<SearchResult> search_results(shards_count);
std::vector<absl::Duration> profile_results(shards_count);
tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
cmd_cntx.tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
auto* index = es->search_indices()->GetIndex(index_name);
if (!index) {
index_not_found.store(true, memory_order_relaxed);
@ -835,7 +836,7 @@ void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder*
});
if (index_not_found.load())
return builder->SendError(std::string{index_name} + ": no such index");
return rb->SendError(std::string{index_name} + ": no such index");
auto took = absl::Now() - start;
@ -851,7 +852,6 @@ void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder*
}
}
auto* rb = static_cast<RedisReplyBuilder*>(builder);
// First element -> Result of the search command
// Second element -> Profile information
rb->StartArray(2);
@ -860,9 +860,9 @@ void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder*
if (!result_is_empty) {
auto agg = search_algo.HasAggregation();
if (agg) {
ReplySorted(*agg, *params, absl::MakeSpan(search_results), builder);
ReplySorted(*agg, *params, absl::MakeSpan(search_results), rb);
} else {
ReplyWithResults(*params, absl::MakeSpan(search_results), builder);
ReplyWithResults(*params, absl::MakeSpan(search_results), rb);
}
} else {
rb->StartArray(1);
@ -918,14 +918,14 @@ void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder*
}
}
void SearchFamily::FtTagVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void SearchFamily::FtTagVals(CmdArgList args, const CommandContext& cmd_cntx) {
string_view index_name = ArgS(args, 0);
string_view field_name = ArgS(args, 1);
VLOG(1) << "FtTagVals: " << index_name << " " << field_name;
vector<io::Result<StringVec, ErrorReply>> shard_results(shard_set->size(), StringVec{});
tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
cmd_cntx.tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
if (auto* index = es->search_indices()->GetIndex(index_name); index)
shard_results[es->shard_id()] = index->GetTagVals(field_name);
else
@ -935,6 +935,7 @@ void SearchFamily::FtTagVals(CmdArgList args, Transaction* tx, SinkReplyBuilder*
});
absl::flat_hash_set<string> result_set;
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
// Check first if either shard had errors. Also merge the results into a single set.
for (auto& res : shard_results) {
@ -942,18 +943,18 @@ void SearchFamily::FtTagVals(CmdArgList args, Transaction* tx, SinkReplyBuilder*
result_set.insert(make_move_iterator(res->begin()), make_move_iterator(res->end()));
} else {
res.error().kind = facade::kSearchErrType;
return builder->SendError(res.error());
return rb->SendError(res.error());
}
}
shard_results.clear();
vector<string> vec(result_set.begin(), result_set.end());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendBulkStrArr(vec, RedisReplyBuilder::SET);
}
void SearchFamily::FtAggregate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void SearchFamily::FtAggregate(CmdArgList args, const CommandContext& cmd_cntx) {
auto* builder = cmd_cntx.rb;
const auto params = ParseAggregatorParamsOrReply(args, builder);
if (!params)
return;
@ -966,7 +967,7 @@ void SearchFamily::FtAggregate(CmdArgList args, Transaction* tx, SinkReplyBuilde
declval<OpArgs>(), params.value(), &search_algo));
vector<ResultContainer> query_results(shard_set->size());
tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
cmd_cntx.tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
if (auto* index = es->search_indices()->GetIndex(params->index); index) {
query_results[es->shard_id()] =
index->SearchForAggregator(t->GetOpArgs(es), params.value(), &search_algo);
@ -985,7 +986,7 @@ void SearchFamily::FtAggregate(CmdArgList args, Transaction* tx, SinkReplyBuilde
return builder->SendError(agg_results.error());
size_t result_size = agg_results->size();
auto* rb = static_cast<RedisReplyBuilder*>(builder);
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
auto sortable_value_sender = SortableValueSender(rb);
rb->StartArray(result_size + 1);

View file

@ -15,21 +15,20 @@ class SinkReplyBuilder;
namespace dfly {
class CommandRegistry;
class ConnectionContext;
struct CommandContext;
class SearchFamily {
using SinkReplyBuilder = facade::SinkReplyBuilder;
static void FtCreate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
static void FtAlter(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void FtDropIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void FtInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void FtList(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void FtSearch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void FtAggregate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void FtTagVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void FtCreate(CmdArgList args, const CommandContext& cmd_cntx);
static void FtAlter(CmdArgList args, const CommandContext& cmd_cntx);
static void FtDropIndex(CmdArgList args, const CommandContext& cmd_cntx);
static void FtInfo(CmdArgList args, const CommandContext& cmd_cntx);
static void FtList(CmdArgList args, const CommandContext& cmd_cntx);
static void FtSearch(CmdArgList args, const CommandContext& cmd_cntx);
static void FtProfile(CmdArgList args, const CommandContext& cmd_cntx);
static void FtAggregate(CmdArgList args, const CommandContext& cmd_cntx);
static void FtTagVals(CmdArgList args, const CommandContext& cmd_cntx);
public:
static void Register(CommandRegistry* registry);