diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index b3f82758e..d6654d3aa 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -88,7 +88,7 @@ size_t JournalStreamer::GetTotalBufferCapacities() const { void JournalStreamer::Write(std::string_view str) { DCHECK(!str.empty()); - DVLOG(2) << "Writing " << str.size() << " bytes"; + DVLOG(3) << "Writing " << str.size() << " bytes"; size_t total_pending = pending_buf_.size() + str.size(); @@ -132,7 +132,7 @@ void JournalStreamer::Write(std::string_view str) { void JournalStreamer::OnCompletion(std::error_code ec, size_t len) { DCHECK_GE(in_flight_bytes_, len); - DVLOG(2) << "Completing from " << in_flight_bytes_ << " to " << in_flight_bytes_ - len; + DVLOG(3) << "Completing from " << in_flight_bytes_ << " to " << in_flight_bytes_ - len; in_flight_bytes_ -= len; if (ec && !IsStopped()) { cntx_->ReportError(ec); diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index d3454d8ab..d75e1c74a 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -2287,7 +2287,6 @@ async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFact await check_for_no_state_status([node.admin_client for node in nodes]) -@pytest.mark.skip("Flaky") @pytest.mark.asyncio @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_seeder_factory): @@ -2297,7 +2296,7 @@ async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_see port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000, replication_timeout=3000, - vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9", + vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=2", ) for i in range(2) ] @@ -2311,9 +2310,8 @@ async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_see await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) logging.debug("source node DEBUG POPULATE") - await nodes[0].client.execute_command("debug", "populate", "100000", "foo", "5000") - # await StaticSeeder(key_target=200000, data_size=1000).run(nodes[0].client) + await StaticSeeder(key_target=300000, data_size=1000).run(nodes[0].client) start_capture = await StaticSeeder.capture(nodes[0].client) logging.debug("Start migration") @@ -2329,13 +2327,13 @@ async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_see await nodes[1].client.execute_command("debug migration pause") await wait_for_error( - nodes[0].admin_client, nodes[1].id, "JournalStreamer write operation timeout" + nodes[0].admin_client, nodes[1].id, "JournalStreamer write operation timeout", 30 ) logging.debug("debug migration resume") await nodes[1].client.execute_command("debug migration resume") - await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED") + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", 300) await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED") nodes[0].migrations = []