mirror of
https://github.com/element-hq/synapse.git
synced 2025-03-31 03:45:13 +00:00
Merge 9ba2c7030b
into 3c188231c7
This commit is contained in:
commit
59b65f3a11
6 changed files with 88 additions and 40 deletions
1
changelog.d/18290.feature
Normal file
1
changelog.d/18290.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add support for sending notification counts and thread notification counts in simplified sliding sync mode.
|
|
@ -15,7 +15,17 @@
|
|||
import itertools
|
||||
import logging
|
||||
from itertools import chain
|
||||
from typing import TYPE_CHECKING, AbstractSet, Dict, List, Mapping, Optional, Set, Tuple
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
AbstractSet,
|
||||
Dict,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
Tuple,
|
||||
)
|
||||
|
||||
from prometheus_client import Histogram
|
||||
from typing_extensions import assert_never
|
||||
|
@ -38,6 +48,7 @@ from synapse.logging.opentracing import (
|
|||
tag_args,
|
||||
trace,
|
||||
)
|
||||
from synapse.storage.databases.main.receipts import ReceiptInRoom
|
||||
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
|
||||
from synapse.storage.databases.main.state_deltas import StateDelta
|
||||
from synapse.storage.databases.main.stream import PaginateFunction
|
||||
|
@ -245,11 +256,31 @@ class SlidingSyncHandler:
|
|||
to_token=to_token,
|
||||
)
|
||||
|
||||
# fetch the user's receipts between the two points: these will be factor
|
||||
# in deciding whether to send the room, since it may have changed their
|
||||
# notification counts
|
||||
receipts = await self.store.get_linearized_receipts_for_user_in_rooms(
|
||||
user_id=user_id,
|
||||
room_ids=interested_rooms.relevant_room_map.keys(),
|
||||
from_key=from_token.stream_token.receipt_key if from_token else None,
|
||||
to_key=to_token.receipt_key,
|
||||
)
|
||||
|
||||
# Filtered subset of `relevant_room_map` for rooms that may have updates
|
||||
# (in the event stream)
|
||||
relevant_rooms_to_send_map = self.room_lists.filter_relevant_rooms_to_send(
|
||||
sync_config.user,
|
||||
previous_connection_state,
|
||||
from_token.stream_token if from_token else None,
|
||||
to_token,
|
||||
interested_rooms.relevant_room_map,
|
||||
receipts,
|
||||
)
|
||||
|
||||
lists = interested_rooms.lists
|
||||
relevant_room_map = interested_rooms.relevant_room_map
|
||||
all_rooms = interested_rooms.all_rooms
|
||||
room_membership_for_user_map = interested_rooms.room_membership_for_user_map
|
||||
relevant_rooms_to_send_map = interested_rooms.relevant_rooms_to_send_map
|
||||
|
||||
# Fetch room data
|
||||
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
|
||||
|
@ -272,6 +303,7 @@ class SlidingSyncHandler:
|
|||
to_token=to_token,
|
||||
newly_joined=room_id in interested_rooms.newly_joined_rooms,
|
||||
is_dm=room_id in interested_rooms.dm_room_ids,
|
||||
room_receipts=receipts[room_id] if room_id in receipts else None,
|
||||
)
|
||||
|
||||
# Filter out empty room results during incremental sync
|
||||
|
@ -543,6 +575,7 @@ class SlidingSyncHandler:
|
|||
to_token: StreamToken,
|
||||
newly_joined: bool,
|
||||
is_dm: bool,
|
||||
room_receipts: Optional[Sequence[ReceiptInRoom]],
|
||||
) -> SlidingSyncResult.RoomResult:
|
||||
"""
|
||||
Fetch room data for the sync response.
|
||||
|
@ -560,6 +593,8 @@ class SlidingSyncHandler:
|
|||
to_token: The point in the stream to sync up to.
|
||||
newly_joined: If the user has newly joined the room
|
||||
is_dm: Whether the room is a DM room
|
||||
room_receipts: Any read receipts from the in question in that room between
|
||||
from_token and to_token
|
||||
"""
|
||||
user = sync_config.user
|
||||
|
||||
|
@ -1312,6 +1347,11 @@ class SlidingSyncHandler:
|
|||
|
||||
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
|
||||
|
||||
unread_notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
|
||||
room_id,
|
||||
sync_config.user.to_string(),
|
||||
)
|
||||
|
||||
return SlidingSyncResult.RoomResult(
|
||||
name=room_name,
|
||||
avatar=room_avatar,
|
||||
|
@ -1329,11 +1369,8 @@ class SlidingSyncHandler:
|
|||
bump_stamp=bump_stamp,
|
||||
joined_count=joined_count,
|
||||
invited_count=invited_count,
|
||||
# TODO: These are just dummy values. We could potentially just remove these
|
||||
# since notifications can only really be done correctly on the client anyway
|
||||
# (encrypted rooms).
|
||||
notification_count=0,
|
||||
highlight_count=0,
|
||||
notif_counts=unread_notifs,
|
||||
room_receipts=room_receipts,
|
||||
)
|
||||
|
||||
@trace
|
||||
|
|
|
@ -24,6 +24,7 @@ from typing import (
|
|||
Literal,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
Tuple,
|
||||
Union,
|
||||
|
@ -44,6 +45,7 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
|||
from synapse.events import StrippedStateEvent
|
||||
from synapse.events.utils import parse_stripped_state_event
|
||||
from synapse.logging.opentracing import start_active_span, trace
|
||||
from synapse.storage.databases.main.receipts import ReceiptInRoom
|
||||
from synapse.storage.databases.main.state import (
|
||||
ROOM_UNKNOWN_SENTINEL,
|
||||
Sentinel as StateSentinel,
|
||||
|
@ -102,10 +104,6 @@ class SlidingSyncInterestedRooms:
|
|||
lists: A mapping from list name to the list result for the response
|
||||
relevant_room_map: A map from rooms that match the sync request to
|
||||
their room sync config.
|
||||
relevant_rooms_to_send_map: Subset of `relevant_room_map` that
|
||||
includes the rooms that *may* have relevant updates. Rooms not
|
||||
in this map will definitely not have room updates (though
|
||||
extensions may have updates in these rooms).
|
||||
newly_joined_rooms: The set of rooms that were joined in the token range
|
||||
and the user is still joined to at the end of this range.
|
||||
newly_left_rooms: The set of rooms that we left in the token range
|
||||
|
@ -115,7 +113,6 @@ class SlidingSyncInterestedRooms:
|
|||
|
||||
lists: Mapping[str, SlidingSyncResult.SlidingWindowList]
|
||||
relevant_room_map: Mapping[str, RoomSyncConfig]
|
||||
relevant_rooms_to_send_map: Mapping[str, RoomSyncConfig]
|
||||
all_rooms: Set[str]
|
||||
room_membership_for_user_map: Mapping[str, RoomsForUserType]
|
||||
|
||||
|
@ -128,7 +125,6 @@ class SlidingSyncInterestedRooms:
|
|||
return SlidingSyncInterestedRooms(
|
||||
lists={},
|
||||
relevant_room_map={},
|
||||
relevant_rooms_to_send_map={},
|
||||
all_rooms=set(),
|
||||
room_membership_for_user_map={},
|
||||
newly_joined_rooms=set(),
|
||||
|
@ -547,16 +543,9 @@ class SlidingSyncRoomLists:
|
|||
|
||||
relevant_room_map[room_id] = room_sync_config
|
||||
|
||||
# Filtered subset of `relevant_room_map` for rooms that may have updates
|
||||
# (in the event stream)
|
||||
relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send(
|
||||
previous_connection_state, from_token, relevant_room_map
|
||||
)
|
||||
|
||||
return SlidingSyncInterestedRooms(
|
||||
lists=lists,
|
||||
relevant_room_map=relevant_room_map,
|
||||
relevant_rooms_to_send_map=relevant_rooms_to_send_map,
|
||||
all_rooms=all_rooms,
|
||||
room_membership_for_user_map=room_membership_for_user_map,
|
||||
newly_joined_rooms=newly_joined_room_ids,
|
||||
|
@ -735,16 +724,9 @@ class SlidingSyncRoomLists:
|
|||
|
||||
relevant_room_map[room_id] = room_sync_config
|
||||
|
||||
# Filtered subset of `relevant_room_map` for rooms that may have updates
|
||||
# (in the event stream)
|
||||
relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send(
|
||||
previous_connection_state, from_token, relevant_room_map
|
||||
)
|
||||
|
||||
return SlidingSyncInterestedRooms(
|
||||
lists=lists,
|
||||
relevant_room_map=relevant_room_map,
|
||||
relevant_rooms_to_send_map=relevant_rooms_to_send_map,
|
||||
all_rooms=all_rooms,
|
||||
room_membership_for_user_map=room_membership_for_user_map,
|
||||
newly_joined_rooms=newly_joined_room_ids,
|
||||
|
@ -752,11 +734,14 @@ class SlidingSyncRoomLists:
|
|||
dm_room_ids=dm_room_ids,
|
||||
)
|
||||
|
||||
async def _filter_relevant_rooms_to_send(
|
||||
def filter_relevant_rooms_to_send(
|
||||
self,
|
||||
user_id: UserID,
|
||||
previous_connection_state: PerConnectionState,
|
||||
from_token: Optional[StreamToken],
|
||||
relevant_room_map: Dict[str, RoomSyncConfig],
|
||||
to_token: StreamToken,
|
||||
relevant_room_map: Mapping[str, RoomSyncConfig],
|
||||
receipts: Mapping[str, Sequence[ReceiptInRoom]],
|
||||
) -> Dict[str, RoomSyncConfig]:
|
||||
"""Filters the `relevant_room_map` down to those rooms that may have
|
||||
updates we need to fetch and return."""
|
||||
|
@ -814,6 +799,8 @@ class SlidingSyncRoomLists:
|
|||
)
|
||||
)
|
||||
rooms_should_send.update(rooms_that_have_updates)
|
||||
rooms_should_send.update(receipts.keys())
|
||||
|
||||
relevant_rooms_to_send_map = {
|
||||
room_id: room_sync_config
|
||||
for room_id, room_sync_config in relevant_room_map.items()
|
||||
|
|
|
@ -1066,10 +1066,19 @@ class SlidingSyncRestServlet(RestServlet):
|
|||
serialized_rooms: Dict[str, JsonDict] = {}
|
||||
for room_id, room_result in rooms.items():
|
||||
serialized_rooms[room_id] = {
|
||||
"notification_count": room_result.notification_count,
|
||||
"highlight_count": room_result.highlight_count,
|
||||
"notification_count": room_result.notif_counts.main_timeline.notify_count,
|
||||
"highlight_count": room_result.notif_counts.main_timeline.highlight_count,
|
||||
}
|
||||
|
||||
if len(room_result.notif_counts.threads) > 0:
|
||||
serialized_rooms[room_id]["unread_thread_notifications"] = {
|
||||
thread_id: {
|
||||
"notification_count": counts.notify_count,
|
||||
"highlight_count": counts.highlight_count,
|
||||
}
|
||||
for thread_id, counts in room_result.notif_counts.threads.items()
|
||||
}
|
||||
|
||||
if room_result.bump_stamp is not None:
|
||||
serialized_rooms[room_id]["bump_stamp"] = room_result.bump_stamp
|
||||
|
||||
|
|
|
@ -666,7 +666,11 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
return results
|
||||
|
||||
async def get_linearized_receipts_for_user_in_rooms(
|
||||
self, user_id: str, room_ids: StrCollection, to_key: MultiWriterStreamToken
|
||||
self,
|
||||
user_id: str,
|
||||
room_ids: StrCollection,
|
||||
from_key: Optional[MultiWriterStreamToken] = None,
|
||||
to_key: Optional[MultiWriterStreamToken] = None,
|
||||
) -> Mapping[str, Sequence[ReceiptInRoom]]:
|
||||
"""Fetch all receipts for the user in the given room.
|
||||
|
||||
|
@ -685,11 +689,18 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
sql = f"""
|
||||
SELECT instance_name, stream_id, room_id, receipt_type, user_id, event_id, thread_id, data
|
||||
FROM receipts_linearized
|
||||
WHERE {clause} AND user_id = ? AND stream_id <= ?
|
||||
WHERE {clause} AND user_id = ?
|
||||
"""
|
||||
|
||||
args.append(user_id)
|
||||
args.append(to_key.get_max_stream_pos())
|
||||
|
||||
if from_key is not None:
|
||||
sql += " AND stream_id >= ?"
|
||||
args.append(from_key.get_max_stream_pos())
|
||||
|
||||
if to_key is not None:
|
||||
sql += " AND stream_id <= ?"
|
||||
args.append(to_key.get_max_stream_pos())
|
||||
|
||||
txn.execute(sql, args)
|
||||
|
||||
|
|
|
@ -40,6 +40,8 @@ import attr
|
|||
from synapse._pydantic_compat import Extra
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.events import EventBase
|
||||
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
|
||||
from synapse.storage.databases.main.receipts import ReceiptInRoom
|
||||
from synapse.types import (
|
||||
DeviceListUpdates,
|
||||
JsonDict,
|
||||
|
@ -163,10 +165,10 @@ class SlidingSyncResult:
|
|||
own user ID. (same as sync `v2 m.joined_member_count`)
|
||||
invited_count: The number of users with membership of invite. (same as sync v2
|
||||
`m.invited_member_count`)
|
||||
notification_count: The total number of unread notifications for this room. (same
|
||||
as sync v2)
|
||||
highlight_count: The number of unread notifications for this room with the highlight
|
||||
flag set. (same as sync v2)
|
||||
notif_counts: An object containing the number of unread notifications for both
|
||||
the main thread and any other threads.
|
||||
room_receipts: A sequence of any read receipts from the user in question in
|
||||
the room, used to calculate whether the notif_counts could have changed
|
||||
"""
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
|
@ -197,8 +199,8 @@ class SlidingSyncResult:
|
|||
bump_stamp: Optional[int]
|
||||
joined_count: Optional[int]
|
||||
invited_count: Optional[int]
|
||||
notification_count: int
|
||||
highlight_count: int
|
||||
notif_counts: RoomNotifCounts
|
||||
room_receipts: Sequence[ReceiptInRoom]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return (
|
||||
|
@ -215,6 +217,7 @@ class SlidingSyncResult:
|
|||
or bool(self.required_state)
|
||||
or bool(self.timeline_events)
|
||||
or bool(self.stripped_state)
|
||||
or bool(self.room_receipts)
|
||||
)
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
|
|
Loading…
Add table
Reference in a new issue