1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

chore: add integrity checks to consumer->pel (#3754)

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-09-22 15:40:42 +03:00 committed by GitHub
parent e09ebe0c5c
commit 2e9b133ea0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 19 additions and 7 deletions

View file

@ -38,7 +38,6 @@
#include <math.h>
#include "rax.h"
#include "endianconv.h"
#ifndef RAX_MALLOC_INCLUDE
#define RAX_MALLOC_INCLUDE "rax_malloc.h"
@ -156,7 +155,7 @@ static inline void raxStackFree(raxStack *ts) {
* 'nodesize'. The padding is needed to store the child pointers to aligned
* addresses. Note that we add 4 to the node size because the node has a four
* bytes header. */
#define raxPadding(nodesize) ((sizeof(void*)-((nodesize+4) % sizeof(void*))) & (sizeof(void*)-1))
#define raxPadding(nodesize) ((sizeof(void*)-(((nodesize)+4) % sizeof(void*))) & (sizeof(void*)-1))
/* Return the pointer to the last child pointer in a node. For the compressed
* nodes this is the only child pointer. */
@ -354,7 +353,7 @@ raxNode *raxAddChild(raxNode *n, unsigned char c, raxNode **childptr, raxNode **
* we don't need to do anything if there was already some padding to use. In
* that case the final destination of the pointers will be the same, however
* in our example there was no pre-existing padding, so we added one byte
* plus there bytes of padding. After the next memmove() things will look
* plus three bytes of padding. After the next memmove() things will look
* like that:
*
* [HDR*][abde][....][Aptr][Bptr][....][Dptr][Eptr]|AUXP|

View file

@ -93,7 +93,6 @@ struct GroupInfo {
size_t consumer_size;
size_t pending_size;
streamID last_id;
size_t pel_count;
int64_t entries_read;
int64_t lag;
vector<NACKInfo> stream_nack_vec;
@ -716,6 +715,7 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeO
* if we find that there is already an entry for this ID. */
streamNACK* nack = StreamCreateNACK(opts.consumer);
int group_inserted = raxTryInsert(opts.group->pel, buf, sizeof(buf), nack, nullptr);
int consumer_inserted = raxTryInsert(opts.consumer->pel, buf, sizeof(buf), nack, nullptr);
/* Now we can check if the entry was already busy, and
@ -726,6 +726,8 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeO
nack = static_cast<streamNACK*>(raxFind(opts.group->pel, buf, sizeof(buf)));
DCHECK(nack != raxNotFound);
raxRemove(nack->consumer->pel, buf, sizeof(buf), NULL);
LOG_IF(DFATAL, nack->consumer->pel->numnodes == 0) << "Invalid rax state";
/* Update the consumer and NACK metadata. */
nack->consumer = opts.consumer;
nack->delivery_time = GetCurrentTimeMs();
@ -733,6 +735,7 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeO
/* Add the entry in the new consumer local PEL. */
raxInsert(opts.consumer->pel, buf, sizeof(buf), nack, NULL);
} else if (group_inserted == 1 && consumer_inserted == 0) {
LOG(DFATAL) << "Internal error";
return OpStatus::SKIPPED; // ("NACK half-created. Should not be possible.");
}
}
@ -977,6 +980,8 @@ void GetConsumers(stream* s, streamCG* cg, long long count, GroupInfo* ginfo) {
ConsumerInfo consumer_info;
streamConsumer* consumer = static_cast<streamConsumer*>(ri_consumers.data);
LOG_IF(DFATAL, consumer->pel->numnodes == 0) << "Invalid rax state";
consumer_info.name = consumer->name;
consumer_info.seen_time = consumer->seen_time;
consumer_info.pel_count = raxSize(consumer->pel);
@ -1047,7 +1052,6 @@ OpResult<StreamInfo> OpStreams(const DbContext& db_cntx, string_view key, Engine
ginfo.pending_size = raxSize(cg->pel);
ginfo.entries_read = cg->entries_read;
ginfo.lag = streamCGLag(s, cg);
ginfo.pel_count = raxSize(cg->pel);
GetGroupPEL(s, cg, count, &ginfo);
GetConsumers(s, cg, count, &ginfo);
@ -1263,6 +1267,7 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
/* Release the NACK */
raxRemove(cgr_res->cg->pel, buf.begin(), sizeof(buf), nullptr);
raxRemove(nack->consumer->pel, buf.begin(), sizeof(buf), nullptr);
LOG_IF(DFATAL, nack->consumer->pel->numnodes == 0) << "Invalid rax state";
streamFreeNACK(nack);
}
continue;
@ -1303,6 +1308,7 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
* NACK above because of the FORCE option. */
if (nack->consumer) {
raxRemove(nack->consumer->pel, buf.begin(), sizeof(buf), nullptr);
LOG_IF(DFATAL, nack->consumer->pel->numnodes == 0) << "Invalid rax state";
}
}
// Set the delivery time for the entry.
@ -2517,7 +2523,7 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) {
}
rb->SendBulkString("pel-count");
rb->SendLong(ginfo.pel_count);
rb->SendLong(ginfo.pending_size);
rb->SendBulkString("pending");
rb->StartArray(ginfo.stream_nack_vec.size());
@ -2966,7 +2972,14 @@ void XReadBlock(ReadOpts* opts, ConnectionContext* cntx) {
range_opts.group = sitem.group;
range_opts.consumer = sitem.consumer;
range_opts.noack = opts->noack;
if (sitem.consumer) {
if (sitem.consumer->pel->numnodes == 0) {
LOG(DFATAL) << "Internal error when accessing consumer data, seen_time "
<< sitem.consumer->seen_time;
result = OpStatus::CANCELLED;
return OpStatus::OK;
}
}
result = OpRange(t->GetOpArgs(shard), *wake_key, range_opts);
key = *wake_key;
}