1
0
Fork 0
mirror of https://github.com/element-hq/synapse.git synced 2025-03-31 03:45:13 +00:00

Adapt to using sliding_sync_joined_rooms_to_recalculate table

This commit is contained in:
Eric Eastwood 2024-08-28 00:42:14 -05:00
parent 53b7309f6c
commit 8468401a97
6 changed files with 165 additions and 88 deletions

View file

@ -1536,8 +1536,8 @@ class DatabasePool:
self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False)
@staticmethod
def simple_upsert_many_txn_native_upsert(
self,
txn: LoggingTransaction,
table: str,
key_names: Collection[str],

View file

@ -20,7 +20,6 @@
#
import logging
from collections import OrderedDict
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, cast
import attr
@ -322,7 +321,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
self.db_pool.updates.register_background_index_update(
_BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
index_name="sliding_sync_joined_rooms_to_recalculate_room_id_idx",
table="sliding_sync_joined_rooms",
table="sliding_sync_joined_rooms_to_recalculate",
columns=["room_id"],
unique=True,
)
@ -1575,21 +1574,43 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
return batch_size
async def _sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update(
self, _progress: JsonDict, _batch_size: int
self, progress: JsonDict, _batch_size: int
) -> int:
"""
Prefill `sliding_sync_joined_rooms_to_recalculate` table with all rooms we know about already.
"""
initial_insert = progress.get("initial_insert", False)
def _txn(txn: LoggingTransaction) -> None:
# We do this as one big bulk insert. This has been tested on a bigger
# homeserver with ~10M rooms and took 11s. There is potential for this to
# starve disk usage while this goes on.
txn.execute(
"""
INSERT INTO sliding_sync_joined_rooms_to_recalculate (room_id) SELECT room_id FROM rooms;
""",
)
if initial_insert:
txn.execute(
"""
INSERT INTO sliding_sync_joined_rooms_to_recalculate
(room_id)
SELECT room_id FROM rooms;
""",
)
else:
# We can only upsert once the unique index has been added to the table
# (see
# `_BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE`)
#
# We upsert in case we have to run this multiple times.
#
# The `WHERE TRUE` clause is to avoid "Parsing Ambiguity"
txn.execute(
"""
INSERT INTO sliding_sync_joined_rooms_to_recalculate
(room_id)
SELECT room_id FROM rooms WHERE ?
ON CONFLICT (room_id)
DO NOTHING;
""",
(True,),
)
await self.db_pool.runInteraction(
"_sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update",
@ -1608,11 +1629,10 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
"""
Background update to populate the `sliding_sync_joined_rooms` table.
"""
last_event_stream_ordering = progress.get(
"last_event_stream_ordering", -(1 << 31)
)
# We don't need to fetch any progress state because we just grab the next N
# events in `sliding_sync_joined_rooms_to_recalculate`
def _get_rooms_to_update_txn(txn: LoggingTransaction) -> List[Tuple[str, int]]:
def _get_rooms_to_update_txn(txn: LoggingTransaction) -> List[Tuple[str]]:
"""
Returns:
A list of room ID's to update along with the progress value
@ -1625,30 +1645,16 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# server is still participating in the room because if we're
# `no_longer_in_room`, this table would be cleared out for the given
# `room_id`.
#
# Because we're using `event_stream_ordering` as the progress marker, we're
# going to be pulling out the same rooms over and over again but we can
# at-least re-use this background update for the catch-up background
# process as well (see `_resolve_stale_data_in_sliding_sync_tables()`).
#
# It's important to sort by `event_stream_ordering` *ascending* (oldest to
# newest) so that if we see that this background update in progress and want
# to start the catch-up process, we can safely assume that it will
# eventually get to the rooms we want to catch-up on anyway (see
# `_resolve_stale_data_in_sliding_sync_tables()`).
txn.execute(
"""
SELECT room_id, MAX(event_stream_ordering)
FROM current_state_events
WHERE event_stream_ordering > ?
GROUP BY room_id
ORDER BY MAX(event_stream_ordering) ASC
SELECT room_id
FROM sliding_sync_joined_rooms_to_recalculate
LIMIT ?
""",
(last_event_stream_ordering, batch_size),
(batch_size,),
)
rooms_to_update_rows = cast(List[Tuple[str, int]], txn.fetchall())
rooms_to_update_rows = cast(List[Tuple[str]], txn.fetchall())
return rooms_to_update_rows
@ -1669,28 +1675,23 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
joined_room_stream_ordering_updates: Dict[
str, _JoinedRoomStreamOrderingUpdate
] = {}
# Map from room_id to the progress value (event_stream_ordering)
#
# This needs to be an `OrderedDict` because we need to process things in
# `event_stream_ordering` order *ascending* to save our progress position
# correctly if we need to exit early.
room_id_to_progress_marker_map: OrderedDict[str, int] = OrderedDict()
# As long as we get this value before we fetch the current state, we can use it
# to check if something has changed since that point.
most_recent_current_state_delta_stream_id = (
await self.get_max_stream_id_in_current_state_deltas()
)
for room_id, progress_event_stream_ordering in rooms_to_update:
room_id_to_progress_marker_map[room_id] = progress_event_stream_ordering
for (room_id,) in rooms_to_update:
current_state_ids_map = await self.db_pool.runInteraction(
"_sliding_sync_joined_rooms_bg_update._get_relevant_sliding_sync_current_state_event_ids_txn",
PersistEventsStore._get_relevant_sliding_sync_current_state_event_ids_txn,
room_id,
)
# We're iterating over rooms pulled from the current_state_events table
# so we should have some current state for each room
assert current_state_ids_map
# If we're not joined to the room a) it doesn't belong in the
# `sliding_sync_joined_rooms` table so we should skip and b) we won't have
# any `current_state_events` for the room.
if not current_state_ids_map:
continue
fetched_events = await self.get_events(current_state_ids_map.values())
@ -1701,9 +1702,9 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
if event_id in fetched_events
}
# Can happen for unknown room versions (old room versions that aren't known
# anymore) since `get_events(...)` will filter out events for unknown room
# versions
# Even if we are joined to the room, this can happen for unknown room
# versions (old room versions that aren't known anymore) since
# `get_events(...)` will filter out events for unknown room versions
if not current_state_map:
continue
@ -1754,23 +1755,17 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
def _fill_table_txn(txn: LoggingTransaction) -> None:
# Handle updating the `sliding_sync_joined_rooms` table
#
last_successful_room_id: Optional[str] = None
# Process the rooms in `event_stream_ordering` order *ascending* so we can
# save our position correctly if we need to exit early.
# `progress_event_stream_ordering` is an `OrderedDict` which remembers
# insertion order (and we inserted in the correct order) so this should be
# the correct thing to do.
for (
room_id,
progress_event_stream_ordering,
) in room_id_to_progress_marker_map.items():
update_map = joined_room_updates[room_id]
joined_room_update = joined_room_stream_ordering_updates[room_id]
event_stream_ordering = (
joined_room_update.most_recent_event_stream_ordering
update_map,
) in joined_room_updates.items():
joined_room_stream_ordering_update = (
joined_room_stream_ordering_updates[room_id]
)
bump_stamp = joined_room_update.most_recent_bump_stamp
event_stream_ordering = (
joined_room_stream_ordering_update.most_recent_event_stream_ordering
)
bump_stamp = joined_room_stream_ordering_update.most_recent_bump_stamp
# Check if the current state has been updated since we gathered it
state_deltas_since_we_gathered_current_state = (
@ -1790,15 +1785,6 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
state_delta.event_type,
state_delta.state_key,
) in SLIDING_SYNC_RELEVANT_STATE_SET:
# Save our progress before we exit early
if last_successful_room_id is not None:
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
{
"last_event_stream_ordering": progress_event_stream_ordering
},
)
# Raising exception so we can just exit and try again. It would
# be hard to resolve this within the transaction because we need
# to get full events out that take redactions into account. We
@ -1829,20 +1815,17 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
},
)
# Keep track of the last successful room_id
last_successful_room_id = room_id
# Now that we've processed the room, we can remove it from the queue
self.db_pool.simple_delete_txn(
txn,
table="sliding_sync_joined_rooms_to_recalculate",
keyvalues={"room_id": room_id},
)
await self.db_pool.runInteraction(
"sliding_sync_joined_rooms_bg_update", _fill_table_txn
)
# Update the progress
_ = room_id_to_progress_marker_map.values()
await self.db_pool.updates._background_update_progress(
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
{"last_event_stream_ordering": rooms_to_update[-1][1]},
)
return len(rooms_to_update)
async def _sliding_sync_membership_snapshots_bg_update(

View file

@ -642,6 +642,7 @@ def _resolve_stale_data_in_sliding_sync_joined_rooms_table(
# nothing to clean up
row = cast(Optional[Tuple[int]], txn.fetchone())
max_stream_ordering_sliding_sync_joined_rooms_table = None
depends_on = None
if row is not None:
(max_stream_ordering_sliding_sync_joined_rooms_table,) = row
@ -668,6 +669,7 @@ def _resolve_stale_data_in_sliding_sync_joined_rooms_table(
for chunk in batch_iter(room_rows, 1000):
# Handle updating the `sliding_sync_joined_rooms` table
#
# Clear out the stale data
DatabasePool.simple_delete_many_batch_txn(
txn,
table="sliding_sync_joined_rooms",
@ -675,6 +677,44 @@ def _resolve_stale_data_in_sliding_sync_joined_rooms_table(
values=chunk,
)
# Update the `sliding_sync_joined_rooms_to_recalculate` table with the rooms
# that went stale and now need to be recalculated.
#
# FIXME: There is potentially a race where the unique index (added via
# `_BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE`)
# hasn't been added at this point so we won't be able to upsert
DatabasePool.simple_upsert_many_txn_native_upsert(
txn,
table="sliding_sync_joined_rooms_to_recalculate",
key_names=("room_id",),
key_values=chunk,
value_names=(),
# No value columns, therefore make a blank list so that the following
# zip() works correctly.
value_values=[() for x in range(len(chunk))],
)
else:
# Re-run the `sliding_sync_joined_rooms_to_recalculate` prefill if there is
# nothing in the `sliding_sync_joined_rooms` table
DatabasePool.simple_upsert_txn_native_upsert(
txn,
table="background_updates",
keyvalues={
"update_name": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE
},
values={},
# Only insert the row if it doesn't already exist. If it already exists,
# we're already working on it
insertion_values={
"progress_json": "{}",
# Since we're going to upsert, we need to make sure the unique index is in place
"depends_on": _BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
},
)
depends_on = (
_BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE
)
# Now kick-off the background update to catch-up with what we missed while Synapse
# was downgraded.
#
@ -682,13 +722,6 @@ def _resolve_stale_data_in_sliding_sync_joined_rooms_table(
# `sliding_sync_joined_rooms` table yet. This could happen if someone had zero rooms
# on their server (so the normal background update completes), downgrade Synapse
# versions, join and create some new rooms, and upgrade again.
#
progress_json: JsonDict = {}
if max_stream_ordering_sliding_sync_joined_rooms_table is not None:
progress_json["last_event_stream_ordering"] = (
max_stream_ordering_sliding_sync_joined_rooms_table
)
DatabasePool.simple_upsert_txn_native_upsert(
txn,
table="background_updates",
@ -699,7 +732,10 @@ def _resolve_stale_data_in_sliding_sync_joined_rooms_table(
# Only insert the row if it doesn't already exist. If it already exists, we will
# eventually fill in the rows we're trying to populate.
insertion_values={
"progress_json": json_encoder.encode(progress_json),
# Empty progress is expected since it's not used for this background update.
"progress_json": "{}",
# Wait for the prefill to finish
"depends_on": depends_on,
},
)

View file

@ -150,7 +150,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_event_stream
-- 3. Add a background update to populate the new `sliding_sync_joined_rooms` table
--
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8701, 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update', '{}');
(8701, 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update', '{ "initial_insert": true }');
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(8701, 'sliding_sync_index_joined_rooms_to_recalculate_table_bg_update', '{}', 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update');
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES

View file

@ -112,6 +112,24 @@ class UpdateUpsertManyTests(unittest.HomeserverTestCase):
{(1, "user1", "hello"), (2, "user2", "bleb")},
)
self.get_success(
self.storage.db_pool.runInteraction(
"test",
self.storage.db_pool.simple_upsert_many_txn,
self.table_name,
key_names=key_names,
key_values=[[2, "user2"]],
value_names=[],
value_values=[],
)
)
# Check results are what we expect
self.assertEqual(
set(self._dump_table_to_tuple()),
{(1, "user1", "hello"), (2, "user2", "bleb")},
)
def test_simple_update_many(self) -> None:
"""
simple_update_many performs many updates at once.

View file

@ -2659,13 +2659,33 @@ class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
exact=True,
)
# Insert and run the background update.
# Insert and run the background updates.
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
"progress_json": "{}",
},
)
)
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
"progress_json": "{}",
"depends_on": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
},
)
)
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
"progress_json": "{}",
"depends_on": _BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
},
)
)
@ -2807,12 +2827,32 @@ class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
)
# Insert and run the background update.
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
"progress_json": "{}",
},
)
)
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
"progress_json": "{}",
"depends_on": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
},
)
)
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
"progress_json": "{}",
"depends_on": _BackgroundUpdates.SLIDING_SYNC_INDEX_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
},
)
)