mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
refactor: use CmdArgParser for XGROUP command (#3739)
This commit is contained in:
parent
bbaa2669f9
commit
efa4efd2bf
1 changed files with 70 additions and 85 deletions
|
@ -1797,19 +1797,18 @@ OpResult<PendingResult> OpPending(const OpArgs& op_args, string_view key, const
|
|||
return result;
|
||||
}
|
||||
|
||||
void CreateGroup(CmdArgList args, string_view key, ConnectionContext* cntx) {
|
||||
if (args.size() < 2)
|
||||
return cntx->SendError(UnknownSubCmd("CREATE", "XGROUP"));
|
||||
void CreateGroup(facade::CmdArgParser* parser, ConnectionContext* cntx) {
|
||||
auto key = parser->Next();
|
||||
|
||||
CreateOpts opts;
|
||||
opts.gname = ArgS(args, 0);
|
||||
opts.id = ArgS(args, 1);
|
||||
if (args.size() >= 3) {
|
||||
ToUpper(&args[2]);
|
||||
if (ArgS(args, 2) == "MKSTREAM")
|
||||
opts.flags |= kCreateOptMkstream;
|
||||
std::tie(opts.gname, opts.id) = parser->Next<string_view, string_view>();
|
||||
if (parser->Check("MKSTREAM")) {
|
||||
opts.flags |= kCreateOptMkstream;
|
||||
}
|
||||
|
||||
if (auto err = parser->Error(); err)
|
||||
return cntx->SendError(err->MakeReply());
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpCreate(t->GetOpArgs(shard), key, opts);
|
||||
};
|
||||
|
@ -1823,7 +1822,15 @@ void CreateGroup(CmdArgList args, string_view key, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) {
|
||||
void DestroyGroup(facade::CmdArgParser* parser, ConnectionContext* cntx) {
|
||||
auto [key, gname] = parser->Next<string_view, string_view>();
|
||||
|
||||
if (auto err = parser->Error(); err)
|
||||
return cntx->SendError(err->MakeReply());
|
||||
|
||||
if (parser->HasNext())
|
||||
return cntx->SendError(UnknownSubCmd("DESTROY", "XGROUP"));
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpDestroyGroup(t->GetOpArgs(shard), key, gname);
|
||||
};
|
||||
|
@ -1841,8 +1848,15 @@ void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void CreateConsumer(string_view key, string_view gname, string_view consumer,
|
||||
ConnectionContext* cntx) {
|
||||
void CreateConsumer(facade::CmdArgParser* parser, ConnectionContext* cntx) {
|
||||
auto [key, gname, consumer] = parser->Next<string_view, string_view, string_view>();
|
||||
|
||||
if (auto err = parser->Error(); err)
|
||||
return cntx->SendError(err->MakeReply());
|
||||
|
||||
if (parser->HasNext())
|
||||
return cntx->SendError(UnknownSubCmd("CREATECONSUMER", "XGROUP"));
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpCreateConsumer(t->GetOpArgs(shard), key, gname, consumer);
|
||||
};
|
||||
|
@ -1862,8 +1876,15 @@ void CreateConsumer(string_view key, string_view gname, string_view consumer,
|
|||
}
|
||||
}
|
||||
|
||||
void DelConsumer(string_view key, string_view gname, string_view consumer,
|
||||
ConnectionContext* cntx) {
|
||||
void DelConsumer(facade::CmdArgParser* parser, ConnectionContext* cntx) {
|
||||
auto [key, gname, consumer] = parser->Next<string_view, string_view, string_view>();
|
||||
|
||||
if (auto err = parser->Error(); err)
|
||||
return cntx->SendError(err->MakeReply());
|
||||
|
||||
if (parser->HasNext())
|
||||
return cntx->SendError(UnknownSubCmd("DELCONSUMER", "XGROUP"));
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpDelConsumer(t->GetOpArgs(shard), key, gname, consumer);
|
||||
};
|
||||
|
@ -1882,18 +1903,21 @@ void DelConsumer(string_view key, string_view gname, string_view consumer,
|
|||
}
|
||||
}
|
||||
|
||||
void SetId(string_view key, string_view gname, CmdArgList args, ConnectionContext* cntx) {
|
||||
facade::CmdArgParser parser{args};
|
||||
void SetId(facade::CmdArgParser* parser, ConnectionContext* cntx) {
|
||||
auto [key, gname, id] = parser->Next<string_view, string_view, string_view>();
|
||||
|
||||
string_view id = parser.Next();
|
||||
while (parser.HasNext()) {
|
||||
if (parser.Check("ENTRIESREAD")) {
|
||||
while (parser->HasNext()) {
|
||||
if (parser->Check("ENTRIESREAD")) {
|
||||
// TODO: to support ENTRIESREAD.
|
||||
return cntx->SendError(kSyntaxErr);
|
||||
} else {
|
||||
return cntx->SendError(kSyntaxErr);
|
||||
}
|
||||
}
|
||||
|
||||
if (auto err = parser->Error(); err)
|
||||
return cntx->SendError(err->MakeReply());
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpSetId(t->GetOpArgs(shard), key, gname, id);
|
||||
};
|
||||
|
@ -1910,20 +1934,23 @@ void SetId(string_view key, string_view gname, CmdArgList args, ConnectionContex
|
|||
}
|
||||
|
||||
void XGroupHelp(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view help_arr[] = {
|
||||
"CREATE <key> <groupname> <id|$> [option]",
|
||||
" Create a new consumer group. Options are:",
|
||||
" * MKSTREAM",
|
||||
" Create the empty stream if it does not exist.",
|
||||
"CREATECONSUMER <key> <groupname> <consumer>",
|
||||
" Create a new consumer in the specified group.",
|
||||
"DELCONSUMER <key> <groupname> <consumer>",
|
||||
" Remove the specified consumer.",
|
||||
"DESTROY <key> <groupname>",
|
||||
" Remove the specified group.",
|
||||
"SETID <key> <groupname> <id|$>",
|
||||
" Set the current group ID.",
|
||||
};
|
||||
string_view help_arr[] = {"XGROUP <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
|
||||
"CREATE <key> <groupname> <id|$> [option]",
|
||||
" Create a new consumer group. Options are:",
|
||||
" * MKSTREAM",
|
||||
" Create the empty stream if it does not exist.",
|
||||
" * ENTRIESREAD entries_read",
|
||||
" Set the group's entries_read counter (internal use).",
|
||||
"CREATECONSUMER <key> <groupname> <consumer>",
|
||||
" Create a new consumer in the specified group.",
|
||||
"DELCONSUMER <key> <groupname> <consumer>",
|
||||
" Remove the specified consumer.",
|
||||
"DESTROY <key> <groupname>",
|
||||
" Remove the specified group.",
|
||||
"SETID <key> <groupname> <id|$> [ENTRIESREAD entries_read]",
|
||||
" Set the current group ID and entries_read counter.",
|
||||
"HELP",
|
||||
" Print this help."};
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
return rb->SendSimpleStrArr(help_arr);
|
||||
}
|
||||
|
@ -2316,63 +2343,21 @@ void StreamFamily::XDel(CmdArgList args, ConnectionContext* cntx) {
|
|||
cntx->SendError(result.status());
|
||||
}
|
||||
|
||||
void HelpSubCmd(facade::CmdArgParser* parser, ConnectionContext* cntx) {
|
||||
XGroupHelp(parser->Tail(), cntx);
|
||||
}
|
||||
|
||||
void StreamFamily::XGroup(CmdArgList args, ConnectionContext* cntx) {
|
||||
facade::CmdArgParser parser{args};
|
||||
|
||||
string_view sub_cmd = parser.ToUpper().Next();
|
||||
if (sub_cmd == "HELP") {
|
||||
string_view help[] = {"XGROUP <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
|
||||
"CREATE <key> <groupname> <id|$> [option]",
|
||||
" Create a new consumer group. Options are:",
|
||||
" * MKSTREAM",
|
||||
" Create the empty stream if it does not exist.",
|
||||
" * ENTRIESREAD entries_read",
|
||||
" Set the group's entries_read counter (internal use).",
|
||||
"CREATECONSUMER <key> <groupname> <consumer>",
|
||||
" Create a new consumer in the specified group.",
|
||||
"DELCONSUMER <key> <groupname> <consumer>",
|
||||
" Remove the specified consumer.",
|
||||
"DESTROY <key> <groupname>",
|
||||
" Remove the specified group.",
|
||||
"SETID <key> <groupname> <id|$> [ENTRIESREAD entries_read]",
|
||||
" Set the current group ID and entries_read counter.",
|
||||
"HELP",
|
||||
" Print this help."};
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
return rb->SendStringArr(help);
|
||||
}
|
||||
auto sub_cmd_func = parser.MapNext("HELP", &HelpSubCmd, "CREATE", &CreateGroup, "DESTROY",
|
||||
&DestroyGroup, "CREATECONSUMER", &CreateConsumer,
|
||||
"DELCONSUMER", &DelConsumer, "SETID", &SetId);
|
||||
|
||||
if (!parser.HasAtLeast(2))
|
||||
return cntx->SendError(kSyntaxErr, kScriptErrType);
|
||||
if (auto err = parser.Error(); err)
|
||||
return cntx->SendError(err->MakeReply());
|
||||
|
||||
string_view key = parser.Next();
|
||||
|
||||
if (sub_cmd == "CREATE") {
|
||||
args.remove_prefix(2);
|
||||
return CreateGroup(std::move(args), key, cntx);
|
||||
}
|
||||
|
||||
string_view gname = parser.Next();
|
||||
if (sub_cmd == "DESTROY" && args.size() == 3) {
|
||||
return DestroyGroup(key, gname, cntx);
|
||||
}
|
||||
|
||||
if (sub_cmd == "CREATECONSUMER" && args.size() == 4) {
|
||||
string_view cname = parser.Next();
|
||||
return CreateConsumer(key, gname, cname, cntx);
|
||||
}
|
||||
|
||||
if (sub_cmd == "DELCONSUMER" && args.size() == 4) {
|
||||
string_view cname = parser.Next();
|
||||
return DelConsumer(key, gname, cname, cntx);
|
||||
}
|
||||
|
||||
if (sub_cmd == "SETID" && args.size() >= 4) {
|
||||
args.remove_prefix(3);
|
||||
return SetId(key, gname, std::move(args), cntx);
|
||||
}
|
||||
|
||||
return cntx->SendError(UnknownSubCmd(sub_cmd, "XGROUP"));
|
||||
sub_cmd_func(&parser, cntx);
|
||||
}
|
||||
|
||||
void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
|
Loading…
Reference in a new issue