1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix: correctly set batching mode during pubsub

Previously we set batch mode when dispatch queue is not empty
but dispatch queue could contain other async messages related to pubsub or monitor.
Now we enable batching only if there are more pipeline commands in the queue.

In addition, fix the issue of unlimited aggregation of batching buffer.

Fixes #935.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-03-13 21:25:18 +02:00 committed by Roman Gershman
parent 97e38aebd2
commit 66b4fbd14e
5 changed files with 38 additions and 10 deletions

View file

@ -300,8 +300,12 @@ void Connection::DispatchOperations::operator()(const PubMsgRecord& msg) {
void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) {
++stats->pipelined_cmd_cnt;
bool empty = self->dispatch_q_.empty();
builder->SetBatchMode(!empty);
self->pipeline_msg_cnt_--;
bool do_batch = (self->pipeline_msg_cnt_ > 0);
DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front()) << " " << do_batch;
builder->SetBatchMode(do_batch);
self->cc_->async_dispatch = true;
self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get());
self->last_interaction_ = time(nullptr);
@ -650,13 +654,16 @@ auto Connection::ParseRedis() -> ParserStatus {
}
}
RespToArgList(parse_args_, &cmd_vec_);
DVLOG(2) << "Sync dispatch " << ToSV(cmd_vec_.front());
CmdArgList cmd_list{cmd_vec_.data(), cmd_vec_.size()};
service_->DispatchCommand(cmd_list, cc_.get());
last_interaction_ = time(nullptr);
} else {
// Dispatch via queue to speedup input reading.
RequestPtr req = FromArgs(std::move(parse_args_), tlh);
++pipeline_msg_cnt_;
dispatch_q_.push_back(std::move(req));
if (dispatch_q_.size() == 1) {
evc_.notify();

View file

@ -147,6 +147,8 @@ class Connection : public util::Connection {
RequestPtr FromArgs(RespVec args, mi_heap_t* heap);
std::deque<RequestPtr> dispatch_q_; // coordinated via evc_.
uint32_t pipeline_msg_cnt_ = 0;
static thread_local std::vector<RequestPtr> free_req_pool_;
util::fibers_ext::EventCount evc_;

View file

@ -47,13 +47,19 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
DCHECK(sink_);
if (should_batch_) {
// TODO: to introduce flushing when too much data is batched.
size_t total_size = batch_.size();
for (unsigned i = 0; i < len; ++i) {
std::string_view src((char*)v[i].iov_base, v[i].iov_len);
DVLOG(2) << "Appending to stream " << sink_ << " " << src;
batch_.append(src.data(), src.size());
total_size += v[i].iov_len;
}
if (total_size < 8192) { // Allow batching with up to 8K of data.
for (unsigned i = 0; i < len; ++i) {
std::string_view src((char*)v[i].iov_base, v[i].iov_len);
DVLOG(2) << "Appending to stream " << src;
batch_.append(src.data(), src.size());
}
return;
}
return;
}
error_code ec;
@ -327,6 +333,8 @@ void RedisReplyBuilder::StartArray(unsigned len) {
}
void RedisReplyBuilder::SendStringArr(StrPtr str_ptr, uint32_t len) {
DVLOG(2) << "Sending array of " << len << " strings.";
// When vector length is too long, Send returns EMSGSIZE.
size_t vec_len = std::min<size_t>(256u, len);

View file

@ -18,6 +18,11 @@ You can override the location of the binary using `DRAGONFLY_PATH` environment v
- use `--df arg=val` to pass custom arguments to all dragonfly instances. Can be used multiple times.
- use `--log-seeder file` to store all single-db commands from the lastest tests seeder inside file.
for example,
```sh
pytest dragonfly/connection_test.py -s --df logtostdout --df vmodule=dragonfly_connection=2 -k test_subscribe
```
### Before you start
Please make sure that you have python 3 installed on you local host.
If have more both python 2 and python 3 installed on you host, you can run the tests with the following command:

View file

@ -174,8 +174,8 @@ async def reader(channel: aioredis.client.PubSub, messages, max: int):
return True, "success"
async def run_pipeline_mode(async_client, messages):
pipe = async_client.pipeline()
async def run_pipeline_mode(async_client: aioredis.Redis, messages):
pipe = async_client.pipeline(transaction=False)
for key, val in messages.items():
pipe.set(key, val)
result = await pipe.execute()
@ -327,3 +327,9 @@ async def test_big_command(df_server, size=8 * 1024):
writer.close()
await writer.wait_closed()
@pytest.mark.asyncio
async def test_subscribe_pipelined(async_client: aioredis.Redis):
pipe = async_client.pipeline(transaction=False)
pipe.execute_command('subscribe channel').execute_command('subscribe channel')
await pipe.echo('bye bye').execute()