1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix: properly seriailize meta buffer in SendStringArrInternal (#3455)

Fixes #3449 that was introduced by #3425

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-08-06 10:43:05 +03:00 committed by GitHub
parent e482eefcbb
commit 420046aac8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 39 additions and 20 deletions

View file

@ -593,27 +593,38 @@ void RedisReplyBuilder::SendStringArrInternal(
serialize_len(type_char[0], header_len); serialize_len(type_char[0], header_len);
unsigned vec_indx = 0; unsigned vec_indx = 0;
string_view src; string_view src;
#define FLUSH_IOVEC() \
do { \
Send(vec.data(), vec_indx); \
if (ec_) \
return; \
vec_indx = 0; \
next = meta.data(); \
} while (false)
for (unsigned i = 0; i < size; ++i) { for (unsigned i = 0; i < size; ++i) {
DCHECK_LT(vec_indx, vec_cap);
src = producer(i); src = producer(i);
serialize_len('$', src.size()); serialize_len('$', src.size());
// copy data either by referencing via an iovec or copying inline into meta buf. // copy data either by referencing via an iovec or copying inline into meta buf.
constexpr size_t kSSOLen = 32; constexpr size_t kSSOLen = 32;
if (src.size() > kSSOLen) { if (src.size() > kSSOLen) {
if (vec_indx + 1 >= vec_cap) {
Send(vec.data(), vec_indx);
if (ec_)
return;
vec_indx = 0;
next = meta.data();
}
// reference metadata blob before referencing another vector. // reference metadata blob before referencing another vector.
DCHECK_GT(next - start, 0); DCHECK_GT(next - start, 0);
vec[vec_indx++] = IoVec(string_view{start, size_t(next - start)}); vec[vec_indx++] = IoVec(string_view{start, size_t(next - start)});
start = next; if (vec_indx >= vec_cap) {
FLUSH_IOVEC();
}
DCHECK_LT(vec_indx, vec.size()); DCHECK_LT(vec_indx, vec.size());
vec[vec_indx++] = IoVec(src); vec[vec_indx++] = IoVec(src);
if (vec_indx >= vec_cap) {
FLUSH_IOVEC();
}
start = next;
} else if (src.size() > 0) { } else if (src.size() > 0) {
// NOTE!: this is not just optimization. producer may returns a string_piece that will // NOTE!: this is not just optimization. producer may returns a string_piece that will
// be overriden for the next call, so we must do this for correctness. // be overriden for the next call, so we must do this for correctness.
@ -621,19 +632,15 @@ void RedisReplyBuilder::SendStringArrInternal(
next += src.size(); next += src.size();
} }
constexpr ptrdiff_t kMargin = kSSOLen + 3 /*$\r\n*/ + 2 /*length*/ + 2 /* \r\n*/; // metadata // how much buffer we need to perform the next iteration.
constexpr ptrdiff_t kMargin = kSSOLen + 3 /* $\r\n */ + 2 /*length*/ + 2 /* \r\n */;
// Keep at least kMargin bytes for a small string as well as its length. // Keep at least kMargin bytes for a small string as well as its length.
if (vec_indx >= vec.size() || ((meta.end() - next) <= kMargin)) { if (kMargin >= meta.end() - next) {
// Flush the iovec array. // Flush the iovec array.
DVLOG(2) << "i=" << i << "meta size=" << next - meta.data(); vec[vec_indx++] = IoVec(string_view{start, size_t(next - start)});
Send(vec.data(), vec_indx); FLUSH_IOVEC();
if (ec_) start = next;
return;
vec_indx = 0;
start = meta.data();
next = start;
} }
*next++ = '\r'; *next++ = '\r';
*next++ = '\n'; *next++ = '\n';

View file

@ -85,7 +85,7 @@ class RedisReplyBuilderTest : public testing::Test {
} }
bool IsError() const { bool IsError() const {
return args.size() == 1 && args[0].type == RespExpr::ERROR; return result != RedisParser::OK || (args.size() == 1 && args[0].type == RespExpr::ERROR);
} }
bool IsOk() const { bool IsOk() const {
@ -951,6 +951,18 @@ TEST_F(RedisReplyBuilderTest, VerbatimString) {
ASSERT_EQ(TakePayload(), "$16\r\nA simple string!\r\n") << "Resp3 VerbatimString TXT failed."; ASSERT_EQ(TakePayload(), "$16\r\nA simple string!\r\n") << "Resp3 VerbatimString TXT failed.";
} }
TEST_F(RedisReplyBuilderTest, Issue3449) {
vector<string> records;
for (unsigned i = 0; i < 10'000; ++i) {
records.push_back(absl::StrCat(i));
}
builder_->SendStringArr(records);
ASSERT_TRUE(NoErrors());
ParsingResults parse_result = Parse();
ASSERT_FALSE(parse_result.IsError());
EXPECT_EQ(10000, parse_result.args.size());
}
static void BM_FormatDouble(benchmark::State& state) { static void BM_FormatDouble(benchmark::State& state) {
vector<double> values; vector<double> values;
char buf[64]; char buf[64];