1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

bug(redis replication): fix multi transaction replication stable state (#934)

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-03-13 15:14:58 +02:00 committed by GitHub
parent 8e528f1fbb
commit 97e38aebd2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 11 additions and 14 deletions

View file

@ -559,6 +559,9 @@ error_code Replica::InitiateDflySync() {
error_code Replica::ConsumeRedisStream() {
base::IoBuf io_buf(16_KB);
io::NullSink null_sink; // we never reply back on the commands.
ConnectionContext conn_context{&null_sink, nullptr};
conn_context.is_replicating = true;
parser_.reset(new RedisParser);
ReqSerializer serializer{sock_.get()};
@ -598,7 +601,7 @@ error_code Replica::ConsumeRedisStream() {
RETURN_ON_ERR(SendCommand(ack_cmd, &serializer));
}
ec = ParseAndExecute(&io_buf);
ec = ParseAndExecute(&io_buf, &conn_context);
}
VLOG(1) << "ConsumeRedisStream finished";
@ -1103,7 +1106,7 @@ error_code Replica::ReadLine(base::IoBuf* io_buf, string_view* line) {
return std::make_error_code(std::errc::illegal_byte_sequence);
}
error_code Replica::ParseAndExecute(base::IoBuf* io_buf) {
error_code Replica::ParseAndExecute(base::IoBuf* io_buf, ConnectionContext* cntx) {
VLOG(1) << "ParseAndExecute: input len " << io_buf->InputLen();
if (parser_->stash_size() > 0) {
DVLOG(1) << "Stash " << *parser_->stash()[0];
@ -1112,21 +1115,16 @@ error_code Replica::ParseAndExecute(base::IoBuf* io_buf) {
uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK;
io::NullSink null_sink; // we never reply back on the commands.
ConnectionContext conn_context{&null_sink, nullptr};
conn_context.is_replicating = true;
do {
result = parser_->Parse(io_buf->InputBuffer(), &consumed, &resp_args_);
switch (result) {
case RedisParser::OK:
if (!resp_args_.empty()) {
VLOG(2) << "Got command " << ToSV(resp_args_[0].GetBuf()) << ToSV(resp_args_[1].GetBuf())
<< "\n consumed: " << consumed;
VLOG(2) << "Got command " << ToSV(resp_args_[0].GetBuf()) << "\n consumed: " << consumed;
facade::RespToArgList(resp_args_, &cmd_str_args_);
CmdArgList arg_list{cmd_str_args_.data(), cmd_str_args_.size()};
service_.DispatchCommand(arg_list, &conn_context);
service_.DispatchCommand(arg_list, cntx);
}
io_buf->ConsumeInput(consumed);
break;

View file

@ -171,7 +171,7 @@ class Replica {
std::error_code ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* header);
std::error_code ReadLine(base::IoBuf* io_buf, std::string_view* line);
std::error_code ParseAndExecute(base::IoBuf* io_buf);
std::error_code ParseAndExecute(base::IoBuf* io_buf, ConnectionContext* cntx);
// Check if reps_args contains a simple reply.
bool CheckRespIsSimpleReply(std::string_view reply) const;

View file

@ -115,7 +115,6 @@ stable_sync_replication_specs = [
@pytest.mark.asyncio
@pytest.mark.parametrize("t_replicas, seeder_config", stable_sync_replication_specs)
@pytest.mark.skip(reason="Skipping until we fix replication from redis")
async def test_replication_stable_sync(df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config):
master = redis_server
c_master = aioredis.Redis(port=master.port)
@ -152,7 +151,7 @@ replication_specs = [
@pytest.mark.asyncio
@pytest.mark.parametrize("t_replicas, seeder_config", replication_specs)
@pytest.mark.skip(reason="Skipping until we fix replication from redis")
# @pytest.mark.skip(reason="Skipping until we fix replication from redis")
async def test_redis_replication_all(df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config):
master = redis_server
c_master = aioredis.Redis(port=master.port)
@ -173,7 +172,7 @@ async def test_redis_replication_all(df_local_factory, df_seeder_factory, redis_
c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]
# Start data stream
stream_task = asyncio.create_task(seeder.run(target_ops=6000))
stream_task = asyncio.create_task(seeder.run())
await asyncio.sleep(0.0)
# Start replication
@ -187,6 +186,7 @@ async def test_redis_replication_all(df_local_factory, df_seeder_factory, redis_
# Wait for streaming to finish
assert not stream_task.done(
), "Weak testcase. Increase number of streamed iterations to surpass full sync"
seeder.stop()
await stream_task
# Check data after full sync

View file

@ -495,7 +495,6 @@ class DflySeeder:
await pipe.execute()
except Exception as e:
raise SystemExit(e)
queue.task_done()
await client.connection_pool.disconnect()