mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-14 11:57:44 +00:00
Add some miscellaneous comments around sync (#13474)
Add some miscellaneous comments to document sync, especially around `compute_state_delta`. Signed-off-by: Sean Quah <seanq@matrix.org> Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
This commit is contained in:
parent
1b09b0832e
commit
51c01d450a
3 changed files with 81 additions and 40 deletions
1
changelog.d/13474.misc
Normal file
1
changelog.d/13474.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add some miscellaneous comments to document sync, especially around `compute_state_delta`.
|
|
@ -13,7 +13,17 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import itertools
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
|
from typing import (
|
||||||
|
TYPE_CHECKING,
|
||||||
|
Any,
|
||||||
|
Dict,
|
||||||
|
FrozenSet,
|
||||||
|
List,
|
||||||
|
Optional,
|
||||||
|
Sequence,
|
||||||
|
Set,
|
||||||
|
Tuple,
|
||||||
|
)
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter
|
||||||
|
@ -89,7 +99,7 @@ class SyncConfig:
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
class TimelineBatch:
|
class TimelineBatch:
|
||||||
prev_batch: StreamToken
|
prev_batch: StreamToken
|
||||||
events: List[EventBase]
|
events: Sequence[EventBase]
|
||||||
limited: bool
|
limited: bool
|
||||||
# A mapping of event ID to the bundled aggregations for the above events.
|
# A mapping of event ID to the bundled aggregations for the above events.
|
||||||
# This is only calculated if limited is true.
|
# This is only calculated if limited is true.
|
||||||
|
@ -852,16 +862,26 @@ class SyncHandler:
|
||||||
now_token: StreamToken,
|
now_token: StreamToken,
|
||||||
full_state: bool,
|
full_state: bool,
|
||||||
) -> MutableStateMap[EventBase]:
|
) -> MutableStateMap[EventBase]:
|
||||||
"""Works out the difference in state between the start of the timeline
|
"""Works out the difference in state between the end of the previous sync and
|
||||||
and the previous sync.
|
the start of the timeline.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
room_id:
|
room_id:
|
||||||
batch: The timeline batch for the room that will be sent to the user.
|
batch: The timeline batch for the room that will be sent to the user.
|
||||||
sync_config:
|
sync_config:
|
||||||
since_token: Token of the end of the previous batch. May be None.
|
since_token: Token of the end of the previous batch. May be `None`.
|
||||||
now_token: Token of the end of the current batch.
|
now_token: Token of the end of the current batch.
|
||||||
full_state: Whether to force returning the full state.
|
full_state: Whether to force returning the full state.
|
||||||
|
`lazy_load_members` still applies when `full_state` is `True`.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The state to return in the sync response for the room.
|
||||||
|
|
||||||
|
Clients will overlay this onto the state at the end of the previous sync to
|
||||||
|
arrive at the state at the start of the timeline.
|
||||||
|
|
||||||
|
Clients will then overlay state events in the timeline to arrive at the
|
||||||
|
state at the end of the timeline, in preparation for the next sync.
|
||||||
"""
|
"""
|
||||||
# TODO(mjark) Check if the state events were received by the server
|
# TODO(mjark) Check if the state events were received by the server
|
||||||
# after the previous sync, since we need to include those state
|
# after the previous sync, since we need to include those state
|
||||||
|
@ -869,7 +889,8 @@ class SyncHandler:
|
||||||
# TODO(mjark) Check for new redactions in the state events.
|
# TODO(mjark) Check for new redactions in the state events.
|
||||||
|
|
||||||
with Measure(self.clock, "compute_state_delta"):
|
with Measure(self.clock, "compute_state_delta"):
|
||||||
|
# The memberships needed for events in the timeline.
|
||||||
|
# Only calculated when `lazy_load_members` is on.
|
||||||
members_to_fetch = None
|
members_to_fetch = None
|
||||||
|
|
||||||
lazy_load_members = sync_config.filter_collection.lazy_load_members()
|
lazy_load_members = sync_config.filter_collection.lazy_load_members()
|
||||||
|
@ -897,38 +918,46 @@ class SyncHandler:
|
||||||
else:
|
else:
|
||||||
state_filter = StateFilter.all()
|
state_filter = StateFilter.all()
|
||||||
|
|
||||||
|
# The contribution to the room state from state events in the timeline.
|
||||||
|
# Only contains the last event for any given state key.
|
||||||
timeline_state = {
|
timeline_state = {
|
||||||
(event.type, event.state_key): event.event_id
|
(event.type, event.state_key): event.event_id
|
||||||
for event in batch.events
|
for event in batch.events
|
||||||
if event.is_state()
|
if event.is_state()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Now calculate the state to return in the sync response for the room.
|
||||||
|
# This is more or less the change in state between the end of the previous
|
||||||
|
# sync's timeline and the start of the current sync's timeline.
|
||||||
|
# See the docstring above for details.
|
||||||
|
state_ids: StateMap[str]
|
||||||
|
|
||||||
if full_state:
|
if full_state:
|
||||||
if batch:
|
if batch:
|
||||||
current_state_ids = (
|
state_at_timeline_end = (
|
||||||
await self._state_storage_controller.get_state_ids_for_event(
|
await self._state_storage_controller.get_state_ids_for_event(
|
||||||
batch.events[-1].event_id, state_filter=state_filter
|
batch.events[-1].event_id, state_filter=state_filter
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
state_ids = (
|
state_at_timeline_start = (
|
||||||
await self._state_storage_controller.get_state_ids_for_event(
|
await self._state_storage_controller.get_state_ids_for_event(
|
||||||
batch.events[0].event_id, state_filter=state_filter
|
batch.events[0].event_id, state_filter=state_filter
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
current_state_ids = await self.get_state_at(
|
state_at_timeline_end = await self.get_state_at(
|
||||||
room_id, stream_position=now_token, state_filter=state_filter
|
room_id, stream_position=now_token, state_filter=state_filter
|
||||||
)
|
)
|
||||||
|
|
||||||
state_ids = current_state_ids
|
state_at_timeline_start = state_at_timeline_end
|
||||||
|
|
||||||
state_ids = _calculate_state(
|
state_ids = _calculate_state(
|
||||||
timeline_contains=timeline_state,
|
timeline_contains=timeline_state,
|
||||||
timeline_start=state_ids,
|
timeline_start=state_at_timeline_start,
|
||||||
previous={},
|
timeline_end=state_at_timeline_end,
|
||||||
current=current_state_ids,
|
previous_timeline_end={},
|
||||||
lazy_load_members=lazy_load_members,
|
lazy_load_members=lazy_load_members,
|
||||||
)
|
)
|
||||||
elif batch.limited:
|
elif batch.limited:
|
||||||
|
@ -968,24 +997,23 @@ class SyncHandler:
|
||||||
)
|
)
|
||||||
|
|
||||||
if batch:
|
if batch:
|
||||||
current_state_ids = (
|
state_at_timeline_end = (
|
||||||
await self._state_storage_controller.get_state_ids_for_event(
|
await self._state_storage_controller.get_state_ids_for_event(
|
||||||
batch.events[-1].event_id, state_filter=state_filter
|
batch.events[-1].event_id, state_filter=state_filter
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Its not clear how we get here, but empirically we do
|
# We can get here if the user has ignored the senders of all
|
||||||
# (#5407). Logging has been added elsewhere to try and
|
# the recent events.
|
||||||
# figure out where this state comes from.
|
state_at_timeline_end = await self.get_state_at(
|
||||||
current_state_ids = await self.get_state_at(
|
|
||||||
room_id, stream_position=now_token, state_filter=state_filter
|
room_id, stream_position=now_token, state_filter=state_filter
|
||||||
)
|
)
|
||||||
|
|
||||||
state_ids = _calculate_state(
|
state_ids = _calculate_state(
|
||||||
timeline_contains=timeline_state,
|
timeline_contains=timeline_state,
|
||||||
timeline_start=state_at_timeline_start,
|
timeline_start=state_at_timeline_start,
|
||||||
previous=state_at_previous_sync,
|
timeline_end=state_at_timeline_end,
|
||||||
current=current_state_ids,
|
previous_timeline_end=state_at_previous_sync,
|
||||||
# we have to include LL members in case LL initial sync missed them
|
# we have to include LL members in case LL initial sync missed them
|
||||||
lazy_load_members=lazy_load_members,
|
lazy_load_members=lazy_load_members,
|
||||||
)
|
)
|
||||||
|
@ -1010,6 +1038,13 @@ class SyncHandler:
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# At this point, if `lazy_load_members` is enabled, `state_ids` includes
|
||||||
|
# the memberships of all event senders in the timeline. This is because we
|
||||||
|
# may not have sent the memberships in a previous sync.
|
||||||
|
|
||||||
|
# When `include_redundant_members` is on, we send all the lazy-loaded
|
||||||
|
# memberships of event senders. Otherwise we make an effort to limit the set
|
||||||
|
# of memberships we send to those that we have not already sent to this client.
|
||||||
if lazy_load_members and not include_redundant_members:
|
if lazy_load_members and not include_redundant_members:
|
||||||
cache_key = (sync_config.user.to_string(), sync_config.device_id)
|
cache_key = (sync_config.user.to_string(), sync_config.device_id)
|
||||||
cache = self.get_lazy_loaded_members_cache(cache_key)
|
cache = self.get_lazy_loaded_members_cache(cache_key)
|
||||||
|
@ -2216,8 +2251,8 @@ def _action_has_highlight(actions: List[JsonDict]) -> bool:
|
||||||
def _calculate_state(
|
def _calculate_state(
|
||||||
timeline_contains: StateMap[str],
|
timeline_contains: StateMap[str],
|
||||||
timeline_start: StateMap[str],
|
timeline_start: StateMap[str],
|
||||||
previous: StateMap[str],
|
timeline_end: StateMap[str],
|
||||||
current: StateMap[str],
|
previous_timeline_end: StateMap[str],
|
||||||
lazy_load_members: bool,
|
lazy_load_members: bool,
|
||||||
) -> StateMap[str]:
|
) -> StateMap[str]:
|
||||||
"""Works out what state to include in a sync response.
|
"""Works out what state to include in a sync response.
|
||||||
|
@ -2225,45 +2260,50 @@ def _calculate_state(
|
||||||
Args:
|
Args:
|
||||||
timeline_contains: state in the timeline
|
timeline_contains: state in the timeline
|
||||||
timeline_start: state at the start of the timeline
|
timeline_start: state at the start of the timeline
|
||||||
previous: state at the end of the previous sync (or empty dict
|
timeline_end: state at the end of the timeline
|
||||||
|
previous_timeline_end: state at the end of the previous sync (or empty dict
|
||||||
if this is an initial sync)
|
if this is an initial sync)
|
||||||
current: state at the end of the timeline
|
|
||||||
lazy_load_members: whether to return members from timeline_start
|
lazy_load_members: whether to return members from timeline_start
|
||||||
or not. assumes that timeline_start has already been filtered to
|
or not. assumes that timeline_start has already been filtered to
|
||||||
include only the members the client needs to know about.
|
include only the members the client needs to know about.
|
||||||
"""
|
"""
|
||||||
event_id_to_key = {
|
event_id_to_state_key = {
|
||||||
e: key
|
event_id: state_key
|
||||||
for key, e in itertools.chain(
|
for state_key, event_id in itertools.chain(
|
||||||
timeline_contains.items(),
|
timeline_contains.items(),
|
||||||
previous.items(),
|
|
||||||
timeline_start.items(),
|
timeline_start.items(),
|
||||||
current.items(),
|
timeline_end.items(),
|
||||||
|
previous_timeline_end.items(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
c_ids = set(current.values())
|
timeline_end_ids = set(timeline_end.values())
|
||||||
ts_ids = set(timeline_start.values())
|
timeline_start_ids = set(timeline_start.values())
|
||||||
p_ids = set(previous.values())
|
previous_timeline_end_ids = set(previous_timeline_end.values())
|
||||||
tc_ids = set(timeline_contains.values())
|
timeline_contains_ids = set(timeline_contains.values())
|
||||||
|
|
||||||
# If we are lazyloading room members, we explicitly add the membership events
|
# If we are lazyloading room members, we explicitly add the membership events
|
||||||
# for the senders in the timeline into the state block returned by /sync,
|
# for the senders in the timeline into the state block returned by /sync,
|
||||||
# as we may not have sent them to the client before. We find these membership
|
# as we may not have sent them to the client before. We find these membership
|
||||||
# events by filtering them out of timeline_start, which has already been filtered
|
# events by filtering them out of timeline_start, which has already been filtered
|
||||||
# to only include membership events for the senders in the timeline.
|
# to only include membership events for the senders in the timeline.
|
||||||
# In practice, we can do this by removing them from the p_ids list,
|
# In practice, we can do this by removing them from the previous_timeline_end_ids
|
||||||
# which is the list of relevant state we know we have already sent to the client.
|
# list, which is the list of relevant state we know we have already sent to the
|
||||||
|
# client.
|
||||||
# see https://github.com/matrix-org/synapse/pull/2970/files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
|
# see https://github.com/matrix-org/synapse/pull/2970/files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
|
||||||
|
|
||||||
if lazy_load_members:
|
if lazy_load_members:
|
||||||
p_ids.difference_update(
|
previous_timeline_end_ids.difference_update(
|
||||||
e for t, e in timeline_start.items() if t[0] == EventTypes.Member
|
e for t, e in timeline_start.items() if t[0] == EventTypes.Member
|
||||||
)
|
)
|
||||||
|
|
||||||
state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
|
state_ids = (
|
||||||
|
(timeline_end_ids | timeline_start_ids)
|
||||||
|
- previous_timeline_end_ids
|
||||||
|
- timeline_contains_ids
|
||||||
|
)
|
||||||
|
|
||||||
return {event_id_to_key[e]: e for e in state_ids}
|
return {event_id_to_state_key[e]: e for e in state_ids}
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, auto_attribs=True)
|
@attr.s(slots=True, auto_attribs=True)
|
||||||
|
|
|
@ -73,8 +73,8 @@ async def filter_events_for_client(
|
||||||
* the user is not currently a member of the room, and:
|
* the user is not currently a member of the room, and:
|
||||||
* the user has not been a member of the room since the given
|
* the user has not been a member of the room since the given
|
||||||
events
|
events
|
||||||
always_include_ids: set of event ids to specifically
|
always_include_ids: set of event ids to specifically include, if present
|
||||||
include (unless sender is ignored)
|
in events (unless sender is ignored)
|
||||||
filter_send_to_client: Whether we're checking an event that's going to be
|
filter_send_to_client: Whether we're checking an event that's going to be
|
||||||
sent to a client. This might not always be the case since this function can
|
sent to a client. This might not always be the case since this function can
|
||||||
also be called to check whether a user can see the state at a given point.
|
also be called to check whether a user can see the state at a given point.
|
||||||
|
|
Loading…
Reference in a new issue