mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
fix: bugs in stream code (#4239)
* fix: bugs in stream code 1. Memory leak in streamGetEdgeID 2. Addresses CVE-2022-31144 3. Fixes XAUTOCLAIM bugs and adds tests. 4. Limits the count argument in XAUTOCLAIM command to 2^18 (CVE-2022-35951) Also fixes #3830 --------- Signed-off-by: Roman Gershman <roman@dragonflydb.io> Signed-off-by: Roman Gershman <romange@gmail.com> Co-authored-by: Shahar Mike <chakaz@users.noreply.github.com>
This commit is contained in:
parent
91aff49fcd
commit
dcee9a9874
4 changed files with 101 additions and 4 deletions
|
@ -83,6 +83,10 @@ MATCHER_P(RespArray, value, "") {
|
|||
result_listener);
|
||||
}
|
||||
|
||||
template <typename... Args> auto RespElementsAre(const Args&... matchers) {
|
||||
return RespArray(::testing::ElementsAre(matchers...));
|
||||
}
|
||||
|
||||
inline bool operator==(const RespExpr& left, std::string_view s) {
|
||||
return left.type == RespExpr::STRING && ToSV(left.GetBuf()) == s;
|
||||
}
|
||||
|
|
|
@ -274,7 +274,7 @@ void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_i
|
|||
streamID min_id = {0, 0}, max_id = {UINT64_MAX, UINT64_MAX};
|
||||
*edge_id = first ? max_id : min_id;
|
||||
}
|
||||
|
||||
streamIteratorStop(&si);
|
||||
}
|
||||
|
||||
/* Trim the stream 's' according to args->trim_strategy, and return the
|
||||
|
@ -336,7 +336,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
|
|||
streamDecodeID(ri.key, &master_id);
|
||||
|
||||
/* Read last ID. */
|
||||
streamID last_id;
|
||||
streamID last_id = {0, 0};
|
||||
lpGetEdgeStreamID(lp, 0, &master_id, &last_id);
|
||||
|
||||
/* We can remove the entire node id its last ID < 'id' */
|
||||
|
|
|
@ -1568,11 +1568,15 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
|
|||
streamDecodeID(ri.key, &id);
|
||||
|
||||
if (!streamEntryExists(stream, &id)) {
|
||||
// TODO: to propagate this change to replica as XCLAIM command
|
||||
// - since we delete it from NACK. See streamPropagateXCLAIM call.
|
||||
raxRemove(group->pel, ri.key, ri.key_len, nullptr);
|
||||
raxRemove(nack->consumer->pel, ri.key, ri.key_len, nullptr);
|
||||
streamFreeNACK(nack);
|
||||
result.deleted_ids.push_back(id);
|
||||
raxSeek(&ri, ">=", ri.key, ri.key_len);
|
||||
|
||||
count--; /* Count is a limit of the command response size. */
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1603,6 +1607,7 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
|
|||
|
||||
AppendClaimResultItem(result, stream, id);
|
||||
count--;
|
||||
// TODO: propagate xclaim to replica
|
||||
}
|
||||
|
||||
raxNext(&ri);
|
||||
|
@ -3229,8 +3234,8 @@ void StreamFamily::XAutoClaim(CmdArgList args, const CommandContext& cmd_cntx) {
|
|||
if (!absl::SimpleAtoi(arg, &opts.count)) {
|
||||
return rb->SendError(kInvalidIntErr);
|
||||
}
|
||||
if (opts.count <= 0) {
|
||||
return rb->SendError("COUNT must be > 0");
|
||||
if (opts.count <= 0 || opts.count >= (1L << 18)) {
|
||||
return rb->SendError("COUNT must be > 0 and less than 2^18");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -1074,6 +1074,94 @@ TEST_F(StreamFamilyTest, XInfoStream) {
|
|||
"pel-count", IntArg(11), "pending", ArrLen(11)));
|
||||
}
|
||||
|
||||
TEST_F(StreamFamilyTest, AutoClaimPelItemsFromAnotherConsumer) {
|
||||
auto resp = Run({"xadd", "mystream", "*", "a", "1"});
|
||||
string id1 = resp.GetString();
|
||||
resp = Run({"xadd", "mystream", "*", "b", "2"});
|
||||
string id2 = resp.GetString();
|
||||
resp = Run({"xadd", "mystream", "*", "c", "3"});
|
||||
string id3 = resp.GetString();
|
||||
resp = Run({"xadd", "mystream", "*", "d", "4"});
|
||||
string id4 = resp.GetString();
|
||||
|
||||
Run({"XGROUP", "CREATE", "mystream", "mygroup", "0"});
|
||||
|
||||
// Consumer 1 reads item 1 from the stream without acknowledgements.
|
||||
// Consumer 2 then claims pending item 1 from the PEL of consumer 1
|
||||
resp = Run(
|
||||
{"XREADGROUP", "GROUP", "mygroup", "consumer1", "COUNT", "1", "STREAMS", "mystream", ">"});
|
||||
|
||||
auto match_a1 = RespElementsAre("a", "1");
|
||||
ASSERT_THAT(resp, RespElementsAre("mystream", RespElementsAre(RespElementsAre(id1, match_a1))));
|
||||
|
||||
AdvanceTime(200); // Advance time to greater time than the idle time in the autoclaim (10)
|
||||
resp = Run({"XAUTOCLAIM", "mystream", "mygroup", "consumer2", "10", "-", "COUNT", "1"});
|
||||
|
||||
EXPECT_THAT(resp, RespElementsAre("0-0", ArrLen(1), ArrLen(0)));
|
||||
EXPECT_THAT(resp.GetVec()[1], RespElementsAre(RespElementsAre(id1, match_a1)));
|
||||
|
||||
Run({"XREADGROUP", "GROUP", "mygroup", "consumer1", "COUNT", "3", "STREAMS", "mystream", ">"});
|
||||
AdvanceTime(200);
|
||||
|
||||
// Delete item 2 from the stream.Now consumer 1 has PEL that contains
|
||||
// only item 3. Try to use consumer 2 to claim the deleted item 2
|
||||
// from the PEL of consumer 1, this should return nil
|
||||
resp = Run({"XDEL", "mystream", id2});
|
||||
ASSERT_THAT(resp, IntArg(1));
|
||||
|
||||
// id1 and id3 are self - claimed here but not id2('count' was set to 3)
|
||||
// we make sure id2 is indeed skipped(the cursor points to id4)
|
||||
resp = Run({"XAUTOCLAIM", "mystream", "mygroup", "consumer2", "10", "-", "COUNT", "3"});
|
||||
auto match_id1_a1 = RespElementsAre(id1, match_a1);
|
||||
auto match_id3_c3 = RespElementsAre(id3, RespElementsAre("c", "3"));
|
||||
ASSERT_THAT(resp, RespElementsAre(id4, RespElementsAre(match_id1_a1, match_id3_c3),
|
||||
RespElementsAre(id2)));
|
||||
// Delete item 3 from the stream.Now consumer 1 has PEL that is empty.
|
||||
// Try to use consumer 2 to claim the deleted item 3 from the PEL
|
||||
// of consumer 1, this should return nil
|
||||
AdvanceTime(200);
|
||||
|
||||
ASSERT_THAT(Run({"XDEL", "mystream", id4}), IntArg(1));
|
||||
|
||||
// id1 and id3 are self - claimed here but not id2 and id4('count' is default 100)
|
||||
// we also test the JUSTID modifier here.note that, when using JUSTID,
|
||||
// deleted entries are returned in reply(consistent with XCLAIM).
|
||||
resp = Run({"XAUTOCLAIM", "mystream", "mygroup", "consumer2", "10", "-", "JUSTID"});
|
||||
ASSERT_THAT(resp, RespElementsAre("0-0", RespElementsAre(id1, id3), RespElementsAre(id4)));
|
||||
}
|
||||
|
||||
TEST_F(StreamFamilyTest, AutoClaimDelCount) {
|
||||
Run({"xadd", "x", "1-0", "f", "v"});
|
||||
Run({"xadd", "x", "2-0", "f", "v"});
|
||||
Run({"xadd", "x", "3-0", "f", "v"});
|
||||
Run({"XGROUP", "CREATE", "x", "grp", "0"});
|
||||
auto resp = Run({"XREADGROUP", "GROUP", "grp", "Alice", "STREAMS", "x", ">"});
|
||||
|
||||
auto m1 = RespElementsAre("1-0", _);
|
||||
auto m2 = RespElementsAre("2-0", _);
|
||||
auto m3 = RespElementsAre("3-0", _);
|
||||
EXPECT_THAT(resp, RespElementsAre("x", RespElementsAre(m1, m2, m3)));
|
||||
|
||||
EXPECT_THAT(Run({"XDEL", "x", "1-0"}), IntArg(1));
|
||||
EXPECT_THAT(Run({"XDEL", "x", "2-0"}), IntArg(1));
|
||||
|
||||
resp = Run({"XAUTOCLAIM", "x", "grp", "Bob", "0", "0-0", "COUNT", "1"});
|
||||
EXPECT_THAT(resp, RespElementsAre("2-0", ArrLen(0), RespElementsAre("1-0")));
|
||||
|
||||
resp = Run({"XAUTOCLAIM", "x", "grp", "Bob", "0", "2-0", "COUNT", "1"});
|
||||
EXPECT_THAT(resp, RespElementsAre("3-0", ArrLen(0), RespElementsAre("2-0")));
|
||||
|
||||
resp = Run({"XAUTOCLAIM", "x", "grp", "Bob", "0", "3-0", "COUNT", "1"});
|
||||
EXPECT_THAT(resp, RespElementsAre(
|
||||
"0-0", RespElementsAre(RespElementsAre("3-0", RespElementsAre("f", "v"))),
|
||||
ArrLen(0)));
|
||||
resp = Run({"xpending", "x", "grp", "-", "+", "10", "Alice"});
|
||||
EXPECT_THAT(resp, ArrLen(0));
|
||||
|
||||
resp = Run({"XAUTOCLAIM", "x", "grp", "Bob", "0", "3-0", "COUNT", "704505322"});
|
||||
EXPECT_THAT(resp, ErrArg("COUNT"));
|
||||
}
|
||||
|
||||
TEST_F(StreamFamilyTest, XAddMaxSeq) {
|
||||
Run({"XADD", "x", "1-18446744073709551615", "f1", "v1"});
|
||||
auto resp = Run({"XADD", "x", "1-*", "f2", "v2"});
|
||||
|
|
Loading…
Reference in a new issue