mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
chore: more parser improvements (#4177)
The long-term goal is to make the parser to consume the whole input when it returns INPUT_PENDING. It requires several baby step PRs. This PR: 1. Adds more invariant checks 2. Avoids calling RedisParser::Parse with an empty buffer. 3. In bulk string parsing - remove redundant "optimization" of rejecting partial strings of less than 32 bytes, in other words consume small parts as well. The unit test adjusted accordingly. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
ff2359af30
commit
872d5e2d7d
3 changed files with 43 additions and 27 deletions
|
@ -1122,7 +1122,7 @@ Connection::ParserStatus Connection::ParseRedis() {
|
|||
DispatchSingle(has_more, dispatch_sync, dispatch_async);
|
||||
}
|
||||
io_buf_.ConsumeInput(consumed);
|
||||
} while (RedisParser::OK == result && !reply_builder_->GetError());
|
||||
} while (RedisParser::OK == result && io_buf_.InputLen() > 0 && !reply_builder_->GetError());
|
||||
|
||||
parser_error_ = result;
|
||||
if (result == RedisParser::OK)
|
||||
|
|
|
@ -13,6 +13,7 @@ namespace facade {
|
|||
using namespace std;
|
||||
|
||||
auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> Result {
|
||||
DCHECK(!str.empty());
|
||||
*consumed = 0;
|
||||
res->clear();
|
||||
|
||||
|
@ -212,8 +213,10 @@ auto RedisParser::ParseInline(Buffer str) -> ResultConsumed {
|
|||
auto RedisParser::ParseLen(Buffer str, int64_t* res) -> ResultConsumed {
|
||||
DCHECK(!str.empty());
|
||||
|
||||
char* s = reinterpret_cast<char*>(str.data() + 1);
|
||||
char* pos = reinterpret_cast<char*>(memchr(s, '\n', str.size() - 1));
|
||||
DCHECK(str[0] == '$' || str[0] == '*' || str[0] == '%' || str[0] == '~');
|
||||
|
||||
const char* s = reinterpret_cast<const char*>(str.data());
|
||||
const char* pos = reinterpret_cast<const char*>(memchr(s, '\n', str.size()));
|
||||
if (!pos) {
|
||||
Result r = str.size() < 32 ? INPUT_PENDING : BAD_ARRAYLEN;
|
||||
return {r, 0};
|
||||
|
@ -223,8 +226,11 @@ auto RedisParser::ParseLen(Buffer str, int64_t* res) -> ResultConsumed {
|
|||
return {BAD_ARRAYLEN, 0};
|
||||
}
|
||||
|
||||
bool success = absl::SimpleAtoi(std::string_view{s, size_t(pos - s - 1)}, res);
|
||||
return ResultConsumed{success ? OK : BAD_ARRAYLEN, (pos - s) + 2};
|
||||
// Skip the first character and 2 last ones (\r\n).
|
||||
bool success = absl::SimpleAtoi(std::string_view{s + 1, size_t(pos - 1 - s)}, res);
|
||||
unsigned consumed = pos - s + 1;
|
||||
|
||||
return ResultConsumed{success ? OK : BAD_ARRAYLEN, consumed};
|
||||
}
|
||||
|
||||
auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed {
|
||||
|
@ -288,7 +294,13 @@ auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed {
|
|||
|
||||
auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
|
||||
DCHECK(!str.empty());
|
||||
|
||||
char c = str[0];
|
||||
unsigned min_len = 3 + int(c != '_');
|
||||
|
||||
if (str.size() < min_len) {
|
||||
return {INPUT_PENDING, 0};
|
||||
}
|
||||
|
||||
if (c == '$') {
|
||||
int64_t len;
|
||||
|
@ -321,12 +333,18 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
|
|||
}
|
||||
|
||||
if (c == '_') { // Resp3 NIL
|
||||
// TODO: Do we need to validate that str[1:2] == "\r\n"?
|
||||
// '_','\r','\n'
|
||||
DCHECK_GE(str.size(), 3u);
|
||||
|
||||
unsigned consumed = 3;
|
||||
if (str[1] != '\r' || str[2] != '\n') {
|
||||
return {BAD_STRING, 0};
|
||||
}
|
||||
|
||||
cached_expr_->emplace_back(RespExpr::NIL);
|
||||
cached_expr_->back().u = Buffer{};
|
||||
HandleFinishArg();
|
||||
return {OK, 3}; // // '_','\r','\n'
|
||||
return {OK, consumed};
|
||||
}
|
||||
|
||||
if (c == '*') {
|
||||
|
@ -415,26 +433,24 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {
|
|||
return {INPUT_PENDING, consumed};
|
||||
}
|
||||
|
||||
if (str.size() >= 32) {
|
||||
DCHECK(bulk_len_);
|
||||
size_t len = std::min<size_t>(str.size(), bulk_len_);
|
||||
DCHECK(bulk_len_);
|
||||
size_t len = std::min<size_t>(str.size(), bulk_len_);
|
||||
|
||||
if (is_broken_token_) {
|
||||
memcpy(bulk_str.end(), str.data(), len);
|
||||
bulk_str = Buffer{bulk_str.data(), bulk_str.size() + len};
|
||||
DVLOG(1) << "Extending bulk stash to size " << bulk_str.size();
|
||||
} else {
|
||||
DVLOG(1) << "New bulk stash size " << bulk_len_;
|
||||
vector<uint8_t> nb(bulk_len_);
|
||||
memcpy(nb.data(), str.data(), len);
|
||||
bulk_str = Buffer{nb.data(), len};
|
||||
buf_stash_.emplace_back(std::move(nb));
|
||||
is_broken_token_ = true;
|
||||
cached_expr_->back().has_support = true;
|
||||
}
|
||||
consumed = len;
|
||||
bulk_len_ -= len;
|
||||
if (is_broken_token_) {
|
||||
memcpy(bulk_str.end(), str.data(), len);
|
||||
bulk_str = Buffer{bulk_str.data(), bulk_str.size() + len};
|
||||
DVLOG(1) << "Extending bulk stash to size " << bulk_str.size();
|
||||
} else {
|
||||
DVLOG(1) << "New bulk stash size " << bulk_len_;
|
||||
vector<uint8_t> nb(bulk_len_);
|
||||
memcpy(nb.data(), str.data(), len);
|
||||
bulk_str = Buffer{nb.data(), len};
|
||||
buf_stash_.emplace_back(std::move(nb));
|
||||
is_broken_token_ = true;
|
||||
cached_expr_->back().has_support = true;
|
||||
}
|
||||
consumed = len;
|
||||
bulk_len_ -= len;
|
||||
|
||||
return {INPUT_PENDING, consumed};
|
||||
}
|
||||
|
|
|
@ -125,9 +125,9 @@ TEST_F(RedisParserTest, Multi2) {
|
|||
|
||||
TEST_F(RedisParserTest, Multi3) {
|
||||
const char kFirst[] = "*3\r\n$3\r\nSET\r\n$16\r\nkey:";
|
||||
const char kSecond[] = "key:000002273458\r\n$3\r\nVXK";
|
||||
const char kSecond[] = "000002273458\r\n$3\r\nVXK";
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(kFirst));
|
||||
ASSERT_EQ(strlen(kFirst) - 4, consumed_);
|
||||
ASSERT_EQ(strlen(kFirst), consumed_);
|
||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(kSecond));
|
||||
ASSERT_EQ(strlen(kSecond), consumed_);
|
||||
ASSERT_EQ(RedisParser::OK, Parse("\r\n*3\r\n$3\r\nSET"));
|
||||
|
|
Loading…
Reference in a new issue