mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-15 17:51:10 +00:00
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
This commit is contained in:
commit
65f5373035
9 changed files with 818 additions and 75 deletions
1
changelog.d/17695.bugfix
Normal file
1
changelog.d/17695.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix bug where room account data would not correctly be sent down sliding sync for old rooms.
|
1
changelog.d/17734.misc
Normal file
1
changelog.d/17734.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Minor speed up of initial sliding sync requests.
|
|
@ -19,7 +19,6 @@ from typing import (
|
||||||
AbstractSet,
|
AbstractSet,
|
||||||
ChainMap,
|
ChainMap,
|
||||||
Dict,
|
Dict,
|
||||||
List,
|
|
||||||
Mapping,
|
Mapping,
|
||||||
MutableMapping,
|
MutableMapping,
|
||||||
Optional,
|
Optional,
|
||||||
|
@ -119,6 +118,8 @@ class SlidingSyncExtensionHandler:
|
||||||
if sync_config.extensions.account_data is not None:
|
if sync_config.extensions.account_data is not None:
|
||||||
account_data_response = await self.get_account_data_extension_response(
|
account_data_response = await self.get_account_data_extension_response(
|
||||||
sync_config=sync_config,
|
sync_config=sync_config,
|
||||||
|
previous_connection_state=previous_connection_state,
|
||||||
|
new_connection_state=new_connection_state,
|
||||||
actual_lists=actual_lists,
|
actual_lists=actual_lists,
|
||||||
actual_room_ids=actual_room_ids,
|
actual_room_ids=actual_room_ids,
|
||||||
account_data_request=sync_config.extensions.account_data,
|
account_data_request=sync_config.extensions.account_data,
|
||||||
|
@ -361,6 +362,8 @@ class SlidingSyncExtensionHandler:
|
||||||
async def get_account_data_extension_response(
|
async def get_account_data_extension_response(
|
||||||
self,
|
self,
|
||||||
sync_config: SlidingSyncConfig,
|
sync_config: SlidingSyncConfig,
|
||||||
|
previous_connection_state: "PerConnectionState",
|
||||||
|
new_connection_state: "MutablePerConnectionState",
|
||||||
actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList],
|
actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList],
|
||||||
actual_room_ids: Set[str],
|
actual_room_ids: Set[str],
|
||||||
account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
|
account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
|
||||||
|
@ -425,15 +428,7 @@ class SlidingSyncExtensionHandler:
|
||||||
|
|
||||||
# Fetch room account data
|
# Fetch room account data
|
||||||
#
|
#
|
||||||
# List of -> Mapping from room_id to mapping of `type` to `content` of room
|
account_data_by_room_map: MutableMapping[str, Mapping[str, JsonMapping]] = {}
|
||||||
# account data events.
|
|
||||||
#
|
|
||||||
# This is is a list so we can avoid making copies of immutable data and instead
|
|
||||||
# just provide multiple maps that need to be combined. Normally, we could
|
|
||||||
# reach for `ChainMap` in this scenario, but this is a nested map and accessing
|
|
||||||
# the ChainMap by room_id won't combine the two maps for that room (we would
|
|
||||||
# need a new `NestedChainMap` type class).
|
|
||||||
account_data_by_room_maps: List[Mapping[str, Mapping[str, JsonMapping]]] = []
|
|
||||||
relevant_room_ids = self.find_relevant_room_ids_for_extension(
|
relevant_room_ids = self.find_relevant_room_ids_for_extension(
|
||||||
requested_lists=account_data_request.lists,
|
requested_lists=account_data_request.lists,
|
||||||
requested_room_ids=account_data_request.rooms,
|
requested_room_ids=account_data_request.rooms,
|
||||||
|
@ -441,9 +436,43 @@ class SlidingSyncExtensionHandler:
|
||||||
actual_room_ids=actual_room_ids,
|
actual_room_ids=actual_room_ids,
|
||||||
)
|
)
|
||||||
if len(relevant_room_ids) > 0:
|
if len(relevant_room_ids) > 0:
|
||||||
|
# We need to handle the different cases depending on if we have sent
|
||||||
|
# down account data previously or not, so we split the relevant
|
||||||
|
# rooms up into different collections based on status.
|
||||||
|
live_rooms = set()
|
||||||
|
previously_rooms: Dict[str, int] = {}
|
||||||
|
initial_rooms = set()
|
||||||
|
|
||||||
|
for room_id in relevant_room_ids:
|
||||||
|
if not from_token:
|
||||||
|
initial_rooms.add(room_id)
|
||||||
|
continue
|
||||||
|
|
||||||
|
room_status = previous_connection_state.account_data.have_sent_room(
|
||||||
|
room_id
|
||||||
|
)
|
||||||
|
if room_status.status == HaveSentRoomFlag.LIVE:
|
||||||
|
live_rooms.add(room_id)
|
||||||
|
elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
|
||||||
|
assert room_status.last_token is not None
|
||||||
|
previously_rooms[room_id] = room_status.last_token
|
||||||
|
elif room_status.status == HaveSentRoomFlag.NEVER:
|
||||||
|
initial_rooms.add(room_id)
|
||||||
|
else:
|
||||||
|
assert_never(room_status.status)
|
||||||
|
|
||||||
|
# We fetch all room account data since the from_token. This is so
|
||||||
|
# that we can record which rooms have updates that haven't been sent
|
||||||
|
# down.
|
||||||
|
#
|
||||||
|
# Mapping from room_id to mapping of `type` to `content` of room account
|
||||||
|
# data events.
|
||||||
|
all_updates_since_the_from_token: Mapping[
|
||||||
|
str, Mapping[str, JsonMapping]
|
||||||
|
] = {}
|
||||||
if from_token is not None:
|
if from_token is not None:
|
||||||
# TODO: This should take into account the `from_token` and `to_token`
|
# TODO: This should take into account the `from_token` and `to_token`
|
||||||
account_data_by_room_map = (
|
all_updates_since_the_from_token = (
|
||||||
await self.store.get_updated_room_account_data_for_user(
|
await self.store.get_updated_room_account_data_for_user(
|
||||||
user_id, from_token.stream_token.account_data_key
|
user_id, from_token.stream_token.account_data_key
|
||||||
)
|
)
|
||||||
|
@ -456,58 +485,108 @@ class SlidingSyncExtensionHandler:
|
||||||
user_id, from_token.stream_token.account_data_key
|
user_id, from_token.stream_token.account_data_key
|
||||||
)
|
)
|
||||||
for room_id, tags in tags_by_room.items():
|
for room_id, tags in tags_by_room.items():
|
||||||
account_data_by_room_map.setdefault(room_id, {})[
|
all_updates_since_the_from_token.setdefault(room_id, {})[
|
||||||
AccountDataTypes.TAG
|
AccountDataTypes.TAG
|
||||||
] = {"tags": tags}
|
] = {"tags": tags}
|
||||||
|
|
||||||
account_data_by_room_maps.append(account_data_by_room_map)
|
# For live rooms we just get the updates from `all_updates_since_the_from_token`
|
||||||
else:
|
if live_rooms:
|
||||||
# TODO: This should take into account the `to_token`
|
for room_id in all_updates_since_the_from_token.keys() & live_rooms:
|
||||||
immutable_account_data_by_room_map = (
|
account_data_by_room_map[room_id] = (
|
||||||
await self.store.get_room_account_data_for_user(user_id)
|
all_updates_since_the_from_token[room_id]
|
||||||
)
|
)
|
||||||
account_data_by_room_maps.append(immutable_account_data_by_room_map)
|
|
||||||
|
|
||||||
# Add room tags
|
# For previously and initial rooms we query each room individually.
|
||||||
#
|
if previously_rooms or initial_rooms:
|
||||||
# TODO: This should take into account the `to_token`
|
|
||||||
tags_by_room = await self.store.get_tags_for_user(user_id)
|
async def handle_previously(room_id: str) -> None:
|
||||||
account_data_by_room_maps.append(
|
# Either get updates or all account data in the room
|
||||||
{
|
# depending on if the room state is PREVIOUSLY or NEVER.
|
||||||
room_id: {AccountDataTypes.TAG: {"tags": tags}}
|
previous_token = previously_rooms.get(room_id)
|
||||||
for room_id, tags in tags_by_room.items()
|
if previous_token is not None:
|
||||||
}
|
room_account_data = await (
|
||||||
|
self.store.get_updated_room_account_data_for_user_for_room(
|
||||||
|
user_id=user_id,
|
||||||
|
room_id=room_id,
|
||||||
|
from_stream_id=previous_token,
|
||||||
|
to_stream_id=to_token.account_data_key,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add room tags
|
||||||
|
changed = await self.store.has_tags_changed_for_room(
|
||||||
|
user_id=user_id,
|
||||||
|
room_id=room_id,
|
||||||
|
from_stream_id=previous_token,
|
||||||
|
to_stream_id=to_token.account_data_key,
|
||||||
|
)
|
||||||
|
if changed:
|
||||||
|
# XXX: Ideally, this should take into account the `to_token`
|
||||||
|
# and return the set of tags at that time but we don't track
|
||||||
|
# changes to tags so we just have to return all tags for the
|
||||||
|
# room.
|
||||||
|
immutable_tag_map = await self.store.get_tags_for_room(
|
||||||
|
user_id, room_id
|
||||||
|
)
|
||||||
|
room_account_data[AccountDataTypes.TAG] = {
|
||||||
|
"tags": immutable_tag_map
|
||||||
|
}
|
||||||
|
|
||||||
|
# Only add an entry if there were any updates.
|
||||||
|
if room_account_data:
|
||||||
|
account_data_by_room_map[room_id] = room_account_data
|
||||||
|
else:
|
||||||
|
# TODO: This should take into account the `to_token`
|
||||||
|
immutable_room_account_data = (
|
||||||
|
await self.store.get_account_data_for_room(user_id, room_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add room tags
|
||||||
|
#
|
||||||
|
# XXX: Ideally, this should take into account the `to_token`
|
||||||
|
# and return the set of tags at that time but we don't track
|
||||||
|
# changes to tags so we just have to return all tags for the
|
||||||
|
# room.
|
||||||
|
immutable_tag_map = await self.store.get_tags_for_room(
|
||||||
|
user_id, room_id
|
||||||
|
)
|
||||||
|
|
||||||
|
account_data_by_room_map[room_id] = ChainMap(
|
||||||
|
{AccountDataTypes.TAG: {"tags": immutable_tag_map}}
|
||||||
|
if immutable_tag_map
|
||||||
|
else {},
|
||||||
|
# Cast is safe because `ChainMap` only mutates the top-most map,
|
||||||
|
# see https://github.com/python/typeshed/issues/8430
|
||||||
|
cast(
|
||||||
|
MutableMapping[str, JsonMapping],
|
||||||
|
immutable_room_account_data,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# We handle these rooms concurrently to speed it up.
|
||||||
|
await concurrently_execute(
|
||||||
|
handle_previously,
|
||||||
|
previously_rooms.keys() | initial_rooms,
|
||||||
|
limit=20,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Filter down to the relevant rooms ... and combine the maps
|
# Now record which rooms are now up to data, and which rooms have
|
||||||
relevant_account_data_by_room_map: MutableMapping[
|
# pending updates to send.
|
||||||
str, Mapping[str, JsonMapping]
|
new_connection_state.account_data.record_sent_rooms(relevant_room_ids)
|
||||||
] = {}
|
missing_updates = (
|
||||||
for room_id in relevant_room_ids:
|
all_updates_since_the_from_token.keys() - relevant_room_ids
|
||||||
# We want to avoid adding empty maps for relevant rooms that have no room
|
)
|
||||||
# account data so do a quick check to see if it's in any of the maps.
|
if missing_updates:
|
||||||
is_room_in_maps = False
|
# If we have missing updates then we must have had a from_token.
|
||||||
for room_map in account_data_by_room_maps:
|
assert from_token is not None
|
||||||
if room_id in room_map:
|
|
||||||
is_room_in_maps = True
|
|
||||||
break
|
|
||||||
|
|
||||||
# If we found the room in any of the maps, combine the maps for that room
|
new_connection_state.account_data.record_unsent_rooms(
|
||||||
if is_room_in_maps:
|
missing_updates, from_token.stream_token.account_data_key
|
||||||
relevant_account_data_by_room_map[room_id] = ChainMap(
|
|
||||||
{},
|
|
||||||
*(
|
|
||||||
# Cast is safe because `ChainMap` only mutates the top-most map,
|
|
||||||
# see https://github.com/python/typeshed/issues/8430
|
|
||||||
cast(MutableMapping[str, JsonMapping], room_map[room_id])
|
|
||||||
for room_map in account_data_by_room_maps
|
|
||||||
if room_map.get(room_id)
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
return SlidingSyncResult.Extensions.AccountDataExtension(
|
return SlidingSyncResult.Extensions.AccountDataExtension(
|
||||||
global_account_data_map=global_account_data_map,
|
global_account_data_map=global_account_data_map,
|
||||||
account_data_by_room_map=relevant_account_data_by_room_map,
|
account_data_by_room_map=account_data_by_room_map,
|
||||||
)
|
)
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
|
|
|
@ -420,6 +420,10 @@ class SlidingSyncRoomLists:
|
||||||
Dict[str, RoomsForUserType], filtered_sync_room_map
|
Dict[str, RoomsForUserType], filtered_sync_room_map
|
||||||
),
|
),
|
||||||
to_token,
|
to_token,
|
||||||
|
# We only need to sort the rooms up to the end
|
||||||
|
# of the largest range. Both sides of range are
|
||||||
|
# inclusive so we `+ 1`.
|
||||||
|
limit=max(range[1] + 1 for range in list_config.ranges),
|
||||||
)
|
)
|
||||||
|
|
||||||
for range in list_config.ranges:
|
for range in list_config.ranges:
|
||||||
|
@ -462,7 +466,7 @@ class SlidingSyncRoomLists:
|
||||||
)
|
)
|
||||||
|
|
||||||
lists[list_key] = SlidingSyncResult.SlidingWindowList(
|
lists[list_key] = SlidingSyncResult.SlidingWindowList(
|
||||||
count=len(sorted_room_info),
|
count=len(filtered_sync_room_map),
|
||||||
ops=ops,
|
ops=ops,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1980,15 +1984,21 @@ class SlidingSyncRoomLists:
|
||||||
self,
|
self,
|
||||||
sync_room_map: Dict[str, RoomsForUserType],
|
sync_room_map: Dict[str, RoomsForUserType],
|
||||||
to_token: StreamToken,
|
to_token: StreamToken,
|
||||||
|
limit: Optional[int] = None,
|
||||||
) -> List[RoomsForUserType]:
|
) -> List[RoomsForUserType]:
|
||||||
"""
|
"""
|
||||||
Sort by `stream_ordering` of the last event that the user should see in the
|
Sort by `stream_ordering` of the last event that the user should see in the
|
||||||
room. `stream_ordering` is unique so we get a stable sort.
|
room. `stream_ordering` is unique so we get a stable sort.
|
||||||
|
|
||||||
|
If `limit` is specified then sort may return fewer entries, but will
|
||||||
|
always return at least the top N rooms. This is useful as we don't always
|
||||||
|
need to sort the full list, but are just interested in the top N.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
sync_room_map: Dictionary of room IDs to sort along with membership
|
sync_room_map: Dictionary of room IDs to sort along with membership
|
||||||
information in the room at the time of `to_token`.
|
information in the room at the time of `to_token`.
|
||||||
to_token: We sort based on the events in the room at this token (<= `to_token`)
|
to_token: We sort based on the events in the room at this token (<= `to_token`)
|
||||||
|
limit: The number of rooms that we need to return from the top of the list.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A sorted list of room IDs by `stream_ordering` along with membership information.
|
A sorted list of room IDs by `stream_ordering` along with membership information.
|
||||||
|
@ -1998,8 +2008,23 @@ class SlidingSyncRoomLists:
|
||||||
# user should see in the room (<= `to_token`)
|
# user should see in the room (<= `to_token`)
|
||||||
last_activity_in_room_map: Dict[str, int] = {}
|
last_activity_in_room_map: Dict[str, int] = {}
|
||||||
|
|
||||||
|
# Same as above, except for positions that we know are in the event
|
||||||
|
# stream cache.
|
||||||
|
cached_positions: Dict[str, int] = {}
|
||||||
|
|
||||||
|
earliest_cache_position = (
|
||||||
|
self.store._events_stream_cache.get_earliest_known_position()
|
||||||
|
)
|
||||||
|
|
||||||
for room_id, room_for_user in sync_room_map.items():
|
for room_id, room_for_user in sync_room_map.items():
|
||||||
if room_for_user.membership != Membership.JOIN:
|
if room_for_user.membership == Membership.JOIN:
|
||||||
|
# For joined rooms check the stream change cache.
|
||||||
|
cached_position = (
|
||||||
|
self.store._events_stream_cache.get_max_pos_of_last_change(room_id)
|
||||||
|
)
|
||||||
|
if cached_position is not None:
|
||||||
|
cached_positions[room_id] = cached_position
|
||||||
|
else:
|
||||||
# If the user has left/been invited/knocked/been banned from a
|
# If the user has left/been invited/knocked/been banned from a
|
||||||
# room, they shouldn't see anything past that point.
|
# room, they shouldn't see anything past that point.
|
||||||
#
|
#
|
||||||
|
@ -2009,6 +2034,48 @@ class SlidingSyncRoomLists:
|
||||||
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
|
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
|
||||||
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
|
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
|
||||||
|
|
||||||
|
# If the stream position is in range of the stream change cache
|
||||||
|
# we can include it.
|
||||||
|
if room_for_user.event_pos.stream > earliest_cache_position:
|
||||||
|
cached_positions[room_id] = room_for_user.event_pos.stream
|
||||||
|
|
||||||
|
# If we are only asked for the top N rooms, and we have enough from
|
||||||
|
# looking in the stream change cache, then we can return early. This
|
||||||
|
# is because the cache must include all entries above
|
||||||
|
# `.get_earliest_known_position()`.
|
||||||
|
if limit is not None and len(cached_positions) >= limit:
|
||||||
|
# ... but first we need to handle the case where the cached max
|
||||||
|
# position is greater than the to_token, in which case we do
|
||||||
|
# actually query the DB. This should happen rarely, so can do it in
|
||||||
|
# a loop.
|
||||||
|
for room_id, position in list(cached_positions.items()):
|
||||||
|
if position > to_token.room_key.stream:
|
||||||
|
result = await self.store.get_last_event_pos_in_room_before_stream_ordering(
|
||||||
|
room_id, to_token.room_key
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
result is not None
|
||||||
|
and result[1].stream > earliest_cache_position
|
||||||
|
):
|
||||||
|
# We have a stream position in the cached range.
|
||||||
|
cached_positions[room_id] = result[1].stream
|
||||||
|
else:
|
||||||
|
# No position in the range, so we remove the entry.
|
||||||
|
cached_positions.pop(room_id)
|
||||||
|
|
||||||
|
if limit is not None and len(cached_positions) >= limit:
|
||||||
|
return sorted(
|
||||||
|
(
|
||||||
|
room
|
||||||
|
for room in sync_room_map.values()
|
||||||
|
if room.room_id in cached_positions
|
||||||
|
),
|
||||||
|
# Sort by the last activity (stream_ordering) in the room
|
||||||
|
key=lambda room_info: cached_positions[room_info.room_id],
|
||||||
|
# We want descending order
|
||||||
|
reverse=True,
|
||||||
|
)
|
||||||
|
|
||||||
# For fully-joined rooms, we find the latest activity at/before the
|
# For fully-joined rooms, we find the latest activity at/before the
|
||||||
# `to_token`.
|
# `to_token`.
|
||||||
joined_room_positions = (
|
joined_room_positions = (
|
||||||
|
|
|
@ -467,6 +467,56 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
|
||||||
get_updated_room_account_data_for_user_txn,
|
get_updated_room_account_data_for_user_txn,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def get_updated_room_account_data_for_user_for_room(
|
||||||
|
self,
|
||||||
|
# Since there are multiple arguments with the same type, force keyword arguments
|
||||||
|
# so people don't accidentally swap the order
|
||||||
|
*,
|
||||||
|
user_id: str,
|
||||||
|
room_id: str,
|
||||||
|
from_stream_id: int,
|
||||||
|
to_stream_id: int,
|
||||||
|
) -> Dict[str, JsonMapping]:
|
||||||
|
"""Get the room account_data that's changed for a user in a room.
|
||||||
|
|
||||||
|
(> `from_stream_id` and <= `to_stream_id`)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id: The user to get the account_data for.
|
||||||
|
room_id: The room to check
|
||||||
|
from_stream_id: The point in the stream to fetch from
|
||||||
|
to_stream_id: The point in the stream to fetch to
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A dict of the room account data.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def get_updated_room_account_data_for_user_for_room_txn(
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
) -> Dict[str, JsonMapping]:
|
||||||
|
sql = """
|
||||||
|
SELECT account_data_type, content FROM room_account_data
|
||||||
|
WHERE user_id = ? AND room_id = ? AND stream_id > ? AND stream_id <= ?
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (user_id, room_id, from_stream_id, to_stream_id))
|
||||||
|
|
||||||
|
room_account_data: Dict[str, JsonMapping] = {}
|
||||||
|
for row in txn:
|
||||||
|
room_account_data[row[0]] = db_to_json(row[1])
|
||||||
|
|
||||||
|
return room_account_data
|
||||||
|
|
||||||
|
changed = self._account_data_stream_cache.has_entity_changed(
|
||||||
|
user_id, int(from_stream_id)
|
||||||
|
)
|
||||||
|
if not changed:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"get_updated_room_account_data_for_user_for_room",
|
||||||
|
get_updated_room_account_data_for_user_for_room_txn,
|
||||||
|
)
|
||||||
|
|
||||||
@cached(max_entries=5000, iterable=True)
|
@cached(max_entries=5000, iterable=True)
|
||||||
async def ignored_by(self, user_id: str) -> FrozenSet[str]:
|
async def ignored_by(self, user_id: str) -> FrozenSet[str]:
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -267,6 +267,15 @@ class SlidingSyncStore(SQLBaseStore):
|
||||||
(have_sent_room.status.value, have_sent_room.last_token)
|
(have_sent_room.status.value, have_sent_room.last_token)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
for (
|
||||||
|
room_id,
|
||||||
|
have_sent_room,
|
||||||
|
) in per_connection_state.account_data._statuses.items():
|
||||||
|
key_values.append((connection_position, "account_data", room_id))
|
||||||
|
value_values.append(
|
||||||
|
(have_sent_room.status.value, have_sent_room.last_token)
|
||||||
|
)
|
||||||
|
|
||||||
self.db_pool.simple_upsert_many_txn(
|
self.db_pool.simple_upsert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="sliding_sync_connection_streams",
|
table="sliding_sync_connection_streams",
|
||||||
|
@ -407,6 +416,7 @@ class SlidingSyncStore(SQLBaseStore):
|
||||||
# Now look up the per-room stream data.
|
# Now look up the per-room stream data.
|
||||||
rooms: Dict[str, HaveSentRoom[str]] = {}
|
rooms: Dict[str, HaveSentRoom[str]] = {}
|
||||||
receipts: Dict[str, HaveSentRoom[str]] = {}
|
receipts: Dict[str, HaveSentRoom[str]] = {}
|
||||||
|
account_data: Dict[str, HaveSentRoom[str]] = {}
|
||||||
|
|
||||||
receipt_rows = self.db_pool.simple_select_list_txn(
|
receipt_rows = self.db_pool.simple_select_list_txn(
|
||||||
txn,
|
txn,
|
||||||
|
@ -427,6 +437,8 @@ class SlidingSyncStore(SQLBaseStore):
|
||||||
rooms[room_id] = have_sent_room
|
rooms[room_id] = have_sent_room
|
||||||
elif stream == "receipts":
|
elif stream == "receipts":
|
||||||
receipts[room_id] = have_sent_room
|
receipts[room_id] = have_sent_room
|
||||||
|
elif stream == "account_data":
|
||||||
|
account_data[room_id] = have_sent_room
|
||||||
else:
|
else:
|
||||||
# For forwards compatibility we ignore unknown streams, as in
|
# For forwards compatibility we ignore unknown streams, as in
|
||||||
# future we want to be able to easily add more stream types.
|
# future we want to be able to easily add more stream types.
|
||||||
|
@ -435,6 +447,7 @@ class SlidingSyncStore(SQLBaseStore):
|
||||||
return PerConnectionStateDB(
|
return PerConnectionStateDB(
|
||||||
rooms=RoomStatusMap(rooms),
|
rooms=RoomStatusMap(rooms),
|
||||||
receipts=RoomStatusMap(receipts),
|
receipts=RoomStatusMap(receipts),
|
||||||
|
account_data=RoomStatusMap(account_data),
|
||||||
room_configs=room_configs,
|
room_configs=room_configs,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -452,6 +465,7 @@ class PerConnectionStateDB:
|
||||||
|
|
||||||
rooms: "RoomStatusMap[str]"
|
rooms: "RoomStatusMap[str]"
|
||||||
receipts: "RoomStatusMap[str]"
|
receipts: "RoomStatusMap[str]"
|
||||||
|
account_data: "RoomStatusMap[str]"
|
||||||
|
|
||||||
room_configs: Mapping[str, "RoomSyncConfig"]
|
room_configs: Mapping[str, "RoomSyncConfig"]
|
||||||
|
|
||||||
|
@ -484,10 +498,21 @@ class PerConnectionStateDB:
|
||||||
for room_id, status in per_connection_state.receipts.get_updates().items()
|
for room_id, status in per_connection_state.receipts.get_updates().items()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
account_data = {
|
||||||
|
room_id: HaveSentRoom(
|
||||||
|
status=status.status,
|
||||||
|
last_token=(
|
||||||
|
str(status.last_token) if status.last_token is not None else None
|
||||||
|
),
|
||||||
|
)
|
||||||
|
for room_id, status in per_connection_state.account_data.get_updates().items()
|
||||||
|
}
|
||||||
|
|
||||||
log_kv(
|
log_kv(
|
||||||
{
|
{
|
||||||
"rooms": rooms,
|
"rooms": rooms,
|
||||||
"receipts": receipts,
|
"receipts": receipts,
|
||||||
|
"account_data": account_data,
|
||||||
"room_configs": per_connection_state.room_configs.maps[0],
|
"room_configs": per_connection_state.room_configs.maps[0],
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -495,6 +520,7 @@ class PerConnectionStateDB:
|
||||||
return PerConnectionStateDB(
|
return PerConnectionStateDB(
|
||||||
rooms=RoomStatusMap(rooms),
|
rooms=RoomStatusMap(rooms),
|
||||||
receipts=RoomStatusMap(receipts),
|
receipts=RoomStatusMap(receipts),
|
||||||
|
account_data=RoomStatusMap(account_data),
|
||||||
room_configs=per_connection_state.room_configs.maps[0],
|
room_configs=per_connection_state.room_configs.maps[0],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -524,8 +550,19 @@ class PerConnectionStateDB:
|
||||||
for room_id, status in self.receipts._statuses.items()
|
for room_id, status in self.receipts._statuses.items()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
account_data = {
|
||||||
|
room_id: HaveSentRoom(
|
||||||
|
status=status.status,
|
||||||
|
last_token=(
|
||||||
|
int(status.last_token) if status.last_token is not None else None
|
||||||
|
),
|
||||||
|
)
|
||||||
|
for room_id, status in self.account_data._statuses.items()
|
||||||
|
}
|
||||||
|
|
||||||
return PerConnectionState(
|
return PerConnectionState(
|
||||||
rooms=RoomStatusMap(rooms),
|
rooms=RoomStatusMap(rooms),
|
||||||
receipts=RoomStatusMap(receipts),
|
receipts=RoomStatusMap(receipts),
|
||||||
|
account_data=RoomStatusMap(account_data),
|
||||||
room_configs=self.room_configs,
|
room_configs=self.room_configs,
|
||||||
)
|
)
|
||||||
|
|
|
@ -158,6 +158,52 @@ class TagsWorkerStore(AccountDataWorkerStore):
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
async def has_tags_changed_for_room(
|
||||||
|
self,
|
||||||
|
# Since there are multiple arguments with the same type, force keyword arguments
|
||||||
|
# so people don't accidentally swap the order
|
||||||
|
*,
|
||||||
|
user_id: str,
|
||||||
|
room_id: str,
|
||||||
|
from_stream_id: int,
|
||||||
|
to_stream_id: int,
|
||||||
|
) -> bool:
|
||||||
|
"""Check if the users tags for a room have been updated in the token range
|
||||||
|
|
||||||
|
(> `from_stream_id` and <= `to_stream_id`)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id: The user to get tags for
|
||||||
|
room_id: The room to get tags for
|
||||||
|
from_stream_id: The point in the stream to fetch from
|
||||||
|
to_stream_id: The point in the stream to fetch to
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A mapping of tags to tag content.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Shortcut if no room has changed for the user
|
||||||
|
changed = self._account_data_stream_cache.has_entity_changed(
|
||||||
|
user_id, int(from_stream_id)
|
||||||
|
)
|
||||||
|
if not changed:
|
||||||
|
return False
|
||||||
|
|
||||||
|
last_change_position_for_room = await self.db_pool.simple_select_one_onecol(
|
||||||
|
table="room_tags_revisions",
|
||||||
|
keyvalues={"user_id": user_id, "room_id": room_id},
|
||||||
|
retcol="stream_id",
|
||||||
|
allow_none=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if last_change_position_for_room is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return (
|
||||||
|
last_change_position_for_room > from_stream_id
|
||||||
|
and last_change_position_for_room <= to_stream_id
|
||||||
|
)
|
||||||
|
|
||||||
@cached(num_args=2, tree=True)
|
@cached(num_args=2, tree=True)
|
||||||
async def get_tags_for_room(
|
async def get_tags_for_room(
|
||||||
self, user_id: str, room_id: str
|
self, user_id: str, room_id: str
|
||||||
|
|
|
@ -675,7 +675,7 @@ class HaveSentRoomFlag(Enum):
|
||||||
LIVE = "live"
|
LIVE = "live"
|
||||||
|
|
||||||
|
|
||||||
T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken)
|
T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken, int)
|
||||||
|
|
||||||
|
|
||||||
@attr.s(auto_attribs=True, slots=True, frozen=True)
|
@attr.s(auto_attribs=True, slots=True, frozen=True)
|
||||||
|
@ -823,6 +823,7 @@ class PerConnectionState:
|
||||||
|
|
||||||
rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap)
|
rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap)
|
||||||
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap)
|
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap)
|
||||||
|
account_data: RoomStatusMap[int] = attr.Factory(RoomStatusMap)
|
||||||
|
|
||||||
room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)
|
room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)
|
||||||
|
|
||||||
|
@ -833,6 +834,7 @@ class PerConnectionState:
|
||||||
return MutablePerConnectionState(
|
return MutablePerConnectionState(
|
||||||
rooms=self.rooms.get_mutable(),
|
rooms=self.rooms.get_mutable(),
|
||||||
receipts=self.receipts.get_mutable(),
|
receipts=self.receipts.get_mutable(),
|
||||||
|
account_data=self.account_data.get_mutable(),
|
||||||
room_configs=ChainMap({}, room_configs),
|
room_configs=ChainMap({}, room_configs),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -840,6 +842,7 @@ class PerConnectionState:
|
||||||
return PerConnectionState(
|
return PerConnectionState(
|
||||||
rooms=self.rooms.copy(),
|
rooms=self.rooms.copy(),
|
||||||
receipts=self.receipts.copy(),
|
receipts=self.receipts.copy(),
|
||||||
|
account_data=self.account_data.copy(),
|
||||||
room_configs=dict(self.room_configs),
|
room_configs=dict(self.room_configs),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -853,6 +856,7 @@ class MutablePerConnectionState(PerConnectionState):
|
||||||
|
|
||||||
rooms: MutableRoomStatusMap[RoomStreamToken]
|
rooms: MutableRoomStatusMap[RoomStreamToken]
|
||||||
receipts: MutableRoomStatusMap[MultiWriterStreamToken]
|
receipts: MutableRoomStatusMap[MultiWriterStreamToken]
|
||||||
|
account_data: MutableRoomStatusMap[int]
|
||||||
|
|
||||||
room_configs: typing.ChainMap[str, RoomSyncConfig]
|
room_configs: typing.ChainMap[str, RoomSyncConfig]
|
||||||
|
|
||||||
|
@ -860,6 +864,7 @@ class MutablePerConnectionState(PerConnectionState):
|
||||||
return (
|
return (
|
||||||
bool(self.rooms.get_updates())
|
bool(self.rooms.get_updates())
|
||||||
or bool(self.receipts.get_updates())
|
or bool(self.receipts.get_updates())
|
||||||
|
or bool(self.account_data.get_updates())
|
||||||
or bool(self.get_room_config_updates())
|
or bool(self.get_room_config_updates())
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -11,9 +11,11 @@
|
||||||
# See the GNU Affero General Public License for more details:
|
# See the GNU Affero General Public License for more details:
|
||||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
#
|
#
|
||||||
|
import enum
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from parameterized import parameterized_class
|
from parameterized import parameterized, parameterized_class
|
||||||
|
from typing_extensions import assert_never
|
||||||
|
|
||||||
from twisted.test.proto_helpers import MemoryReactor
|
from twisted.test.proto_helpers import MemoryReactor
|
||||||
|
|
||||||
|
@ -30,6 +32,11 @@ from tests.server import TimedOutException
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class TagAction(enum.Enum):
|
||||||
|
ADD = enum.auto()
|
||||||
|
REMOVE = enum.auto()
|
||||||
|
|
||||||
|
|
||||||
# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
|
# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
|
||||||
# foreground update for
|
# foreground update for
|
||||||
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
|
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
|
||||||
|
@ -350,10 +357,20 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
|
||||||
account_data_map[AccountDataTypes.TAG], {"tags": {"m.favourite": {}}}
|
account_data_map[AccountDataTypes.TAG], {"tags": {"m.favourite": {}}}
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_room_account_data_incremental_sync(self) -> None:
|
@parameterized.expand(
|
||||||
|
[
|
||||||
|
("add tags", TagAction.ADD),
|
||||||
|
("remove tags", TagAction.REMOVE),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
def test_room_account_data_incremental_sync(
|
||||||
|
self, test_description: str, tag_action: TagAction
|
||||||
|
) -> None:
|
||||||
"""
|
"""
|
||||||
On incremental sync, we return all account data for a given room but only for
|
On incremental sync, we return all account data for a given room but only for
|
||||||
rooms that we request and are being returned in the Sliding Sync response.
|
rooms that we request and are being returned in the Sliding Sync response.
|
||||||
|
|
||||||
|
(HaveSentRoomFlag.LIVE)
|
||||||
"""
|
"""
|
||||||
user1_id = self.register_user("user1", "pass")
|
user1_id = self.register_user("user1", "pass")
|
||||||
user1_tok = self.login(user1_id, "pass")
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
@ -432,23 +449,42 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
|
||||||
content={"roo": "rar"},
|
content={"roo": "rar"},
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# Add another room tag
|
if tag_action == TagAction.ADD:
|
||||||
self.get_success(
|
# Add another room tag
|
||||||
self.account_data_handler.add_tag_to_room(
|
self.get_success(
|
||||||
user_id=user1_id,
|
self.account_data_handler.add_tag_to_room(
|
||||||
room_id=room_id1,
|
user_id=user1_id,
|
||||||
tag="m.server_notice",
|
room_id=room_id1,
|
||||||
content={},
|
tag="m.server_notice",
|
||||||
|
content={},
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
self.get_success(
|
||||||
self.get_success(
|
self.account_data_handler.add_tag_to_room(
|
||||||
self.account_data_handler.add_tag_to_room(
|
user_id=user1_id,
|
||||||
user_id=user1_id,
|
room_id=room_id2,
|
||||||
room_id=room_id2,
|
tag="m.server_notice",
|
||||||
tag="m.server_notice",
|
content={},
|
||||||
content={},
|
)
|
||||||
)
|
)
|
||||||
)
|
elif tag_action == TagAction.REMOVE:
|
||||||
|
# Remove the room tag
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.remove_tag_from_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id1,
|
||||||
|
tag="m.favourite",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.remove_tag_from_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id2,
|
||||||
|
tag="m.favourite",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
assert_never(tag_action)
|
||||||
|
|
||||||
# Make an incremental Sliding Sync request with the account_data extension enabled
|
# Make an incremental Sliding Sync request with the account_data extension enabled
|
||||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||||
|
@ -475,10 +511,431 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
|
||||||
exact=True,
|
exact=True,
|
||||||
)
|
)
|
||||||
self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"})
|
self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"})
|
||||||
self.assertEqual(
|
if tag_action == TagAction.ADD:
|
||||||
account_data_map[AccountDataTypes.TAG],
|
self.assertEqual(
|
||||||
{"tags": {"m.favourite": {}, "m.server_notice": {}}},
|
account_data_map[AccountDataTypes.TAG],
|
||||||
|
{"tags": {"m.favourite": {}, "m.server_notice": {}}},
|
||||||
|
)
|
||||||
|
elif tag_action == TagAction.REMOVE:
|
||||||
|
# If we previously showed the client that the room has tags, when it no
|
||||||
|
# longer has tags, we need to show them an empty map.
|
||||||
|
self.assertEqual(
|
||||||
|
account_data_map[AccountDataTypes.TAG],
|
||||||
|
{"tags": {}},
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
assert_never(tag_action)
|
||||||
|
|
||||||
|
@parameterized.expand(
|
||||||
|
[
|
||||||
|
("add tags", TagAction.ADD),
|
||||||
|
("remove tags", TagAction.REMOVE),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
def test_room_account_data_incremental_sync_out_of_range_never(
|
||||||
|
self, test_description: str, tag_action: TagAction
|
||||||
|
) -> None:
|
||||||
|
"""Tests that we don't return account data for rooms that are out of
|
||||||
|
range, but then do send all account data once they're in range.
|
||||||
|
|
||||||
|
(initial/HaveSentRoomFlag.NEVER)
|
||||||
|
"""
|
||||||
|
user1_id = self.register_user("user1", "pass")
|
||||||
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
|
||||||
|
# Create a room and add some room account data
|
||||||
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_account_data_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id1,
|
||||||
|
account_data_type="org.matrix.roorarraz",
|
||||||
|
content={"roo": "rar"},
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
# Add a room tag to mark the room as a favourite
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_tag_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id1,
|
||||||
|
tag="m.favourite",
|
||||||
|
content={},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create another room with some room account data
|
||||||
|
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_account_data_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id2,
|
||||||
|
account_data_type="org.matrix.roorarraz",
|
||||||
|
content={"roo": "rar"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# Add a room tag to mark the room as a favourite
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_tag_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id2,
|
||||||
|
tag="m.favourite",
|
||||||
|
content={},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Now send a message into room1 so that it is at the top of the list
|
||||||
|
self.helper.send(room_id1, body="new event", tok=user1_tok)
|
||||||
|
|
||||||
|
# Make a SS request for only the top room.
|
||||||
|
sync_body = {
|
||||||
|
"lists": {
|
||||||
|
"main": {
|
||||||
|
"ranges": [[0, 0]],
|
||||||
|
"required_state": [],
|
||||||
|
"timeline_limit": 0,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"extensions": {
|
||||||
|
"account_data": {
|
||||||
|
"enabled": True,
|
||||||
|
"lists": ["main"],
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||||
|
|
||||||
|
# Only room1 should be in the response since it's the latest room with activity
|
||||||
|
# and our range only includes 1 room.
|
||||||
|
self.assertIncludes(
|
||||||
|
response_body["extensions"]["account_data"].get("rooms").keys(),
|
||||||
|
{room_id1},
|
||||||
|
exact=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add some other room account data
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_account_data_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id1,
|
||||||
|
account_data_type="org.matrix.roorarraz2",
|
||||||
|
content={"roo": "rar"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_account_data_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id2,
|
||||||
|
account_data_type="org.matrix.roorarraz2",
|
||||||
|
content={"roo": "rar"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if tag_action == TagAction.ADD:
|
||||||
|
# Add another room tag
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_tag_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id1,
|
||||||
|
tag="m.server_notice",
|
||||||
|
content={},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_tag_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id2,
|
||||||
|
tag="m.server_notice",
|
||||||
|
content={},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
elif tag_action == TagAction.REMOVE:
|
||||||
|
# Remove the room tag
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.remove_tag_from_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id1,
|
||||||
|
tag="m.favourite",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.remove_tag_from_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id2,
|
||||||
|
tag="m.favourite",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
assert_never(tag_action)
|
||||||
|
|
||||||
|
# Move room2 into range.
|
||||||
|
self.helper.send(room_id2, body="new event", tok=user1_tok)
|
||||||
|
|
||||||
|
# Make an incremental Sliding Sync request with the account_data extension enabled
|
||||||
|
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||||
|
|
||||||
|
self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
|
||||||
|
# We expect to see the account data of room2, as that has the most
|
||||||
|
# recent update.
|
||||||
|
self.assertIncludes(
|
||||||
|
response_body["extensions"]["account_data"].get("rooms").keys(),
|
||||||
|
{room_id2},
|
||||||
|
exact=True,
|
||||||
|
)
|
||||||
|
# Since this is the first time we're seeing room2 down sync, we should see all
|
||||||
|
# room account data for it.
|
||||||
|
account_data_map = {
|
||||||
|
event["type"]: event["content"]
|
||||||
|
for event in response_body["extensions"]["account_data"]
|
||||||
|
.get("rooms")
|
||||||
|
.get(room_id2)
|
||||||
|
}
|
||||||
|
expected_account_data_keys = {
|
||||||
|
"org.matrix.roorarraz",
|
||||||
|
"org.matrix.roorarraz2",
|
||||||
|
}
|
||||||
|
if tag_action == TagAction.ADD:
|
||||||
|
expected_account_data_keys.add(AccountDataTypes.TAG)
|
||||||
|
self.assertIncludes(
|
||||||
|
account_data_map.keys(),
|
||||||
|
expected_account_data_keys,
|
||||||
|
exact=True,
|
||||||
|
)
|
||||||
|
self.assertEqual(account_data_map["org.matrix.roorarraz"], {"roo": "rar"})
|
||||||
|
self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"})
|
||||||
|
if tag_action == TagAction.ADD:
|
||||||
|
self.assertEqual(
|
||||||
|
account_data_map[AccountDataTypes.TAG],
|
||||||
|
{"tags": {"m.favourite": {}, "m.server_notice": {}}},
|
||||||
|
)
|
||||||
|
elif tag_action == TagAction.REMOVE:
|
||||||
|
# Since we never told the client about the room tags, we don't need to say
|
||||||
|
# anything if there are no tags now (the client doesn't need an update).
|
||||||
|
self.assertIsNone(
|
||||||
|
account_data_map.get(AccountDataTypes.TAG),
|
||||||
|
account_data_map,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
assert_never(tag_action)
|
||||||
|
|
||||||
|
@parameterized.expand(
|
||||||
|
[
|
||||||
|
("add tags", TagAction.ADD),
|
||||||
|
("remove tags", TagAction.REMOVE),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
def test_room_account_data_incremental_sync_out_of_range_previously(
|
||||||
|
self, test_description: str, tag_action: TagAction
|
||||||
|
) -> None:
|
||||||
|
"""Tests that we don't return account data for rooms that fall out of
|
||||||
|
range, but then do send all account data that has changed they're back in range.
|
||||||
|
|
||||||
|
(HaveSentRoomFlag.PREVIOUSLY)
|
||||||
|
"""
|
||||||
|
user1_id = self.register_user("user1", "pass")
|
||||||
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
|
||||||
|
# Create a room and add some room account data
|
||||||
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_account_data_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id1,
|
||||||
|
account_data_type="org.matrix.roorarraz",
|
||||||
|
content={"roo": "rar"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# Add a room tag to mark the room as a favourite
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_tag_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id1,
|
||||||
|
tag="m.favourite",
|
||||||
|
content={},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create another room with some room account data
|
||||||
|
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_account_data_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id2,
|
||||||
|
account_data_type="org.matrix.roorarraz",
|
||||||
|
content={"roo": "rar"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# Add a room tag to mark the room as a favourite
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_tag_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id2,
|
||||||
|
tag="m.favourite",
|
||||||
|
content={},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make an initial Sliding Sync request for only room1 and room2.
|
||||||
|
sync_body = {
|
||||||
|
"lists": {},
|
||||||
|
"room_subscriptions": {
|
||||||
|
room_id1: {
|
||||||
|
"required_state": [],
|
||||||
|
"timeline_limit": 0,
|
||||||
|
},
|
||||||
|
room_id2: {
|
||||||
|
"required_state": [],
|
||||||
|
"timeline_limit": 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"extensions": {
|
||||||
|
"account_data": {
|
||||||
|
"enabled": True,
|
||||||
|
"rooms": [room_id1, room_id2],
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||||
|
|
||||||
|
# Both rooms show up because we have a room subscription for each and they're
|
||||||
|
# requested in the `account_data` extension.
|
||||||
|
self.assertIncludes(
|
||||||
|
response_body["extensions"]["account_data"].get("rooms").keys(),
|
||||||
|
{room_id1, room_id2},
|
||||||
|
exact=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add some other room account data
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_account_data_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id1,
|
||||||
|
account_data_type="org.matrix.roorarraz2",
|
||||||
|
content={"roo": "rar"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_account_data_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id2,
|
||||||
|
account_data_type="org.matrix.roorarraz2",
|
||||||
|
content={"roo": "rar"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if tag_action == TagAction.ADD:
|
||||||
|
# Add another room tag
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_tag_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id1,
|
||||||
|
tag="m.server_notice",
|
||||||
|
content={},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.add_tag_to_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id2,
|
||||||
|
tag="m.server_notice",
|
||||||
|
content={},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
elif tag_action == TagAction.REMOVE:
|
||||||
|
# Remove the room tag
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.remove_tag_from_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id1,
|
||||||
|
tag="m.favourite",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.get_success(
|
||||||
|
self.account_data_handler.remove_tag_from_room(
|
||||||
|
user_id=user1_id,
|
||||||
|
room_id=room_id2,
|
||||||
|
tag="m.favourite",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
assert_never(tag_action)
|
||||||
|
|
||||||
|
# Make an incremental Sliding Sync request for just room1
|
||||||
|
response_body, from_token = self.do_sync(
|
||||||
|
{
|
||||||
|
**sync_body,
|
||||||
|
"room_subscriptions": {
|
||||||
|
room_id1: {
|
||||||
|
"required_state": [],
|
||||||
|
"timeline_limit": 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
since=from_token,
|
||||||
|
tok=user1_tok,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Only room1 shows up because we only have a room subscription for room1 now.
|
||||||
|
self.assertIncludes(
|
||||||
|
response_body["extensions"]["account_data"].get("rooms").keys(),
|
||||||
|
{room_id1},
|
||||||
|
exact=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make an incremental Sliding Sync request for just room2 now
|
||||||
|
response_body, from_token = self.do_sync(
|
||||||
|
{
|
||||||
|
**sync_body,
|
||||||
|
"room_subscriptions": {
|
||||||
|
room_id2: {
|
||||||
|
"required_state": [],
|
||||||
|
"timeline_limit": 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
since=from_token,
|
||||||
|
tok=user1_tok,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Only room2 shows up because we only have a room subscription for room2 now.
|
||||||
|
self.assertIncludes(
|
||||||
|
response_body["extensions"]["account_data"].get("rooms").keys(),
|
||||||
|
{room_id2},
|
||||||
|
exact=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
|
||||||
|
# Check for room account data for room2
|
||||||
|
self.assertIncludes(
|
||||||
|
response_body["extensions"]["account_data"].get("rooms").keys(),
|
||||||
|
{room_id2},
|
||||||
|
exact=True,
|
||||||
|
)
|
||||||
|
# We should see any room account data updates for room2 since the last
|
||||||
|
# time we saw it down sync
|
||||||
|
account_data_map = {
|
||||||
|
event["type"]: event["content"]
|
||||||
|
for event in response_body["extensions"]["account_data"]
|
||||||
|
.get("rooms")
|
||||||
|
.get(room_id2)
|
||||||
|
}
|
||||||
|
self.assertIncludes(
|
||||||
|
account_data_map.keys(),
|
||||||
|
{"org.matrix.roorarraz2", AccountDataTypes.TAG},
|
||||||
|
exact=True,
|
||||||
|
)
|
||||||
|
self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"})
|
||||||
|
if tag_action == TagAction.ADD:
|
||||||
|
self.assertEqual(
|
||||||
|
account_data_map[AccountDataTypes.TAG],
|
||||||
|
{"tags": {"m.favourite": {}, "m.server_notice": {}}},
|
||||||
|
)
|
||||||
|
elif tag_action == TagAction.REMOVE:
|
||||||
|
# If we previously showed the client that the room has tags, when it no
|
||||||
|
# longer has tags, we need to show them an empty map.
|
||||||
|
self.assertEqual(
|
||||||
|
account_data_map[AccountDataTypes.TAG],
|
||||||
|
{"tags": {}},
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
assert_never(tag_action)
|
||||||
|
|
||||||
def test_wait_for_new_data(self) -> None:
|
def test_wait_for_new_data(self) -> None:
|
||||||
"""
|
"""
|
||||||
|
|
Loading…
Reference in a new issue