1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: pass SinkReplyBuilder and Transaction explicitly. Part7 (#3988)

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-10-25 13:32:51 +03:00 committed by GitHub
parent a2a5e3179a
commit ef09052482
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 169 additions and 176 deletions

View file

@ -91,7 +91,7 @@ OpResult<ExistsResult> OpExists(const OpArgs& op_args, string_view key, CmdArgLi
} // namespace
void BloomFamily::Reserve(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::Reserve(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser(args);
string_view key = parser.Next();
SbfParams params;
@ -99,23 +99,23 @@ void BloomFamily::Reserve(CmdArgList args, ConnectionContext* cntx) {
tie(params.error, params.init_capacity) = parser.Next<double, uint32_t>();
if (parser.Error())
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
if (!params.ok())
return cntx->SendError("error rate is out of range", kSyntaxErrType);
return builder->SendError("error rate is out of range", kSyntaxErrType);
const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpReserve(params, t->GetOpArgs(shard), key);
};
OpStatus res = cntx->transaction->ScheduleSingleHop(std::move(cb));
OpStatus res = tx->ScheduleSingleHop(std::move(cb));
if (res == OpStatus::KEY_EXISTS) {
return cntx->SendError("item exists");
return builder->SendError("item exists");
}
return cntx->SendError(res);
return builder->SendError(res);
}
void BloomFamily::Add(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::Add(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);
@ -123,30 +123,30 @@ void BloomFamily::Add(CmdArgList args, ConnectionContext* cntx) {
return OpAdd(t->GetOpArgs(shard), key, args);
};
OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpResult res = tx->ScheduleSingleHopT(std::move(cb));
OpStatus status = res.status();
if (res) {
if (res->front())
return cntx->SendLong(*res->front());
return builder->SendLong(*res->front());
else
status = res->front().status();
}
return cntx->SendError(status);
return builder->SendError(status);
}
void BloomFamily::Exists(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);
const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExists(t->GetOpArgs(shard), key, args);
};
OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
return cntx->SendLong(res ? res->front() : 0);
OpResult res = tx->ScheduleSingleHopT(std::move(cb));
return builder->SendLong(res ? res->front() : 0);
}
void BloomFamily::MAdd(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::MAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);
@ -154,23 +154,23 @@ void BloomFamily::MAdd(CmdArgList args, ConnectionContext* cntx) {
return OpAdd(t->GetOpArgs(shard), key, args);
};
OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpResult res = tx->ScheduleSingleHopT(std::move(cb));
if (!res) {
return cntx->SendError(res.status());
return builder->SendError(res.status());
}
const AddResult& add_res = *res;
RedisReplyBuilder* rb = (RedisReplyBuilder*)cntx->reply_builder();
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(add_res.size());
for (const OpResult<bool>& val : add_res) {
if (val) {
cntx->SendLong(*val);
builder->SendLong(*val);
} else {
cntx->SendError(val.status());
builder->SendError(val.status());
}
}
}
void BloomFamily::MExists(CmdArgList args, ConnectionContext* cntx) {
void BloomFamily::MExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);
@ -178,12 +178,12 @@ void BloomFamily::MExists(CmdArgList args, ConnectionContext* cntx) {
return OpExists(t->GetOpArgs(shard), key, args);
};
OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpResult res = tx->ScheduleSingleHopT(std::move(cb));
RedisReplyBuilder* rb = (RedisReplyBuilder*)cntx->reply_builder();
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(args.size());
for (size_t i = 0; i < args.size(); ++i) {
cntx->SendLong(res ? res->at(i) : 0);
rb->SendLong(res ? res->at(i) : 0);
}
}

View file

@ -6,6 +6,10 @@
#include "server/common.h"
namespace facade {
class SinkReplyBuilder;
} // namespace facade
namespace dfly {
class CommandRegistry;
@ -16,11 +20,13 @@ class BloomFamily {
static void Register(CommandRegistry* registry);
private:
static void Reserve(CmdArgList args, ConnectionContext* cntx);
static void Add(CmdArgList args, ConnectionContext* cntx);
static void MAdd(CmdArgList args, ConnectionContext* cntx);
static void Exists(CmdArgList args, ConnectionContext* cntx);
static void MExists(CmdArgList args, ConnectionContext* cntx);
using SinkReplyBuilder = facade::SinkReplyBuilder;
static void Reserve(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Add(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void MAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void MExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
};
} // namespace dfly

View file

@ -51,7 +51,7 @@ using facade::kWrongTypeErr;
({ \
auto expr_res = (expr); \
if (!expr_res) { \
cntx->SendError(expr_res.error()); \
builder->SendError(expr_res.error()); \
return; \
} \
std::move(expr_res).value(); \

View file

@ -28,6 +28,7 @@
#include "server/command_registry.h"
#include "server/common.h"
#include "server/detail/wrapped_json_path.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
#include "server/search/doc_index.h"
@ -45,6 +46,8 @@ using namespace std;
using namespace jsoncons;
using facade::CmdArgParser;
using facade::kSyntaxErrType;
using facade::RedisReplyBuilder;
using facade::SinkReplyBuilder;
using JsonExpression = jsonpath::jsonpath_expression<JsonType>;
using JsonReplaceVerify = std::function<void(JsonType&)>;
@ -252,7 +255,7 @@ struct JsonGetParams {
std::vector<std::pair<std::string_view, WrappedJsonPath>> paths;
};
std::optional<JsonGetParams> ParseJsonGetParams(CmdArgParser* parser, ConnectionContext* cntx) {
std::optional<JsonGetParams> ParseJsonGetParams(CmdArgParser* parser, SinkReplyBuilder* builder) {
JsonGetParams parsed_args;
while (parser->HasNext()) {
if (parser->Check("NOESCAPE")) {
@ -268,7 +271,7 @@ std::optional<JsonGetParams> ParseJsonGetParams(CmdArgParser* parser, Connection
auto json_path = ParseJsonPath(path_str);
if (!json_path) {
cntx->SendError(json_path.error());
builder->SendError(json_path.error());
return std::nullopt;
}
@ -791,13 +794,14 @@ auto OpToggle(const OpArgs& op_args, string_view key,
}
template <typename T>
auto ExecuteToggle(string_view key, const WrappedJsonPath& json_path, ConnectionContext* cntx) {
auto ExecuteToggle(string_view key, const WrappedJsonPath& json_path, Transaction* tx,
SinkReplyBuilder* builder) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpToggle<T>(t->GetOpArgs(shard), key, json_path);
};
auto result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
@ -1450,7 +1454,7 @@ OpStatus OpMerge(const OpArgs& op_args, string_view key, string_view path,
} // namespace
void JsonFamily::Set(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::Set(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
auto [key, path, json_str] = parser.Next<string_view, string_view, string_view>();
@ -1460,18 +1464,16 @@ void JsonFamily::Set(CmdArgList args, ConnectionContext* cntx) {
bool is_xx_condition = (res == 2), is_nx_condition = (res == 1);
if (parser.Error() || parser.HasNext()) // also clear the parser error dcheck
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpSet(t->GetOpArgs(shard), key, path, json_path, json_str, is_nx_condition,
is_xx_condition);
};
Transaction* trans = cntx->transaction;
OpResult<bool> result = tx->ScheduleSingleHopT(std::move(cb));
OpResult<bool> result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (result) {
if (*result) {
rb->SendOk();
@ -1479,15 +1481,15 @@ void JsonFamily::Set(CmdArgList args, ConnectionContext* cntx) {
rb->SendNull();
}
} else {
cntx->SendError(result.status());
builder->SendError(result.status());
}
}
// JSON.MSET key path value [key path value ...]
void JsonFamily::MSet(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::MSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
DCHECK_GE(args.size(), 3u);
if (args.size() % 3 != 0) {
return cntx->SendError(facade::WrongNumArgsError("json.mset"));
return builder->SendError(facade::WrongNumArgsError("json.mset"));
}
AggregateStatus status;
@ -1499,16 +1501,16 @@ void JsonFamily::MSet(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK;
};
cntx->transaction->ScheduleSingleHop(cb);
tx->ScheduleSingleHop(cb);
if (*status != OpStatus::OK)
return cntx->SendError(*status);
cntx->SendOk();
return builder->SendError(*status);
builder->SendOk();
}
// JSON.MERGE key path value
// Based on https://datatracker.ietf.org/doc/html/rfc7386 spec
void JsonFamily::Merge(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::Merge(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
string_view key = parser.Next();
string_view path = parser.Next();
@ -1520,13 +1522,13 @@ void JsonFamily::Merge(CmdArgList args, ConnectionContext* cntx) {
return OpMerge(t->GetOpArgs(shard), key, path, json_path, value);
};
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
OpStatus status = tx->ScheduleSingleHop(std::move(cb));
if (status == OpStatus::OK)
return cntx->SendOk();
cntx->SendError(status);
return builder->SendOk();
builder->SendError(status);
}
void JsonFamily::Resp(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::Resp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
string_view key = parser.Next();
string_view path = parser.NextOrDefault();
@ -1537,27 +1539,26 @@ void JsonFamily::Resp(CmdArgList args, ConnectionContext* cntx) {
return OpResp(t->GetOpArgs(shard), key, json_path);
};
Transaction* trans = cntx->transaction;
auto result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::Debug(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::Debug(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
string_view command = parser.Next();
// The 'MEMORY' sub-command is not supported yet, calling to operation function should be added
// here.
if (absl::EqualsIgnoreCase(command, "help")) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(2);
rb->SendBulkString(
"JSON.DEBUG FIELDS <key> <path> - report number of fields in the JSON element.");
rb->SendBulkString("JSON.DEBUG HELP - print help message.");
return;
} else if (!absl::EqualsIgnoreCase(command, "fields")) {
cntx->SendError(facade::UnknownSubCmd(command, "JSON.DEBUG"), facade::kSyntaxErrType);
builder->SendError(facade::UnknownSubCmd(command, "JSON.DEBUG"), facade::kSyntaxErrType);
return;
}
@ -1572,19 +1573,17 @@ void JsonFamily::Debug(CmdArgList args, ConnectionContext* cntx) {
return OpFields(t->GetOpArgs(shard), key, json_path);
};
Transaction* trans = cntx->transaction;
auto result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
DCHECK_GE(args.size(), 1U);
string_view path = ArgS(args, args.size() - 1);
WrappedJsonPath json_path = GET_OR_SEND_UNEXPECTED(ParseJsonPath(path));
Transaction* transaction = cntx->transaction;
unsigned shard_count = shard_set->size();
std::vector<std::vector<std::optional<std::string>>> mget_resp(shard_count);
@ -1594,16 +1593,16 @@ void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK;
};
OpStatus result = transaction->ScheduleSingleHop(std::move(cb));
OpStatus result = tx->ScheduleSingleHop(std::move(cb));
CHECK_EQ(OpStatus::OK, result);
std::vector<std::optional<std::string>> results(args.size() - 1);
for (ShardId sid = 0; sid < shard_count; ++sid) {
if (!transaction->IsActive(sid))
if (!tx->IsActive(sid))
continue;
std::vector<std::optional<std::string>>& res = mget_resp[sid];
ShardArgs shard_args = transaction->GetShardArgs(sid);
ShardArgs shard_args = tx->GetShardArgs(sid);
unsigned src_index = 0;
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) {
if (!res[src_index])
@ -1614,11 +1613,11 @@ void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
}
}
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(results.begin(), results.end(), rb);
}
void JsonFamily::ArrIndex(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::ArrIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
string_view key = parser.Next();
string_view path = parser.Next();
@ -1627,7 +1626,7 @@ void JsonFamily::ArrIndex(CmdArgList args, ConnectionContext* cntx) {
optional<JsonType> search_value = JsonFromString(parser.Next());
if (!search_value) {
cntx->SendError(kSyntaxErr);
builder->SendError(kSyntaxErr);
return;
}
@ -1635,7 +1634,7 @@ void JsonFamily::ArrIndex(CmdArgList args, ConnectionContext* cntx) {
if (parser.HasNext()) {
if (!absl::SimpleAtoi(parser.Next(), &start_index)) {
VLOG(1) << "Failed to convert the start index to numeric" << ArgS(args, 3);
cntx->SendError(kInvalidIntErr);
builder->SendError(kInvalidIntErr);
return;
}
}
@ -1644,7 +1643,7 @@ void JsonFamily::ArrIndex(CmdArgList args, ConnectionContext* cntx) {
if (parser.HasNext()) {
if (!absl::SimpleAtoi(parser.Next(), &end_index)) {
VLOG(1) << "Failed to convert the stop index to numeric" << ArgS(args, 4);
cntx->SendError(kInvalidIntErr);
builder->SendError(kInvalidIntErr);
return;
}
}
@ -1653,20 +1652,19 @@ void JsonFamily::ArrIndex(CmdArgList args, ConnectionContext* cntx) {
return OpArrIndex(t->GetOpArgs(shard), key, json_path, *search_value, start_index, end_index);
};
Transaction* trans = cntx->transaction;
auto result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::ArrInsert(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::ArrInsert(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
string_view path = ArgS(args, 1);
int index = -1;
if (!absl::SimpleAtoi(ArgS(args, 2), &index)) {
VLOG(1) << "Failed to convert the following value to numeric: " << ArgS(args, 2);
cntx->SendError(kInvalidIntErr);
builder->SendError(kInvalidIntErr);
return;
}
@ -1676,7 +1674,7 @@ void JsonFamily::ArrInsert(CmdArgList args, ConnectionContext* cntx) {
for (size_t i = 3; i < args.size(); i++) {
optional<JsonType> val = JsonFromString(ArgS(args, i));
if (!val) {
cntx->SendError(kSyntaxErr);
builder->SendError(kSyntaxErr);
return;
}
@ -1687,13 +1685,12 @@ void JsonFamily::ArrInsert(CmdArgList args, ConnectionContext* cntx) {
return OpArrInsert(t->GetOpArgs(shard), key, json_path, index, new_values);
};
Transaction* trans = cntx->transaction;
auto result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::ArrAppend(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::ArrAppend(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
string_view path = ArgS(args, 1);
@ -1706,7 +1703,7 @@ void JsonFamily::ArrAppend(CmdArgList args, ConnectionContext* cntx) {
for (size_t i = 2; i < args.size(); ++i) {
optional<JsonType> converted_val = JsonFromString(ArgS(args, i));
if (!converted_val) {
cntx->SendError(kSyntaxErr);
builder->SendError(kSyntaxErr);
return;
}
append_values.emplace_back(converted_val);
@ -1716,13 +1713,12 @@ void JsonFamily::ArrAppend(CmdArgList args, ConnectionContext* cntx) {
return OpArrAppend(t->GetOpArgs(shard), key, json_path, append_values);
};
Transaction* trans = cntx->transaction;
auto result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::ArrTrim(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::ArrTrim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
string_view path = ArgS(args, 1);
int start_index;
@ -1730,13 +1726,13 @@ void JsonFamily::ArrTrim(CmdArgList args, ConnectionContext* cntx) {
if (!absl::SimpleAtoi(ArgS(args, 2), &start_index)) {
VLOG(1) << "Failed to parse array start index";
cntx->SendError(kInvalidIntErr);
builder->SendError(kInvalidIntErr);
return;
}
if (!absl::SimpleAtoi(ArgS(args, 3), &stop_index)) {
VLOG(1) << "Failed to parse array stop index";
cntx->SendError(kInvalidIntErr);
builder->SendError(kInvalidIntErr);
return;
}
@ -1746,13 +1742,12 @@ void JsonFamily::ArrTrim(CmdArgList args, ConnectionContext* cntx) {
return OpArrTrim(t->GetOpArgs(shard), key, json_path, start_index, stop_index);
};
Transaction* trans = cntx->transaction;
auto result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::ArrPop(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::ArrPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
string_view key = parser.Next();
string_view path = parser.NextOrDefault();
@ -1764,13 +1759,12 @@ void JsonFamily::ArrPop(CmdArgList args, ConnectionContext* cntx) {
return OpArrPop(t->GetOpArgs(shard), key, json_path, index);
};
Transaction* trans = cntx->transaction;
auto result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::Clear(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::Clear(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
string_view key = parser.Next();
string_view path = parser.NextOrDefault();
@ -1781,13 +1775,12 @@ void JsonFamily::Clear(CmdArgList args, ConnectionContext* cntx) {
return OpClear(t->GetOpArgs(shard), key, json_path);
};
Transaction* trans = cntx->transaction;
OpResult<long> result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
OpResult<long> result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::StrAppend(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::StrAppend(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
string_view path = ArgS(args, 1);
string_view value = ArgS(args, 2);
@ -1797,7 +1790,7 @@ void JsonFamily::StrAppend(CmdArgList args, ConnectionContext* cntx) {
// We try parsing the value into json string object first.
optional<JsonType> parsed_json = JsonFromString(value);
if (!parsed_json || !parsed_json->is_string()) {
return cntx->SendError("expected string value", kSyntaxErrType);
return builder->SendError("expected string value", kSyntaxErrType);
};
string_view json_string = parsed_json->as_string_view();
@ -1805,13 +1798,12 @@ void JsonFamily::StrAppend(CmdArgList args, ConnectionContext* cntx) {
return OpStrAppend(t->GetOpArgs(shard), key, json_path, json_string);
};
Transaction* trans = cntx->transaction;
auto result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::ObjKeys(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::ObjKeys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
string_view key = parser.Next();
string_view path = parser.NextOrDefault();
@ -1822,13 +1814,12 @@ void JsonFamily::ObjKeys(CmdArgList args, ConnectionContext* cntx) {
return OpObjKeys(t->GetOpArgs(shard), key, json_path);
};
Transaction* trans = cntx->transaction;
auto result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::Del(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
string_view key = parser.Next();
string_view path = parser.NextOrDefault();
@ -1839,13 +1830,12 @@ void JsonFamily::Del(CmdArgList args, ConnectionContext* cntx) {
return OpDel(t->GetOpArgs(shard), key, path, json_path);
};
Transaction* trans = cntx->transaction;
OpResult<long> result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
OpResult<long> result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::NumIncrBy(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::NumIncrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
string_view path = ArgS(args, 1);
string_view num = ArgS(args, 2);
@ -1856,12 +1846,12 @@ void JsonFamily::NumIncrBy(CmdArgList args, ConnectionContext* cntx) {
return OpDoubleArithmetic(t->GetOpArgs(shard), key, json_path, num, OP_ADD);
};
OpResult<string> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
OpResult<string> result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::NumMultBy(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::NumMultBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
string_view path = ArgS(args, 1);
string_view num = ArgS(args, 2);
@ -1872,12 +1862,12 @@ void JsonFamily::NumMultBy(CmdArgList args, ConnectionContext* cntx) {
return OpDoubleArithmetic(t->GetOpArgs(shard), key, json_path, num, OP_MULTIPLY);
};
OpResult<string> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
OpResult<string> result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::Toggle(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::Toggle(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
string_view key = parser.Next();
string_view path = parser.NextOrDefault();
@ -1885,13 +1875,13 @@ void JsonFamily::Toggle(CmdArgList args, ConnectionContext* cntx) {
WrappedJsonPath json_path = GET_OR_SEND_UNEXPECTED(ParseJsonPath(path));
if (json_path.IsLegacyModePath()) {
ExecuteToggle<bool>(key, json_path, cntx);
ExecuteToggle<bool>(key, json_path, tx, builder);
} else {
ExecuteToggle<long>(key, json_path, cntx);
ExecuteToggle<long>(key, json_path, tx, builder);
}
}
void JsonFamily::Type(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::Type(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
string_view key = parser.Next();
string_view path = parser.NextOrDefault();
@ -1902,13 +1892,12 @@ void JsonFamily::Type(CmdArgList args, ConnectionContext* cntx) {
return OpType(t->GetOpArgs(shard), key, json_path);
};
Transaction* trans = cntx->transaction;
auto result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::ArrLen(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::ArrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
string_view key = parser.Next();
string_view path = parser.NextOrDefault();
@ -1919,13 +1908,12 @@ void JsonFamily::ArrLen(CmdArgList args, ConnectionContext* cntx) {
return OpArrLen(t->GetOpArgs(shard), key, json_path);
};
Transaction* trans = cntx->transaction;
auto result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::ObjLen(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::ObjLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
string_view key = parser.Next();
string_view path = parser.NextOrDefault();
@ -1936,13 +1924,12 @@ void JsonFamily::ObjLen(CmdArgList args, ConnectionContext* cntx) {
return OpObjLen(t->GetOpArgs(shard), key, json_path);
};
Transaction* trans = cntx->transaction;
auto result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::StrLen(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::StrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
string_view key = parser.Next();
string_view path = parser.NextOrDefault();
@ -1953,33 +1940,31 @@ void JsonFamily::StrLen(CmdArgList args, ConnectionContext* cntx) {
return OpStrLen(t->GetOpArgs(shard), key, json_path);
};
Transaction* trans = cntx->transaction;
auto result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
reply_generic::Send(result, rb);
}
void JsonFamily::Get(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::Get(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
DCHECK_GE(args.size(), 1U);
facade::CmdArgParser parser{args};
string_view key = parser.Next();
auto params = ParseJsonGetParams(&parser, cntx);
auto params = ParseJsonGetParams(&parser, builder);
if (!params) {
return; // ParseJsonGetParams should have already sent an error
}
if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply());
return builder->SendError(err->MakeReply());
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpJsonGet(t->GetOpArgs(shard), key, params.value());
};
Transaction* trans = cntx->transaction;
OpResult<string> result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
OpResult<string> result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (result == OpStatus::KEY_NOTFOUND) {
rb->SendNull(); // Match Redis

View file

@ -5,44 +5,46 @@
#pragma once
#include "server/common.h"
#include "server/engine_shard_set.h"
namespace facade {
class SinkReplyBuilder;
} // namespace facade
namespace dfly {
class ConnectionContext;
class CommandRegistry;
using facade::OpResult;
using facade::OpStatus;
using facade::RedisReplyBuilder;
class JsonFamily {
public:
static void Register(CommandRegistry* registry);
private:
static void Get(CmdArgList args, ConnectionContext* cntx);
static void MGet(CmdArgList args, ConnectionContext* cntx);
static void Type(CmdArgList args, ConnectionContext* cntx);
static void StrLen(CmdArgList args, ConnectionContext* cntx);
static void ObjLen(CmdArgList args, ConnectionContext* cntx);
static void ArrLen(CmdArgList args, ConnectionContext* cntx);
static void Toggle(CmdArgList args, ConnectionContext* cntx);
static void NumIncrBy(CmdArgList args, ConnectionContext* cntx);
static void NumMultBy(CmdArgList args, ConnectionContext* cntx);
static void Del(CmdArgList args, ConnectionContext* cntx);
static void ObjKeys(CmdArgList args, ConnectionContext* cntx);
static void StrAppend(CmdArgList args, ConnectionContext* cntx);
static void Clear(CmdArgList args, ConnectionContext* cntx);
static void ArrPop(CmdArgList args, ConnectionContext* cntx);
static void ArrTrim(CmdArgList args, ConnectionContext* cntx);
static void ArrInsert(CmdArgList args, ConnectionContext* cntx);
static void ArrAppend(CmdArgList args, ConnectionContext* cntx);
static void ArrIndex(CmdArgList args, ConnectionContext* cntx);
static void Debug(CmdArgList args, ConnectionContext* cntx);
static void Resp(CmdArgList args, ConnectionContext* cntx);
static void Set(CmdArgList args, ConnectionContext* cntx);
static void MSet(CmdArgList args, ConnectionContext* cntx);
static void Merge(CmdArgList args, ConnectionContext* cntx);
using SinkReplyBuilder = facade::SinkReplyBuilder;
static void Get(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Type(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void StrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void ObjLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void ArrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Toggle(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void NumIncrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void NumMultBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void ObjKeys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void StrAppend(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Clear(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void ArrPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void ArrTrim(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void ArrInsert(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void ArrAppend(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void ArrIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Debug(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Resp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Set(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void MSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Merge(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
};
} // namespace dfly