1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

test: update logs and test for devug purpose

This commit is contained in:
Borys 2024-12-13 13:09:33 +02:00
parent 027d76c13d
commit a102cbc69a
No known key found for this signature in database
GPG key ID: 753496F020ABFD14
3 changed files with 16 additions and 11 deletions

View file

@ -81,7 +81,14 @@ class ClusterShardMigration {
VLOG(1) << "Finalized flow " << source_shard_id_; VLOG(1) << "Finalized flow " << source_shard_id_;
return; return;
} }
VLOG(2) << "Attempt failed to finalize flow " << source_shard_id_; if (!tx_data->command.cmd_args.empty()) {
VLOG(1) << "Flow finalization failed " << source_shard_id_ << " by "
<< tx_data->command.cmd_args[0];
} else {
VLOG(1) << "Flow finalization failed " << source_shard_id_ << " by opcode "
<< (int)tx_data->opcode;
}
bc_->Add(); // the flow isn't finished so we lock it again bc_->Add(); // the flow isn't finished so we lock it again
} }
if (tx_data->opcode == journal::Op::PING) { if (tx_data->opcode == journal::Op::PING) {

View file

@ -103,7 +103,7 @@ void JournalStreamer::AsyncWrite() {
const auto& cur_buf = pending_buf_.PrepareSendingBuf(); const auto& cur_buf = pending_buf_.PrepareSendingBuf();
in_flight_bytes_ = cur_buf.mem_size; in_flight_bytes_ = cur_buf.mem_size;
total_sent_ += cur_buf.mem_size; total_sent_ += in_flight_bytes_;
const auto v_size = cur_buf.buf.size(); const auto v_size = cur_buf.buf.size();
absl::InlinedVector<iovec, 8> v(v_size); absl::InlinedVector<iovec, 8> v(v_size);
@ -113,7 +113,7 @@ void JournalStreamer::AsyncWrite() {
v[i] = IoVec(io::Bytes(uptr, cur_buf.buf[i].size())); v[i] = IoVec(io::Bytes(uptr, cur_buf.buf[i].size()));
} }
dest_->AsyncWrite(v.data(), v.size(), [this, len = cur_buf.mem_size](std::error_code ec) { dest_->AsyncWrite(v.data(), v.size(), [this, len = in_flight_bytes_](std::error_code ec) {
OnCompletion(std::move(ec), len); OnCompletion(std::move(ec), len);
}); });
} }
@ -128,13 +128,11 @@ void JournalStreamer::Write(std::string str) {
} }
void JournalStreamer::OnCompletion(std::error_code ec, size_t len) { void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
DCHECK_GE(in_flight_bytes_, len); DCHECK_EQ(in_flight_bytes_, len);
DVLOG(3) << "Completing from " << in_flight_bytes_ << " to " << in_flight_bytes_ - len; DVLOG(3) << "Completing " << in_flight_bytes_;
in_flight_bytes_ -= len; in_flight_bytes_ = 0;
if (in_flight_bytes_ == 0) { pending_buf_.Pop();
pending_buf_.Pop();
}
if (ec && !IsStopped()) { if (ec && !IsStopped()) {
cntx_->ReportError(ec); cntx_->ReportError(ec);
} else if (!pending_buf_.Empty() && !IsStopped()) { } else if (!pending_buf_.Empty() && !IsStopped()) {

View file

@ -1256,7 +1256,7 @@ async def test_cluster_flushall_during_migration(
df_factory.create( df_factory.create(
port=next(next_port), port=next(next_port),
admin_port=next(next_port), admin_port=next(next_port),
vmodule="cluster_family=9,outgoing_slot_migration=9,incoming_slot_migration=9", vmodule="cluster_family=9,outgoing_slot_migration=9,incoming_slot_migration=9,streamer=9",
logtostdout=True, logtostdout=True,
) )
for i in range(2) for i in range(2)
@ -1507,7 +1507,7 @@ async def test_cluster_fuzzymigration(
df_factory.create( df_factory.create(
port=next(next_port), port=next(next_port),
admin_port=next(next_port), admin_port=next(next_port),
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9", vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9",
serialization_max_chunk_size=huge_values, serialization_max_chunk_size=huge_values,
replication_stream_output_limit=10, replication_stream_output_limit=10,
) )