mirror of
https://github.com/element-hq/synapse.git
synced 2025-03-12 10:56:50 +00:00
Revert the background job to clear unreferenced state groups (that was introduced in v1.126.0rc1), due to a suspected issue that causes increased disk usage. (#18222)
Revert "Add background job to clear unreferenced state groups (#18154)"
This mechanism is suspected of inserting large numbers of rows into
`state_groups_state`,
thus unreasonably increasing disk usage.
See: https://github.com/element-hq/synapse/issues/18217
This reverts commit 5121f9210c
(#18154).
---------
Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org>
This commit is contained in:
parent
350e84a8a4
commit
8295de87a7
10 changed files with 76 additions and 375 deletions
1
changelog.d/18222.bugfix
Normal file
1
changelog.d/18222.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Revert the background job to clear unreferenced state groups (that was introduced in v1.126.0rc1), due to a suspected issue that causes increased disk usage.
|
|
@ -162,7 +162,7 @@ by a unique name, the current status (stored in JSON), and some dependency infor
|
|||
* Whether the update requires a previous update to be complete.
|
||||
* A rough ordering for which to complete updates.
|
||||
|
||||
A new background update needs to be added to the `background_updates` table:
|
||||
A new background updates needs to be added to the `background_updates` table:
|
||||
|
||||
```sql
|
||||
INSERT INTO background_updates (ordering, update_name, depends_on, progress_json) VALUES
|
||||
|
|
|
@ -191,11 +191,6 @@ APPEND_ONLY_TABLES = [
|
|||
|
||||
|
||||
IGNORED_TABLES = {
|
||||
# Porting the auto generated sequence in this table is non-trivial.
|
||||
# None of the entries in this list are mandatory for Synapse to keep working.
|
||||
# If state group disk space is an issue after the port, the
|
||||
# `delete_unreferenced_state_groups_bg_update` background task can be run again.
|
||||
"state_groups_pending_deletion",
|
||||
# We don't port these tables, as they're a faff and we can regenerate
|
||||
# them anyway.
|
||||
"user_directory",
|
||||
|
@ -221,15 +216,6 @@ IGNORED_TABLES = {
|
|||
}
|
||||
|
||||
|
||||
# These background updates will not be applied upon creation of the postgres database.
|
||||
IGNORED_BACKGROUND_UPDATES = {
|
||||
# Reapplying this background update to the postgres database is unnecessary after
|
||||
# already having waited for the SQLite database to complete all running background
|
||||
# updates.
|
||||
"delete_unreferenced_state_groups_bg_update",
|
||||
}
|
||||
|
||||
|
||||
# Error returned by the run function. Used at the top-level part of the script to
|
||||
# handle errors and return codes.
|
||||
end_error: Optional[str] = None
|
||||
|
@ -701,20 +687,6 @@ class Porter:
|
|||
# 0 means off. 1 means full. 2 means incremental.
|
||||
return autovacuum_setting != 0
|
||||
|
||||
async def remove_ignored_background_updates_from_database(self) -> None:
|
||||
def _remove_delete_unreferenced_state_groups_bg_updates(
|
||||
txn: LoggingTransaction,
|
||||
) -> None:
|
||||
txn.execute(
|
||||
"DELETE FROM background_updates WHERE update_name = ANY(?)",
|
||||
(list(IGNORED_BACKGROUND_UPDATES),),
|
||||
)
|
||||
|
||||
await self.postgres_store.db_pool.runInteraction(
|
||||
"remove_delete_unreferenced_state_groups_bg_updates",
|
||||
_remove_delete_unreferenced_state_groups_bg_updates,
|
||||
)
|
||||
|
||||
async def run(self) -> None:
|
||||
"""Ports the SQLite database to a PostgreSQL database.
|
||||
|
||||
|
@ -760,8 +732,6 @@ class Porter:
|
|||
self.hs_config.database.get_single_database()
|
||||
)
|
||||
|
||||
await self.remove_ignored_background_updates_from_database()
|
||||
|
||||
await self.run_background_updates_on_postgres()
|
||||
|
||||
self.progress.set_state("Creating port tables")
|
||||
|
|
|
@ -21,18 +21,11 @@
|
|||
|
||||
import itertools
|
||||
import logging
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Collection,
|
||||
Mapping,
|
||||
Set,
|
||||
)
|
||||
from typing import TYPE_CHECKING, Collection, Mapping, Set
|
||||
|
||||
from synapse.logging.context import nested_logging_context
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.storage.databases import Databases
|
||||
from synapse.types.storage import _BackgroundUpdates
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
@ -51,11 +44,6 @@ class PurgeEventsStorageController:
|
|||
self._delete_state_groups_loop, 60 * 1000
|
||||
)
|
||||
|
||||
self.stores.state.db_pool.updates.register_background_update_handler(
|
||||
_BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE,
|
||||
self._background_delete_unrefereneced_state_groups,
|
||||
)
|
||||
|
||||
async def purge_room(self, room_id: str) -> None:
|
||||
"""Deletes all record of a room"""
|
||||
|
||||
|
@ -92,6 +80,68 @@ class PurgeEventsStorageController:
|
|||
sg_to_delete
|
||||
)
|
||||
|
||||
async def _find_unreferenced_groups(
|
||||
self, state_groups: Collection[int]
|
||||
) -> Set[int]:
|
||||
"""Used when purging history to figure out which state groups can be
|
||||
deleted.
|
||||
|
||||
Args:
|
||||
state_groups: Set of state groups referenced by events
|
||||
that are going to be deleted.
|
||||
|
||||
Returns:
|
||||
The set of state groups that can be deleted.
|
||||
"""
|
||||
# Set of events that we have found to be referenced by events
|
||||
referenced_groups = set()
|
||||
|
||||
# Set of state groups we've already seen
|
||||
state_groups_seen = set(state_groups)
|
||||
|
||||
# Set of state groups to handle next.
|
||||
next_to_search = set(state_groups)
|
||||
while next_to_search:
|
||||
# We bound size of groups we're looking up at once, to stop the
|
||||
# SQL query getting too big
|
||||
if len(next_to_search) < 100:
|
||||
current_search = next_to_search
|
||||
next_to_search = set()
|
||||
else:
|
||||
current_search = set(itertools.islice(next_to_search, 100))
|
||||
next_to_search -= current_search
|
||||
|
||||
referenced = await self.stores.main.get_referenced_state_groups(
|
||||
current_search
|
||||
)
|
||||
referenced_groups |= referenced
|
||||
|
||||
# We don't continue iterating up the state group graphs for state
|
||||
# groups that are referenced.
|
||||
current_search -= referenced
|
||||
|
||||
edges = await self.stores.state.get_previous_state_groups(current_search)
|
||||
|
||||
prevs = set(edges.values())
|
||||
# We don't bother re-handling groups we've already seen
|
||||
prevs -= state_groups_seen
|
||||
next_to_search |= prevs
|
||||
state_groups_seen |= prevs
|
||||
|
||||
# We also check to see if anything referencing the state groups are
|
||||
# also unreferenced. This helps ensure that we delete unreferenced
|
||||
# state groups, if we don't then we will de-delta them when we
|
||||
# delete the other state groups leading to increased DB usage.
|
||||
next_edges = await self.stores.state.get_next_state_groups(current_search)
|
||||
nexts = set(next_edges.keys())
|
||||
nexts -= state_groups_seen
|
||||
next_to_search |= nexts
|
||||
state_groups_seen |= nexts
|
||||
|
||||
to_delete = state_groups_seen - referenced_groups
|
||||
|
||||
return to_delete
|
||||
|
||||
@wrap_as_background_process("_delete_state_groups_loop")
|
||||
async def _delete_state_groups_loop(self) -> None:
|
||||
"""Background task that deletes any state groups that may be pending
|
||||
|
@ -153,173 +203,3 @@ class PurgeEventsStorageController:
|
|||
room_id,
|
||||
groups_to_sequences,
|
||||
)
|
||||
|
||||
async def _background_delete_unrefereneced_state_groups(
|
||||
self, progress: dict, batch_size: int
|
||||
) -> int:
|
||||
"""This background update will slowly delete any unreferenced state groups"""
|
||||
|
||||
last_checked_state_group = progress.get("last_checked_state_group")
|
||||
max_state_group = progress.get("max_state_group")
|
||||
|
||||
if last_checked_state_group is None or max_state_group is None:
|
||||
# This is the first run.
|
||||
last_checked_state_group = 0
|
||||
|
||||
max_state_group = await self.stores.state.db_pool.simple_select_one_onecol(
|
||||
table="state_groups",
|
||||
keyvalues={},
|
||||
retcol="MAX(id)",
|
||||
allow_none=True,
|
||||
desc="get_max_state_group",
|
||||
)
|
||||
if max_state_group is None:
|
||||
# There are no state groups so the background process is finished.
|
||||
await self.stores.state.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE
|
||||
)
|
||||
return batch_size
|
||||
|
||||
(
|
||||
last_checked_state_group,
|
||||
final_batch,
|
||||
) = await self._delete_unreferenced_state_groups_batch(
|
||||
last_checked_state_group, batch_size, max_state_group
|
||||
)
|
||||
|
||||
if not final_batch:
|
||||
# There are more state groups to check.
|
||||
progress = {
|
||||
"last_checked_state_group": last_checked_state_group,
|
||||
"max_state_group": max_state_group,
|
||||
}
|
||||
await self.stores.state.db_pool.updates._background_update_progress(
|
||||
_BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE,
|
||||
progress,
|
||||
)
|
||||
else:
|
||||
# This background process is finished.
|
||||
await self.stores.state.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE
|
||||
)
|
||||
|
||||
return batch_size
|
||||
|
||||
async def _delete_unreferenced_state_groups_batch(
|
||||
self,
|
||||
last_checked_state_group: int,
|
||||
batch_size: int,
|
||||
max_state_group: int,
|
||||
) -> tuple[int, bool]:
|
||||
"""Looks for unreferenced state groups starting from the last state group
|
||||
checked, and any state groups which would become unreferenced if a state group
|
||||
was deleted, and marks them for deletion.
|
||||
|
||||
Args:
|
||||
last_checked_state_group: The last state group that was checked.
|
||||
batch_size: How many state groups to process in this iteration.
|
||||
|
||||
Returns:
|
||||
(last_checked_state_group, final_batch)
|
||||
"""
|
||||
|
||||
# Look for state groups that can be cleaned up.
|
||||
def get_next_state_groups_txn(txn: LoggingTransaction) -> Set[int]:
|
||||
state_group_sql = "SELECT id FROM state_groups WHERE ? < id AND id <= ? ORDER BY id LIMIT ?"
|
||||
txn.execute(
|
||||
state_group_sql, (last_checked_state_group, max_state_group, batch_size)
|
||||
)
|
||||
|
||||
next_set = {row[0] for row in txn}
|
||||
|
||||
return next_set
|
||||
|
||||
next_set = await self.stores.state.db_pool.runInteraction(
|
||||
"get_next_state_groups", get_next_state_groups_txn
|
||||
)
|
||||
|
||||
final_batch = False
|
||||
if len(next_set) < batch_size:
|
||||
final_batch = True
|
||||
else:
|
||||
last_checked_state_group = max(next_set)
|
||||
|
||||
if len(next_set) == 0:
|
||||
return last_checked_state_group, final_batch
|
||||
|
||||
# Find all state groups that can be deleted if the original set is deleted.
|
||||
# This set includes the original set, as well as any state groups that would
|
||||
# become unreferenced upon deleting the original set.
|
||||
to_delete = await self._find_unreferenced_groups(next_set)
|
||||
|
||||
if len(to_delete) == 0:
|
||||
return last_checked_state_group, final_batch
|
||||
|
||||
await self.stores.state_deletion.mark_state_groups_as_pending_deletion(
|
||||
to_delete
|
||||
)
|
||||
|
||||
return last_checked_state_group, final_batch
|
||||
|
||||
async def _find_unreferenced_groups(
|
||||
self,
|
||||
state_groups: Collection[int],
|
||||
) -> Set[int]:
|
||||
"""Used when purging history to figure out which state groups can be
|
||||
deleted.
|
||||
|
||||
Args:
|
||||
state_groups: Set of state groups referenced by events
|
||||
that are going to be deleted.
|
||||
|
||||
Returns:
|
||||
The set of state groups that can be deleted.
|
||||
"""
|
||||
# Set of events that we have found to be referenced by events
|
||||
referenced_groups = set()
|
||||
|
||||
# Set of state groups we've already seen
|
||||
state_groups_seen = set(state_groups)
|
||||
|
||||
# Set of state groups to handle next.
|
||||
next_to_search = set(state_groups)
|
||||
while next_to_search:
|
||||
# We bound size of groups we're looking up at once, to stop the
|
||||
# SQL query getting too big
|
||||
if len(next_to_search) < 100:
|
||||
current_search = next_to_search
|
||||
next_to_search = set()
|
||||
else:
|
||||
current_search = set(itertools.islice(next_to_search, 100))
|
||||
next_to_search -= current_search
|
||||
|
||||
referenced = await self.stores.main.get_referenced_state_groups(
|
||||
current_search
|
||||
)
|
||||
referenced_groups |= referenced
|
||||
|
||||
# We don't continue iterating up the state group graphs for state
|
||||
# groups that are referenced.
|
||||
current_search -= referenced
|
||||
|
||||
edges = await self.stores.state.get_previous_state_groups(current_search)
|
||||
|
||||
prevs = set(edges.values())
|
||||
# We don't bother re-handling groups we've already seen
|
||||
prevs -= state_groups_seen
|
||||
next_to_search |= prevs
|
||||
state_groups_seen |= prevs
|
||||
|
||||
# We also check to see if anything referencing the state groups are
|
||||
# also unreferenced. This helps ensure that we delete unreferenced
|
||||
# state groups, if we don't then we will de-delta them when we
|
||||
# delete the other state groups leading to increased DB usage.
|
||||
next_edges = await self.stores.state.get_next_state_groups(current_search)
|
||||
nexts = set(next_edges.keys())
|
||||
nexts -= state_groups_seen
|
||||
next_to_search |= nexts
|
||||
state_groups_seen |= nexts
|
||||
|
||||
to_delete = state_groups_seen - referenced_groups
|
||||
|
||||
return to_delete
|
||||
|
|
|
@ -20,15 +20,7 @@
|
|||
#
|
||||
|
||||
import logging
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Dict,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Tuple,
|
||||
Union,
|
||||
)
|
||||
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union
|
||||
|
||||
from synapse.logging.opentracing import tag_args, trace
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
|
|
|
@ -321,42 +321,18 @@ class StateDeletionDataStore:
|
|||
async def mark_state_groups_as_pending_deletion(
|
||||
self, state_groups: Collection[int]
|
||||
) -> None:
|
||||
"""Mark the given state groups as pending deletion.
|
||||
|
||||
If any of the state groups are already pending deletion, then those records are
|
||||
left as is.
|
||||
"""
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"mark_state_groups_as_pending_deletion",
|
||||
self._mark_state_groups_as_pending_deletion_txn,
|
||||
state_groups,
|
||||
)
|
||||
|
||||
def _mark_state_groups_as_pending_deletion_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
state_groups: Collection[int],
|
||||
) -> None:
|
||||
sql = """
|
||||
INSERT INTO state_groups_pending_deletion (state_group, insertion_ts)
|
||||
VALUES %s
|
||||
ON CONFLICT (state_group)
|
||||
DO NOTHING
|
||||
"""
|
||||
"""Mark the given state groups as pending deletion"""
|
||||
|
||||
now = self._clock.time_msec()
|
||||
rows = [
|
||||
(
|
||||
state_group,
|
||||
now,
|
||||
)
|
||||
for state_group in state_groups
|
||||
]
|
||||
if isinstance(txn.database_engine, PostgresEngine):
|
||||
txn.execute_values(sql % ("?",), rows, fetch=False)
|
||||
else:
|
||||
txn.execute_batch(sql % ("(?, ?)",), rows)
|
||||
|
||||
await self.db_pool.simple_upsert_many(
|
||||
table="state_groups_pending_deletion",
|
||||
key_names=("state_group",),
|
||||
key_values=[(state_group,) for state_group in state_groups],
|
||||
value_names=("insertion_ts",),
|
||||
value_values=[(now,) for _ in state_groups],
|
||||
desc="mark_state_groups_as_pending_deletion",
|
||||
)
|
||||
|
||||
async def mark_state_groups_as_used(self, state_groups: Collection[int]) -> None:
|
||||
"""Mark the given state groups as now being referenced"""
|
||||
|
|
|
@ -158,7 +158,6 @@ Changes in SCHEMA_VERSION = 88
|
|||
|
||||
Changes in SCHEMA_VERSION = 89
|
||||
- Add `state_groups_pending_deletion` and `state_groups_persisting` tables.
|
||||
- Add background update to delete unreferenced state groups.
|
||||
"""
|
||||
|
||||
|
||||
|
|
|
@ -1,16 +0,0 @@
|
|||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
-- Add a background update to delete any unreferenced state groups
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(8902, 'delete_unreferenced_state_groups_bg_update', '{}');
|
|
@ -48,7 +48,3 @@ class _BackgroundUpdates:
|
|||
SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE = (
|
||||
"sliding_sync_membership_snapshots_fix_forgotten_column_bg_update"
|
||||
)
|
||||
|
||||
DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE = (
|
||||
"delete_unreferenced_state_groups_bg_update"
|
||||
)
|
||||
|
|
|
@ -24,7 +24,6 @@ from synapse.api.errors import NotFoundError, SynapseError
|
|||
from synapse.rest.client import room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.types.storage import _BackgroundUpdates
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
@ -304,99 +303,3 @@ class PurgeTests(HomeserverTestCase):
|
|||
)
|
||||
)
|
||||
self.assertEqual(len(state_groups), 1)
|
||||
|
||||
def test_clear_unreferenced_state_groups(self) -> None:
|
||||
"""Test that any unreferenced state groups are automatically cleaned up."""
|
||||
|
||||
self.helper.send(self.room_id, body="test1")
|
||||
state1 = self.helper.send_state(
|
||||
self.room_id, "org.matrix.test", body={"number": 2}
|
||||
)
|
||||
# Create enough state events to require multiple batches of
|
||||
# delete_unreferenced_state_groups_bg_update to be run.
|
||||
for i in range(200):
|
||||
self.helper.send_state(self.room_id, "org.matrix.test", body={"number": i})
|
||||
state2 = self.helper.send_state(
|
||||
self.room_id, "org.matrix.test", body={"number": 3}
|
||||
)
|
||||
self.helper.send(self.room_id, body="test4")
|
||||
last = self.helper.send(self.room_id, body="test5")
|
||||
|
||||
# Create an unreferenced state group that has a prev group of one of the
|
||||
# to-be-purged events.
|
||||
prev_group = self.get_success(
|
||||
self.store._get_state_group_for_event(state1["event_id"])
|
||||
)
|
||||
unreferenced_state_group = self.get_success(
|
||||
self.state_store.store_state_group(
|
||||
event_id=last["event_id"],
|
||||
room_id=self.room_id,
|
||||
prev_group=prev_group,
|
||||
delta_ids={("org.matrix.test", ""): state2["event_id"]},
|
||||
current_state_ids=None,
|
||||
)
|
||||
)
|
||||
|
||||
another_unreferenced_state_group = self.get_success(
|
||||
self.state_store.store_state_group(
|
||||
event_id=last["event_id"],
|
||||
room_id=self.room_id,
|
||||
prev_group=unreferenced_state_group,
|
||||
delta_ids={("org.matrix.test", ""): state2["event_id"]},
|
||||
current_state_ids=None,
|
||||
)
|
||||
)
|
||||
|
||||
# Insert and run the background update.
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": _BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE,
|
||||
"progress_json": "{}",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.store.db_pool.updates._all_done = False
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# Advance so that the background job to delete the state groups runs
|
||||
self.reactor.advance(
|
||||
1 + self.state_deletion_store.DELAY_BEFORE_DELETION_MS / 1000
|
||||
)
|
||||
|
||||
# We expect that the unreferenced state group has been deleted.
|
||||
row = self.get_success(
|
||||
self.state_store.db_pool.simple_select_one_onecol(
|
||||
table="state_groups",
|
||||
keyvalues={"id": unreferenced_state_group},
|
||||
retcol="id",
|
||||
allow_none=True,
|
||||
desc="test_purge_unreferenced_state_group",
|
||||
)
|
||||
)
|
||||
self.assertIsNone(row)
|
||||
|
||||
# We expect that the other unreferenced state group has also been deleted.
|
||||
row = self.get_success(
|
||||
self.state_store.db_pool.simple_select_one_onecol(
|
||||
table="state_groups",
|
||||
keyvalues={"id": another_unreferenced_state_group},
|
||||
retcol="id",
|
||||
allow_none=True,
|
||||
desc="test_purge_unreferenced_state_group",
|
||||
)
|
||||
)
|
||||
self.assertIsNone(row)
|
||||
|
||||
# We expect there to now only be one state group for the room, which is
|
||||
# the state group of the last event (as the only outlier).
|
||||
state_groups = self.get_success(
|
||||
self.state_store.db_pool.simple_select_onecol(
|
||||
table="state_groups",
|
||||
keyvalues={"room_id": self.room_id},
|
||||
retcol="id",
|
||||
desc="test_purge_unreferenced_state_group",
|
||||
)
|
||||
)
|
||||
self.assertEqual(len(state_groups), 207)
|
||||
|
|
Loading…
Add table
Reference in a new issue