1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix: assign threadlocal data structures during connection migration (#2237)

* fix: assign threadlocal data structures during connection migration

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

* fix: assign threadlocal data structures during connection migration

Co-authored-by: Shahar Mike <chakaz@users.noreply.github.com>
Signed-off-by: Roman Gershman <roman@dragonflydb.io>

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
Co-authored-by: Shahar Mike <chakaz@users.noreply.github.com>
This commit is contained in:
Roman Gershman 2023-12-01 13:34:19 +02:00 committed by GitHub
parent b53492ece7
commit 2381316866
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 11 deletions

View file

@ -611,13 +611,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
DCHECK(dispatch_q_.empty()); DCHECK(dispatch_q_.empty());
service_->OnClose(cc_.get()); service_->OnClose(cc_.get());
DecreaseStatsOnClose();
stats_->read_buf_capacity -= io_buf_.Capacity();
// Update num_replicas if this was a replica connection.
if (cc_->replica_conn) {
--stats_->num_replicas;
}
// We wait for dispatch_fb to finish writing the previous replies before replying to the last // We wait for dispatch_fb to finish writing the previous replies before replying to the last
// offending request. // offending request.
@ -657,8 +651,6 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName() << ": " << ec LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName() << ": " << ec
<< " " << ec.message(); << " " << ec.message();
} }
--stats_->num_conns;
} }
void Connection::DispatchCommand(uint32_t consumed, mi_heap_t* heap) { void Connection::DispatchCommand(uint32_t consumed, mi_heap_t* heap) {
@ -826,10 +818,29 @@ void Connection::HandleMigrateRequest() {
// handles. We can't check above, as the queue might have contained a subscribe request. // handles. We can't check above, as the queue might have contained a subscribe request.
if (cc_->subscriptions == 0) { if (cc_->subscriptions == 0) {
migration_request_ = nullptr; migration_request_ = nullptr;
DecreaseStatsOnClose();
this->Migrate(dest); this->Migrate(dest);
// We're now running in `dest` thread auto update_tl_vars = [this] [[gnu::noinline]] {
// The compiler barrier that does not allow reordering memory accesses
// to before this function starts. See https://stackoverflow.com/a/75622732
asm volatile("");
queue_backpressure_ = &tl_queue_backpressure_; queue_backpressure_ = &tl_queue_backpressure_;
stats_ = service_->GetThreadLocalConnectionStats();
++stats_->num_conns;
stats_->read_buf_capacity += io_buf_.Capacity();
if (cc_->replica_conn) {
++stats_->num_replicas;
}
};
// We're now running in `dest` thread. We use non-inline lambda to force reading new thread's
// thread local vars.
update_tl_vars();
} }
DCHECK(dispatch_q_.empty()); DCHECK(dispatch_q_.empty());
@ -1333,4 +1344,14 @@ Connection::MemoryUsage Connection::GetMemoryUsage() const {
}; };
} }
void Connection::DecreaseStatsOnClose() {
stats_->read_buf_capacity -= io_buf_.Capacity();
// Update num_replicas if this was a replica connection.
if (cc_->replica_conn) {
--stats_->num_replicas;
}
--stats_->num_conns;
}
} // namespace facade } // namespace facade

View file

@ -305,6 +305,8 @@ class Connection : public util::Connection {
std::unique_ptr<ConnectionContext> cc_; // Null for http connections std::unique_ptr<ConnectionContext> cc_; // Null for http connections
private: private:
void DecreaseStatsOnClose();
std::deque<MessageHandle> dispatch_q_; // dispatch queue std::deque<MessageHandle> dispatch_q_; // dispatch queue
dfly::EventCount evc_; // dispatch queue waker dfly::EventCount evc_; // dispatch queue waker
util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started) util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started)