1
0
Fork 0
mirror of https://github.com/element-hq/synapse.git synced 2025-03-31 03:45:13 +00:00

Delete unreferenced state groups in background (#18254)

This PR fixes #18154 to avoid de-deltaing state groups which resulted in
DB size temporarily increasing until the DB was `VACUUM`'ed. As a
result, less state groups will get deleted now.
It also attempts to improve performance by not duplicating work when
processing state groups it has already processed in previous iterations.

### Pull Request Checklist

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [X] Pull request is based on the develop branch
* [X] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [X] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct
(run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

---------

Co-authored-by: Erik Johnston <erikj@element.io>
This commit is contained in:
Devon Hudson 2025-03-21 17:09:49 +00:00 committed by GitHub
parent 33bcef9dc7
commit 1efb826b54
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 495 additions and 14 deletions

View file

@ -0,0 +1 @@
Add background job to clear unreferenced state groups.

View file

@ -162,7 +162,7 @@ by a unique name, the current status (stored in JSON), and some dependency infor
* Whether the update requires a previous update to be complete.
* A rough ordering for which to complete updates.
A new background updates needs to be added to the `background_updates` table:
A new background update needs to be added to the `background_updates` table:
```sql
INSERT INTO background_updates (ordering, update_name, depends_on, progress_json) VALUES

View file

@ -192,6 +192,11 @@ APPEND_ONLY_TABLES = [
IGNORED_TABLES = {
# Porting the auto generated sequence in this table is non-trivial.
# None of the entries in this list are mandatory for Synapse to keep working.
# If state group disk space is an issue after the port, the
# `mark_unreferenced_state_groups_for_deletion_bg_update` background task can be run again.
"state_groups_pending_deletion",
# We don't port these tables, as they're a faff and we can regenerate
# them anyway.
"user_directory",
@ -217,6 +222,15 @@ IGNORED_TABLES = {
}
# These background updates will not be applied upon creation of the postgres database.
IGNORED_BACKGROUND_UPDATES = {
# Reapplying this background update to the postgres database is unnecessary after
# already having waited for the SQLite database to complete all running background
# updates.
"mark_unreferenced_state_groups_for_deletion_bg_update",
}
# Error returned by the run function. Used at the top-level part of the script to
# handle errors and return codes.
end_error: Optional[str] = None
@ -688,6 +702,20 @@ class Porter:
# 0 means off. 1 means full. 2 means incremental.
return autovacuum_setting != 0
async def remove_ignored_background_updates_from_database(self) -> None:
def _remove_delete_unreferenced_state_groups_bg_updates(
txn: LoggingTransaction,
) -> None:
txn.execute(
"DELETE FROM background_updates WHERE update_name = ANY(?)",
(list(IGNORED_BACKGROUND_UPDATES),),
)
await self.postgres_store.db_pool.runInteraction(
"remove_delete_unreferenced_state_groups_bg_updates",
_remove_delete_unreferenced_state_groups_bg_updates,
)
async def run(self) -> None:
"""Ports the SQLite database to a PostgreSQL database.
@ -733,6 +761,8 @@ class Porter:
self.hs_config.database.get_single_database()
)
await self.remove_ignored_background_updates_from_database()
await self.run_background_updates_on_postgres()
self.progress.set_state("Creating port tables")

View file

@ -21,11 +21,19 @@
import itertools
import logging
from typing import TYPE_CHECKING, Collection, Mapping, Set
from typing import (
TYPE_CHECKING,
Collection,
Mapping,
Optional,
Set,
)
from synapse.logging.context import nested_logging_context
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases import Databases
from synapse.types.storage import _BackgroundUpdates
if TYPE_CHECKING:
from synapse.server import HomeServer
@ -44,6 +52,11 @@ class PurgeEventsStorageController:
self._delete_state_groups_loop, 60 * 1000
)
self.stores.state.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE,
self._background_delete_unrefereneced_state_groups,
)
async def purge_room(self, room_id: str) -> None:
"""Deletes all record of a room"""
@ -81,7 +94,8 @@ class PurgeEventsStorageController:
)
async def _find_unreferenced_groups(
self, state_groups: Collection[int]
self,
state_groups: Collection[int],
) -> Set[int]:
"""Used when purging history to figure out which state groups can be
deleted.
@ -203,3 +217,232 @@ class PurgeEventsStorageController:
room_id,
groups_to_sequences,
)
async def _background_delete_unrefereneced_state_groups(
self, progress: dict, batch_size: int
) -> int:
"""This background update will slowly delete any unreferenced state groups"""
last_checked_state_group = progress.get("last_checked_state_group")
if last_checked_state_group is None:
# This is the first run.
last_checked_state_group = (
await self.stores.state.db_pool.simple_select_one_onecol(
table="state_groups",
keyvalues={},
retcol="MAX(id)",
allow_none=True,
desc="get_max_state_group",
)
)
if last_checked_state_group is None:
# There are no state groups so the background process is finished.
await self.stores.state.db_pool.updates._end_background_update(
_BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE
)
return batch_size
last_checked_state_group += 1
(
last_checked_state_group,
final_batch,
) = await self._delete_unreferenced_state_groups_batch(
last_checked_state_group,
batch_size,
)
if not final_batch:
# There are more state groups to check.
progress = {
"last_checked_state_group": last_checked_state_group,
}
await self.stores.state.db_pool.updates._background_update_progress(
_BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE,
progress,
)
else:
# This background process is finished.
await self.stores.state.db_pool.updates._end_background_update(
_BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE
)
return batch_size
async def _delete_unreferenced_state_groups_batch(
self,
last_checked_state_group: int,
batch_size: int,
) -> tuple[int, bool]:
"""Looks for unreferenced state groups starting from the last state group
checked and marks them for deletion.
Args:
last_checked_state_group: The last state group that was checked.
batch_size: How many state groups to process in this iteration.
Returns:
(last_checked_state_group, final_batch)
"""
# Find all state groups that can be deleted if any of the original set are deleted.
(
to_delete,
last_checked_state_group,
final_batch,
) = await self._find_unreferenced_groups_for_background_deletion(
last_checked_state_group, batch_size
)
if len(to_delete) == 0:
return last_checked_state_group, final_batch
await self.stores.state_deletion.mark_state_groups_as_pending_deletion(
to_delete
)
return last_checked_state_group, final_batch
async def _find_unreferenced_groups_for_background_deletion(
self,
last_checked_state_group: int,
batch_size: int,
) -> tuple[Set[int], int, bool]:
"""Used when deleting unreferenced state groups in the background to figure out
which state groups can be deleted.
To avoid increased DB usage due to de-deltaing state groups, this returns only
state groups which are free standing (ie. no shared edges with referenced groups) or
state groups which do not share edges which result in a future referenced group.
The following scenarios outline the possibilities based on state group data in
the DB.
ie. Free standing -> state groups 1-N would be returned:
SG_1
|
...
|
SG_N
ie. Previous reference -> state groups 2-N would be returned:
SG_1 <- referenced by event
|
SG_2
|
...
|
SG_N
ie. Future reference -> none of the following state groups would be returned:
SG_1
|
SG_2
|
...
|
SG_N <- referenced by event
Args:
last_checked_state_group: The last state group that was checked.
batch_size: How many state groups to process in this iteration.
Returns:
(to_delete, last_checked_state_group, final_batch)
"""
# If a state group's next edge is not pending deletion then we don't delete the state group.
# If there is no next edge or the next edges are all marked for deletion, then delete
# the state group.
# This holds since we walk backwards from the latest state groups, ensuring that
# we've already checked newer state groups for event references along the way.
def get_next_state_groups_marked_for_deletion_txn(
txn: LoggingTransaction,
) -> tuple[dict[int, bool], dict[int, int]]:
state_group_sql = """
SELECT s.id, e.state_group, d.state_group
FROM (
SELECT id FROM state_groups
WHERE id < ? ORDER BY id DESC LIMIT ?
) as s
LEFT JOIN state_group_edges AS e ON (s.id = e.prev_state_group)
LEFT JOIN state_groups_pending_deletion AS d ON (e.state_group = d.state_group)
"""
txn.execute(state_group_sql, (last_checked_state_group, batch_size))
# Mapping from state group to whether we should delete it.
state_groups_to_deletion: dict[int, bool] = {}
# Mapping from state group to prev state group.
state_groups_to_prev: dict[int, int] = {}
for row in txn:
state_group = row[0]
next_edge = row[1]
pending_deletion = row[2]
if next_edge is not None:
state_groups_to_prev[next_edge] = state_group
if next_edge is not None and not pending_deletion:
# We have found an edge not marked for deletion.
# Check previous results to see if this group is part of a chain
# within this batch that qualifies for deletion.
# ie. batch contains:
# SG_1 -> SG_2 -> SG_3
# If SG_3 is a candidate for deletion, then SG_2 & SG_1 should also
# be, even though they have edges which may not be marked for
# deletion.
# This relies on SQL results being sorted in DESC order to work.
next_is_deletion_candidate = state_groups_to_deletion.get(next_edge)
if (
next_is_deletion_candidate is None
or not next_is_deletion_candidate
):
state_groups_to_deletion[state_group] = False
else:
state_groups_to_deletion.setdefault(state_group, True)
else:
# This state group may be a candidate for deletion
state_groups_to_deletion.setdefault(state_group, True)
return state_groups_to_deletion, state_groups_to_prev
(
state_groups_to_deletion,
state_group_edges,
) = await self.stores.state.db_pool.runInteraction(
"get_next_state_groups_marked_for_deletion",
get_next_state_groups_marked_for_deletion_txn,
)
deletion_candidates = {
state_group
for state_group, deletion in state_groups_to_deletion.items()
if deletion
}
final_batch = False
state_groups = state_groups_to_deletion.keys()
if len(state_groups) < batch_size:
final_batch = True
else:
last_checked_state_group = min(state_groups)
if len(state_groups) == 0:
return set(), last_checked_state_group, final_batch
# Determine if any of the remaining state groups are directly referenced.
referenced = await self.stores.main.get_referenced_state_groups(
deletion_candidates
)
# Remove state groups from deletion_candidates which are directly referenced or share a
# future edge with a referenced state group within this batch.
def filter_reference_chains(group: Optional[int]) -> None:
while group is not None:
deletion_candidates.discard(group)
group = state_group_edges.get(group)
for referenced_group in referenced:
filter_reference_chains(referenced_group)
return deletion_candidates, last_checked_state_group, final_batch

View file

@ -20,7 +20,15 @@
#
import logging
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union
from typing import (
TYPE_CHECKING,
Dict,
List,
Mapping,
Optional,
Tuple,
Union,
)
from synapse.logging.opentracing import tag_args, trace
from synapse.storage._base import SQLBaseStore

View file

@ -321,18 +321,42 @@ class StateDeletionDataStore:
async def mark_state_groups_as_pending_deletion(
self, state_groups: Collection[int]
) -> None:
"""Mark the given state groups as pending deletion"""
"""Mark the given state groups as pending deletion.
If any of the state groups are already pending deletion, then those records are
left as is.
"""
await self.db_pool.runInteraction(
"mark_state_groups_as_pending_deletion",
self._mark_state_groups_as_pending_deletion_txn,
state_groups,
)
def _mark_state_groups_as_pending_deletion_txn(
self,
txn: LoggingTransaction,
state_groups: Collection[int],
) -> None:
sql = """
INSERT INTO state_groups_pending_deletion (state_group, insertion_ts)
VALUES %s
ON CONFLICT (state_group)
DO NOTHING
"""
now = self._clock.time_msec()
await self.db_pool.simple_upsert_many(
table="state_groups_pending_deletion",
key_names=("state_group",),
key_values=[(state_group,) for state_group in state_groups],
value_names=("insertion_ts",),
value_values=[(now,) for _ in state_groups],
desc="mark_state_groups_as_pending_deletion",
)
rows = [
(
state_group,
now,
)
for state_group in state_groups
]
if isinstance(txn.database_engine, PostgresEngine):
txn.execute_values(sql % ("?",), rows, fetch=False)
else:
txn.execute_batch(sql % ("(?, ?)",), rows)
async def mark_state_groups_as_used(self, state_groups: Collection[int]) -> None:
"""Mark the given state groups as now being referenced"""

View file

@ -161,6 +161,7 @@ Changes in SCHEMA_VERSION = 89
Changes in SCHEMA_VERSION = 90
- Add a column `participant` to `room_memberships` table
- Add background update to delete unreferenced state groups.
"""

View file

@ -0,0 +1,16 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Add a background update to delete any unreferenced state groups
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(9002, 'mark_unreferenced_state_groups_for_deletion_bg_update', '{}');

View file

@ -48,3 +48,7 @@ class _BackgroundUpdates:
SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE = (
"sliding_sync_membership_snapshots_fix_forgotten_column_bg_update"
)
MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE = (
"mark_unreferenced_state_groups_for_deletion_bg_update"
)

View file

@ -24,6 +24,7 @@ from synapse.api.errors import NotFoundError, SynapseError
from synapse.rest.client import room
from synapse.server import HomeServer
from synapse.types.state import StateFilter
from synapse.types.storage import _BackgroundUpdates
from synapse.util import Clock
from tests.unittest import HomeserverTestCase
@ -303,3 +304,156 @@ class PurgeTests(HomeserverTestCase):
)
)
self.assertEqual(len(state_groups), 1)
def test_clear_unreferenced_state_groups(self) -> None:
"""Test that any unreferenced state groups are automatically cleaned up."""
self.helper.send(self.room_id, body="test1")
state1 = self.helper.send_state(
self.room_id, "org.matrix.test", body={"number": 2}
)
# Create enough state events to require multiple batches of
# mark_unreferenced_state_groups_for_deletion_bg_update to be run.
for i in range(200):
self.helper.send_state(self.room_id, "org.matrix.test", body={"number": i})
self.helper.send(self.room_id, body="test4")
last = self.helper.send(self.room_id, body="test5")
# Create an unreferenced state group that has no prev group.
unreferenced_free_state_group = self.get_success(
self.state_store.store_state_group(
event_id=last["event_id"],
room_id=self.room_id,
prev_group=None,
delta_ids={("org.matrix.test", ""): state1["event_id"]},
current_state_ids={("org.matrix.test", ""): ""},
)
)
# Create some unreferenced state groups that have a prev group of one of the
# existing state groups.
prev_group = self.get_success(
self.store._get_state_group_for_event(state1["event_id"])
)
unreferenced_end_state_group = self.get_success(
self.state_store.store_state_group(
event_id=last["event_id"],
room_id=self.room_id,
prev_group=prev_group,
delta_ids={("org.matrix.test", ""): state1["event_id"]},
current_state_ids=None,
)
)
another_unreferenced_end_state_group = self.get_success(
self.state_store.store_state_group(
event_id=last["event_id"],
room_id=self.room_id,
prev_group=unreferenced_end_state_group,
delta_ids={("org.matrix.test", ""): state1["event_id"]},
current_state_ids=None,
)
)
# Add some other unreferenced state groups which lead to a referenced state
# group.
# These state groups should not get deleted.
chain_state_group = self.get_success(
self.state_store.store_state_group(
event_id=last["event_id"],
room_id=self.room_id,
prev_group=None,
delta_ids={("org.matrix.test", ""): ""},
current_state_ids={("org.matrix.test", ""): ""},
)
)
chain_state_group_2 = self.get_success(
self.state_store.store_state_group(
event_id=last["event_id"],
room_id=self.room_id,
prev_group=chain_state_group,
delta_ids={("org.matrix.test", ""): ""},
current_state_ids=None,
)
)
referenced_chain_state_group = self.get_success(
self.state_store.store_state_group(
event_id=last["event_id"],
room_id=self.room_id,
prev_group=chain_state_group_2,
delta_ids={("org.matrix.test", ""): ""},
current_state_ids=None,
)
)
self.get_success(
self.store.db_pool.simple_insert(
"event_to_state_groups",
{
"event_id": "$new_event",
"state_group": referenced_chain_state_group,
},
)
)
# Insert and run the background update.
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE,
"progress_json": "{}",
},
)
)
self.store.db_pool.updates._all_done = False
self.wait_for_background_updates()
# Advance so that the background job to delete the state groups runs
self.reactor.advance(
1 + self.state_deletion_store.DELAY_BEFORE_DELETION_MS / 1000
)
# We expect that the unreferenced free state group has been deleted.
row = self.get_success(
self.state_store.db_pool.simple_select_one_onecol(
table="state_groups",
keyvalues={"id": unreferenced_free_state_group},
retcol="id",
allow_none=True,
desc="test_purge_unreferenced_state_group",
)
)
self.assertIsNone(row)
# We expect that both unreferenced end state groups have been deleted.
row = self.get_success(
self.state_store.db_pool.simple_select_one_onecol(
table="state_groups",
keyvalues={"id": unreferenced_end_state_group},
retcol="id",
allow_none=True,
desc="test_purge_unreferenced_state_group",
)
)
self.assertIsNone(row)
row = self.get_success(
self.state_store.db_pool.simple_select_one_onecol(
table="state_groups",
keyvalues={"id": another_unreferenced_end_state_group},
retcol="id",
allow_none=True,
desc="test_purge_unreferenced_state_group",
)
)
self.assertIsNone(row)
# We expect there to now only be one state group for the room, which is
# the state group of the last event (as the only outlier).
state_groups = self.get_success(
self.state_store.db_pool.simple_select_onecol(
table="state_groups",
keyvalues={"room_id": self.room_id},
retcol="id",
desc="test_purge_unreferenced_state_group",
)
)
self.assertEqual(len(state_groups), 210)