diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 20675a5ce..152020a61 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -566,7 +566,7 @@ void Connection::OnPreMigrateThread() { void Connection::OnPostMigrateThread() { // Once we migrated, we should rearm OnBreakCb callback. - if (breaker_cb_) { + if (breaker_cb_ && socket()->IsOpen()) { socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); }); } @@ -1421,15 +1421,23 @@ void Connection::ShutdownSelf() { util::Connection::Shutdown(); } -void Connection::Migrate(util::fb2::ProactorBase* dest) { +bool Connection::Migrate(util::fb2::ProactorBase* dest) { // Migrate is used only by replication, so it doesn't have properties of full-fledged // connections CHECK(!cc_->async_dispatch); - CHECK_EQ(cc_->subscriptions, 0); // are bound to thread local caches - CHECK_EQ(self_.use_count(), 1u); // references cache our thread and backpressure - CHECK(!dispatch_fb_.IsJoinable()); // can't move once it started + CHECK_EQ(cc_->subscriptions, 0); // are bound to thread local caches + CHECK_EQ(self_.use_count(), 1u); // references cache our thread and backpressure + if (dispatch_fb_.IsJoinable()) { // can't move once it started + return false; + } listener()->Migrate(this, dest); + // After we migrate, it could be the case the connection was shut down. We should + // act accordingly. + if (!socket()->IsOpen()) { + return false; + } + return true; } Connection::WeakRef Connection::Borrow() { @@ -1503,8 +1511,9 @@ void Connection::SendAsync(MessageHandle msg) { return; // If we launch while closing, it won't be awaited. Control messages will be processed on cleanup. - if (!cc_->conn_closing) + if (!cc_->conn_closing) { LaunchDispatchFiberIfNeeded(); + } DCHECK_NE(phase_, PRECLOSE); // No more messages are processed after this point diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 92b76d0cf..b8b52e303 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -241,7 +241,9 @@ class Connection : public util::Connection { void ShutdownSelf(); // Migrate this connecton to a different thread. - void Migrate(util::fb2::ProactorBase* dest); + // Return true if Migrate succeeded + // Return false if dispatch_fb_ is active + bool Migrate(util::fb2::ProactorBase* dest); // Borrow weak reference to connection. Can be called from any thread. WeakRef Borrow(); @@ -377,7 +379,6 @@ class Connection : public util::Connection { bool ShouldEndDispatchFiber(const MessageHandle& msg); void LaunchDispatchFiberIfNeeded(); // Dispatch fiber is started lazily - // Squashes pipelined commands from the dispatch queue to spread load over all threads void SquashPipeline(facade::SinkReplyBuilder*); diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 81d42730f..5f43d13b5 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -166,7 +166,13 @@ void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) { if (num_thread < pool->size()) { if (int(num_thread) != ProactorBase::me()->GetPoolIndex()) { - cntx->conn()->Migrate(pool->at(num_thread)); + if (!cntx->conn()->Migrate(pool->at(num_thread))) { + // Listener::PreShutdown() triggered + if (cntx->conn()->socket()->IsOpen()) { + return rb->SendError(kInvalidState); + } + return; + } } return rb->SendOk(); @@ -222,8 +228,13 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) { flow.conn = cntx->conn(); flow.eof_token = eof_token; flow.version = replica_ptr->version; - - cntx->conn()->Migrate(shard_set->pool()->at(flow_id)); + if (!cntx->conn()->Migrate(shard_set->pool()->at(flow_id))) { + // Listener::PreShutdown() triggered + if (cntx->conn()->socket()->IsOpen()) { + return rb->SendError(kInvalidState); + } + return; + } sf_->journal()->StartInThread(); std::string_view sync_type = "FULL";