diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index f0bc7f366..fc9b16245 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -81,7 +81,14 @@ class ClusterShardMigration { VLOG(1) << "Finalized flow " << source_shard_id_; return; } - VLOG(2) << "Attempt failed to finalize flow " << source_shard_id_; + if (!tx_data->command.cmd_args.empty()) { + VLOG(1) << "Flow finalization failed " << source_shard_id_ << " by " + << tx_data->command.cmd_args[0]; + } else { + VLOG(1) << "Flow finalization failed " << source_shard_id_ << " by opcode " + << (int)tx_data->opcode; + } + bc_->Add(); // the flow isn't finished so we lock it again } if (tx_data->opcode == journal::Op::PING) { diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 485a97125..90d94ca6e 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -103,7 +103,7 @@ void JournalStreamer::AsyncWrite() { const auto& cur_buf = pending_buf_.PrepareSendingBuf(); in_flight_bytes_ = cur_buf.mem_size; - total_sent_ += cur_buf.mem_size; + total_sent_ += in_flight_bytes_; const auto v_size = cur_buf.buf.size(); absl::InlinedVector v(v_size); @@ -113,7 +113,7 @@ void JournalStreamer::AsyncWrite() { v[i] = IoVec(io::Bytes(uptr, cur_buf.buf[i].size())); } - dest_->AsyncWrite(v.data(), v.size(), [this, len = cur_buf.mem_size](std::error_code ec) { + dest_->AsyncWrite(v.data(), v.size(), [this, len = in_flight_bytes_](std::error_code ec) { OnCompletion(std::move(ec), len); }); } @@ -128,13 +128,11 @@ void JournalStreamer::Write(std::string str) { } void JournalStreamer::OnCompletion(std::error_code ec, size_t len) { - DCHECK_GE(in_flight_bytes_, len); + DCHECK_EQ(in_flight_bytes_, len); - DVLOG(3) << "Completing from " << in_flight_bytes_ << " to " << in_flight_bytes_ - len; - in_flight_bytes_ -= len; - if (in_flight_bytes_ == 0) { - pending_buf_.Pop(); - } + DVLOG(3) << "Completing " << in_flight_bytes_; + in_flight_bytes_ = 0; + pending_buf_.Pop(); if (ec && !IsStopped()) { cntx_->ReportError(ec); } else if (!pending_buf_.Empty() && !IsStopped()) { diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 4a06ed492..f51e7393a 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1256,7 +1256,7 @@ async def test_cluster_flushall_during_migration( df_factory.create( port=next(next_port), admin_port=next(next_port), - vmodule="cluster_family=9,outgoing_slot_migration=9,incoming_slot_migration=9", + vmodule="cluster_family=9,outgoing_slot_migration=9,incoming_slot_migration=9,streamer=9", logtostdout=True, ) for i in range(2) @@ -1507,7 +1507,7 @@ async def test_cluster_fuzzymigration( df_factory.create( port=next(next_port), admin_port=next(next_port), - vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9", + vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9", serialization_max_chunk_size=huge_values, replication_stream_output_limit=10, )