mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
Support Pubsub (P)UNSUBSCRIBE commands without arguments (#117)
* Add tests demonstrating expectations of (p)unsubscribe - When there are subscriptions the return should contain the count of remaining channels or patterns subscribed to - When there are no subscriptions and no arguments are provided a single response containing null for channel / pattern and count = 0 should be returned - When there are subscriptions and no arguments are provided a response per unsubscribed channel or pattern should be returned * Update arity of (p)unsubscribe commands * Lint test * Handle (p)unsubscribe without arguments Side effects: - Extract response construction for (p)sububscribe, (p)unsubscribe into private method - Use UnsubscribeAll, PunsubscribeAll in cleanup * Fix linting errors * Replace ternary with explicit if/else * Simplify setup for (p)unsubscribe tests
This commit is contained in:
parent
b855b50fb7
commit
02bc4425ae
4 changed files with 93 additions and 24 deletions
|
@ -165,42 +165,63 @@ void ConnectionContext::ChangePSub(bool to_add, bool to_reply, CmdArgList args)
|
|||
|
||||
if (to_reply) {
|
||||
const char* action[2] = {"punsubscribe", "psubscribe"};
|
||||
if (result.size() == 0) {
|
||||
return SendSubscriptionChangedResponse(action[to_add], std::nullopt, 0);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < result.size(); ++i) {
|
||||
(*this)->StartArray(3);
|
||||
(*this)->SendBulkString(action[to_add]);
|
||||
(*this)->SendBulkString(ArgS(args, i));
|
||||
(*this)->SendLong(result[i]);
|
||||
SendSubscriptionChangedResponse(action[to_add], ArgS(args, i), result[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
void ConnectionContext::UnsubscribeAll(bool to_reply) {
|
||||
if (to_reply && (!conn_state.subscribe_info || conn_state.subscribe_info->channels.empty())) {
|
||||
return SendSubscriptionChangedResponse("unsubscribe", std::nullopt, 0);
|
||||
}
|
||||
StringVec channels(conn_state.subscribe_info->channels.begin(),
|
||||
conn_state.subscribe_info->channels.end());
|
||||
CmdArgVec arg_vec(channels.begin(), channels.end());
|
||||
|
||||
ChangeSubscription(false, to_reply, CmdArgList{arg_vec});
|
||||
}
|
||||
|
||||
void ConnectionContext::PUnsubscribeAll(bool to_reply) {
|
||||
if (to_reply && (!conn_state.subscribe_info || conn_state.subscribe_info->patterns.empty())) {
|
||||
return SendSubscriptionChangedResponse("punsubscribe", std::nullopt, 0);
|
||||
}
|
||||
|
||||
StringVec patterns(conn_state.subscribe_info->patterns.begin(),
|
||||
conn_state.subscribe_info->patterns.end());
|
||||
CmdArgVec arg_vec(patterns.begin(), patterns.end());
|
||||
ChangePSub(false, to_reply, CmdArgList{arg_vec});
|
||||
}
|
||||
|
||||
void ConnectionContext::SendSubscriptionChangedResponse(string_view action,
|
||||
std::optional<string_view> topic,
|
||||
unsigned count) {
|
||||
(*this)->StartArray(3);
|
||||
(*this)->SendBulkString(action);
|
||||
if (topic.has_value())
|
||||
(*this)->SendBulkString(topic.value());
|
||||
else
|
||||
(*this)->SendNull();
|
||||
(*this)->SendLong(count);
|
||||
}
|
||||
|
||||
void ConnectionContext::OnClose() {
|
||||
if (!conn_state.subscribe_info)
|
||||
return;
|
||||
if (!conn_state.subscribe_info) return;
|
||||
|
||||
if (!conn_state.subscribe_info->channels.empty()) {
|
||||
StringVec channels(conn_state.subscribe_info->channels.begin(),
|
||||
conn_state.subscribe_info->channels.end());
|
||||
CmdArgVec arg_vec(channels.begin(), channels.end());
|
||||
|
||||
auto token = conn_state.subscribe_info->borrow_token;
|
||||
ChangeSubscription(false, false, CmdArgList{arg_vec});
|
||||
|
||||
UnsubscribeAll(false);
|
||||
// Check that all borrowers finished processing
|
||||
token.Wait();
|
||||
}
|
||||
|
||||
if (conn_state.subscribe_info) {
|
||||
DCHECK(!conn_state.subscribe_info->patterns.empty());
|
||||
|
||||
StringVec patterns(conn_state.subscribe_info->patterns.begin(),
|
||||
conn_state.subscribe_info->patterns.end());
|
||||
CmdArgVec arg_vec(patterns.begin(), patterns.end());
|
||||
|
||||
auto token = conn_state.subscribe_info->borrow_token;
|
||||
ChangePSub(false, false, CmdArgList{arg_vec});
|
||||
|
||||
PUnsubscribeAll(false);
|
||||
// Check that all borrowers finished processing
|
||||
token.Wait();
|
||||
DCHECK(!conn_state.subscribe_info);
|
||||
|
|
|
@ -92,8 +92,15 @@ class ConnectionContext : public facade::ConnectionContext {
|
|||
|
||||
void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args);
|
||||
void ChangePSub(bool to_add, bool to_reply, CmdArgList args);
|
||||
void UnsubscribeAll(bool to_reply);
|
||||
void PUnsubscribeAll(bool to_reply);
|
||||
|
||||
bool is_replicating = false;
|
||||
|
||||
private:
|
||||
void SendSubscriptionChangedResponse(std::string_view action,
|
||||
std::optional<std::string_view> topic,
|
||||
unsigned count);
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -426,7 +426,9 @@ TEST_F(DflyEngineTest, OOM) {
|
|||
}
|
||||
|
||||
TEST_F(DflyEngineTest, PSubscribe) {
|
||||
auto resp = pp_->at(1)->Await([&] { return Run({"psubscribe", "a*", "b*"}); });
|
||||
auto resp = pp_->at(1)->Await([&] {
|
||||
return Run({"psubscribe", "a*", "b*"});
|
||||
});
|
||||
EXPECT_THAT(resp, ArrLen(3));
|
||||
resp = pp_->at(0)->Await([&] { return Run({"publish", "ab", "foo"}); });
|
||||
EXPECT_THAT(resp, IntArg(1));
|
||||
|
@ -438,6 +440,37 @@ TEST_F(DflyEngineTest, PSubscribe) {
|
|||
EXPECT_EQ("ab", msg.channel);
|
||||
EXPECT_EQ("a*", msg.pattern);
|
||||
}
|
||||
TEST_F(DflyEngineTest, Unsubscribe) {
|
||||
auto resp = Run({"unsubscribe", "a"});
|
||||
EXPECT_THAT(resp.GetVec(), ElementsAre("unsubscribe", "a", IntArg(0)));
|
||||
|
||||
resp = Run({"unsubscribe"});
|
||||
EXPECT_THAT(resp.GetVec(), ElementsAre("unsubscribe", ArgType(RespExpr::NIL), IntArg(0)));
|
||||
|
||||
Run({"subscribe", "a", "b"});
|
||||
|
||||
resp = Run({"unsubscribe", "a"});
|
||||
EXPECT_THAT(resp.GetVec(), ElementsAre("unsubscribe", "a", IntArg(1)));
|
||||
|
||||
resp = Run({"unsubscribe"});
|
||||
EXPECT_THAT(resp.GetVec(), ElementsAre("unsubscribe", "b", IntArg(0)));
|
||||
}
|
||||
|
||||
TEST_F(DflyEngineTest, PUnsubscribe) {
|
||||
auto resp = Run({"punsubscribe", "a*"});
|
||||
EXPECT_THAT(resp.GetVec(), ElementsAre("punsubscribe", "a*", IntArg(0)));
|
||||
|
||||
resp = Run({"punsubscribe"});
|
||||
EXPECT_THAT(resp.GetVec(), ElementsAre("punsubscribe", ArgType(RespExpr::NIL), IntArg(0)));
|
||||
|
||||
Run({"psubscribe", "a*", "b*"});
|
||||
|
||||
resp = Run({"punsubscribe", "a*"});
|
||||
EXPECT_THAT(resp.GetVec(), ElementsAre("punsubscribe", "a*", IntArg(1)));
|
||||
|
||||
resp = Run({"punsubscribe"});
|
||||
EXPECT_THAT(resp.GetVec(), ElementsAre("punsubscribe", "b*", IntArg(0)));
|
||||
}
|
||||
|
||||
// TODO: to test transactions with a single shard since then all transactions become local.
|
||||
// To consider having a parameter in dragonfly engine controlling number of shards
|
||||
|
|
|
@ -955,7 +955,11 @@ void Service::Subscribe(CmdArgList args, ConnectionContext* cntx) {
|
|||
void Service::Unsubscribe(CmdArgList args, ConnectionContext* cntx) {
|
||||
args.remove_prefix(1);
|
||||
|
||||
cntx->ChangeSubscription(false, true, std::move(args));
|
||||
if (args.size() == 0){
|
||||
cntx->UnsubscribeAll(true);
|
||||
}else{
|
||||
cntx->ChangeSubscription(false, true, std::move(args));
|
||||
}
|
||||
}
|
||||
|
||||
void Service::PSubscribe(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
@ -966,7 +970,11 @@ void Service::PSubscribe(CmdArgList args, ConnectionContext* cntx) {
|
|||
void Service::PUnsubscribe(CmdArgList args, ConnectionContext* cntx) {
|
||||
args.remove_prefix(1);
|
||||
|
||||
cntx->ChangePSub(false, true, args);
|
||||
if (args.size() == 0) {
|
||||
cntx->PUnsubscribeAll(true);
|
||||
} else {
|
||||
cntx->ChangePSub(false, true, args);
|
||||
}
|
||||
}
|
||||
|
||||
// Not a real implementation. Serves as a decorator to accept some function commands
|
||||
|
@ -1033,9 +1041,9 @@ void Service::RegisterCommands() {
|
|||
<< CI{"EXEC", kExecMask, 1, 0, 0, 0}.MFUNC(Exec)
|
||||
<< CI{"PUBLISH", CO::LOADING | CO::FAST, 3, 0, 0, 0}.MFUNC(Publish)
|
||||
<< CI{"SUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(Subscribe)
|
||||
<< CI{"UNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(Unsubscribe)
|
||||
<< CI{"UNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(Unsubscribe)
|
||||
<< CI{"PSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(PSubscribe)
|
||||
<< CI{"PUNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(PUnsubscribe)
|
||||
<< CI{"PUNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(PUnsubscribe)
|
||||
<< CI{"FUNCTION", CO::NOSCRIPT, 2, 0, 0, 0}.MFUNC(Function);
|
||||
|
||||
StringFamily::Register(®istry_);
|
||||
|
|
Loading…
Reference in a new issue