mirror of
https://github.com/element-hq/synapse.git
synced 2025-01-20 18:42:33 +00:00
Fix tightloop over connecting to replication server
If the client failed to process incoming commands during the initial set up of the replication connection it would immediately disconnect and reconnect, resulting in a tightloop. This can happen, for example, when subscribing to a stream that has a row that is too long in the backlog. The fix here is to not consider the connection successfully set up until the client has succesfully subscribed and caught up with the streams. This ensures that the retry logic timers aren't reset until then, meaning that if an error does happen during start up the client will continue backing off before retrying again.
This commit is contained in:
parent
4dc945ba30
commit
313987187e
3 changed files with 42 additions and 5 deletions
|
@ -188,7 +188,9 @@ RDATA (S)
|
|||
A single update in a stream
|
||||
|
||||
POSITION (S)
|
||||
The position of the stream has been updated
|
||||
The position of the stream has been updated. Sent to the client after all
|
||||
missing updates for a stream have been sent to the client and they're now
|
||||
up to date.
|
||||
|
||||
ERROR (S, C)
|
||||
There was an error
|
||||
|
|
|
@ -54,7 +54,6 @@ class ReplicationClientFactory(ReconnectingClientFactory):
|
|||
|
||||
def buildProtocol(self, addr):
|
||||
logger.info("Connected to replication: %r", addr)
|
||||
self.resetDelay()
|
||||
return ClientReplicationStreamProtocol(
|
||||
self.client_name, self.server_name, self._clock, self.handler
|
||||
)
|
||||
|
@ -90,15 +89,23 @@ class ReplicationClientHandler(object):
|
|||
# Used for tests.
|
||||
self.awaiting_syncs = {}
|
||||
|
||||
# Set of stream names that have been subscribe to, but haven't yet
|
||||
# caught up with. This is used to track when the client has been fully
|
||||
# connected to the remote.
|
||||
self.streams_connecting = None
|
||||
|
||||
# The factory used to create connections.
|
||||
self.factory = None
|
||||
|
||||
def start_replication(self, hs):
|
||||
"""Helper method to start a replication connection to the remote server
|
||||
using TCP.
|
||||
"""
|
||||
client_name = hs.config.worker_name
|
||||
factory = ReplicationClientFactory(hs, client_name, self)
|
||||
self.factory = ReplicationClientFactory(hs, client_name, self)
|
||||
host = hs.config.worker_replication_host
|
||||
port = hs.config.worker_replication_port
|
||||
hs.get_reactor().connectTCP(host, port, factory)
|
||||
hs.get_reactor().connectTCP(host, port, self.factory)
|
||||
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
"""Called when we get new replication data. By default this just pokes
|
||||
|
@ -115,6 +122,12 @@ class ReplicationClientHandler(object):
|
|||
|
||||
Can be overriden in subclasses to handle more.
|
||||
"""
|
||||
# When we get a `POSITION` command it means we've finished getting
|
||||
# missing updates for the given stream, and are now up to date.
|
||||
self.streams_connecting.discard(stream_name)
|
||||
if not self.streams_connecting:
|
||||
self.finished_connecting()
|
||||
|
||||
return self.store.process_replication_rows(stream_name, token, [])
|
||||
|
||||
def on_sync(self, data):
|
||||
|
@ -140,6 +153,10 @@ class ReplicationClientHandler(object):
|
|||
args["account_data"] = user_account_data
|
||||
elif room_account_data:
|
||||
args["account_data"] = room_account_data
|
||||
|
||||
# Record which streams we're in the process of subscribing to
|
||||
self.streams_connecting = set(args.keys())
|
||||
|
||||
return args
|
||||
|
||||
def get_currently_syncing_users(self):
|
||||
|
@ -204,3 +221,18 @@ class ReplicationClientHandler(object):
|
|||
for cmd in self.pending_commands:
|
||||
connection.send_command(cmd)
|
||||
self.pending_commands = []
|
||||
|
||||
# This will happen if we don't actually subscribe to any streams
|
||||
if not self.streams_connecting:
|
||||
self.finished_connecting()
|
||||
|
||||
def finished_connecting(self):
|
||||
"""Called when we have successfully subscribed and caught up to all
|
||||
streams we're interested in.
|
||||
"""
|
||||
logger.info("Finished connecting to server")
|
||||
|
||||
# We don't reset the delay any earlier as otherwise if there is a
|
||||
# problem during start up we'll end up tight looping connecting to the
|
||||
# server.
|
||||
self.factory.resetDelay()
|
||||
|
|
|
@ -127,8 +127,11 @@ class RdataCommand(Command):
|
|||
|
||||
|
||||
class PositionCommand(Command):
|
||||
"""Sent by the client to tell the client the stream postition without
|
||||
"""Sent by the server to tell the client the stream postition without
|
||||
needing to send an RDATA.
|
||||
|
||||
Sent to the client after all missing updates for a stream have been sent
|
||||
to the client and they're now up to date.
|
||||
"""
|
||||
NAME = "POSITION"
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue