mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
chore: Introduce small buffer in redis parser
This is needed in order to eliminate cases where we return INPUT_PENDING but do not consume the whole string by rejecting just several bytes. This should simplify buffer management for the caller, so that if they pass a string that did not result in complete parsed request, at least the whole string is consumed and can be discarded. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
b37287bf14
commit
46c2e1ff66
3 changed files with 125 additions and 39 deletions
|
@ -3,6 +3,7 @@
|
|||
//
|
||||
#include "facade/redis_parser.h"
|
||||
|
||||
#include <absl/strings/escaping.h>
|
||||
#include <absl/strings/numbers.h>
|
||||
|
||||
#include "base/logging.h"
|
||||
|
@ -18,12 +19,20 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
|
|||
*consumed = 0;
|
||||
res->clear();
|
||||
|
||||
if (str.size() < 2) {
|
||||
DVLOG(2) << "Parsing: "
|
||||
<< absl::CHexEscape(string_view{reinterpret_cast<const char*>(str.data()), str.size()});
|
||||
|
||||
if (str.size() == 1 && small_len_ == 0) {
|
||||
*consumed = 1;
|
||||
if (str[0] != '\n') {
|
||||
small_buf_[0] = str[0];
|
||||
small_len_ = 1;
|
||||
}
|
||||
return INPUT_PENDING;
|
||||
}
|
||||
|
||||
if (state_ == CMD_COMPLETE_S) {
|
||||
InitStart(str[0], res);
|
||||
InitStart(small_len_ > 0 ? small_buf_[0] : str[0], res);
|
||||
} else {
|
||||
// We continue parsing in the middle.
|
||||
if (!cached_expr_)
|
||||
|
@ -40,11 +49,8 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
|
|||
resultc = ConsumeArrayLen(str);
|
||||
break;
|
||||
case PARSE_ARG_S:
|
||||
if (str.size() == 0 || (str.size() < 4 && str[0] != '_')) {
|
||||
resultc.first = INPUT_PENDING;
|
||||
} else {
|
||||
resultc = ParseArg(str);
|
||||
}
|
||||
DCHECK(!str.empty());
|
||||
resultc = ParseArg(str);
|
||||
break;
|
||||
case INLINE_S:
|
||||
DCHECK(parse_stack_.empty());
|
||||
|
@ -67,6 +73,7 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
|
|||
}
|
||||
|
||||
if (resultc.first == INPUT_PENDING) {
|
||||
DCHECK(str.empty());
|
||||
StashState(res);
|
||||
}
|
||||
return resultc.first;
|
||||
|
@ -74,6 +81,8 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
|
|||
|
||||
if (resultc.first == OK) {
|
||||
DCHECK(cached_expr_);
|
||||
DCHECK_EQ(0, small_len_);
|
||||
|
||||
if (res != cached_expr_) {
|
||||
DCHECK(!stash_.empty());
|
||||
|
||||
|
@ -182,9 +191,16 @@ auto RedisParser::ParseInline(Buffer str) -> ResultConsumed {
|
|||
++ptr;
|
||||
}
|
||||
// We do not test for \r in order to accept 'nc' input.
|
||||
if (is_finish())
|
||||
if (ptr == end)
|
||||
break;
|
||||
|
||||
if (*ptr == '\n') {
|
||||
if (cached_expr_->empty()) {
|
||||
++ptr;
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
DCHECK(!is_broken_token_);
|
||||
|
||||
token_start = ptr;
|
||||
|
@ -195,12 +211,8 @@ auto RedisParser::ParseInline(Buffer str) -> ResultConsumed {
|
|||
}
|
||||
|
||||
uint32_t last_consumed = ptr - str.data();
|
||||
if (ptr == end) { // we have not finished parsing.
|
||||
if (ptr[-1] > 32) {
|
||||
// we stopped in the middle of the token.
|
||||
is_broken_token_ = true;
|
||||
}
|
||||
|
||||
if (ptr == end) { // we have not finished parsing.
|
||||
is_broken_token_ = ptr[-1] > 32; // we stopped in the middle of the token.
|
||||
return {INPUT_PENDING, last_consumed};
|
||||
}
|
||||
|
||||
|
@ -214,17 +226,27 @@ auto RedisParser::ParseInline(Buffer str) -> ResultConsumed {
|
|||
auto RedisParser::ParseLen(Buffer str, int64_t* res) -> ResultConsumed {
|
||||
DCHECK(!str.empty());
|
||||
|
||||
DCHECK(str[0] == '$' || str[0] == '*' || str[0] == '%' || str[0] == '~');
|
||||
DCHECK(small_len_ > 0 || str[0] == '$' || str[0] == '*' || str[0] == '%' || str[0] == '~');
|
||||
|
||||
const char* s = reinterpret_cast<const char*>(str.data());
|
||||
const char* pos = reinterpret_cast<const char*>(memchr(s, '\n', str.size()));
|
||||
if (!pos) {
|
||||
Result r = INPUT_PENDING;
|
||||
if (str.size() >= 32) {
|
||||
LOG(WARNING) << "Unexpected format " << string_view{s, str.size()};
|
||||
r = BAD_ARRAYLEN;
|
||||
if (str.size() + small_len_ < sizeof(small_buf_)) {
|
||||
memcpy(small_buf_ + small_len_, str.data(), str.size());
|
||||
small_len_ += str.size();
|
||||
return {INPUT_PENDING, str.size()};
|
||||
}
|
||||
return {r, 0};
|
||||
LOG(WARNING) << "Unexpected format " << string_view{s, str.size()};
|
||||
return ResultConsumed{BAD_ARRAYLEN, 0};
|
||||
}
|
||||
|
||||
unsigned consumed = pos - s + 1;
|
||||
if (small_len_ > 0) {
|
||||
memcpy(small_buf_ + small_len_, str.data(), consumed);
|
||||
small_len_ += consumed;
|
||||
s = small_buf_;
|
||||
pos = small_buf_ + small_len_ - 1;
|
||||
small_len_ = 0;
|
||||
}
|
||||
|
||||
if (pos[-1] != '\r') {
|
||||
|
@ -232,10 +254,9 @@ auto RedisParser::ParseLen(Buffer str, int64_t* res) -> ResultConsumed {
|
|||
}
|
||||
|
||||
// Skip the first character and 2 last ones (\r\n).
|
||||
string_view len_token{s + 1, size_t(pos - 1 - s)};
|
||||
string_view len_token{s + 1, size_t(pos - 2 - s)};
|
||||
bool success = absl::SimpleAtoi(len_token, res);
|
||||
|
||||
unsigned consumed = pos - s + 1;
|
||||
if (success && *res >= -1) {
|
||||
return ResultConsumed{OK, consumed};
|
||||
}
|
||||
|
@ -306,11 +327,13 @@ auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed {
|
|||
auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
|
||||
DCHECK(!str.empty());
|
||||
|
||||
char c = str[0];
|
||||
char c = small_len_ > 0 ? small_buf_[0] : str[0];
|
||||
unsigned min_len = 3 + int(c != '_');
|
||||
|
||||
if (str.size() < min_len) {
|
||||
return {INPUT_PENDING, 0};
|
||||
if (small_len_ + str.size() < min_len) {
|
||||
memcpy(small_buf_ + small_len_, str.data(), str.size());
|
||||
small_len_ += str.size();
|
||||
return {INPUT_PENDING, str.size()};
|
||||
}
|
||||
|
||||
if (c == '$') {
|
||||
|
@ -344,10 +367,14 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
|
|||
|
||||
if (c == '_') { // Resp3 NIL
|
||||
// '_','\r','\n'
|
||||
DCHECK_GE(str.size(), 3u);
|
||||
DCHECK_GE(small_len_ + str.size(), 3u);
|
||||
DCHECK_LT(small_len_, 3);
|
||||
|
||||
unsigned consumed = 3;
|
||||
if (str[1] != '\r' || str[2] != '\n') {
|
||||
unsigned consumed = 3 - small_len_;
|
||||
for (unsigned i = 0; i < consumed; ++i) {
|
||||
small_buf_[small_len_ + i] = str[i];
|
||||
}
|
||||
if (small_buf_[1] != '\r' || small_buf_[2] != '\n') {
|
||||
return {BAD_STRING, 0};
|
||||
}
|
||||
|
||||
|
@ -418,6 +445,26 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {
|
|||
|
||||
uint32_t consumed = 0;
|
||||
|
||||
if (small_len_ > 0) {
|
||||
DCHECK(!is_broken_token_);
|
||||
DCHECK_EQ(bulk_len_, 0u);
|
||||
|
||||
if (bulk_len_ == 0) {
|
||||
DCHECK_EQ(small_len_, 1);
|
||||
DCHECK_GE(str.size(), 1u);
|
||||
if (small_buf_[0] != '\r' || str[0] != '\n') {
|
||||
return {BAD_STRING, 0};
|
||||
}
|
||||
consumed = bulk_len_ + 2;
|
||||
small_len_ = 0;
|
||||
HandleFinishArg();
|
||||
|
||||
return {OK, 1};
|
||||
}
|
||||
}
|
||||
|
||||
DCHECK_EQ(small_len_, 0);
|
||||
|
||||
if (str.size() >= bulk_len_) {
|
||||
consumed = bulk_len_;
|
||||
if (bulk_len_) {
|
||||
|
@ -439,6 +486,10 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {
|
|||
}
|
||||
HandleFinishArg();
|
||||
return {OK, consumed + 2};
|
||||
} else if (str.size() == 1) {
|
||||
small_buf_[0] = str[0];
|
||||
consumed++;
|
||||
small_len_ = 1;
|
||||
}
|
||||
return {INPUT_PENDING, consumed};
|
||||
}
|
||||
|
@ -483,6 +534,7 @@ void RedisParser::HandleFinishArg() {
|
|||
}
|
||||
cached_expr_ = parse_stack_.back().second;
|
||||
}
|
||||
small_len_ = 0;
|
||||
}
|
||||
|
||||
void RedisParser::ExtendLastString(Buffer str) {
|
||||
|
|
|
@ -45,8 +45,6 @@ class RedisParser {
|
|||
* part of str because parser caches the intermediate state internally according to 'consumed'
|
||||
* result.
|
||||
*
|
||||
* Note: A parser does not always guarantee progress, i.e. if a small buffer was passed it may
|
||||
* returns INPUT_PENDING with consumed == 0.
|
||||
*
|
||||
*/
|
||||
|
||||
|
@ -97,6 +95,7 @@ class RedisParser {
|
|||
State state_ = CMD_COMPLETE_S;
|
||||
bool is_broken_token_ = false; // true, if a token (inline or bulk) is broken during the parsing.
|
||||
bool server_mode_ = true;
|
||||
uint8_t small_len_ = 0;
|
||||
|
||||
uint32_t bulk_len_ = 0;
|
||||
uint32_t last_stashed_level_ = 0, last_stashed_index_ = 0;
|
||||
|
@ -112,6 +111,7 @@ class RedisParser {
|
|||
|
||||
using Blob = std::vector<uint8_t>;
|
||||
std::vector<Blob> buf_stash_;
|
||||
char small_buf_[32];
|
||||
};
|
||||
|
||||
} // namespace facade
|
||||
|
|
|
@ -107,10 +107,10 @@ TEST_F(RedisParserTest, Multi1) {
|
|||
|
||||
TEST_F(RedisParserTest, Multi2) {
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r\n$"));
|
||||
EXPECT_EQ(4, consumed_);
|
||||
EXPECT_EQ(5, consumed_);
|
||||
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("$4\r\nMSET"));
|
||||
EXPECT_EQ(8, consumed_);
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("4\r\nMSET"));
|
||||
EXPECT_EQ(7, consumed_);
|
||||
|
||||
ASSERT_EQ(RedisParser::OK, Parse("\r\n*2\r\n"));
|
||||
EXPECT_EQ(2, consumed_);
|
||||
|
@ -146,6 +146,15 @@ TEST_F(RedisParserTest, ClientMode) {
|
|||
|
||||
ASSERT_EQ(RedisParser::OK, Parse("-ERR foo bar\r\n"));
|
||||
EXPECT_THAT(args_, ElementsAre(ErrArg("ERR foo")));
|
||||
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("_"));
|
||||
EXPECT_EQ(1, consumed_);
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\r"));
|
||||
EXPECT_EQ(1, consumed_);
|
||||
ASSERT_EQ(RedisParser::OK, Parse("\n"));
|
||||
EXPECT_EQ(1, consumed_);
|
||||
EXPECT_THAT(args_, ElementsAre(ArgType(RespExpr::NIL)));
|
||||
ASSERT_EQ(RedisParser::OK, Parse("*2\r\n_\r\n_\r\n"));
|
||||
}
|
||||
|
||||
TEST_F(RedisParserTest, Hierarchy) {
|
||||
|
@ -171,25 +180,25 @@ TEST_F(RedisParserTest, Empty) {
|
|||
|
||||
TEST_F(RedisParserTest, LargeBulk) {
|
||||
string_view prefix("*1\r\n$1024\r\n");
|
||||
string half(512, 'a');
|
||||
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(prefix));
|
||||
ASSERT_EQ(prefix.size(), consumed_);
|
||||
ASSERT_GE(parser_.parselen_hint(), 1024);
|
||||
|
||||
string half(512, 'a');
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(half));
|
||||
ASSERT_EQ(512, consumed_);
|
||||
ASSERT_GE(parser_.parselen_hint(), 512);
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(half));
|
||||
ASSERT_EQ(512, consumed_);
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\r"));
|
||||
ASSERT_EQ(0, consumed_);
|
||||
ASSERT_EQ(RedisParser::OK, Parse("\r\n"));
|
||||
ASSERT_EQ(2, consumed_);
|
||||
ASSERT_EQ(1, consumed_);
|
||||
ASSERT_EQ(RedisParser::OK, Parse("\n"));
|
||||
EXPECT_EQ(1, consumed_);
|
||||
|
||||
string part1 = absl::StrCat(prefix, half);
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(part1));
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(half));
|
||||
EXPECT_EQ(RedisParser::INPUT_PENDING, Parse(part1));
|
||||
EXPECT_EQ(RedisParser::INPUT_PENDING, Parse(half));
|
||||
ASSERT_EQ(RedisParser::OK, Parse("\r\n"));
|
||||
|
||||
prefix = "*1\r\n$270000000\r\n";
|
||||
|
@ -243,4 +252,29 @@ TEST_F(RedisParserTest, UsedMemory) {
|
|||
EXPECT_GT(dfly::HeapSize(stash), 30000);
|
||||
}
|
||||
|
||||
TEST_F(RedisParserTest, Eol) {
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r"));
|
||||
EXPECT_EQ(3, consumed_);
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\n$5\r\n"));
|
||||
EXPECT_EQ(5, consumed_);
|
||||
}
|
||||
|
||||
TEST_F(RedisParserTest, BulkSplit) {
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r\n$4\r\nSADD\r"));
|
||||
ASSERT_EQ(RedisParser::OK, Parse("\n"));
|
||||
}
|
||||
|
||||
TEST_F(RedisParserTest, InlineSplit) {
|
||||
#if 0
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\n"));
|
||||
EXPECT_EQ(1, consumed_);
|
||||
ASSERT_EQ(RedisParser::OK, Parse("\nPING\n\n"));
|
||||
EXPECT_EQ(6, consumed_);
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\n"));
|
||||
EXPECT_EQ(1, consumed_);
|
||||
#endif
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("P"));
|
||||
ASSERT_EQ(RedisParser::OK, Parse("ING\n"));
|
||||
}
|
||||
|
||||
} // namespace facade
|
||||
|
|
Loading…
Reference in a new issue