diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 2cf2267e2..030d44b7d 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -611,13 +611,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { DCHECK(dispatch_q_.empty()); service_->OnClose(cc_.get()); - - stats_->read_buf_capacity -= io_buf_.Capacity(); - - // Update num_replicas if this was a replica connection. - if (cc_->replica_conn) { - --stats_->num_replicas; - } + DecreaseStatsOnClose(); // We wait for dispatch_fb to finish writing the previous replies before replying to the last // offending request. @@ -657,8 +651,6 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName() << ": " << ec << " " << ec.message(); } - - --stats_->num_conns; } void Connection::DispatchCommand(uint32_t consumed, mi_heap_t* heap) { @@ -826,10 +818,29 @@ void Connection::HandleMigrateRequest() { // handles. We can't check above, as the queue might have contained a subscribe request. if (cc_->subscriptions == 0) { migration_request_ = nullptr; + + DecreaseStatsOnClose(); + this->Migrate(dest); - // We're now running in `dest` thread - queue_backpressure_ = &tl_queue_backpressure_; + auto update_tl_vars = [this] [[gnu::noinline]] { + // The compiler barrier that does not allow reordering memory accesses + // to before this function starts. See https://stackoverflow.com/a/75622732 + asm volatile(""); + + queue_backpressure_ = &tl_queue_backpressure_; + + stats_ = service_->GetThreadLocalConnectionStats(); + ++stats_->num_conns; + stats_->read_buf_capacity += io_buf_.Capacity(); + if (cc_->replica_conn) { + ++stats_->num_replicas; + } + }; + + // We're now running in `dest` thread. We use non-inline lambda to force reading new thread's + // thread local vars. + update_tl_vars(); } DCHECK(dispatch_q_.empty()); @@ -1333,4 +1344,14 @@ Connection::MemoryUsage Connection::GetMemoryUsage() const { }; } +void Connection::DecreaseStatsOnClose() { + stats_->read_buf_capacity -= io_buf_.Capacity(); + + // Update num_replicas if this was a replica connection. + if (cc_->replica_conn) { + --stats_->num_replicas; + } + --stats_->num_conns; +} + } // namespace facade diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index c4ca81435..93a1c2c2e 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -305,6 +305,8 @@ class Connection : public util::Connection { std::unique_ptr cc_; // Null for http connections private: + void DecreaseStatsOnClose(); + std::deque dispatch_q_; // dispatch queue dfly::EventCount evc_; // dispatch queue waker util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started)