diff --git a/changelog.d/18254.feature b/changelog.d/18254.feature new file mode 100644 index 0000000000..62e1b79a15 --- /dev/null +++ b/changelog.d/18254.feature @@ -0,0 +1 @@ +Add background job to clear unreferenced state groups. diff --git a/docs/development/database_schema.md b/docs/development/database_schema.md index 37a06acc12..620d1c16b0 100644 --- a/docs/development/database_schema.md +++ b/docs/development/database_schema.md @@ -162,7 +162,7 @@ by a unique name, the current status (stored in JSON), and some dependency infor * Whether the update requires a previous update to be complete. * A rough ordering for which to complete updates. -A new background updates needs to be added to the `background_updates` table: +A new background update needs to be added to the `background_updates` table: ```sql INSERT INTO background_updates (ordering, update_name, depends_on, progress_json) VALUES diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 1bb9940180..438b2ff8a0 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -192,6 +192,11 @@ APPEND_ONLY_TABLES = [ IGNORED_TABLES = { + # Porting the auto generated sequence in this table is non-trivial. + # None of the entries in this list are mandatory for Synapse to keep working. + # If state group disk space is an issue after the port, the + # `mark_unreferenced_state_groups_for_deletion_bg_update` background task can be run again. + "state_groups_pending_deletion", # We don't port these tables, as they're a faff and we can regenerate # them anyway. "user_directory", @@ -217,6 +222,15 @@ IGNORED_TABLES = { } +# These background updates will not be applied upon creation of the postgres database. +IGNORED_BACKGROUND_UPDATES = { + # Reapplying this background update to the postgres database is unnecessary after + # already having waited for the SQLite database to complete all running background + # updates. + "mark_unreferenced_state_groups_for_deletion_bg_update", +} + + # Error returned by the run function. Used at the top-level part of the script to # handle errors and return codes. end_error: Optional[str] = None @@ -688,6 +702,20 @@ class Porter: # 0 means off. 1 means full. 2 means incremental. return autovacuum_setting != 0 + async def remove_ignored_background_updates_from_database(self) -> None: + def _remove_delete_unreferenced_state_groups_bg_updates( + txn: LoggingTransaction, + ) -> None: + txn.execute( + "DELETE FROM background_updates WHERE update_name = ANY(?)", + (list(IGNORED_BACKGROUND_UPDATES),), + ) + + await self.postgres_store.db_pool.runInteraction( + "remove_delete_unreferenced_state_groups_bg_updates", + _remove_delete_unreferenced_state_groups_bg_updates, + ) + async def run(self) -> None: """Ports the SQLite database to a PostgreSQL database. @@ -733,6 +761,8 @@ class Porter: self.hs_config.database.get_single_database() ) + await self.remove_ignored_background_updates_from_database() + await self.run_background_updates_on_postgres() self.progress.set_state("Creating port tables") diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 47cec8c469..c2d4bf8290 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -21,11 +21,19 @@ import itertools import logging -from typing import TYPE_CHECKING, Collection, Mapping, Set +from typing import ( + TYPE_CHECKING, + Collection, + Mapping, + Optional, + Set, +) from synapse.logging.context import nested_logging_context from synapse.metrics.background_process_metrics import wrap_as_background_process +from synapse.storage.database import LoggingTransaction from synapse.storage.databases import Databases +from synapse.types.storage import _BackgroundUpdates if TYPE_CHECKING: from synapse.server import HomeServer @@ -44,6 +52,11 @@ class PurgeEventsStorageController: self._delete_state_groups_loop, 60 * 1000 ) + self.stores.state.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE, + self._background_delete_unrefereneced_state_groups, + ) + async def purge_room(self, room_id: str) -> None: """Deletes all record of a room""" @@ -81,7 +94,8 @@ class PurgeEventsStorageController: ) async def _find_unreferenced_groups( - self, state_groups: Collection[int] + self, + state_groups: Collection[int], ) -> Set[int]: """Used when purging history to figure out which state groups can be deleted. @@ -203,3 +217,232 @@ class PurgeEventsStorageController: room_id, groups_to_sequences, ) + + async def _background_delete_unrefereneced_state_groups( + self, progress: dict, batch_size: int + ) -> int: + """This background update will slowly delete any unreferenced state groups""" + + last_checked_state_group = progress.get("last_checked_state_group") + + if last_checked_state_group is None: + # This is the first run. + last_checked_state_group = ( + await self.stores.state.db_pool.simple_select_one_onecol( + table="state_groups", + keyvalues={}, + retcol="MAX(id)", + allow_none=True, + desc="get_max_state_group", + ) + ) + if last_checked_state_group is None: + # There are no state groups so the background process is finished. + await self.stores.state.db_pool.updates._end_background_update( + _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE + ) + return batch_size + last_checked_state_group += 1 + + ( + last_checked_state_group, + final_batch, + ) = await self._delete_unreferenced_state_groups_batch( + last_checked_state_group, + batch_size, + ) + + if not final_batch: + # There are more state groups to check. + progress = { + "last_checked_state_group": last_checked_state_group, + } + await self.stores.state.db_pool.updates._background_update_progress( + _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE, + progress, + ) + else: + # This background process is finished. + await self.stores.state.db_pool.updates._end_background_update( + _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE + ) + + return batch_size + + async def _delete_unreferenced_state_groups_batch( + self, + last_checked_state_group: int, + batch_size: int, + ) -> tuple[int, bool]: + """Looks for unreferenced state groups starting from the last state group + checked and marks them for deletion. + + Args: + last_checked_state_group: The last state group that was checked. + batch_size: How many state groups to process in this iteration. + + Returns: + (last_checked_state_group, final_batch) + """ + + # Find all state groups that can be deleted if any of the original set are deleted. + ( + to_delete, + last_checked_state_group, + final_batch, + ) = await self._find_unreferenced_groups_for_background_deletion( + last_checked_state_group, batch_size + ) + + if len(to_delete) == 0: + return last_checked_state_group, final_batch + + await self.stores.state_deletion.mark_state_groups_as_pending_deletion( + to_delete + ) + + return last_checked_state_group, final_batch + + async def _find_unreferenced_groups_for_background_deletion( + self, + last_checked_state_group: int, + batch_size: int, + ) -> tuple[Set[int], int, bool]: + """Used when deleting unreferenced state groups in the background to figure out + which state groups can be deleted. + To avoid increased DB usage due to de-deltaing state groups, this returns only + state groups which are free standing (ie. no shared edges with referenced groups) or + state groups which do not share edges which result in a future referenced group. + + The following scenarios outline the possibilities based on state group data in + the DB. + + ie. Free standing -> state groups 1-N would be returned: + SG_1 + | + ... + | + SG_N + + ie. Previous reference -> state groups 2-N would be returned: + SG_1 <- referenced by event + | + SG_2 + | + ... + | + SG_N + + ie. Future reference -> none of the following state groups would be returned: + SG_1 + | + SG_2 + | + ... + | + SG_N <- referenced by event + + Args: + last_checked_state_group: The last state group that was checked. + batch_size: How many state groups to process in this iteration. + + Returns: + (to_delete, last_checked_state_group, final_batch) + """ + + # If a state group's next edge is not pending deletion then we don't delete the state group. + # If there is no next edge or the next edges are all marked for deletion, then delete + # the state group. + # This holds since we walk backwards from the latest state groups, ensuring that + # we've already checked newer state groups for event references along the way. + def get_next_state_groups_marked_for_deletion_txn( + txn: LoggingTransaction, + ) -> tuple[dict[int, bool], dict[int, int]]: + state_group_sql = """ + SELECT s.id, e.state_group, d.state_group + FROM ( + SELECT id FROM state_groups + WHERE id < ? ORDER BY id DESC LIMIT ? + ) as s + LEFT JOIN state_group_edges AS e ON (s.id = e.prev_state_group) + LEFT JOIN state_groups_pending_deletion AS d ON (e.state_group = d.state_group) + """ + txn.execute(state_group_sql, (last_checked_state_group, batch_size)) + + # Mapping from state group to whether we should delete it. + state_groups_to_deletion: dict[int, bool] = {} + + # Mapping from state group to prev state group. + state_groups_to_prev: dict[int, int] = {} + + for row in txn: + state_group = row[0] + next_edge = row[1] + pending_deletion = row[2] + + if next_edge is not None: + state_groups_to_prev[next_edge] = state_group + + if next_edge is not None and not pending_deletion: + # We have found an edge not marked for deletion. + # Check previous results to see if this group is part of a chain + # within this batch that qualifies for deletion. + # ie. batch contains: + # SG_1 -> SG_2 -> SG_3 + # If SG_3 is a candidate for deletion, then SG_2 & SG_1 should also + # be, even though they have edges which may not be marked for + # deletion. + # This relies on SQL results being sorted in DESC order to work. + next_is_deletion_candidate = state_groups_to_deletion.get(next_edge) + if ( + next_is_deletion_candidate is None + or not next_is_deletion_candidate + ): + state_groups_to_deletion[state_group] = False + else: + state_groups_to_deletion.setdefault(state_group, True) + else: + # This state group may be a candidate for deletion + state_groups_to_deletion.setdefault(state_group, True) + + return state_groups_to_deletion, state_groups_to_prev + + ( + state_groups_to_deletion, + state_group_edges, + ) = await self.stores.state.db_pool.runInteraction( + "get_next_state_groups_marked_for_deletion", + get_next_state_groups_marked_for_deletion_txn, + ) + deletion_candidates = { + state_group + for state_group, deletion in state_groups_to_deletion.items() + if deletion + } + + final_batch = False + state_groups = state_groups_to_deletion.keys() + if len(state_groups) < batch_size: + final_batch = True + else: + last_checked_state_group = min(state_groups) + + if len(state_groups) == 0: + return set(), last_checked_state_group, final_batch + + # Determine if any of the remaining state groups are directly referenced. + referenced = await self.stores.main.get_referenced_state_groups( + deletion_candidates + ) + + # Remove state groups from deletion_candidates which are directly referenced or share a + # future edge with a referenced state group within this batch. + def filter_reference_chains(group: Optional[int]) -> None: + while group is not None: + deletion_candidates.discard(group) + group = state_group_edges.get(group) + + for referenced_group in referenced: + filter_reference_chains(referenced_group) + + return deletion_candidates, last_checked_state_group, final_batch diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index f7824cba0f..95fd0ae73a 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -20,7 +20,15 @@ # import logging -from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union +from typing import ( + TYPE_CHECKING, + Dict, + List, + Mapping, + Optional, + Tuple, + Union, +) from synapse.logging.opentracing import tag_args, trace from synapse.storage._base import SQLBaseStore diff --git a/synapse/storage/databases/state/deletion.py b/synapse/storage/databases/state/deletion.py index d4b1c20a45..f77c46f6ae 100644 --- a/synapse/storage/databases/state/deletion.py +++ b/synapse/storage/databases/state/deletion.py @@ -321,18 +321,42 @@ class StateDeletionDataStore: async def mark_state_groups_as_pending_deletion( self, state_groups: Collection[int] ) -> None: - """Mark the given state groups as pending deletion""" + """Mark the given state groups as pending deletion. + + If any of the state groups are already pending deletion, then those records are + left as is. + """ + + await self.db_pool.runInteraction( + "mark_state_groups_as_pending_deletion", + self._mark_state_groups_as_pending_deletion_txn, + state_groups, + ) + + def _mark_state_groups_as_pending_deletion_txn( + self, + txn: LoggingTransaction, + state_groups: Collection[int], + ) -> None: + sql = """ + INSERT INTO state_groups_pending_deletion (state_group, insertion_ts) + VALUES %s + ON CONFLICT (state_group) + DO NOTHING + """ now = self._clock.time_msec() - - await self.db_pool.simple_upsert_many( - table="state_groups_pending_deletion", - key_names=("state_group",), - key_values=[(state_group,) for state_group in state_groups], - value_names=("insertion_ts",), - value_values=[(now,) for _ in state_groups], - desc="mark_state_groups_as_pending_deletion", - ) + rows = [ + ( + state_group, + now, + ) + for state_group in state_groups + ] + if isinstance(txn.database_engine, PostgresEngine): + txn.execute_values(sql % ("?",), rows, fetch=False) + else: + txn.execute_batch(sql % ("(?, ?)",), rows) async def mark_state_groups_as_used(self, state_groups: Collection[int]) -> None: """Mark the given state groups as now being referenced""" diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index f87b1a4a0a..2160edb014 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -161,6 +161,7 @@ Changes in SCHEMA_VERSION = 89 Changes in SCHEMA_VERSION = 90 - Add a column `participant` to `room_memberships` table + - Add background update to delete unreferenced state groups. """ diff --git a/synapse/storage/schema/state/delta/90/02_delete_unreferenced_state_groups.sql b/synapse/storage/schema/state/delta/90/02_delete_unreferenced_state_groups.sql new file mode 100644 index 0000000000..55a038e2b8 --- /dev/null +++ b/synapse/storage/schema/state/delta/90/02_delete_unreferenced_state_groups.sql @@ -0,0 +1,16 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2025 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +-- Add a background update to delete any unreferenced state groups +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (9002, 'mark_unreferenced_state_groups_for_deletion_bg_update', '{}'); diff --git a/synapse/types/storage/__init__.py b/synapse/types/storage/__init__.py index b5fa20a41a..e03ff7ffc8 100644 --- a/synapse/types/storage/__init__.py +++ b/synapse/types/storage/__init__.py @@ -48,3 +48,7 @@ class _BackgroundUpdates: SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE = ( "sliding_sync_membership_snapshots_fix_forgotten_column_bg_update" ) + + MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE = ( + "mark_unreferenced_state_groups_for_deletion_bg_update" + ) diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index 916e42e731..0aa14fd1f4 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -24,6 +24,7 @@ from synapse.api.errors import NotFoundError, SynapseError from synapse.rest.client import room from synapse.server import HomeServer from synapse.types.state import StateFilter +from synapse.types.storage import _BackgroundUpdates from synapse.util import Clock from tests.unittest import HomeserverTestCase @@ -303,3 +304,156 @@ class PurgeTests(HomeserverTestCase): ) ) self.assertEqual(len(state_groups), 1) + + def test_clear_unreferenced_state_groups(self) -> None: + """Test that any unreferenced state groups are automatically cleaned up.""" + + self.helper.send(self.room_id, body="test1") + state1 = self.helper.send_state( + self.room_id, "org.matrix.test", body={"number": 2} + ) + # Create enough state events to require multiple batches of + # mark_unreferenced_state_groups_for_deletion_bg_update to be run. + for i in range(200): + self.helper.send_state(self.room_id, "org.matrix.test", body={"number": i}) + self.helper.send(self.room_id, body="test4") + last = self.helper.send(self.room_id, body="test5") + + # Create an unreferenced state group that has no prev group. + unreferenced_free_state_group = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=None, + delta_ids={("org.matrix.test", ""): state1["event_id"]}, + current_state_ids={("org.matrix.test", ""): ""}, + ) + ) + + # Create some unreferenced state groups that have a prev group of one of the + # existing state groups. + prev_group = self.get_success( + self.store._get_state_group_for_event(state1["event_id"]) + ) + unreferenced_end_state_group = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=prev_group, + delta_ids={("org.matrix.test", ""): state1["event_id"]}, + current_state_ids=None, + ) + ) + another_unreferenced_end_state_group = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=unreferenced_end_state_group, + delta_ids={("org.matrix.test", ""): state1["event_id"]}, + current_state_ids=None, + ) + ) + + # Add some other unreferenced state groups which lead to a referenced state + # group. + # These state groups should not get deleted. + chain_state_group = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=None, + delta_ids={("org.matrix.test", ""): ""}, + current_state_ids={("org.matrix.test", ""): ""}, + ) + ) + chain_state_group_2 = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=chain_state_group, + delta_ids={("org.matrix.test", ""): ""}, + current_state_ids=None, + ) + ) + referenced_chain_state_group = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=chain_state_group_2, + delta_ids={("org.matrix.test", ""): ""}, + current_state_ids=None, + ) + ) + self.get_success( + self.store.db_pool.simple_insert( + "event_to_state_groups", + { + "event_id": "$new_event", + "state_group": referenced_chain_state_group, + }, + ) + ) + + # Insert and run the background update. + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE, + "progress_json": "{}", + }, + ) + ) + self.store.db_pool.updates._all_done = False + self.wait_for_background_updates() + + # Advance so that the background job to delete the state groups runs + self.reactor.advance( + 1 + self.state_deletion_store.DELAY_BEFORE_DELETION_MS / 1000 + ) + + # We expect that the unreferenced free state group has been deleted. + row = self.get_success( + self.state_store.db_pool.simple_select_one_onecol( + table="state_groups", + keyvalues={"id": unreferenced_free_state_group}, + retcol="id", + allow_none=True, + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertIsNone(row) + + # We expect that both unreferenced end state groups have been deleted. + row = self.get_success( + self.state_store.db_pool.simple_select_one_onecol( + table="state_groups", + keyvalues={"id": unreferenced_end_state_group}, + retcol="id", + allow_none=True, + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertIsNone(row) + row = self.get_success( + self.state_store.db_pool.simple_select_one_onecol( + table="state_groups", + keyvalues={"id": another_unreferenced_end_state_group}, + retcol="id", + allow_none=True, + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertIsNone(row) + + # We expect there to now only be one state group for the room, which is + # the state group of the last event (as the only outlier). + state_groups = self.get_success( + self.state_store.db_pool.simple_select_onecol( + table="state_groups", + keyvalues={"room_id": self.room_id}, + retcol="id", + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertEqual(len(state_groups), 210)