1
0
Fork 0
mirror of https://github.com/element-hq/synapse.git synced 2025-04-08 11:43:59 +00:00

Handle old staged inbound events ()

We might have events in the staging area if the service was restarted while there were unhandled events in the staging area.

Fixes 
This commit is contained in:
Erik Johnston 2021-07-06 13:02:37 +01:00 committed by GitHub
parent d7a94a7dcc
commit c65067d673
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 67 additions and 10 deletions
changelog.d
synapse
federation
storage/databases/main

1
changelog.d/10303.bugfix Normal file
View file

@ -0,0 +1 @@
Ensure that inbound events from federation that were being processed when Synapse was restarted get promptly processed on start up.

View file

@ -148,6 +148,41 @@ class FederationServer(FederationBase):
self._room_prejoin_state_types = hs.config.api.room_prejoin_state
# Whether we have started handling old events in the staging area.
self._started_handling_of_staged_events = False
@wrap_as_background_process("_handle_old_staged_events")
async def _handle_old_staged_events(self) -> None:
"""Handle old staged events by fetching all rooms that have staged
events and start the processing of each of those rooms.
"""
# Get all the rooms IDs with staged events.
room_ids = await self.store.get_all_rooms_with_staged_incoming_events()
# We then shuffle them so that if there are multiple instances doing
# this work they're less likely to collide.
random.shuffle(room_ids)
for room_id in room_ids:
room_version = await self.store.get_room_version(room_id)
# Try and acquire the processing lock for the room, if we get it start a
# background process for handling the events in the room.
lock = await self.store.try_acquire_lock(
_INBOUND_EVENT_HANDLING_LOCK_NAME, room_id
)
if lock:
logger.info("Handling old staged inbound events in %s", room_id)
self._process_incoming_pdus_in_room_inner(
room_id,
room_version,
lock,
)
# We pause a bit so that we don't start handling all rooms at once.
await self._clock.sleep(random.uniform(0, 0.1))
async def on_backfill_request(
self, origin: str, room_id: str, versions: List[str], limit: int
) -> Tuple[int, Dict[str, Any]]:
@ -166,6 +201,12 @@ class FederationServer(FederationBase):
async def on_incoming_transaction(
self, origin: str, transaction_data: JsonDict
) -> Tuple[int, Dict[str, Any]]:
# If we receive a transaction we should make sure that kick off handling
# any old events in the staging area.
if not self._started_handling_of_staged_events:
self._started_handling_of_staged_events = True
self._handle_old_staged_events()
# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
request_time = self._clock.time_msec()
@ -882,25 +923,28 @@ class FederationServer(FederationBase):
room_id: str,
room_version: RoomVersion,
lock: Lock,
latest_origin: str,
latest_event: EventBase,
latest_origin: Optional[str] = None,
latest_event: Optional[EventBase] = None,
) -> None:
"""Process events in the staging area for the given room.
The latest_origin and latest_event args are the latest origin and event
received.
received (or None to simply pull the next event from the database).
"""
# The common path is for the event we just received be the only event in
# the room, so instead of pulling the event out of the DB and parsing
# the event we just pull out the next event ID and check if that matches.
next_origin, next_event_id = await self.store.get_next_staged_event_id_for_room(
room_id
)
if next_origin == latest_origin and next_event_id == latest_event.event_id:
origin = latest_origin
event = latest_event
else:
if latest_event is not None and latest_origin is not None:
(
next_origin,
next_event_id,
) = await self.store.get_next_staged_event_id_for_room(room_id)
if next_origin != latest_origin or next_event_id != latest_event.event_id:
latest_origin = None
latest_event = None
if latest_origin is None or latest_event is None:
next = await self.store.get_next_staged_event_for_room(
room_id, room_version
)
@ -908,6 +952,9 @@ class FederationServer(FederationBase):
return
origin, event = next
else:
origin = latest_origin
event = latest_event
# We loop round until there are no more events in the room in the
# staging area, or we fail to get the lock (which means another process

View file

@ -1207,6 +1207,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
return origin, event
async def get_all_rooms_with_staged_incoming_events(self) -> List[str]:
"""Get the room IDs of all events currently staged."""
return await self.db_pool.simple_select_onecol(
table="federation_inbound_events_staging",
keyvalues={},
retcol="DISTINCT room_id",
desc="get_all_rooms_with_staged_incoming_events",
)
@wrap_as_background_process("_get_stats_for_federation_staging")
async def _get_stats_for_federation_staging(self):
"""Update the prometheus metrics for the inbound federation staging area."""