mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-14 11:57:44 +00:00
Faster sliding sync sorting
This commit is contained in:
parent
4ca13ce0dd
commit
99cf1e7c1c
6 changed files with 235 additions and 23 deletions
|
@ -34,6 +34,7 @@ from synapse.api.constants import (
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.events.utils import strip_event
|
from synapse.events.utils import strip_event
|
||||||
from synapse.handlers.relations import BundledAggregations
|
from synapse.handlers.relations import BundledAggregations
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
|
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
|
||||||
from synapse.types import (
|
from synapse.types import (
|
||||||
JsonDict,
|
JsonDict,
|
||||||
|
@ -988,6 +989,7 @@ class SlidingSyncHandler:
|
||||||
# Assemble a new sync room map but only with the `filtered_room_id_set`
|
# Assemble a new sync room map but only with the `filtered_room_id_set`
|
||||||
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
|
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
|
||||||
|
|
||||||
|
@trace
|
||||||
async def sort_rooms(
|
async def sort_rooms(
|
||||||
self,
|
self,
|
||||||
sync_room_map: Dict[str, _RoomMembershipForUser],
|
sync_room_map: Dict[str, _RoomMembershipForUser],
|
||||||
|
@ -1009,24 +1011,18 @@ class SlidingSyncHandler:
|
||||||
# Assemble a map of room ID to the `stream_ordering` of the last activity that the
|
# Assemble a map of room ID to the `stream_ordering` of the last activity that the
|
||||||
# user should see in the room (<= `to_token`)
|
# user should see in the room (<= `to_token`)
|
||||||
last_activity_in_room_map: Dict[str, int] = {}
|
last_activity_in_room_map: Dict[str, int] = {}
|
||||||
|
to_fetch = []
|
||||||
for room_id, room_for_user in sync_room_map.items():
|
for room_id, room_for_user in sync_room_map.items():
|
||||||
# If they are fully-joined to the room, let's find the latest activity
|
# If they are fully-joined to the room, let's find the latest activity
|
||||||
# at/before the `to_token`.
|
# at/before the `to_token`.
|
||||||
if room_for_user.membership == Membership.JOIN:
|
if room_for_user.membership == Membership.JOIN:
|
||||||
last_event_result = (
|
stream = self.store._events_stream_cache._entity_to_key.get(room_id)
|
||||||
await self.store.get_last_event_pos_in_room_before_stream_ordering(
|
if stream is not None:
|
||||||
room_id, to_token.room_key
|
if stream <= to_token.room_key.stream:
|
||||||
)
|
last_activity_in_room_map[room_id] = stream
|
||||||
)
|
continue
|
||||||
|
|
||||||
# If the room has no events at/before the `to_token`, this is probably a
|
to_fetch.append(room_id)
|
||||||
# mistake in the code that generates the `sync_room_map` since that should
|
|
||||||
# only give us rooms that the user had membership in during the token range.
|
|
||||||
assert last_event_result is not None
|
|
||||||
|
|
||||||
_, event_pos = last_event_result
|
|
||||||
|
|
||||||
last_activity_in_room_map[room_id] = event_pos.stream
|
|
||||||
else:
|
else:
|
||||||
# Otherwise, if the user has left/been invited/knocked/been banned from
|
# Otherwise, if the user has left/been invited/knocked/been banned from
|
||||||
# a room, they shouldn't see anything past that point.
|
# a room, they shouldn't see anything past that point.
|
||||||
|
@ -1037,6 +1033,20 @@ class SlidingSyncHandler:
|
||||||
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
|
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
|
||||||
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
|
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
|
||||||
|
|
||||||
|
ordering_map = await self.store.get_max_stream_ordering_in_rooms(to_fetch)
|
||||||
|
for room_id, stream_pos in ordering_map.items():
|
||||||
|
if stream_pos is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if stream_pos.persisted_after(to_token.room_key):
|
||||||
|
continue
|
||||||
|
|
||||||
|
last_activity_in_room_map[room_id] = stream_pos.stream
|
||||||
|
|
||||||
|
for room_id in sync_room_map.keys() - last_activity_in_room_map.keys():
|
||||||
|
# TODO: Handle better
|
||||||
|
last_activity_in_room_map[room_id] = sync_room_map[room_id].event_pos.stream
|
||||||
|
|
||||||
return sorted(
|
return sorted(
|
||||||
sync_room_map.values(),
|
sync_room_map.values(),
|
||||||
# Sort by the last activity (stream_ordering) in the room
|
# Sort by the last activity (stream_ordering) in the room
|
||||||
|
@ -1409,18 +1419,29 @@ class SlidingSyncHandler:
|
||||||
break
|
break
|
||||||
|
|
||||||
# Figure out the last bump event in the room
|
# Figure out the last bump event in the room
|
||||||
|
last_bump_event_stream_ordering = None
|
||||||
|
if timeline_events:
|
||||||
|
for e in reversed(timeline_events):
|
||||||
|
if e.type in DEFAULT_BUMP_EVENT_TYPES:
|
||||||
|
last_bump_event_stream_ordering = (
|
||||||
|
e.internal_metadata.stream_ordering
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
if last_bump_event_stream_ordering is None:
|
||||||
last_bump_event_result = (
|
last_bump_event_result = (
|
||||||
await self.store.get_last_event_pos_in_room_before_stream_ordering(
|
await self.store.get_last_event_pos_in_room_before_stream_ordering(
|
||||||
room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
|
room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
if last_bump_event_result is not None:
|
||||||
|
last_bump_event_stream_ordering = last_bump_event_result[1].stream
|
||||||
|
|
||||||
# By default, just choose the membership event position
|
# By default, just choose the membership event position
|
||||||
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
|
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
|
||||||
# But if we found a bump event, use that instead
|
# But if we found a bump event, use that instead
|
||||||
if last_bump_event_result is not None:
|
if last_bump_event_stream_ordering is not None:
|
||||||
_, bump_event_pos = last_bump_event_result
|
bump_stamp = last_bump_event_stream_ordering
|
||||||
bump_stamp = bump_event_pos.stream
|
|
||||||
|
|
||||||
return SlidingSyncResult.RoomResult(
|
return SlidingSyncResult.RoomResult(
|
||||||
name=room_name,
|
name=room_name,
|
||||||
|
|
|
@ -309,6 +309,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
if not backfilled:
|
if not backfilled:
|
||||||
self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined]
|
self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined]
|
||||||
|
self._attempt_to_invalidate_cache(
|
||||||
|
"get_max_stream_ordering_in_room", (room_id,)
|
||||||
|
)
|
||||||
|
|
||||||
if redacts:
|
if redacts:
|
||||||
self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined]
|
self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined]
|
||||||
|
|
|
@ -551,7 +551,7 @@ class PersistEventsStore:
|
||||||
# From this point onwards the events are only events that we haven't
|
# From this point onwards the events are only events that we haven't
|
||||||
# seen before.
|
# seen before.
|
||||||
|
|
||||||
self._store_event_txn(txn, events_and_contexts=events_and_contexts)
|
self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts)
|
||||||
|
|
||||||
if new_forward_extremities:
|
if new_forward_extremities:
|
||||||
self._update_forward_extremities_txn(
|
self._update_forward_extremities_txn(
|
||||||
|
@ -1555,6 +1555,7 @@ class PersistEventsStore:
|
||||||
def _store_event_txn(
|
def _store_event_txn(
|
||||||
self,
|
self,
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
|
room_id: str,
|
||||||
events_and_contexts: Collection[Tuple[EventBase, EventContext]],
|
events_and_contexts: Collection[Tuple[EventBase, EventContext]],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Insert new events into the event, event_json, redaction and
|
"""Insert new events into the event, event_json, redaction and
|
||||||
|
@ -1629,6 +1630,27 @@ class PersistEventsStore:
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Update the `sliding_sync_room_metadata` with the latest
|
||||||
|
# (non-backfilled, ie positive) stream ordering.
|
||||||
|
#
|
||||||
|
# We know this list is sorted and non-empty, so we just take the last
|
||||||
|
# one event.
|
||||||
|
max_stream_ordering: int
|
||||||
|
for e, _ in events_and_contexts:
|
||||||
|
assert e.internal_metadata.stream_ordering is not None
|
||||||
|
max_stream_ordering = e.internal_metadata.stream_ordering
|
||||||
|
|
||||||
|
if max_stream_ordering > 0:
|
||||||
|
self.db_pool.simple_upsert_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_room_metadata",
|
||||||
|
keyvalues={"room_id": room_id},
|
||||||
|
values={
|
||||||
|
"instance_name": self._instance_name,
|
||||||
|
"last_stream_ordering": max_stream_ordering,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
# If we're persisting an unredacted event we go and ensure
|
# If we're persisting an unredacted event we go and ensure
|
||||||
# that we mark any redactions that reference this event as
|
# that we mark any redactions that reference this event as
|
||||||
# requiring censoring.
|
# requiring censoring.
|
||||||
|
|
|
@ -50,6 +50,7 @@ from typing import (
|
||||||
Dict,
|
Dict,
|
||||||
Iterable,
|
Iterable,
|
||||||
List,
|
List,
|
||||||
|
Mapping,
|
||||||
Optional,
|
Optional,
|
||||||
Set,
|
Set,
|
||||||
Tuple,
|
Tuple,
|
||||||
|
@ -78,8 +79,13 @@ from synapse.storage.database import (
|
||||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||||
from synapse.types import PersistedEventPosition, RoomStreamToken
|
from synapse.types import (
|
||||||
from synapse.util.caches.descriptors import cached
|
JsonDict,
|
||||||
|
PersistedEventPosition,
|
||||||
|
RoomStreamToken,
|
||||||
|
StrCollection,
|
||||||
|
)
|
||||||
|
from synapse.util.caches.descriptors import cached, cachedList
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
from synapse.util.cancellation import cancellable
|
from synapse.util.cancellation import cancellable
|
||||||
|
|
||||||
|
@ -610,6 +616,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
||||||
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
|
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
|
||||||
|
|
||||||
|
database.updates.register_background_update_handler(
|
||||||
|
"sliding_sync_room_metadata", self._sliding_sync_room_metadata_bg_update
|
||||||
|
)
|
||||||
|
|
||||||
def get_room_max_stream_ordering(self) -> int:
|
def get_room_max_stream_ordering(self) -> int:
|
||||||
"""Get the stream_ordering of regular events that we have committed up to
|
"""Get the stream_ordering of regular events that we have committed up to
|
||||||
|
|
||||||
|
@ -1185,6 +1195,52 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@cachedList(
|
||||||
|
cached_method_name="get_max_stream_ordering_in_room",
|
||||||
|
list_name="room_ids",
|
||||||
|
)
|
||||||
|
async def get_max_stream_ordering_in_rooms(
|
||||||
|
self, room_ids: StrCollection
|
||||||
|
) -> Mapping[str, Optional[PersistedEventPosition]]:
|
||||||
|
"""Get the positions for the latest event in a room.
|
||||||
|
|
||||||
|
A batched version of `get_max_stream_ordering_in_room`.
|
||||||
|
"""
|
||||||
|
rows = await self.db_pool.simple_select_many_batch(
|
||||||
|
table="sliding_sync_room_metadata",
|
||||||
|
column="room_id",
|
||||||
|
iterable=room_ids,
|
||||||
|
retcols=("room_id", "instance_name", "last_stream_ordering"),
|
||||||
|
desc="get_max_stream_ordering_in_rooms",
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
room_id: PersistedEventPosition(instance_name, stream)
|
||||||
|
for room_id, instance_name, stream in rows
|
||||||
|
}
|
||||||
|
|
||||||
|
@cached(max_entries=10000)
|
||||||
|
async def get_max_stream_ordering_in_room(
|
||||||
|
self,
|
||||||
|
room_id: str,
|
||||||
|
) -> Optional[PersistedEventPosition]:
|
||||||
|
"""Get the position for the latest event in a room.
|
||||||
|
|
||||||
|
Note: this may be after the current token for the room stream on this
|
||||||
|
process (e.g. due to replication lag)
|
||||||
|
"""
|
||||||
|
row = await self.db_pool.simple_select_one(
|
||||||
|
table="sliding_sync_room_metadata",
|
||||||
|
retcols=("instance_name", "last_stream_ordering"),
|
||||||
|
keyvalues={"room_id": room_id},
|
||||||
|
allow_none=True,
|
||||||
|
desc="get_max_stream_ordering_in_room",
|
||||||
|
)
|
||||||
|
if not row:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return PersistedEventPosition(instance_name=row[0], stream=row[1])
|
||||||
|
|
||||||
async def get_last_event_pos_in_room_before_stream_ordering(
|
async def get_last_event_pos_in_room_before_stream_ordering(
|
||||||
self,
|
self,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
|
@ -1983,3 +2039,88 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
return RoomStreamToken(stream=last_position.stream - 1)
|
return RoomStreamToken(stream=last_position.stream - 1)
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
async def _sliding_sync_room_metadata_bg_update(
|
||||||
|
self, progress: JsonDict, batch_size: int
|
||||||
|
) -> int:
|
||||||
|
"""Background update to fill out 'sliding_sync_room_metadata' table"""
|
||||||
|
previous_room = progress.get("previous_room", "")
|
||||||
|
|
||||||
|
def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int:
|
||||||
|
# Both these queries are just getting the most recent
|
||||||
|
# instance_name/stream ordering for the next N rooms.
|
||||||
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
|
sql = """
|
||||||
|
SELECT room_id, instance_name, stream_ordering FROM rooms AS r,
|
||||||
|
LATERAL (
|
||||||
|
SELECT instance_name, stream_ordering
|
||||||
|
FROM events WHERE events.room_id = r.room_id
|
||||||
|
ORDER BY stream_ordering DESC
|
||||||
|
LIMIT 1
|
||||||
|
) e
|
||||||
|
WHERE r.room_id > ?
|
||||||
|
ORDER BY r.room_id ASC
|
||||||
|
LIMIT ?
|
||||||
|
"""
|
||||||
|
else:
|
||||||
|
sql = """
|
||||||
|
SELECT
|
||||||
|
room_id,
|
||||||
|
(
|
||||||
|
SELECT instance_name
|
||||||
|
FROM events WHERE events.room_id = r.room_id
|
||||||
|
ORDER BY stream_ordering DESC
|
||||||
|
LIMIT 1
|
||||||
|
),
|
||||||
|
(
|
||||||
|
SELECT stream_ordering
|
||||||
|
FROM events WHERE events.room_id = r.room_id
|
||||||
|
ORDER BY stream_ordering DESC
|
||||||
|
LIMIT 1
|
||||||
|
)
|
||||||
|
FROM rooms AS r
|
||||||
|
WHERE r.room_id > ?
|
||||||
|
ORDER BY r.room_id ASC
|
||||||
|
LIMIT ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.execute(sql, (previous_room, batch_size))
|
||||||
|
rows = txn.fetchall()
|
||||||
|
if not rows:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
self.db_pool.simple_upsert_many_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_room_metadata",
|
||||||
|
key_names=("room_id",),
|
||||||
|
key_values=[(room_id,) for room_id, _, _ in rows],
|
||||||
|
value_names=(
|
||||||
|
"instance_name",
|
||||||
|
"last_stream_ordering",
|
||||||
|
),
|
||||||
|
value_values=[
|
||||||
|
(
|
||||||
|
instance_name or "master",
|
||||||
|
stream,
|
||||||
|
)
|
||||||
|
for _, instance_name, stream in rows
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
self.db_pool.updates._background_update_progress_txn(
|
||||||
|
txn, "sliding_sync_room_metadata", {"previous_room": rows[-1][0]}
|
||||||
|
)
|
||||||
|
|
||||||
|
return len(rows)
|
||||||
|
|
||||||
|
rows = await self.db_pool.runInteraction(
|
||||||
|
"_sliding_sync_room_metadata_bg_update",
|
||||||
|
_sliding_sync_room_metadata_bg_update_txn,
|
||||||
|
)
|
||||||
|
|
||||||
|
if rows == 0:
|
||||||
|
await self.db_pool.updates._end_background_update(
|
||||||
|
"sliding_sync_room_metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
return rows
|
||||||
|
|
24
synapse/storage/schema/main/delta/85/07_sliding_sync.sql
Normal file
24
synapse/storage/schema/main/delta/85/07_sliding_sync.sql
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
--
|
||||||
|
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
--
|
||||||
|
-- Copyright (C) 2024 New Vector, Ltd
|
||||||
|
--
|
||||||
|
-- This program is free software: you can redistribute it and/or modify
|
||||||
|
-- it under the terms of the GNU Affero General Public License as
|
||||||
|
-- published by the Free Software Foundation, either version 3 of the
|
||||||
|
-- License, or (at your option) any later version.
|
||||||
|
--
|
||||||
|
-- See the GNU Affero General Public License for more details:
|
||||||
|
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
|
||||||
|
-- A table that maps from room ID to metadata useful for sliding sync.
|
||||||
|
CREATE TABLE sliding_sync_room_metadata (
|
||||||
|
room_id TEXT NOT NULL PRIMARY KEY,
|
||||||
|
|
||||||
|
-- The instance_name / stream ordering of the last event in the room.
|
||||||
|
instance_name TEXT NOT NULL,
|
||||||
|
last_stream_ordering BIGINT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||||
|
(8507, 'sliding_sync_room_metadata', '{}');
|
|
@ -440,6 +440,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
|
||||||
assert persist_events_store is not None
|
assert persist_events_store is not None
|
||||||
persist_events_store._store_event_txn(
|
persist_events_store._store_event_txn(
|
||||||
txn,
|
txn,
|
||||||
|
events[0].room_id,
|
||||||
[
|
[
|
||||||
(e, EventContext(self.hs.get_storage_controllers(), {}))
|
(e, EventContext(self.hs.get_storage_controllers(), {}))
|
||||||
for e in events
|
for e in events
|
||||||
|
|
Loading…
Reference in a new issue