1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix: remove string copy in SendMGetResponse (#2246)

fix: eliminate the redundant string copy in SendMGetResponse

Also, allow selectively create DflyInstance in pytests that is attached to
an existing dragonfly port, created outside of tests.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-12-03 18:14:19 +02:00 committed by GitHub
parent d45ded3b76
commit 26512fdba4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 131 additions and 40 deletions

View file

@ -35,6 +35,10 @@ constexpr unsigned kConvFlags =
DoubleToStringConverter dfly_conv(kConvFlags, "inf", "nan", 'e', -6, 21, 6, 0);
const char* NullString(bool resp3) {
return resp3 ? "_\r\n" : "$-1\r\n";
}
} // namespace
SinkReplyBuilder::MGetResponse::~MGetResponse() {
@ -303,15 +307,8 @@ void RedisReplyBuilder::SendSetSkipped() {
SendNull();
}
const char* RedisReplyBuilder::NullString() {
if (is_resp3_) {
return "_\r\n";
}
return "$-1\r\n";
}
void RedisReplyBuilder::SendNull() {
iovec v[] = {IoVec(NullString())};
iovec v[] = {IoVec(NullString(is_resp3_))};
Send(v, ABSL_ARRAYSIZE(v));
}
@ -379,18 +376,82 @@ void RedisReplyBuilder::SendDouble(double val) {
}
void RedisReplyBuilder::SendMGetResponse(MGetResponse resp) {
string res = absl::StrCat("*", resp.resp_arr.size(), kCRLF);
for (size_t i = 0; i < resp.resp_arr.size(); ++i) {
const auto& item = resp.resp_arr[i];
if (item) {
StrAppend(&res, "$", item->value.size(), kCRLF);
res.append(item->value).append(kCRLF);
DCHECK(!resp.resp_arr.empty());
size_t size = resp.resp_arr.size();
size_t vec_len = std::min<size_t>(32, size);
constexpr size_t kBatchLen = 32 * 2 + 2; // (blob_size, blob) * 32 + 2 spares
iovec vec_batch[kBatchLen];
// for all the meta data to fill the vec batch. 10 digits for the blob size and 6 for
// $, \r, \n, \r, \n
absl::FixedArray<char, 64> meta((vec_len + 2) * 16); // 2 for header and next item meta data.
char* next = meta.data();
char* cur_meta = next;
*next++ = '*';
next = absl::numbers_internal::FastIntToBuffer(size, next);
*next++ = '\r';
*next++ = '\n';
unsigned vec_indx = 0;
const char* nullstr = NullString(is_resp3_);
size_t nulllen = strlen(nullstr);
auto get_pending_metabuf = [&] { return string_view{cur_meta, size_t(next - cur_meta)}; };
for (unsigned i = 0; i < size; ++i) {
DCHECK_GE(meta.end() - next, 16); // We have at least 16 bytes for the meta data.
if (resp.resp_arr[i]) {
string_view blob = resp.resp_arr[i]->value;
*next++ = '$';
next = absl::numbers_internal::FastIntToBuffer(blob.size(), next);
*next++ = '\r';
*next++ = '\n';
DCHECK_GT(next - cur_meta, 0);
vec_batch[vec_indx++] = IoVec(get_pending_metabuf());
vec_batch[vec_indx++] = IoVec(blob);
cur_meta = next; // we combine the CRLF with the next item meta data.
*next++ = '\r';
*next++ = '\n';
} else {
res.append(NullString());
memcpy(next, nullstr, nulllen);
next += nulllen;
}
if (vec_indx >= (kBatchLen - 2) || (meta.end() - next < 16)) {
// we have space for at least one iovec because in the worst case we reached (kBatchLen - 3)
// and then filled 2 vectors in the previous iteration.
DCHECK_LE(vec_indx, kBatchLen - 1);
// if we do not have enough space in the meta buffer, we add the meta data to the
// vector batch and reset it.
if (meta.end() - next < 16) {
vec_batch[vec_indx++] = IoVec(get_pending_metabuf());
next = meta.data();
cur_meta = next;
}
Send(vec_batch, vec_indx);
if (ec_)
return;
vec_indx = 0;
size_t meta_len = next - cur_meta;
memcpy(meta.data(), cur_meta, meta_len);
cur_meta = meta.data();
next = cur_meta + meta_len;
}
}
SendRaw(res);
if (next - cur_meta > 0) {
vec_batch[vec_indx++] = IoVec(get_pending_metabuf());
}
if (vec_indx > 0)
Send(vec_batch, vec_indx);
}
void RedisReplyBuilder::SendSimpleStrArr(StrSpan arr) {

View file

@ -256,8 +256,6 @@ class RedisReplyBuilder : public SinkReplyBuilder {
private:
void SendStringArrInternal(WrappedStrSpan arr, CollectionType type);
const char* NullString();
bool is_resp3_ = false;
};

View file

@ -101,9 +101,6 @@ class RedisReplyBuilderTest : public testing::Test {
}
};
static void SetUpTestSuite() {
}
void SetUp() {
sink_.Clear();
builder_.reset(new RedisReplyBuilder(&sink_));
@ -207,7 +204,7 @@ RedisReplyBuilderTest::ParsingResults RedisReplyBuilderTest::Parse() {
///////////////////////////////////////////////////////////////////////////////
TEST_F(RedisReplyBuilderTest, TestMessageSend) {
TEST_F(RedisReplyBuilderTest, MessageSend) {
// Test each message that is "sent" to the sink
builder_->SendOk();
ASSERT_EQ(TakePayload(), kOKMessage);
@ -328,7 +325,7 @@ TEST_F(RedisReplyBuilderTest, ErrorNoneBuiltInMessage) {
}
}
TEST_F(RedisReplyBuilderTest, TestStringMessage) {
TEST_F(RedisReplyBuilderTest, StringMessage) {
// This would test a message that contain a string in it
// For string this is simple, any string message should start with + and ends with \r\n
// there can never be more than single \r\n in it as well as no special chars
@ -351,7 +348,7 @@ TEST_F(RedisReplyBuilderTest, TestStringMessage) {
}
}
TEST_F(RedisReplyBuilderTest, TestEmptyArray) {
TEST_F(RedisReplyBuilderTest, EmptyArray) {
// This test would build an array and try sending it over the "wire"
// The array starts with the '*', then the number of elements in the array
// then "\r\n", then each element inside is encoded accordingly
@ -370,7 +367,7 @@ TEST_F(RedisReplyBuilderTest, TestEmptyArray) {
ASSERT_EQ(str(), empty_array);
}
TEST_F(RedisReplyBuilderTest, TestStrArray) {
TEST_F(RedisReplyBuilderTest, StrArray) {
std::vector<std::string_view> string_vector{"hello", "world", "111", "@3#$^&*~"};
builder_->StartArray(string_vector.size());
std::size_t expected_size = kCRLF.size() + 2;
@ -572,7 +569,7 @@ TEST_F(RedisReplyBuilderTest, BulkStringWithErrorString) {
ASSERT_THAT(parsing_output.args, ElementsAre(message));
}
TEST_F(RedisReplyBuilderTest, TestInt) {
TEST_F(RedisReplyBuilderTest, Int) {
// message in the form of ":0\r\n" and ":1000\r\n"
// this message just starts with ':' and ends with \r\n
// and the payload must be successfully parsed into int type
@ -588,7 +585,7 @@ TEST_F(RedisReplyBuilderTest, TestInt) {
ASSERT_THAT(parsing_output.args, ElementsAre(IntArg(kPayloadInt)));
}
TEST_F(RedisReplyBuilderTest, TestDouble) {
TEST_F(RedisReplyBuilderTest, Double) {
// There is no direct support for double types in RESP
// to send this, it is sent as bulk string
const std::string_view kPayloadStr = "23.456";
@ -607,7 +604,7 @@ TEST_F(RedisReplyBuilderTest, TestDouble) {
ASSERT_THAT(parsing_output.args, ElementsAre(kPayloadStr));
}
TEST_F(RedisReplyBuilderTest, TestMixedTypeArray) {
TEST_F(RedisReplyBuilderTest, MixedTypeArray) {
// For arrays, we can send an array that contains more than a single type (string/bulk
// string/simple string/null..) In this test we are verifying that this is actually working. note
// that this is not part of class RedisReplyBuilder API
@ -661,7 +658,7 @@ TEST_F(RedisReplyBuilderTest, TestMixedTypeArray) {
ArgType(RespExpr::STRING), ArgType(RespExpr::STRING), ArgType(RespExpr::STRING)));
}
TEST_F(RedisReplyBuilderTest, TestBatchMode) {
TEST_F(RedisReplyBuilderTest, BatchMode) {
// Test that when the batch mode is enabled, we are getting the same correct results
builder_->SetBatchMode(true);
// Some random values and sizes
@ -702,21 +699,21 @@ TEST_F(RedisReplyBuilderTest, TestBatchMode) {
absl::StrCat(kBulkStringStart, "0"), std::string_view{}));
}
TEST_F(RedisReplyBuilderTest, TestResp3Double) {
TEST_F(RedisReplyBuilderTest, Resp3Double) {
builder_->SetResp3(true);
builder_->SendDouble(5.5);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_EQ(str(), ",5.5\r\n");
}
TEST_F(RedisReplyBuilderTest, TestResp3NullString) {
TEST_F(RedisReplyBuilderTest, Resp3NullString) {
builder_->SetResp3(true);
builder_->SendNull();
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_EQ(TakePayload(), "_\r\n");
}
TEST_F(RedisReplyBuilderTest, TestSendStringArrayAsMap) {
TEST_F(RedisReplyBuilderTest, SendStringArrayAsMap) {
const std::vector<std::string> map_array{"k1", "v1", "k2", "v2"};
builder_->SetResp3(false);
@ -732,7 +729,7 @@ TEST_F(RedisReplyBuilderTest, TestSendStringArrayAsMap) {
<< "SendStringArrayAsMap Resp3 Failed.";
}
TEST_F(RedisReplyBuilderTest, TestSendStringArrayAsSet) {
TEST_F(RedisReplyBuilderTest, SendStringArrayAsSet) {
const std::vector<std::string> set_array{"e1", "e2", "e3"};
builder_->SetResp3(false);
@ -748,7 +745,7 @@ TEST_F(RedisReplyBuilderTest, TestSendStringArrayAsSet) {
<< "SendStringArrayAsSet Resp3 Failed.";
}
TEST_F(RedisReplyBuilderTest, TestSendScoredArray) {
TEST_F(RedisReplyBuilderTest, SendScoredArray) {
const std::vector<std::pair<std::string, double>> scored_array{
{"e1", 1.1}, {"e2", 2.2}, {"e3", 3.3}};
@ -779,7 +776,7 @@ TEST_F(RedisReplyBuilderTest, TestSendScoredArray) {
<< "Resp3 WITHSCORES failed.";
}
TEST_F(RedisReplyBuilderTest, TestSendMGetResponse) {
TEST_F(RedisReplyBuilderTest, SendMGetResponse) {
SinkReplyBuilder::MGetResponse resp = MakeMGetResponse({"v1", nullopt, "v3"});
builder_->SetResp3(false);
@ -796,7 +793,34 @@ TEST_F(RedisReplyBuilderTest, TestSendMGetResponse) {
<< "Resp3 SendMGetResponse failed.";
}
TEST_F(RedisReplyBuilderTest, TestBasicCapture) {
TEST_F(RedisReplyBuilderTest, MGetLarge) {
vector<optional<string>> strs;
for (int i = 0; i < 100; i++) {
strs.emplace_back(string(1000, 'a'));
}
SinkReplyBuilder::MGetResponse resp = MakeMGetResponse(strs);
builder_->SetResp3(false);
builder_->SendMGetResponse(std::move(resp));
string expected = "*100\r\n";
for (unsigned i = 0; i < 100; i++) {
absl::StrAppend(&expected, "$1000\r\n", string(1000, 'a'), "\r\n");
}
ASSERT_EQ(TakePayload(), expected);
strs.clear();
for (int i = 0; i < 200; i++) {
strs.emplace_back(nullopt);
}
resp = MakeMGetResponse(strs);
builder_->SendMGetResponse(std::move(resp));
expected = "*200\r\n";
for (unsigned i = 0; i < 200; i++) {
absl::StrAppend(&expected, "$-1\r\n");
}
ASSERT_EQ(TakePayload(), expected);
}
TEST_F(RedisReplyBuilderTest, BasicCapture) {
using namespace std;
string_view kTestSws[] = {"a1"sv, "a2"sv, "a3"sv, "a4"sv};

View file

@ -104,6 +104,7 @@ def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
factory.stop_all()
# Differs from df_factory in that its scope is function
@pytest.fixture(scope="function")
def df_local_factory(df_factory: DflyInstanceFactory):
factory = DflyInstanceFactory(df_factory.params, df_factory.args)

View file

@ -1,3 +1,4 @@
import dataclasses
import time
import subprocess
import aiohttp
@ -120,6 +121,8 @@ class DflyInstance:
self._wait_for_server()
def _wait_for_server(self):
if self.params.existing_port:
return
# Give Dragonfly time to start and detect possible failure causes
# Gdb starts slowly
delay = START_DELAY if not self.params.gdb else START_GDB_DELAY
@ -312,7 +315,7 @@ class DflyInstanceFactory:
self.params = params
self.instances = []
def create(self, **kwargs) -> DflyInstance:
def create(self, existing_port=None, **kwargs) -> DflyInstance:
args = {**self.args, **kwargs}
args.setdefault("dbfilename", "")
args.setdefault("use_zset_tree", None)
@ -322,11 +325,15 @@ class DflyInstanceFactory:
for k, v in args.items():
args[k] = v.format(**self.params.env) if isinstance(v, str) else v
instance = DflyInstance(self.params, args)
if existing_port is not None:
params = dataclasses.replace(self.params, existing_port=existing_port)
else:
params = self.params
instance = DflyInstance(params, args)
self.instances.append(instance)
return instance
def start_all(self, instances):
def start_all(self, instances: List[DflyInstance]):
"""Start multiple instances in parallel"""
for instance in instances:
instance._start()

View file

@ -700,7 +700,7 @@ Test automatic replication of expiry.
@dfly_args({"proactor_threads": 4})
@pytest.mark.asyncio
async def test_expiry(df_local_factory, n_keys=1000):
async def test_expiry(df_local_factory: DflyInstanceFactory, n_keys=1000):
master = df_local_factory.create()
replica = df_local_factory.create()