From d39dc3ef27dfae48a26700eecaf7521939ded874 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 28 Mar 2025 12:22:05 +0000 Subject: [PATCH 1/4] Add support for sending notification counts in simplified sliding sync --- synapse/handlers/sliding_sync/__init__.py | 51 ++++++++++++++++++--- synapse/handlers/sliding_sync/room_lists.py | 24 ++++------ synapse/rest/client/sync.py | 13 +++++- synapse/storage/databases/main/receipts.py | 17 +++++-- synapse/types/handlers/sliding_sync.py | 15 +++--- 5 files changed, 86 insertions(+), 34 deletions(-) diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 459d3c3e24..32ece82079 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -15,7 +15,17 @@ import itertools import logging from itertools import chain -from typing import TYPE_CHECKING, AbstractSet, Dict, List, Mapping, Optional, Set, Tuple +from typing import ( + TYPE_CHECKING, + AbstractSet, + Dict, + List, + Mapping, + Optional, + Sequence, + Set, + Tuple, +) from prometheus_client import Histogram from typing_extensions import assert_never @@ -38,6 +48,7 @@ from synapse.logging.opentracing import ( tag_args, trace, ) +from synapse.storage.databases.main.receipts import ReceiptInRoom from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary from synapse.storage.databases.main.state_deltas import StateDelta from synapse.storage.databases.main.stream import PaginateFunction @@ -245,11 +256,31 @@ class SlidingSyncHandler: to_token=to_token, ) + # fetch the user's receipts between the two points: these will be factor + # in deciding whether to send the room, since it may have changed their + # notification counts + receipts = await self.store.get_linearized_receipts_for_user_in_rooms( + user_id=user_id, + room_ids=interested_rooms.relevant_room_map.keys(), + from_key=from_token.stream_token.receipt_key if from_token else None, + to_key=to_token.receipt_key, + ) + + # Filtered subset of `relevant_room_map` for rooms that may have updates + # (in the event stream) + relevant_rooms_to_send_map = self.room_lists.filter_relevant_rooms_to_send( + sync_config.user, + previous_connection_state, + from_token.stream_token if from_token else None, + to_token, + interested_rooms.relevant_room_map, + receipts, + ) + lists = interested_rooms.lists relevant_room_map = interested_rooms.relevant_room_map all_rooms = interested_rooms.all_rooms room_membership_for_user_map = interested_rooms.room_membership_for_user_map - relevant_rooms_to_send_map = interested_rooms.relevant_rooms_to_send_map # Fetch room data rooms: Dict[str, SlidingSyncResult.RoomResult] = {} @@ -272,6 +303,7 @@ class SlidingSyncHandler: to_token=to_token, newly_joined=room_id in interested_rooms.newly_joined_rooms, is_dm=room_id in interested_rooms.dm_room_ids, + receipts=receipts, ) # Filter out empty room results during incremental sync @@ -543,6 +575,7 @@ class SlidingSyncHandler: to_token: StreamToken, newly_joined: bool, is_dm: bool, + receipts: Sequence[ReceiptInRoom], ) -> SlidingSyncResult.RoomResult: """ Fetch room data for the sync response. @@ -560,6 +593,8 @@ class SlidingSyncHandler: to_token: The point in the stream to sync up to. newly_joined: If the user has newly joined the room is_dm: Whether the room is a DM room + room_receipts: Any read receipts from the in question in that room between + from_token and to_token """ user = sync_config.user @@ -1312,6 +1347,11 @@ class SlidingSyncHandler: set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) + unread_notifs = await self.store.get_unread_event_push_actions_by_room_for_user( + room_id, + sync_config.user.to_string(), + ) + return SlidingSyncResult.RoomResult( name=room_name, avatar=room_avatar, @@ -1329,11 +1369,8 @@ class SlidingSyncHandler: bump_stamp=bump_stamp, joined_count=joined_count, invited_count=invited_count, - # TODO: These are just dummy values. We could potentially just remove these - # since notifications can only really be done correctly on the client anyway - # (encrypted rooms). - notification_count=0, - highlight_count=0, + notif_counts=unread_notifs, + room_receipts=receipts[room_id] if room_id in receipts else None, ) @trace diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index a1730b7e05..d83f84109f 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -24,6 +24,7 @@ from typing import ( Literal, Mapping, Optional, + Sequence, Set, Tuple, Union, @@ -44,6 +45,7 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import StrippedStateEvent from synapse.events.utils import parse_stripped_state_event from synapse.logging.opentracing import start_active_span, trace +from synapse.storage.databases.main.receipts import ReceiptInRoom from synapse.storage.databases.main.state import ( ROOM_UNKNOWN_SENTINEL, Sentinel as StateSentinel, @@ -115,7 +117,6 @@ class SlidingSyncInterestedRooms: lists: Mapping[str, SlidingSyncResult.SlidingWindowList] relevant_room_map: Mapping[str, RoomSyncConfig] - relevant_rooms_to_send_map: Mapping[str, RoomSyncConfig] all_rooms: Set[str] room_membership_for_user_map: Mapping[str, RoomsForUserType] @@ -547,16 +548,9 @@ class SlidingSyncRoomLists: relevant_room_map[room_id] = room_sync_config - # Filtered subset of `relevant_room_map` for rooms that may have updates - # (in the event stream) - relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send( - previous_connection_state, from_token, relevant_room_map - ) - return SlidingSyncInterestedRooms( lists=lists, relevant_room_map=relevant_room_map, - relevant_rooms_to_send_map=relevant_rooms_to_send_map, all_rooms=all_rooms, room_membership_for_user_map=room_membership_for_user_map, newly_joined_rooms=newly_joined_room_ids, @@ -735,16 +729,9 @@ class SlidingSyncRoomLists: relevant_room_map[room_id] = room_sync_config - # Filtered subset of `relevant_room_map` for rooms that may have updates - # (in the event stream) - relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send( - previous_connection_state, from_token, relevant_room_map - ) - return SlidingSyncInterestedRooms( lists=lists, relevant_room_map=relevant_room_map, - relevant_rooms_to_send_map=relevant_rooms_to_send_map, all_rooms=all_rooms, room_membership_for_user_map=room_membership_for_user_map, newly_joined_rooms=newly_joined_room_ids, @@ -752,11 +739,14 @@ class SlidingSyncRoomLists: dm_room_ids=dm_room_ids, ) - async def _filter_relevant_rooms_to_send( + def filter_relevant_rooms_to_send( self, + user_id: UserID, previous_connection_state: PerConnectionState, from_token: Optional[StreamToken], + to_token: StreamToken, relevant_room_map: Dict[str, RoomSyncConfig], + receipts: Mapping[str, Sequence[ReceiptInRoom]], ) -> Dict[str, RoomSyncConfig]: """Filters the `relevant_room_map` down to those rooms that may have updates we need to fetch and return.""" @@ -814,6 +804,8 @@ class SlidingSyncRoomLists: ) ) rooms_should_send.update(rooms_that_have_updates) + rooms_should_send.update(receipts.keys()) + relevant_rooms_to_send_map = { room_id: room_sync_config for room_id, room_sync_config in relevant_room_map.items() diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 4fb9c0c8e7..a5e6903e52 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -1066,10 +1066,19 @@ class SlidingSyncRestServlet(RestServlet): serialized_rooms: Dict[str, JsonDict] = {} for room_id, room_result in rooms.items(): serialized_rooms[room_id] = { - "notification_count": room_result.notification_count, - "highlight_count": room_result.highlight_count, + "notification_count": room_result.notif_counts.main_timeline.notify_count, + "highlight_count": room_result.notif_counts.main_timeline.highlight_count, } + if len(room_result.notif_counts.threads) > 0: + serialized_rooms[room_id]["unread_thread_notifications"] = { + thread_id: { + "notification_count": counts.notify_count, + "highlight_count": counts.highlight_count, + } + for thread_id, counts in room_result.notif_counts.threads.items() + } + if room_result.bump_stamp is not None: serialized_rooms[room_id]["bump_stamp"] = room_result.bump_stamp diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 9964331510..2a09617058 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -666,7 +666,11 @@ class ReceiptsWorkerStore(SQLBaseStore): return results async def get_linearized_receipts_for_user_in_rooms( - self, user_id: str, room_ids: StrCollection, to_key: MultiWriterStreamToken + self, + user_id: str, + room_ids: StrCollection, + from_key: Optional[MultiWriterStreamToken] = None, + to_key: Optional[MultiWriterStreamToken] = None, ) -> Mapping[str, Sequence[ReceiptInRoom]]: """Fetch all receipts for the user in the given room. @@ -685,11 +689,18 @@ class ReceiptsWorkerStore(SQLBaseStore): sql = f""" SELECT instance_name, stream_id, room_id, receipt_type, user_id, event_id, thread_id, data FROM receipts_linearized - WHERE {clause} AND user_id = ? AND stream_id <= ? + WHERE {clause} AND user_id = ? """ args.append(user_id) - args.append(to_key.get_max_stream_pos()) + + if from_key is not None: + sql += " AND stream_id >= ?" + args.append(from_key.get_max_stream_pos()) + + if to_key is not None: + sql += " AND stream_id <= ?" + args.append(to_key.get_max_stream_pos()) txn.execute(sql, args) diff --git a/synapse/types/handlers/sliding_sync.py b/synapse/types/handlers/sliding_sync.py index 3ebd334a6d..26ca1639f3 100644 --- a/synapse/types/handlers/sliding_sync.py +++ b/synapse/types/handlers/sliding_sync.py @@ -40,6 +40,8 @@ import attr from synapse._pydantic_compat import Extra from synapse.api.constants import EventTypes from synapse.events import EventBase +from synapse.storage.databases.main.event_push_actions import RoomNotifCounts +from synapse.storage.databases.main.receipts import ReceiptInRoom from synapse.types import ( DeviceListUpdates, JsonDict, @@ -163,10 +165,10 @@ class SlidingSyncResult: own user ID. (same as sync `v2 m.joined_member_count`) invited_count: The number of users with membership of invite. (same as sync v2 `m.invited_member_count`) - notification_count: The total number of unread notifications for this room. (same - as sync v2) - highlight_count: The number of unread notifications for this room with the highlight - flag set. (same as sync v2) + notif_counts: An object containing the number of unread notifications for both + the main thread and any other threads. + room_receipts: A sequence of any read receipts from the user in question in + the room, used to calculate whether the notif_counts could have changed """ @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -197,8 +199,8 @@ class SlidingSyncResult: bump_stamp: Optional[int] joined_count: Optional[int] invited_count: Optional[int] - notification_count: int - highlight_count: int + notif_counts: RoomNotifCounts + room_receipts: Sequence[ReceiptInRoom] def __bool__(self) -> bool: return ( @@ -215,6 +217,7 @@ class SlidingSyncResult: or bool(self.required_state) or bool(self.timeline_events) or bool(self.stripped_state) + or bool(self.room_receipts) ) @attr.s(slots=True, frozen=True, auto_attribs=True) From cb9d25ffedcbad93a3de863f1bc22b77cb824ecc Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 28 Mar 2025 12:36:05 +0000 Subject: [PATCH 2/4] Here is the news at six o'clock --- changelog.d/18290.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/18290.feature diff --git a/changelog.d/18290.feature b/changelog.d/18290.feature new file mode 100644 index 0000000000..426f6ab31a --- /dev/null +++ b/changelog.d/18290.feature @@ -0,0 +1 @@ +Add support for sending notification counts and thread notification counts in simplified sliding sync mode. From 4ea8507bbdc078f2a7c3fb5b0f4c73e764b95ac9 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 28 Mar 2025 15:13:35 +0000 Subject: [PATCH 3/4] Fix types --- synapse/handlers/sliding_sync/__init__.py | 6 +++--- synapse/handlers/sliding_sync/room_lists.py | 5 ----- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 32ece82079..295d3486c4 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -303,7 +303,7 @@ class SlidingSyncHandler: to_token=to_token, newly_joined=room_id in interested_rooms.newly_joined_rooms, is_dm=room_id in interested_rooms.dm_room_ids, - receipts=receipts, + room_receipts=receipts[room_id] if room_id in receipts else None, ) # Filter out empty room results during incremental sync @@ -575,7 +575,7 @@ class SlidingSyncHandler: to_token: StreamToken, newly_joined: bool, is_dm: bool, - receipts: Sequence[ReceiptInRoom], + room_receipts: Sequence[ReceiptInRoom], ) -> SlidingSyncResult.RoomResult: """ Fetch room data for the sync response. @@ -1370,7 +1370,7 @@ class SlidingSyncHandler: joined_count=joined_count, invited_count=invited_count, notif_counts=unread_notifs, - room_receipts=receipts[room_id] if room_id in receipts else None, + room_receipts=room_receipts, ) @trace diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index d83f84109f..b465e4ab44 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -104,10 +104,6 @@ class SlidingSyncInterestedRooms: lists: A mapping from list name to the list result for the response relevant_room_map: A map from rooms that match the sync request to their room sync config. - relevant_rooms_to_send_map: Subset of `relevant_room_map` that - includes the rooms that *may* have relevant updates. Rooms not - in this map will definitely not have room updates (though - extensions may have updates in these rooms). newly_joined_rooms: The set of rooms that were joined in the token range and the user is still joined to at the end of this range. newly_left_rooms: The set of rooms that we left in the token range @@ -129,7 +125,6 @@ class SlidingSyncInterestedRooms: return SlidingSyncInterestedRooms( lists={}, relevant_room_map={}, - relevant_rooms_to_send_map={}, all_rooms=set(), room_membership_for_user_map={}, newly_joined_rooms=set(), From 9ba2c7030b0d094fd46f37dcfe1c113235e09ee7 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 28 Mar 2025 15:50:47 +0000 Subject: [PATCH 4/4] More types --- synapse/handlers/sliding_sync/__init__.py | 2 +- synapse/handlers/sliding_sync/room_lists.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 295d3486c4..6b77ba90f2 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -575,7 +575,7 @@ class SlidingSyncHandler: to_token: StreamToken, newly_joined: bool, is_dm: bool, - room_receipts: Sequence[ReceiptInRoom], + room_receipts: Optional[Sequence[ReceiptInRoom]], ) -> SlidingSyncResult.RoomResult: """ Fetch room data for the sync response. diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index b465e4ab44..49c02ab56d 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -740,7 +740,7 @@ class SlidingSyncRoomLists: previous_connection_state: PerConnectionState, from_token: Optional[StreamToken], to_token: StreamToken, - relevant_room_map: Dict[str, RoomSyncConfig], + relevant_room_map: Mapping[str, RoomSyncConfig], receipts: Mapping[str, Sequence[ReceiptInRoom]], ) -> Dict[str, RoomSyncConfig]: """Filters the `relevant_room_map` down to those rooms that may have