From 41d463ae3d9c383ac63bed9235fa936b4a394558 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Tue, 9 Apr 2024 10:01:32 +0300 Subject: [PATCH] fix: test_replicaof_reject_on_load crash on stop (#2818) * Migrate might fail if dispatch_fiber is active. In that case do no crash but return false to indicate that the migration was not successful. * After we migrate, we might find ourselves with the socket closed (because of the shutdown signal process/flow). We need to check that the socket is open. If it's not, it means that it was shutdown by the signal flow (because after we finish with DispatchTracker, we start iterating over all of the connections in all of the shards and shut them down. --- src/facade/dragonfly_connection.cc | 21 +++++++++++++++------ src/facade/dragonfly_connection.h | 5 +++-- src/server/dflycmd.cc | 17 ++++++++++++++--- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 20675a5ce..152020a61 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -566,7 +566,7 @@ void Connection::OnPreMigrateThread() { void Connection::OnPostMigrateThread() { // Once we migrated, we should rearm OnBreakCb callback. - if (breaker_cb_) { + if (breaker_cb_ && socket()->IsOpen()) { socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); }); } @@ -1421,15 +1421,23 @@ void Connection::ShutdownSelf() { util::Connection::Shutdown(); } -void Connection::Migrate(util::fb2::ProactorBase* dest) { +bool Connection::Migrate(util::fb2::ProactorBase* dest) { // Migrate is used only by replication, so it doesn't have properties of full-fledged // connections CHECK(!cc_->async_dispatch); - CHECK_EQ(cc_->subscriptions, 0); // are bound to thread local caches - CHECK_EQ(self_.use_count(), 1u); // references cache our thread and backpressure - CHECK(!dispatch_fb_.IsJoinable()); // can't move once it started + CHECK_EQ(cc_->subscriptions, 0); // are bound to thread local caches + CHECK_EQ(self_.use_count(), 1u); // references cache our thread and backpressure + if (dispatch_fb_.IsJoinable()) { // can't move once it started + return false; + } listener()->Migrate(this, dest); + // After we migrate, it could be the case the connection was shut down. We should + // act accordingly. + if (!socket()->IsOpen()) { + return false; + } + return true; } Connection::WeakRef Connection::Borrow() { @@ -1503,8 +1511,9 @@ void Connection::SendAsync(MessageHandle msg) { return; // If we launch while closing, it won't be awaited. Control messages will be processed on cleanup. - if (!cc_->conn_closing) + if (!cc_->conn_closing) { LaunchDispatchFiberIfNeeded(); + } DCHECK_NE(phase_, PRECLOSE); // No more messages are processed after this point diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 92b76d0cf..b8b52e303 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -241,7 +241,9 @@ class Connection : public util::Connection { void ShutdownSelf(); // Migrate this connecton to a different thread. - void Migrate(util::fb2::ProactorBase* dest); + // Return true if Migrate succeeded + // Return false if dispatch_fb_ is active + bool Migrate(util::fb2::ProactorBase* dest); // Borrow weak reference to connection. Can be called from any thread. WeakRef Borrow(); @@ -377,7 +379,6 @@ class Connection : public util::Connection { bool ShouldEndDispatchFiber(const MessageHandle& msg); void LaunchDispatchFiberIfNeeded(); // Dispatch fiber is started lazily - // Squashes pipelined commands from the dispatch queue to spread load over all threads void SquashPipeline(facade::SinkReplyBuilder*); diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 81d42730f..5f43d13b5 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -166,7 +166,13 @@ void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) { if (num_thread < pool->size()) { if (int(num_thread) != ProactorBase::me()->GetPoolIndex()) { - cntx->conn()->Migrate(pool->at(num_thread)); + if (!cntx->conn()->Migrate(pool->at(num_thread))) { + // Listener::PreShutdown() triggered + if (cntx->conn()->socket()->IsOpen()) { + return rb->SendError(kInvalidState); + } + return; + } } return rb->SendOk(); @@ -222,8 +228,13 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) { flow.conn = cntx->conn(); flow.eof_token = eof_token; flow.version = replica_ptr->version; - - cntx->conn()->Migrate(shard_set->pool()->at(flow_id)); + if (!cntx->conn()->Migrate(shard_set->pool()->at(flow_id))) { + // Listener::PreShutdown() triggered + if (cntx->conn()->socket()->IsOpen()) { + return rb->SendError(kInvalidState); + } + return; + } sf_->journal()->StartInThread(); std::string_view sync_type = "FULL";