diff --git a/synapse/storage/database.py b/synapse/storage/database.py index da50fd7f83..d666039120 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1536,8 +1536,8 @@ class DatabasePool: self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False) + @staticmethod def simple_upsert_many_txn_native_upsert( - self, txn: LoggingTransaction, table: str, key_names: Collection[str], diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index c520faa9e7..85014719ae 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -20,7 +20,6 @@ # import logging -from collections import OrderedDict from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, cast import attr @@ -322,7 +321,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS self.db_pool.updates.register_background_index_update( _BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, index_name="sliding_sync_joined_rooms_to_recalculate_room_id_idx", - table="sliding_sync_joined_rooms", + table="sliding_sync_joined_rooms_to_recalculate", columns=["room_id"], unique=True, ) @@ -1575,21 +1574,43 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS return batch_size async def _sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update( - self, _progress: JsonDict, _batch_size: int + self, progress: JsonDict, _batch_size: int ) -> int: """ Prefill `sliding_sync_joined_rooms_to_recalculate` table with all rooms we know about already. """ + initial_insert = progress.get("initial_insert", False) def _txn(txn: LoggingTransaction) -> None: # We do this as one big bulk insert. This has been tested on a bigger # homeserver with ~10M rooms and took 11s. There is potential for this to # starve disk usage while this goes on. - txn.execute( - """ - INSERT INTO sliding_sync_joined_rooms_to_recalculate (room_id) SELECT room_id FROM rooms; - """, - ) + if initial_insert: + txn.execute( + """ + INSERT INTO sliding_sync_joined_rooms_to_recalculate + (room_id) + SELECT room_id FROM rooms; + """, + ) + else: + # We can only upsert once the unique index has been added to the table + # (see + # `_BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE`) + # + # We upsert in case we have to run this multiple times. + # + # The `WHERE TRUE` clause is to avoid "Parsing Ambiguity" + txn.execute( + """ + INSERT INTO sliding_sync_joined_rooms_to_recalculate + (room_id) + SELECT room_id FROM rooms WHERE ? + ON CONFLICT (room_id) + DO NOTHING; + """, + (True,), + ) await self.db_pool.runInteraction( "_sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update", @@ -1608,11 +1629,10 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS """ Background update to populate the `sliding_sync_joined_rooms` table. """ - last_event_stream_ordering = progress.get( - "last_event_stream_ordering", -(1 << 31) - ) + # We don't need to fetch any progress state because we just grab the next N + # events in `sliding_sync_joined_rooms_to_recalculate` - def _get_rooms_to_update_txn(txn: LoggingTransaction) -> List[Tuple[str, int]]: + def _get_rooms_to_update_txn(txn: LoggingTransaction) -> List[Tuple[str]]: """ Returns: A list of room ID's to update along with the progress value @@ -1625,30 +1645,16 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # server is still participating in the room because if we're # `no_longer_in_room`, this table would be cleared out for the given # `room_id`. - # - # Because we're using `event_stream_ordering` as the progress marker, we're - # going to be pulling out the same rooms over and over again but we can - # at-least re-use this background update for the catch-up background - # process as well (see `_resolve_stale_data_in_sliding_sync_tables()`). - # - # It's important to sort by `event_stream_ordering` *ascending* (oldest to - # newest) so that if we see that this background update in progress and want - # to start the catch-up process, we can safely assume that it will - # eventually get to the rooms we want to catch-up on anyway (see - # `_resolve_stale_data_in_sliding_sync_tables()`). txn.execute( """ - SELECT room_id, MAX(event_stream_ordering) - FROM current_state_events - WHERE event_stream_ordering > ? - GROUP BY room_id - ORDER BY MAX(event_stream_ordering) ASC + SELECT room_id + FROM sliding_sync_joined_rooms_to_recalculate LIMIT ? """, - (last_event_stream_ordering, batch_size), + (batch_size,), ) - rooms_to_update_rows = cast(List[Tuple[str, int]], txn.fetchall()) + rooms_to_update_rows = cast(List[Tuple[str]], txn.fetchall()) return rooms_to_update_rows @@ -1669,28 +1675,23 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS joined_room_stream_ordering_updates: Dict[ str, _JoinedRoomStreamOrderingUpdate ] = {} - # Map from room_id to the progress value (event_stream_ordering) - # - # This needs to be an `OrderedDict` because we need to process things in - # `event_stream_ordering` order *ascending* to save our progress position - # correctly if we need to exit early. - room_id_to_progress_marker_map: OrderedDict[str, int] = OrderedDict() # As long as we get this value before we fetch the current state, we can use it # to check if something has changed since that point. most_recent_current_state_delta_stream_id = ( await self.get_max_stream_id_in_current_state_deltas() ) - for room_id, progress_event_stream_ordering in rooms_to_update: - room_id_to_progress_marker_map[room_id] = progress_event_stream_ordering - + for (room_id,) in rooms_to_update: current_state_ids_map = await self.db_pool.runInteraction( "_sliding_sync_joined_rooms_bg_update._get_relevant_sliding_sync_current_state_event_ids_txn", PersistEventsStore._get_relevant_sliding_sync_current_state_event_ids_txn, room_id, ) - # We're iterating over rooms pulled from the current_state_events table - # so we should have some current state for each room - assert current_state_ids_map + + # If we're not joined to the room a) it doesn't belong in the + # `sliding_sync_joined_rooms` table so we should skip and b) we won't have + # any `current_state_events` for the room. + if not current_state_ids_map: + continue fetched_events = await self.get_events(current_state_ids_map.values()) @@ -1701,9 +1702,9 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS if event_id in fetched_events } - # Can happen for unknown room versions (old room versions that aren't known - # anymore) since `get_events(...)` will filter out events for unknown room - # versions + # Even if we are joined to the room, this can happen for unknown room + # versions (old room versions that aren't known anymore) since + # `get_events(...)` will filter out events for unknown room versions if not current_state_map: continue @@ -1754,23 +1755,17 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS def _fill_table_txn(txn: LoggingTransaction) -> None: # Handle updating the `sliding_sync_joined_rooms` table # - last_successful_room_id: Optional[str] = None - # Process the rooms in `event_stream_ordering` order *ascending* so we can - # save our position correctly if we need to exit early. - # `progress_event_stream_ordering` is an `OrderedDict` which remembers - # insertion order (and we inserted in the correct order) so this should be - # the correct thing to do. for ( room_id, - progress_event_stream_ordering, - ) in room_id_to_progress_marker_map.items(): - update_map = joined_room_updates[room_id] - - joined_room_update = joined_room_stream_ordering_updates[room_id] - event_stream_ordering = ( - joined_room_update.most_recent_event_stream_ordering + update_map, + ) in joined_room_updates.items(): + joined_room_stream_ordering_update = ( + joined_room_stream_ordering_updates[room_id] ) - bump_stamp = joined_room_update.most_recent_bump_stamp + event_stream_ordering = ( + joined_room_stream_ordering_update.most_recent_event_stream_ordering + ) + bump_stamp = joined_room_stream_ordering_update.most_recent_bump_stamp # Check if the current state has been updated since we gathered it state_deltas_since_we_gathered_current_state = ( @@ -1790,15 +1785,6 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS state_delta.event_type, state_delta.state_key, ) in SLIDING_SYNC_RELEVANT_STATE_SET: - # Save our progress before we exit early - if last_successful_room_id is not None: - self.db_pool.updates._background_update_progress_txn( - txn, - _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE, - { - "last_event_stream_ordering": progress_event_stream_ordering - }, - ) # Raising exception so we can just exit and try again. It would # be hard to resolve this within the transaction because we need # to get full events out that take redactions into account. We @@ -1829,20 +1815,17 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS }, ) - # Keep track of the last successful room_id - last_successful_room_id = room_id + # Now that we've processed the room, we can remove it from the queue + self.db_pool.simple_delete_txn( + txn, + table="sliding_sync_joined_rooms_to_recalculate", + keyvalues={"room_id": room_id}, + ) await self.db_pool.runInteraction( "sliding_sync_joined_rooms_bg_update", _fill_table_txn ) - # Update the progress - _ = room_id_to_progress_marker_map.values() - await self.db_pool.updates._background_update_progress( - _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE, - {"last_event_stream_ordering": rooms_to_update[-1][1]}, - ) - return len(rooms_to_update) async def _sliding_sync_membership_snapshots_bg_update( diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 034e6f6ccd..9e9c27e3b1 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -642,6 +642,7 @@ def _resolve_stale_data_in_sliding_sync_joined_rooms_table( # nothing to clean up row = cast(Optional[Tuple[int]], txn.fetchone()) max_stream_ordering_sliding_sync_joined_rooms_table = None + depends_on = None if row is not None: (max_stream_ordering_sliding_sync_joined_rooms_table,) = row @@ -668,6 +669,7 @@ def _resolve_stale_data_in_sliding_sync_joined_rooms_table( for chunk in batch_iter(room_rows, 1000): # Handle updating the `sliding_sync_joined_rooms` table # + # Clear out the stale data DatabasePool.simple_delete_many_batch_txn( txn, table="sliding_sync_joined_rooms", @@ -675,6 +677,44 @@ def _resolve_stale_data_in_sliding_sync_joined_rooms_table( values=chunk, ) + # Update the `sliding_sync_joined_rooms_to_recalculate` table with the rooms + # that went stale and now need to be recalculated. + # + # FIXME: There is potentially a race where the unique index (added via + # `_BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE`) + # hasn't been added at this point so we won't be able to upsert + DatabasePool.simple_upsert_many_txn_native_upsert( + txn, + table="sliding_sync_joined_rooms_to_recalculate", + key_names=("room_id",), + key_values=chunk, + value_names=(), + # No value columns, therefore make a blank list so that the following + # zip() works correctly. + value_values=[() for x in range(len(chunk))], + ) + else: + # Re-run the `sliding_sync_joined_rooms_to_recalculate` prefill if there is + # nothing in the `sliding_sync_joined_rooms` table + DatabasePool.simple_upsert_txn_native_upsert( + txn, + table="background_updates", + keyvalues={ + "update_name": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE + }, + values={}, + # Only insert the row if it doesn't already exist. If it already exists, + # we're already working on it + insertion_values={ + "progress_json": "{}", + # Since we're going to upsert, we need to make sure the unique index is in place + "depends_on": _BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, + }, + ) + depends_on = ( + _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE + ) + # Now kick-off the background update to catch-up with what we missed while Synapse # was downgraded. # @@ -682,13 +722,6 @@ def _resolve_stale_data_in_sliding_sync_joined_rooms_table( # `sliding_sync_joined_rooms` table yet. This could happen if someone had zero rooms # on their server (so the normal background update completes), downgrade Synapse # versions, join and create some new rooms, and upgrade again. - # - progress_json: JsonDict = {} - if max_stream_ordering_sliding_sync_joined_rooms_table is not None: - progress_json["last_event_stream_ordering"] = ( - max_stream_ordering_sliding_sync_joined_rooms_table - ) - DatabasePool.simple_upsert_txn_native_upsert( txn, table="background_updates", @@ -699,7 +732,10 @@ def _resolve_stale_data_in_sliding_sync_joined_rooms_table( # Only insert the row if it doesn't already exist. If it already exists, we will # eventually fill in the rows we're trying to populate. insertion_values={ - "progress_json": json_encoder.encode(progress_json), + # Empty progress is expected since it's not used for this background update. + "progress_json": "{}", + # Wait for the prefill to finish + "depends_on": depends_on, }, ) diff --git a/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql b/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql index 6ab1897230..11fb2c4d64 100644 --- a/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql +++ b/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql @@ -150,7 +150,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_event_stream -- 3. Add a background update to populate the new `sliding_sync_joined_rooms` table -- INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (8701, 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update', '{}'); + (8701, 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update', '{ "initial_insert": true }'); INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES (8701, 'sliding_sync_index_joined_rooms_to_recalculate_table_bg_update', '{}', 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update'); INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 506d981ce6..49dc973a36 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -112,6 +112,24 @@ class UpdateUpsertManyTests(unittest.HomeserverTestCase): {(1, "user1", "hello"), (2, "user2", "bleb")}, ) + self.get_success( + self.storage.db_pool.runInteraction( + "test", + self.storage.db_pool.simple_upsert_many_txn, + self.table_name, + key_names=key_names, + key_values=[[2, "user2"]], + value_names=[], + value_values=[], + ) + ) + + # Check results are what we expect + self.assertEqual( + set(self._dump_table_to_tuple()), + {(1, "user1", "hello"), (2, "user2", "bleb")}, + ) + def test_simple_update_many(self) -> None: """ simple_update_many performs many updates at once. diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py index 2654decb0c..300ccd664e 100644 --- a/tests/storage/test_sliding_sync_tables.py +++ b/tests/storage/test_sliding_sync_tables.py @@ -2659,13 +2659,33 @@ class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase): exact=True, ) - # Insert and run the background update. + # Insert and run the background updates. + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, + "progress_json": "{}", + }, + ) + ) + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": _BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, + "progress_json": "{}", + "depends_on": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, + }, + ) + ) self.get_success( self.store.db_pool.simple_insert( "background_updates", { "update_name": _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE, "progress_json": "{}", + "depends_on": _BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, }, ) ) @@ -2807,12 +2827,32 @@ class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase): ) # Insert and run the background update. + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, + "progress_json": "{}", + }, + ) + ) + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": _BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, + "progress_json": "{}", + "depends_on": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, + }, + ) + ) self.get_success( self.store.db_pool.simple_insert( "background_updates", { "update_name": _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE, "progress_json": "{}", + "depends_on": _BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, }, ) )