1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(xgroup): add support for xgroup createconsumer (#1382)

This commit is contained in:
Abhradeep Chakraborty 2023-06-12 17:07:33 +05:30 committed by GitHub
parent 23f1a77d4c
commit e6a8ec8598
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 0 deletions

View file

@ -817,6 +817,26 @@ OpStatus OpDestroyGroup(const OpArgs& op_args, string_view key, string_view gnam
return OpStatus::SKIPPED;
}
// XGROUP CREATECONSUMER key groupname consumername
OpResult<uint32_t> OpCreateConsumer(const OpArgs& op_args, string_view key, string_view gname,
string_view consumer_name) {
OpResult<pair<stream*, streamCG*>> cgroup_res = FindGroup(op_args, key, gname);
if (!cgroup_res)
return cgroup_res.status();
streamCG* cg = cgroup_res->second;
if (cg == nullptr)
return OpStatus::SKIPPED;
auto* shard = op_args.shard;
shard->tmp_str1 = sdscpylen(shard->tmp_str1, consumer_name.data(), consumer_name.size());
streamConsumer* consumer =
streamCreateConsumer(cg, shard->tmp_str1, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
if (consumer)
return OpStatus::OK;
return OpStatus::KEY_EXISTS;
}
// XGROUP DELCONSUMER key groupname consumername
OpResult<uint32_t> OpDelConsumer(const OpArgs& op_args, string_view key, string_view gname,
string_view consumer_name) {
@ -992,6 +1012,27 @@ void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) {
}
}
void CreateConsumer(string_view key, string_view gname, string_view consumer,
ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpCreateConsumer(t->GetOpArgs(shard), key, gname, consumer);
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
switch (result.status()) {
case OpStatus::OK:
return (*cntx)->SendLong(1);
case OpStatus::KEY_EXISTS:
return (*cntx)->SendLong(0);
case OpStatus::SKIPPED:
return (*cntx)->SendError(NoGroupError(key, gname));
case OpStatus::KEY_NOTFOUND:
return (*cntx)->SendError(kXGroupKeyNotFound);
default:
(*cntx)->SendError(result.status());
}
}
void DelConsumer(string_view key, string_view gname, string_view consumer,
ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) {
@ -1219,6 +1260,12 @@ void StreamFamily::XGroup(CmdArgList args, ConnectionContext* cntx) {
return DestroyGroup(key, gname, cntx);
}
if (sub_cmd == "CREATECONSUMER" && args.size() == 4) {
string_view gname = ArgS(args, 2);
string_view cname = ArgS(args, 3);
return CreateConsumer(key, gname, cname, cntx);
}
if (sub_cmd == "DELCONSUMER" && args.size() == 4) {
string_view gname = ArgS(args, 2);
string_view cname = ArgS(args, 3);

View file

@ -236,6 +236,29 @@ TEST_F(StreamFamilyTest, Issue854) {
EXPECT_THAT(resp, ErrArg("is not allowed"));
}
TEST_F(StreamFamilyTest, XGroupConsumer) {
Run({"xgroup", "create", "foo", "group", "$", "MKSTREAM"});
auto resp = Run({"xgroup", "createconsumer", "foo", "group", "bob"});
EXPECT_THAT(resp, IntArg(1));
Run({"xgroup", "createconsumer", "foo", "group", "alice"});
resp = Run({"xinfo", "groups", "foo"});
EXPECT_THAT(resp.GetVec()[3], IntArg(2));
Run({"xgroup", "delconsumer", "foo", "group", "alice"});
resp = Run({"xinfo", "groups", "foo"});
EXPECT_THAT(resp.GetVec()[3], IntArg(1));
resp = Run({"xgroup", "createconsumer", "foo", "group", "alice"});
EXPECT_THAT(resp, IntArg(1));
// ensure createconsumer doesn't create consumer that already exists
resp = Run({"xgroup", "createconsumer", "foo", "group", "alice"});
EXPECT_THAT(resp, IntArg(0));
// nogrouperror
resp = Run({"xgroup", "createconsumer", "foo", "not-exists", "alice"});
EXPECT_THAT(resp, ErrArg("NOGROUP"));
}
TEST_F(StreamFamilyTest, XTrim) {
Run({"xadd", "foo", "1-*", "k", "v"});
Run({"xadd", "foo", "1-*", "k", "v"});