From 6f9932d146fda288688da4b7a75352364fa9d26b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Aug 2024 14:13:52 +0100 Subject: [PATCH] Handle old rows with null event_stream_ordering column --- .../databases/main/events_bg_updates.py | 132 +++++++++++++----- 1 file changed, 99 insertions(+), 33 deletions(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 3603f46678..1f8905fefa 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1828,38 +1828,84 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS """ Background update to populate the `sliding_sync_membership_snapshots` table. """ - last_event_stream_ordering = progress.get( - "last_event_stream_ordering", -(1 << 31) - ) + # We do this in two phases: a) the initial phase where we go through all + # room memberships, and then b) a second phase where we look at new + # memberships (this is to handle the case where we downgrade and then + # upgrade again). + # + # We have to do this as two phases (rather than just the second phase + # where we iterate on event_stream_ordering), as the + # `event_stream_ordering` column may have null values for old rows. + # Therefore we first do the set of historic rooms and *then* look at any + # new rows (which will have a non-null `event_stream_ordering`). + initial_phase = progress.get("initial_phase") + if initial_phase is None: + # If this is the first run, store the current max stream position. + # We know we will go through all memberships less than the current + # max in the initial phase. + progress = { + "initial_phase": True, + "last_event_stream_ordering": self.get_room_max_stream_ordering(), + } + await self.db_pool.updates._background_update_progress( + _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, + progress, + ) + initial_phase = True + + last_room_id = progress.get("last_room_id", "") + last_event_stream_ordering = progress["last_event_stream_ordering"] def _find_memberships_to_update_txn( txn: LoggingTransaction, ) -> List[Tuple[str, str, str, str, str, int, bool]]: # Fetch the set of event IDs that we want to update - # - # It's important to sort by `event_stream_ordering` *ascending* (oldest to - # newest) so that if we see that this background update in progress and want - # to start the catch-up process, we can safely assume that it will - # eventually get to the rooms we want to catch-up on anyway (see - # `_resolve_stale_data_in_sliding_sync_tables()`). - txn.execute( - """ - SELECT - c.room_id, - c.user_id, - e.sender, - c.event_id, - c.membership, - c.event_stream_ordering, - e.outlier - FROM local_current_membership as c - INNER JOIN events AS e USING (event_id) - WHERE event_stream_ordering > ? - ORDER BY event_stream_ordering ASC - LIMIT ? - """, - (last_event_stream_ordering, batch_size), - ) + + if initial_phase: + txn.execute( + """ + SELECT + c.room_id, + c.user_id, + e.sender, + c.event_id, + c.membership, + e.stream_ordering, + e.outlier + FROM local_current_membership as c + INNER JOIN events AS e USING (event_id) + WHERE c.room_id > ? + ORDER BY c.room_id ASC + LIMIT ? + """, + (last_room_id, batch_size), + ) + elif last_event_stream_ordering is not None: + # It's important to sort by `event_stream_ordering` *ascending* (oldest to + # newest) so that if we see that this background update in progress and want + # to start the catch-up process, we can safely assume that it will + # eventually get to the rooms we want to catch-up on anyway (see + # `_resolve_stale_data_in_sliding_sync_tables()`). + txn.execute( + """ + SELECT + c.room_id, + c.user_id, + e.sender, + c.event_id, + c.membership, + c.event_stream_ordering, + e.outlier + FROM local_current_membership as c + INNER JOIN events AS e USING (event_id) + WHERE event_stream_ordering > ? + ORDER BY event_stream_ordering ASC + LIMIT ? + """, + (last_event_stream_ordering, batch_size), + ) + else: + raise Exception("last_event_stream_ordering should not be None") memberships_to_update_rows = cast( List[Tuple[str, str, str, str, str, int, bool]], txn.fetchall() @@ -1873,10 +1919,22 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ) if not memberships_to_update_rows: - await self.db_pool.updates._end_background_update( - _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE - ) - return 0 + if initial_phase: + # Move onto the next phase. + await self.db_pool.updates._background_update_progress( + _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, + { + "initial_phase": False, + "last_event_stream_ordering": last_event_stream_ordering, + }, + ) + return 0 + else: + # We've finished both phases, we're done. + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE + ) + return 0 def _find_previous_membership_txn( txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int @@ -2144,7 +2202,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # Update the progress ( - _room_id, + room_id, _user_id, _sender, _membership_event_id, @@ -2152,9 +2210,16 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS membership_event_stream_ordering, _is_outlier, ) = memberships_to_update_rows[-1] + + progress = { + "initial_phase": initial_phase, + "last_room_id": room_id, + "last_event_stream_ordering": membership_event_stream_ordering, + } + await self.db_pool.updates._background_update_progress( _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, - {"last_event_stream_ordering": membership_event_stream_ordering}, + progress, ) return len(memberships_to_update_rows) @@ -2383,6 +2448,7 @@ def _resolve_stale_data_in_sliding_sync_membership_snapshots_table( # progress_json: JsonDict = {} if max_stream_ordering_sliding_sync_membership_snapshots_table is not None: + progress_json["initial_phase"] = False progress_json["last_event_stream_ordering"] = ( max_stream_ordering_sliding_sync_membership_snapshots_table )