mirror of
https://github.com/element-hq/synapse.git
synced 2025-04-16 10:46:21 +00:00
Properly purge state groups tables when purging a room (#18024)
Currently purging a complex room can lead to a lot of orphaned rows left behind in the state groups tables. It seems it is because we are loosing track of state groups sometimes. This change uses the `room_id` indexed column of `state_groups` table to decide what to delete instead of doing an indirection through `event_to_state_groups`. Related to https://github.com/element-hq/synapse/issues/3364. ### Pull Request Checklist * [x] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). * [x] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --------- Co-authored-by: Erik Johnston <erikj@jki.re>
This commit is contained in:
parent
6306de8e16
commit
b3ba501c52
5 changed files with 35 additions and 70 deletions
1
changelog.d/18024.bugfix
Normal file
1
changelog.d/18024.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Properly purge state groups tables when purging a room with the admin API.
|
|
@ -42,8 +42,8 @@ class PurgeEventsStorageController:
|
||||||
"""Deletes all record of a room"""
|
"""Deletes all record of a room"""
|
||||||
|
|
||||||
with nested_logging_context(room_id):
|
with nested_logging_context(room_id):
|
||||||
state_groups_to_delete = await self.stores.main.purge_room(room_id)
|
await self.stores.main.purge_room(room_id)
|
||||||
await self.stores.state.purge_room_state(room_id, state_groups_to_delete)
|
await self.stores.state.purge_room_state(room_id)
|
||||||
|
|
||||||
async def purge_history(
|
async def purge_history(
|
||||||
self, room_id: str, token: str, delete_local_events: bool
|
self, room_id: str, token: str, delete_local_events: bool
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
#
|
#
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from typing import Any, List, Set, Tuple, cast
|
from typing import Any, Set, Tuple, cast
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.storage.database import LoggingTransaction
|
from synapse.storage.database import LoggingTransaction
|
||||||
|
@ -332,7 +332,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
|
|
||||||
return referenced_state_groups
|
return referenced_state_groups
|
||||||
|
|
||||||
async def purge_room(self, room_id: str) -> List[int]:
|
async def purge_room(self, room_id: str) -> None:
|
||||||
"""Deletes all record of a room
|
"""Deletes all record of a room
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -348,7 +348,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
# purge any of those rows which were added during the first.
|
# purge any of those rows which were added during the first.
|
||||||
|
|
||||||
logger.info("[purge] Starting initial main purge of [1/2]")
|
logger.info("[purge] Starting initial main purge of [1/2]")
|
||||||
state_groups_to_delete = await self.db_pool.runInteraction(
|
await self.db_pool.runInteraction(
|
||||||
"purge_room",
|
"purge_room",
|
||||||
self._purge_room_txn,
|
self._purge_room_txn,
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
|
@ -356,18 +356,15 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info("[purge] Starting secondary main purge of [2/2]")
|
logger.info("[purge] Starting secondary main purge of [2/2]")
|
||||||
state_groups_to_delete.extend(
|
await self.db_pool.runInteraction(
|
||||||
await self.db_pool.runInteraction(
|
"purge_room",
|
||||||
"purge_room",
|
self._purge_room_txn,
|
||||||
self._purge_room_txn,
|
room_id=room_id,
|
||||||
room_id=room_id,
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info("[purge] Done with main purge")
|
logger.info("[purge] Done with main purge")
|
||||||
|
|
||||||
return state_groups_to_delete
|
def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> None:
|
||||||
|
|
||||||
def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
|
|
||||||
# This collides with event persistence so we cannot write new events and metadata into
|
# This collides with event persistence so we cannot write new events and metadata into
|
||||||
# a room while deleting it or this transaction will fail.
|
# a room while deleting it or this transaction will fail.
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
|
@ -381,19 +378,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
# take a while!
|
# take a while!
|
||||||
txn.execute("SET LOCAL statement_timeout = 0")
|
txn.execute("SET LOCAL statement_timeout = 0")
|
||||||
|
|
||||||
# First, fetch all the state groups that should be deleted, before
|
|
||||||
# we delete that information.
|
|
||||||
txn.execute(
|
|
||||||
"""
|
|
||||||
SELECT DISTINCT state_group FROM events
|
|
||||||
INNER JOIN event_to_state_groups USING(event_id)
|
|
||||||
WHERE events.room_id = ?
|
|
||||||
""",
|
|
||||||
(room_id,),
|
|
||||||
)
|
|
||||||
|
|
||||||
state_groups = [row[0] for row in txn]
|
|
||||||
|
|
||||||
# Get all the auth chains that are referenced by events that are to be
|
# Get all the auth chains that are referenced by events that are to be
|
||||||
# deleted.
|
# deleted.
|
||||||
txn.execute(
|
txn.execute(
|
||||||
|
@ -513,5 +497,3 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
# periodically anyway (https://github.com/matrix-org/synapse/issues/5888)
|
# periodically anyway (https://github.com/matrix-org/synapse/issues/5888)
|
||||||
|
|
||||||
self._invalidate_caches_for_room_and_stream(txn, room_id)
|
self._invalidate_caches_for_room_and_stream(txn, room_id)
|
||||||
|
|
||||||
return state_groups
|
|
||||||
|
|
|
@ -840,60 +840,42 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||||
|
|
||||||
return dict(rows)
|
return dict(rows)
|
||||||
|
|
||||||
async def purge_room_state(
|
async def purge_room_state(self, room_id: str) -> None:
|
||||||
self, room_id: str, state_groups_to_delete: Collection[int]
|
return await self.db_pool.runInteraction(
|
||||||
) -> None:
|
|
||||||
"""Deletes all record of a room from state tables
|
|
||||||
|
|
||||||
Args:
|
|
||||||
room_id:
|
|
||||||
state_groups_to_delete: State groups to delete
|
|
||||||
"""
|
|
||||||
|
|
||||||
logger.info("[purge] Starting state purge")
|
|
||||||
await self.db_pool.runInteraction(
|
|
||||||
"purge_room_state",
|
"purge_room_state",
|
||||||
self._purge_room_state_txn,
|
self._purge_room_state_txn,
|
||||||
room_id,
|
room_id,
|
||||||
state_groups_to_delete,
|
|
||||||
)
|
)
|
||||||
logger.info("[purge] Done with state purge")
|
|
||||||
|
|
||||||
def _purge_room_state_txn(
|
def _purge_room_state_txn(
|
||||||
self,
|
self,
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
state_groups_to_delete: Collection[int],
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# first we have to delete the state groups states
|
# Delete all edges that reference a state group linked to room_id
|
||||||
logger.info("[purge] removing %s from state_groups_state", room_id)
|
|
||||||
|
|
||||||
self.db_pool.simple_delete_many_txn(
|
|
||||||
txn,
|
|
||||||
table="state_groups_state",
|
|
||||||
column="state_group",
|
|
||||||
values=state_groups_to_delete,
|
|
||||||
keyvalues={},
|
|
||||||
)
|
|
||||||
|
|
||||||
# ... and the state group edges
|
|
||||||
logger.info("[purge] removing %s from state_group_edges", room_id)
|
logger.info("[purge] removing %s from state_group_edges", room_id)
|
||||||
|
txn.execute(
|
||||||
self.db_pool.simple_delete_many_txn(
|
"""
|
||||||
txn,
|
DELETE FROM state_group_edges AS sge WHERE sge.state_group IN (
|
||||||
table="state_group_edges",
|
SELECT id FROM state_groups AS sg WHERE sg.room_id = ?
|
||||||
column="state_group",
|
)""",
|
||||||
values=state_groups_to_delete,
|
(room_id,),
|
||||||
keyvalues={},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# ... and the state groups
|
# state_groups_state table has a room_id column but no index on it, unlike state_groups,
|
||||||
logger.info("[purge] removing %s from state_groups", room_id)
|
# so we delete them by matching the room_id through the state_groups table.
|
||||||
|
logger.info("[purge] removing %s from state_groups_state", room_id)
|
||||||
|
txn.execute(
|
||||||
|
"""
|
||||||
|
DELETE FROM state_groups_state AS sgs WHERE sgs.state_group IN (
|
||||||
|
SELECT id FROM state_groups AS sg WHERE sg.room_id = ?
|
||||||
|
)""",
|
||||||
|
(room_id,),
|
||||||
|
)
|
||||||
|
|
||||||
self.db_pool.simple_delete_many_txn(
|
logger.info("[purge] removing %s from state_groups", room_id)
|
||||||
|
self.db_pool.simple_delete_txn(
|
||||||
txn,
|
txn,
|
||||||
table="state_groups",
|
table="state_groups",
|
||||||
column="id",
|
keyvalues={"room_id": room_id},
|
||||||
values=state_groups_to_delete,
|
|
||||||
keyvalues={},
|
|
||||||
)
|
)
|
||||||
|
|
|
@ -3050,7 +3050,7 @@ PURGE_TABLES = [
|
||||||
"pusher_throttle",
|
"pusher_throttle",
|
||||||
"room_account_data",
|
"room_account_data",
|
||||||
"room_tags",
|
"room_tags",
|
||||||
# "state_groups", # Current impl leaves orphaned state groups around.
|
"state_groups",
|
||||||
"state_groups_state",
|
"state_groups_state",
|
||||||
"federation_inbound_events_staging",
|
"federation_inbound_events_staging",
|
||||||
]
|
]
|
||||||
|
|
Loading…
Add table
Reference in a new issue