Sliding Sync: Handle timeline limit changes (take 2) (#17579)

This supersedes #17503, given the per-connection state is being heavily
rewritten it felt easier to recreate the PR on top of that work.

This correctly handles the case of timeline limits going up and down.

This does not handle changes in `required_state`, but that can be done
as a separate PR.

Based on #17575.

---------

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
This commit is contained in:
Erik Johnston 2024-08-20 10:31:25 +01:00 committed by GitHub
parent 950ba844f7
commit 6eb98a4f1c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 285 additions and 13 deletions

1
changelog.d/17579.misc Normal file
View file

@ -0,0 +1 @@
Handle changes in `timeline_limit` in experimental sliding sync.

View file

@ -787,7 +787,20 @@ class SlidingSyncHandler:
# subscription and have updates we need to send (i.e. either because
# we haven't sent the room down, or we have but there are missing
# updates).
for room_id in relevant_room_map:
for room_id, room_config in relevant_room_map.items():
prev_room_sync_config = previous_connection_state.room_configs.get(
room_id
)
if prev_room_sync_config is not None:
# Always include rooms whose timeline limit has increased.
# (see the "XXX: Odd behavior" described below)
if (
prev_room_sync_config.timeline_limit
< room_config.timeline_limit
):
rooms_should_send.add(room_id)
continue
status = previous_connection_state.rooms.have_sent_room(room_id)
if (
# The room was never sent down before so the client needs to know
@ -819,12 +832,15 @@ class SlidingSyncHandler:
if room_id in rooms_should_send
}
new_connection_state = previous_connection_state.get_mutable()
@trace
@tag_args
async def handle_room(room_id: str) -> None:
room_sync_result = await self.get_room_sync_data(
sync_config=sync_config,
previous_connection_state=previous_connection_state,
new_connection_state=new_connection_state,
room_id=room_id,
room_sync_config=relevant_rooms_to_send_map[room_id],
room_membership_for_user_at_to_token=room_membership_for_user_map[
@ -842,8 +858,6 @@ class SlidingSyncHandler:
with start_active_span("sliding_sync.generate_room_entries"):
await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10)
new_connection_state = previous_connection_state.get_mutable()
extensions = await self.get_extensions_response(
sync_config=sync_config,
actual_lists=lists,
@ -1955,6 +1969,7 @@ class SlidingSyncHandler:
self,
sync_config: SlidingSyncConfig,
previous_connection_state: "PerConnectionState",
new_connection_state: "MutablePerConnectionState",
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
@ -1998,9 +2013,27 @@ class SlidingSyncHandler:
# - For an incremental sync where we haven't sent it down this
# connection before
#
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
# Relevant spec issue:
# https://github.com/matrix-org/matrix-spec/issues/1917
#
# XXX: Odd behavior - We also check if the `timeline_limit` has increased, if so
# we ignore the from bound for the timeline to send down a larger chunk of
# history and set `unstable_expanded_timeline` to true. This is only being added
# to match the behavior of the Sliding Sync proxy as we expect the ElementX
# client to feel a certain way and be able to trickle in a full page of timeline
# messages to fill up the screen. This is a bit different to the behavior of the
# Sliding Sync proxy (which sets initial=true, but then doesn't send down the
# full state again), but existing apps, e.g. ElementX, just need `limited` set.
# We don't explicitly set `limited` but this will be the case for any room that
# has more history than we're trying to pull out. Using
# `unstable_expanded_timeline` allows us to avoid contaminating what `initial`
# or `limited` mean for clients that interpret them correctly. In future this
# behavior is almost certainly going to change.
#
# TODO: Also handle changes to `required_state`
from_bound = None
initial = True
ignore_timeline_bound = False
if from_token and not room_membership_for_user_at_to_token.newly_joined:
room_status = previous_connection_state.rooms.have_sent_room(room_id)
if room_status.status == HaveSentRoomFlag.LIVE:
@ -2018,7 +2051,26 @@ class SlidingSyncHandler:
log_kv({"sliding_sync.room_status": room_status})
log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
if prev_room_sync_config is not None:
# Check if the timeline limit has increased, if so ignore the
# timeline bound and record the change (see "XXX: Odd behavior"
# above).
if (
prev_room_sync_config.timeline_limit
< room_sync_config.timeline_limit
):
ignore_timeline_bound = True
# TODO: Check for changes in `required_state``
log_kv(
{
"sliding_sync.from_bound": from_bound,
"sliding_sync.initial": initial,
"sliding_sync.ignore_timeline_bound": ignore_timeline_bound,
}
)
# Assemble the list of timeline events
#
@ -2055,6 +2107,10 @@ class SlidingSyncHandler:
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)
timeline_from_bound = from_bound
if ignore_timeline_bound:
timeline_from_bound = None
# For initial `/sync` (and other historical scenarios mentioned above), we
# want to view a historical section of the timeline; to fetch events by
# `topological_ordering` (best representation of the room DAG as others were
@ -2080,7 +2136,7 @@ class SlidingSyncHandler:
pagination_method: PaginateFunction = (
# Use `topographical_ordering` for historical events
paginate_room_events_by_topological_ordering
if from_bound is None
if timeline_from_bound is None
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
@ -2090,7 +2146,7 @@ class SlidingSyncHandler:
# (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=to_bound,
to_key=from_bound,
to_key=timeline_from_bound,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
@ -2448,6 +2504,55 @@ class SlidingSyncHandler:
if new_bump_event_pos.stream > 0:
bump_stamp = new_bump_event_pos.stream
unstable_expanded_timeline = False
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
# Record the `room_sync_config` if we're `ignore_timeline_bound` (which means
# that the `timeline_limit` has increased)
if ignore_timeline_bound:
# FIXME: We signal the fact that we're sending down more events to
# the client by setting `unstable_expanded_timeline` to true (see
# "XXX: Odd behavior" above).
unstable_expanded_timeline = True
new_connection_state.room_configs[room_id] = RoomSyncConfig(
timeline_limit=room_sync_config.timeline_limit,
required_state_map=room_sync_config.required_state_map,
)
elif prev_room_sync_config is not None:
# If the result is `limited` then we need to record that the
# `timeline_limit` has been reduced, as when/if the client later requests
# more timeline then we have more data to send.
#
# Otherwise (when not `limited`) we don't need to record that the
# `timeline_limit` has been reduced, as the *effective* `timeline_limit`
# (i.e. the amount of timeline we have previously sent to the client) is at
# least the previous `timeline_limit`.
#
# This is to handle the case where the `timeline_limit` e.g. goes from 10 to
# 5 to 10 again (without any timeline gaps), where there's no point sending
# down the initial historical chunk events when the `timeline_limit` is
# increased as the client already has the 10 previous events. However, if
# client has a gap in the timeline (i.e. `limited` is True), then we *do*
# need to record the reduced timeline.
#
# TODO: Handle timeline gaps (`get_timeline_gaps()`) - This is separate from
# the gaps we might see on the client because a response was `limited` we're
# talking about above.
if (
limited
and prev_room_sync_config.timeline_limit
> room_sync_config.timeline_limit
):
new_connection_state.room_configs[room_id] = RoomSyncConfig(
timeline_limit=room_sync_config.timeline_limit,
required_state_map=room_sync_config.required_state_map,
)
# TODO: Record changes in required_state.
else:
new_connection_state.room_configs[room_id] = room_sync_config
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
return SlidingSyncResult.RoomResult(
@ -2462,6 +2567,7 @@ class SlidingSyncHandler:
stripped_state=stripped_state,
prev_batch=prev_batch_token,
limited=limited,
unstable_expanded_timeline=unstable_expanded_timeline,
num_live=num_live,
bump_stamp=bump_stamp,
joined_count=room_membership_summary.get(
@ -3264,16 +3370,30 @@ class PerConnectionState:
Attributes:
rooms: The status of each room for the events stream.
receipts: The status of each room for the receipts stream.
room_configs: Map from room_id to the `RoomSyncConfig` of all
rooms that we have previously sent down.
"""
rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap)
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap)
room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)
def get_mutable(self) -> "MutablePerConnectionState":
"""Get a mutable copy of this state."""
room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs)
return MutablePerConnectionState(
rooms=self.rooms.get_mutable(),
receipts=self.receipts.get_mutable(),
room_configs=ChainMap({}, room_configs),
)
def copy(self) -> "PerConnectionState":
return PerConnectionState(
rooms=self.rooms.copy(),
receipts=self.receipts.copy(),
room_configs=dict(self.room_configs),
)
@ -3284,8 +3404,18 @@ class MutablePerConnectionState(PerConnectionState):
rooms: MutableRoomStatusMap[RoomStreamToken]
receipts: MutableRoomStatusMap[MultiWriterStreamToken]
room_configs: typing.ChainMap[str, RoomSyncConfig]
def has_updates(self) -> bool:
return bool(self.rooms.get_updates()) or bool(self.receipts.get_updates())
return (
bool(self.rooms.get_updates())
or bool(self.receipts.get_updates())
or bool(self.get_room_config_updates())
)
def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]:
"""Get updates to the room sync config"""
return self.room_configs.maps[0]
@attr.s(auto_attribs=True)
@ -3369,7 +3499,6 @@ class SlidingSyncConnectionStore:
) -> int:
"""Record updated per-connection state, returning the connection
position associated with the new state.
If there are no changes to the state this may return the same token as
the existing per-connection state.
"""
@ -3390,10 +3519,7 @@ class SlidingSyncConnectionStore:
# We copy the `MutablePerConnectionState` so that the inner `ChainMap`s
# don't grow forever.
sync_statuses[new_store_token] = PerConnectionState(
rooms=new_connection_state.rooms.copy(),
receipts=new_connection_state.receipts.copy(),
)
sync_statuses[new_store_token] = new_connection_state.copy()
return new_store_token

View file

@ -1044,6 +1044,11 @@ class SlidingSyncRestServlet(RestServlet):
if room_result.initial:
serialized_rooms[room_id]["initial"] = room_result.initial
if room_result.unstable_expanded_timeline:
serialized_rooms[room_id][
"unstable_expanded_timeline"
] = room_result.unstable_expanded_timeline
# This will be omitted for invite/knock rooms with `stripped_state`
if (
room_result.required_state is not None

View file

@ -171,6 +171,9 @@ class SlidingSyncResult:
their local state. When there is an update, servers MUST omit this flag
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
absence of this flag means 'false'.
unstable_expanded_timeline: Flag which is set if we're returning more historic
events due to the timeline limit having increased. See "XXX: Odd behavior"
comment ing `synapse.handlers.sliding_sync`.
required_state: The current state of the room
timeline: Latest events in the room. The last event is the most recent.
bundled_aggregations: A mapping of event ID to the bundled aggregations for
@ -219,6 +222,7 @@ class SlidingSyncResult:
heroes: Optional[List[StrippedHero]]
is_dm: bool
initial: bool
unstable_expanded_timeline: bool
# Should be empty for invite/knock rooms with `stripped_state`
required_state: List[EventBase]
# Should be empty for invite/knock rooms with `stripped_state`

View file

@ -17,6 +17,7 @@ from typing import List, Optional
from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.constants import EventTypes
from synapse.rest.client import login, room, sync
from synapse.server import HomeServer
from synapse.types import StreamToken, StrSequence
@ -573,3 +574,138 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase):
# Nothing to see for this banned user in the room in the token range
self.assertIsNone(response_body["rooms"].get(room_id1))
def test_increasing_timeline_range_sends_more_messages(self) -> None:
"""
Test that increasing the timeline limit via room subscriptions sends the
room down with more messages in a limited sync.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [[EventTypes.Create, ""]],
"timeline_limit": 1,
}
}
}
message_events = []
for _ in range(10):
resp = self.helper.send(room_id1, "msg", tok=user1_tok)
message_events.append(resp["event_id"])
# Make the first Sliding Sync request
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
room_response = response_body["rooms"][room_id1]
self.assertEqual(room_response["initial"], True)
self.assertNotIn("unstable_expanded_timeline", room_response)
self.assertEqual(room_response["limited"], True)
# We only expect the last message at first
self._assertTimelineEqual(
room_id=room_id1,
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
expected_event_ids=message_events[-1:],
message=str(room_response["timeline"]),
)
# We also expect to get the create event state.
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
room_response["required_state"],
{state_map[(EventTypes.Create, "")]},
exact=True,
)
# Now do another request with a room subscription with an increased timeline limit
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 10,
}
}
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]
self.assertNotIn("initial", room_response)
self.assertEqual(room_response["unstable_expanded_timeline"], True)
self.assertEqual(room_response["limited"], True)
# Now we expect all the messages
self._assertTimelineEqual(
room_id=room_id1,
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
expected_event_ids=message_events,
message=str(room_response["timeline"]),
)
# We don't expect to get the room create down, as nothing has changed.
self.assertNotIn("required_state", room_response)
# Decreasing the timeline limit shouldn't resend any events
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 5,
}
}
event_response = self.helper.send(room_id1, "msg", tok=user1_tok)
latest_event_id = event_response["event_id"]
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]
self.assertNotIn("initial", room_response)
self.assertNotIn("unstable_expanded_timeline", room_response)
self.assertEqual(room_response["limited"], False)
self._assertTimelineEqual(
room_id=room_id1,
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
expected_event_ids=[latest_event_id],
message=str(room_response["timeline"]),
)
# Increasing the limit to what it was before also should not resend any
# events
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 10,
}
}
event_response = self.helper.send(room_id1, "msg", tok=user1_tok)
latest_event_id = event_response["event_id"]
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]
self.assertNotIn("initial", room_response)
self.assertNotIn("unstable_expanded_timeline", room_response)
self.assertEqual(room_response["limited"], False)
self._assertTimelineEqual(
room_id=room_id1,
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
expected_event_ids=[latest_event_id],
message=str(room_response["timeline"]),
)