1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix: test_replicaof_reject_on_load crash on stop (#2818)

* Migrate might fail if dispatch_fiber is active. In that case do no crash but return false to indicate that the migration was not successful.
* After we migrate, we might find ourselves with the socket closed (because of the shutdown signal process/flow). We need to check that the socket is open. If it's not, it means that it was shutdown by the signal flow (because after we finish with DispatchTracker, we start iterating over all of the connections in all of the shards and shut them down.
This commit is contained in:
Kostas Kyrimis 2024-04-09 10:01:32 +03:00 committed by GitHub
parent 57d567639c
commit 41d463ae3d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 32 additions and 11 deletions

View file

@ -566,7 +566,7 @@ void Connection::OnPreMigrateThread() {
void Connection::OnPostMigrateThread() { void Connection::OnPostMigrateThread() {
// Once we migrated, we should rearm OnBreakCb callback. // Once we migrated, we should rearm OnBreakCb callback.
if (breaker_cb_) { if (breaker_cb_ && socket()->IsOpen()) {
socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); }); socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); });
} }
@ -1421,15 +1421,23 @@ void Connection::ShutdownSelf() {
util::Connection::Shutdown(); util::Connection::Shutdown();
} }
void Connection::Migrate(util::fb2::ProactorBase* dest) { bool Connection::Migrate(util::fb2::ProactorBase* dest) {
// Migrate is used only by replication, so it doesn't have properties of full-fledged // Migrate is used only by replication, so it doesn't have properties of full-fledged
// connections // connections
CHECK(!cc_->async_dispatch); CHECK(!cc_->async_dispatch);
CHECK_EQ(cc_->subscriptions, 0); // are bound to thread local caches CHECK_EQ(cc_->subscriptions, 0); // are bound to thread local caches
CHECK_EQ(self_.use_count(), 1u); // references cache our thread and backpressure CHECK_EQ(self_.use_count(), 1u); // references cache our thread and backpressure
CHECK(!dispatch_fb_.IsJoinable()); // can't move once it started if (dispatch_fb_.IsJoinable()) { // can't move once it started
return false;
}
listener()->Migrate(this, dest); listener()->Migrate(this, dest);
// After we migrate, it could be the case the connection was shut down. We should
// act accordingly.
if (!socket()->IsOpen()) {
return false;
}
return true;
} }
Connection::WeakRef Connection::Borrow() { Connection::WeakRef Connection::Borrow() {
@ -1503,8 +1511,9 @@ void Connection::SendAsync(MessageHandle msg) {
return; return;
// If we launch while closing, it won't be awaited. Control messages will be processed on cleanup. // If we launch while closing, it won't be awaited. Control messages will be processed on cleanup.
if (!cc_->conn_closing) if (!cc_->conn_closing) {
LaunchDispatchFiberIfNeeded(); LaunchDispatchFiberIfNeeded();
}
DCHECK_NE(phase_, PRECLOSE); // No more messages are processed after this point DCHECK_NE(phase_, PRECLOSE); // No more messages are processed after this point

View file

@ -241,7 +241,9 @@ class Connection : public util::Connection {
void ShutdownSelf(); void ShutdownSelf();
// Migrate this connecton to a different thread. // Migrate this connecton to a different thread.
void Migrate(util::fb2::ProactorBase* dest); // Return true if Migrate succeeded
// Return false if dispatch_fb_ is active
bool Migrate(util::fb2::ProactorBase* dest);
// Borrow weak reference to connection. Can be called from any thread. // Borrow weak reference to connection. Can be called from any thread.
WeakRef Borrow(); WeakRef Borrow();
@ -377,7 +379,6 @@ class Connection : public util::Connection {
bool ShouldEndDispatchFiber(const MessageHandle& msg); bool ShouldEndDispatchFiber(const MessageHandle& msg);
void LaunchDispatchFiberIfNeeded(); // Dispatch fiber is started lazily void LaunchDispatchFiberIfNeeded(); // Dispatch fiber is started lazily
// Squashes pipelined commands from the dispatch queue to spread load over all threads // Squashes pipelined commands from the dispatch queue to spread load over all threads
void SquashPipeline(facade::SinkReplyBuilder*); void SquashPipeline(facade::SinkReplyBuilder*);

View file

@ -166,7 +166,13 @@ void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) {
if (num_thread < pool->size()) { if (num_thread < pool->size()) {
if (int(num_thread) != ProactorBase::me()->GetPoolIndex()) { if (int(num_thread) != ProactorBase::me()->GetPoolIndex()) {
cntx->conn()->Migrate(pool->at(num_thread)); if (!cntx->conn()->Migrate(pool->at(num_thread))) {
// Listener::PreShutdown() triggered
if (cntx->conn()->socket()->IsOpen()) {
return rb->SendError(kInvalidState);
}
return;
}
} }
return rb->SendOk(); return rb->SendOk();
@ -222,8 +228,13 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
flow.conn = cntx->conn(); flow.conn = cntx->conn();
flow.eof_token = eof_token; flow.eof_token = eof_token;
flow.version = replica_ptr->version; flow.version = replica_ptr->version;
if (!cntx->conn()->Migrate(shard_set->pool()->at(flow_id))) {
cntx->conn()->Migrate(shard_set->pool()->at(flow_id)); // Listener::PreShutdown() triggered
if (cntx->conn()->socket()->IsOpen()) {
return rb->SendError(kInvalidState);
}
return;
}
sf_->journal()->StartInThread(); sf_->journal()->StartInThread();
std::string_view sync_type = "FULL"; std::string_view sync_type = "FULL";