Merge branch 'develop' into madlittlemods/sliding-sync-faster-background-updates

Conflicts:
	synapse/storage/databases/main/events_bg_updates.py
	tests/storage/test_sliding_sync_tables.py
This commit is contained in:
Eric Eastwood 2024-09-11 11:34:15 -05:00
commit e5f133d0bf
11 changed files with 294 additions and 247 deletions

1
changelog.d/17652.misc Normal file
View file

@ -0,0 +1 @@
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.

1
changelog.d/17683.misc Normal file
View file

@ -0,0 +1 @@
Speed up sliding sync by reducing amount of data pulled out of the database for large rooms.

View file

@ -784,32 +784,10 @@ class SlidingSyncHandler:
): ):
avatar_changed = True avatar_changed = True
# We only need the room summary for calculating heroes, however if we do
# fetch it then we can use it to calculate `joined_count` and
# `invited_count`.
room_membership_summary: Optional[Mapping[str, MemberSummary]] = None room_membership_summary: Optional[Mapping[str, MemberSummary]] = None
empty_membership_summary = MemberSummary([], 0)
# We need the room summary for:
# - Always for initial syncs (or the first time we send down the room)
# - When the room has no name, we need `heroes`
# - When the membership has changed so we need to give updated `heroes` and
# `joined_count`/`invited_count`.
#
# Ideally, instead of just looking at `name_changed`, we'd check if the room
# name is not set but this is a good enough approximation that saves us from
# having to pull out the full event. This just means, we're generating the
# summary whenever the room name changes instead of only when it changes to
# `None`.
if initial or name_changed or membership_changed:
# We can't trace the function directly because it's cached and the `@cached`
# decorator doesn't mix with `@trace` yet.
with start_active_span("get_room_summary"):
if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
):
# TODO: Figure out how to get the membership summary for left/banned rooms
room_membership_summary = {}
else:
room_membership_summary = await self.store.get_room_summary(room_id)
# TODO: Reverse/rewind back to the `to_token`
# `heroes` are required if the room name is not set. # `heroes` are required if the room name is not set.
# #
@ -828,11 +806,45 @@ class SlidingSyncHandler:
# get them on initial syncs (or the first time we send down the room) or if the # get them on initial syncs (or the first time we send down the room) or if the
# membership has changed which may change the heroes. # membership has changed which may change the heroes.
if name_event_id is None and (initial or (not initial and membership_changed)): if name_event_id is None and (initial or (not initial and membership_changed)):
assert room_membership_summary is not None # We need the room summary to extract the heroes from
if room_membership_for_user_at_to_token.membership != Membership.JOIN:
# TODO: Figure out how to get the membership summary for left/banned rooms
# For invite/knock rooms we don't include the information.
room_membership_summary = {}
else:
room_membership_summary = await self.store.get_room_summary(room_id)
# TODO: Reverse/rewind back to the `to_token`
hero_user_ids = extract_heroes_from_room_summary( hero_user_ids = extract_heroes_from_room_summary(
room_membership_summary, me=user.to_string() room_membership_summary, me=user.to_string()
) )
# Fetch the membership counts for rooms we're joined to.
#
# Similarly to other metadata, we only need to calculate the member
# counts if this is an initial sync or the memberships have changed.
joined_count: Optional[int] = None
invited_count: Optional[int] = None
if (
initial or membership_changed
) and room_membership_for_user_at_to_token.membership == Membership.JOIN:
# If we have the room summary (because we calculated heroes above)
# then we can simply pull the counts from there.
if room_membership_summary is not None:
empty_membership_summary = MemberSummary([], 0)
joined_count = room_membership_summary.get(
Membership.JOIN, empty_membership_summary
).count
invited_count = room_membership_summary.get(
Membership.INVITE, empty_membership_summary
).count
else:
member_counts = await self.store.get_member_counts(room_id)
joined_count = member_counts.get(Membership.JOIN, 0)
invited_count = member_counts.get(Membership.INVITE, 0)
# Fetch the `required_state` for the room # Fetch the `required_state` for the room
# #
# No `required_state` for invite/knock rooms (just `stripped_state`) # No `required_state` for invite/knock rooms (just `stripped_state`)
@ -1090,20 +1102,6 @@ class SlidingSyncHandler:
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
joined_count: Optional[int] = None
if initial or membership_changed:
assert room_membership_summary is not None
joined_count = room_membership_summary.get(
Membership.JOIN, empty_membership_summary
).count
invited_count: Optional[int] = None
if initial or membership_changed:
assert room_membership_summary is not None
invited_count = room_membership_summary.get(
Membership.INVITE, empty_membership_summary
).count
return SlidingSyncResult.RoomResult( return SlidingSyncResult.RoomResult(
name=room_name, name=room_name,
avatar=room_avatar, avatar=room_avatar,

View file

@ -112,6 +112,7 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache( self._attempt_to_invalidate_cache(
"get_number_joined_users_in_room", (room_id,) "get_number_joined_users_in_room", (room_id,)
) )
self._attempt_to_invalidate_cache("get_member_counts", (room_id,))
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
# There's no easy way of invalidating this cache for just the users # There's no easy way of invalidating this cache for just the users
@ -153,6 +154,7 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,)) self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,)) self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_member_counts", (room_id,))
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None) self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None) self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)

View file

@ -1980,7 +1980,12 @@ class PersistEventsStore:
if state_key == (EventTypes.Create, ""): if state_key == (EventTypes.Create, ""):
room_type = event.content.get(EventContentFields.ROOM_TYPE) room_type = event.content.get(EventContentFields.ROOM_TYPE)
# Scrutinize JSON values # Scrutinize JSON values
if room_type is None or isinstance(room_type, str): if room_type is None or (
isinstance(room_type, str)
# We ignore values with null bytes as Postgres doesn't allow them in
# text columns.
and "\0" not in room_type
):
sliding_sync_insert_map["room_type"] = room_type sliding_sync_insert_map["room_type"] = room_type
elif state_key == (EventTypes.RoomEncryption, ""): elif state_key == (EventTypes.RoomEncryption, ""):
encryption_algorithm = event.content.get( encryption_algorithm = event.content.get(
@ -1990,15 +1995,26 @@ class PersistEventsStore:
sliding_sync_insert_map["is_encrypted"] = is_encrypted sliding_sync_insert_map["is_encrypted"] = is_encrypted
elif state_key == (EventTypes.Name, ""): elif state_key == (EventTypes.Name, ""):
room_name = event.content.get(EventContentFields.ROOM_NAME) room_name = event.content.get(EventContentFields.ROOM_NAME)
# Scrutinize JSON values # Scrutinize JSON values. We ignore values with nulls as
if room_name is None or isinstance(room_name, str): # postgres doesn't allow null bytes in text columns.
if room_name is None or (
isinstance(room_name, str)
# We ignore values with null bytes as Postgres doesn't allow them in
# text columns.
and "\0" not in room_name
):
sliding_sync_insert_map["room_name"] = room_name sliding_sync_insert_map["room_name"] = room_name
elif state_key == (EventTypes.Tombstone, ""): elif state_key == (EventTypes.Tombstone, ""):
successor_room_id = event.content.get( successor_room_id = event.content.get(
EventContentFields.TOMBSTONE_SUCCESSOR_ROOM EventContentFields.TOMBSTONE_SUCCESSOR_ROOM
) )
# Scrutinize JSON values # Scrutinize JSON values
if successor_room_id is None or isinstance(successor_room_id, str): if successor_room_id is None or (
isinstance(successor_room_id, str)
# We ignore values with null bytes as Postgres doesn't allow them in
# text columns.
and "\0" not in successor_room_id
):
sliding_sync_insert_map["tombstone_successor_room_id"] = ( sliding_sync_insert_map["tombstone_successor_room_id"] = (
successor_room_id successor_room_id
) )
@ -2081,6 +2097,21 @@ class PersistEventsStore:
else None else None
) )
# Check for null bytes in the room name and type. We have to
# ignore values with null bytes as Postgres doesn't allow them
# in text columns.
if (
sliding_sync_insert_map["room_name"] is not None
and "\0" in sliding_sync_insert_map["room_name"]
):
sliding_sync_insert_map.pop("room_name")
if (
sliding_sync_insert_map["room_type"] is not None
and "\0" in sliding_sync_insert_map["room_type"]
):
sliding_sync_insert_map.pop("room_type")
# Find the tombstone_successor_room_id # Find the tombstone_successor_room_id
# Note: This isn't one of the stripped state events according to the spec # Note: This isn't one of the stripped state events according to the spec
# but seems like there is no reason not to support this kind of thing. # but seems like there is no reason not to support this kind of thing.
@ -2095,6 +2126,12 @@ class PersistEventsStore:
else None else None
) )
if (
sliding_sync_insert_map["tombstone_successor_room_id"] is not None
and "\0" in sliding_sync_insert_map["tombstone_successor_room_id"]
):
sliding_sync_insert_map.pop("tombstone_successor_room_id")
else: else:
# No stripped state provided # No stripped state provided
sliding_sync_insert_map["has_known_state"] = False sliding_sync_insert_map["has_known_state"] = False

View file

@ -1963,7 +1963,18 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
def _find_memberships_to_update_txn( def _find_memberships_to_update_txn(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> List[ ) -> List[
Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool] Tuple[
str,
Optional[str],
Optional[str],
str,
str,
str,
str,
int,
Optional[str],
bool,
]
]: ]:
# Fetch the set of event IDs that we want to update # Fetch the set of event IDs that we want to update
# #
@ -1979,6 +1990,14 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# The fact the `events.stream_ordering` is a nullable column is a holdover # The fact the `events.stream_ordering` is a nullable column is a holdover
# from a rename of the column. # from a rename of the column.
# #
# We skip over rows which we've already handled, i.e. have a
# matching row in `sliding_sync_membership_snapshots` with the same
# room, user and event ID.
#
# We also ignore rooms that the user has left themselves (i.e. not
# kicked). This is to avoid having to port lots of old rooms that we
# will never send down sliding sync (as we exclude such rooms from
# initial syncs).
if initial_phase: if initial_phase:
# There are some old out-of-band memberships (before # There are some old out-of-band memberships (before
# https://github.com/matrix-org/synapse/issues/6983) where we don't have # https://github.com/matrix-org/synapse/issues/6983) where we don't have
@ -1990,6 +2009,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
SELECT SELECT
c.room_id, c.room_id,
r.room_id, r.room_id,
r.room_version,
c.user_id, c.user_id,
e.sender, e.sender,
c.event_id, c.event_id,
@ -1998,9 +2018,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
e.instance_name, e.instance_name,
e.outlier e.outlier
FROM local_current_membership AS c FROM local_current_membership AS c
LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id)
INNER JOIN events AS e USING (event_id) INNER JOIN events AS e USING (event_id)
LEFT JOIN rooms AS r ON (c.room_id = r.room_id) LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
WHERE (c.room_id, c.user_id) > (?, ?) WHERE (c.room_id, c.user_id) > (?, ?)
AND (m.user_id IS NULL OR c.event_id != m.membership_event_id)
ORDER BY c.room_id ASC, e.stream_ordering ASC, c.user_id ASC ORDER BY c.room_id ASC, e.stream_ordering ASC, c.user_id ASC
LIMIT ? LIMIT ?
""", """,
@ -2020,7 +2042,8 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
""" """
SELECT SELECT
c.room_id, c.room_id,
c.room_id, r.room_id,
r.room_version,
c.user_id, c.user_id,
e.sender, e.sender,
c.event_id, c.event_id,
@ -2029,8 +2052,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
e.instance_name, e.instance_name,
e.outlier e.outlier
FROM local_current_membership AS c FROM local_current_membership AS c
LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id)
INNER JOIN events AS e USING (event_id) INNER JOIN events AS e USING (event_id)
LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
WHERE e.stream_ordering > ? WHERE e.stream_ordering > ?
AND (m.user_id IS NULL OR c.event_id != m.membership_event_id)
ORDER BY e.stream_ordering ASC ORDER BY e.stream_ordering ASC
LIMIT ? LIMIT ?
""", """,
@ -2042,7 +2068,16 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
memberships_to_update_rows = cast( memberships_to_update_rows = cast(
List[ List[
Tuple[ Tuple[
str, Optional[str], str, str, str, str, int, Optional[str], bool str,
Optional[str],
Optional[str],
str,
str,
str,
str,
int,
Optional[str],
bool,
] ]
], ],
txn.fetchall(), txn.fetchall(),
@ -2075,7 +2110,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
def _find_previous_invite_or_knock_membership_txn( def _find_previous_invite_or_knock_membership_txn(
txn: LoggingTransaction, room_id: str, user_id: str, event_id: str txn: LoggingTransaction, room_id: str, user_id: str, event_id: str
) -> Tuple[str, str]: ) -> Optional[Tuple[str, str]]:
# Find the previous invite/knock event before the leave event # Find the previous invite/knock event before the leave event
# #
# Here are some notes on how we landed on this query: # Here are some notes on how we landed on this query:
@ -2125,8 +2160,13 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
) )
row = txn.fetchone() row = txn.fetchone()
# We should see a corresponding previous invite/knock event if row is None:
assert row is not None # Generally we should have an invite or knock event for leaves
# that are outliers, however this may not always be the case
# (e.g. a local user got kicked but the kick event got pulled in
# as an outlier).
return None
event_id, membership = row event_id, membership = row
return event_id, membership return event_id, membership
@ -2164,7 +2204,18 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
async def _handle_memberships_to_update_rows_for_room( async def _handle_memberships_to_update_rows_for_room(
memberships_to_update_rows: List[ memberships_to_update_rows: List[
Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool] Tuple[
str,
Optional[str],
Optional[str],
str,
str,
str,
str,
int,
Optional[str],
bool,
]
], ],
) -> None: ) -> None:
# We should have exited earlier if there were no rows to process # We should have exited earlier if there were no rows to process
@ -2175,6 +2226,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
for ( for (
room_id, room_id,
room_id_from_rooms_table, room_id_from_rooms_table,
room_version_id,
user_id, user_id,
sender, sender,
membership_event_id, membership_event_id,
@ -2198,6 +2250,14 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
Membership.BAN, Membership.BAN,
) )
if (
room_version_id is not None
and room_version_id not in KNOWN_ROOM_VERSIONS
):
# Ignore rooms with unknown room versions (these were
# experimental rooms, that we no longer support).
continue
# There are some old out-of-band memberships (before # There are some old out-of-band memberships (before
# https://github.com/matrix-org/synapse/issues/6983) where we don't have the # https://github.com/matrix-org/synapse/issues/6983) where we don't have the
# corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY` # corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY`
@ -2437,14 +2497,17 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# in the events table though. We'll just say that we don't # in the events table though. We'll just say that we don't
# know the state for these rooms and continue on with our # know the state for these rooms and continue on with our
# day. # day.
sliding_sync_membership_snapshots_insert_map[ sliding_sync_membership_snapshots_insert_map = {
"has_known_state" "has_known_state": False,
] = False "room_type": None,
"room_name": None,
"is_encrypted": False,
}
elif membership in (Membership.INVITE, Membership.KNOCK) or ( elif membership in (Membership.INVITE, Membership.KNOCK) or (
membership in (Membership.LEAVE, Membership.BAN) and is_outlier membership in (Membership.LEAVE, Membership.BAN) and is_outlier
): ):
invite_or_knock_event_id = membership_event_id invite_or_knock_event_id = None
invite_or_knock_membership = membership invite_or_knock_membership = None
# If the event is an `out_of_band_membership` (special case of # If the event is an `out_of_band_membership` (special case of
# `outlier`), we never had historical state so we have to pull from # `outlier`), we never had historical state so we have to pull from
@ -2453,37 +2516,55 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# membership (i.e. the room shouldn't disappear if your using the # membership (i.e. the room shouldn't disappear if your using the
# `is_encrypted` filter and you leave). # `is_encrypted` filter and you leave).
if membership in (Membership.LEAVE, Membership.BAN) and is_outlier: if membership in (Membership.LEAVE, Membership.BAN) and is_outlier:
( previous_membership = await self.db_pool.runInteraction(
invite_or_knock_event_id,
invite_or_knock_membership,
) = await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn", "sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn",
_find_previous_invite_or_knock_membership_txn, _find_previous_invite_or_knock_membership_txn,
room_id, room_id,
user_id, user_id,
membership_event_id, membership_event_id,
) )
if previous_membership is not None:
(
invite_or_knock_event_id,
invite_or_knock_membership,
) = previous_membership
else:
invite_or_knock_event_id = membership_event_id
invite_or_knock_membership = membership
# Pull from the stripped state on the invite/knock event if (
invite_or_knock_event = await self.get_event( invite_or_knock_event_id is not None
invite_or_knock_event_id and invite_or_knock_membership is not None
) ):
# Pull from the stripped state on the invite/knock event
raw_stripped_state_events = None invite_or_knock_event = await self.get_event(
if invite_or_knock_membership == Membership.INVITE: invite_or_knock_event_id
invite_room_state = invite_or_knock_event.unsigned.get(
"invite_room_state"
) )
raw_stripped_state_events = invite_room_state
elif invite_or_knock_membership == Membership.KNOCK:
knock_room_state = invite_or_knock_event.unsigned.get(
"knock_room_state"
)
raw_stripped_state_events = knock_room_state
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state( raw_stripped_state_events = None
raw_stripped_state_events if invite_or_knock_membership == Membership.INVITE:
) invite_room_state = invite_or_knock_event.unsigned.get(
"invite_room_state"
)
raw_stripped_state_events = invite_room_state
elif invite_or_knock_membership == Membership.KNOCK:
knock_room_state = invite_or_knock_event.unsigned.get(
"knock_room_state"
)
raw_stripped_state_events = knock_room_state
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state(
raw_stripped_state_events
)
else:
# We couldn't find any state for the membership, so we just have to
# leave it as empty.
sliding_sync_membership_snapshots_insert_map = {
"has_known_state": False,
"room_type": None,
"room_name": None,
"is_encrypted": False,
}
# We should have some insert values for each room, even if no # We should have some insert values for each room, even if no
# stripped state is on the event because we still want to record # stripped state is on the event because we still want to record
@ -2577,7 +2658,18 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
room_id_to_memberships_to_update_rows: Dict[ room_id_to_memberships_to_update_rows: Dict[
str, str,
List[ List[
Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool] Tuple[
str,
Optional[str],
Optional[str],
str,
str,
str,
str,
int,
Optional[str],
bool,
]
], ],
] = {} ] = {}
for row in memberships_to_update_rows: for row in memberships_to_update_rows:
@ -2623,7 +2715,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
str, str,
] ]
] = [] ] = []
forgotten_update_query_args: List[Tuple[str, str, str]] = [] forgotten_update_query_args: List[Tuple[str, str, str, str]] = []
for key, insert_map in to_insert_membership_snapshots.items(): for key, insert_map in to_insert_membership_snapshots.items():
room_id, user_id = key room_id, user_id = key
membership_info = to_insert_membership_infos[(room_id, user_id)] membership_info = to_insert_membership_infos[(room_id, user_id)]
@ -2659,6 +2751,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
membership_event_id, membership_event_id,
room_id, room_id,
user_id, user_id,
membership_event_id,
) )
) )
@ -2688,7 +2781,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
UPDATE sliding_sync_membership_snapshots UPDATE sliding_sync_membership_snapshots
SET SET
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?) forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
WHERE room_id = ? and user_id = ? WHERE room_id = ? and user_id = ? AND membership_event_id = ?
""", """,
forgotten_update_query_args, forgotten_update_query_args,
) )
@ -2701,6 +2794,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
( (
room_id, room_id,
_room_id_from_rooms_table, _room_id_from_rooms_table,
_room_version_id,
user_id, user_id,
_sender, _sender,
_membership_event_id, _membership_event_id,

View file

@ -312,18 +312,10 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
# We do this all in one transaction to keep the cache small. # We do this all in one transaction to keep the cache small.
# FIXME: get rid of this when we have room_stats # FIXME: get rid of this when we have room_stats
# Note, rejected events will have a null membership field, so counts = self._get_member_counts_txn(txn, room_id)
# we we manually filter them out.
sql = """
SELECT count(*), membership FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ?
AND membership IS NOT NULL
GROUP BY membership
"""
txn.execute(sql, (room_id,))
res: Dict[str, MemberSummary] = {} res: Dict[str, MemberSummary] = {}
for count, membership in txn: for membership, count in counts.items():
res.setdefault(membership, MemberSummary([], count)) res.setdefault(membership, MemberSummary([], count))
# Order by membership (joins -> invites -> leave (former insiders) -> # Order by membership (joins -> invites -> leave (former insiders) ->
@ -369,6 +361,31 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
"get_room_summary", _get_room_summary_txn "get_room_summary", _get_room_summary_txn
) )
@cached()
async def get_member_counts(self, room_id: str) -> Mapping[str, int]:
"""Get a mapping of number of users by membership"""
return await self.db_pool.runInteraction(
"get_member_counts", self._get_member_counts_txn, room_id
)
def _get_member_counts_txn(
self, txn: LoggingTransaction, room_id: str
) -> Dict[str, int]:
"""Get a mapping of number of users by membership"""
# Note, rejected events will have a null membership field, so
# we we manually filter them out.
sql = """
SELECT count(*), membership FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ?
AND membership IS NOT NULL
GROUP BY membership
"""
txn.execute(sql, (room_id,))
return {membership: count for count, membership in txn}
@cached() @cached()
async def get_number_joined_users_in_room(self, room_id: str) -> int: async def get_number_joined_users_in_room(self, room_id: str) -> int:
return await self.db_pool.simple_select_one_onecol( return await self.db_pool.simple_select_one_onecol(

View file

@ -736,6 +736,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index" EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events" DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events"
MEMBERS_CURRENT_STATE_UPDATE_NAME = "current_state_events_members_room_index"
def __init__( def __init__(
self, self,
@ -764,6 +765,13 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
self.DELETE_CURRENT_STATE_UPDATE_NAME, self.DELETE_CURRENT_STATE_UPDATE_NAME,
self._background_remove_left_rooms, self._background_remove_left_rooms,
) )
self.db_pool.updates.register_background_index_update(
self.MEMBERS_CURRENT_STATE_UPDATE_NAME,
index_name="current_state_events_members_room_index",
table="current_state_events",
columns=["room_id", "membership"],
where_clause="type='m.room.member'",
)
async def _background_remove_left_rooms( async def _background_remove_left_rooms(
self, progress: JsonDict, batch_size: int self, progress: JsonDict, batch_size: int

View file

@ -0,0 +1,19 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Add a background updates to add a new index:
-- `current_state_events(room_id, membership) WHERE type = 'm.room.member'
-- This makes counting membership in rooms (for syncs) much faster
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8701, 'current_state_events_members_room_index', '{}');

View file

@ -371,14 +371,17 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
"mxc://UPDATED_DUMMY_MEDIA_ID", "mxc://UPDATED_DUMMY_MEDIA_ID",
response_body["rooms"][room_id1], response_body["rooms"][room_id1],
) )
self.assertEqual(
response_body["rooms"][room_id1]["joined_count"], # We don't give extra room information to invitees
1, self.assertNotIn(
"joined_count",
response_body["rooms"][room_id1],
) )
self.assertEqual( self.assertNotIn(
response_body["rooms"][room_id1]["invited_count"], "invited_count",
1, response_body["rooms"][room_id1],
) )
self.assertIsNone( self.assertIsNone(
response_body["rooms"][room_id1].get("is_dm"), response_body["rooms"][room_id1].get("is_dm"),
) )
@ -450,15 +453,16 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
"mxc://DUMMY_MEDIA_ID", "mxc://DUMMY_MEDIA_ID",
response_body["rooms"][room_id1], response_body["rooms"][room_id1],
) )
self.assertEqual(
response_body["rooms"][room_id1]["joined_count"], # FIXME: We possibly want to return joined and invited counts for rooms
# FIXME: The actual number should be "1" (user2) but we currently don't # you're banned form
# support this for rooms where the user has left/been banned. self.assertNotIn(
0, "joined_count",
response_body["rooms"][room_id1],
) )
self.assertEqual( self.assertNotIn(
response_body["rooms"][room_id1]["invited_count"], "invited_count",
0, response_body["rooms"][room_id1],
) )
self.assertIsNone( self.assertIsNone(
response_body["rooms"][room_id1].get("is_dm"), response_body["rooms"][room_id1].get("is_dm"),
@ -692,19 +696,15 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
[], [],
) )
self.assertEqual( # FIXME: We possibly want to return joined and invited counts for rooms
response_body["rooms"][room_id1]["joined_count"], # you're banned form
# FIXME: The actual number should be "1" (user2) but we currently don't self.assertNotIn(
# support this for rooms where the user has left/been banned. "joined_count",
0, response_body["rooms"][room_id1],
) )
self.assertEqual( self.assertNotIn(
response_body["rooms"][room_id1]["invited_count"], "invited_count",
# We shouldn't see user5 since they were invited after user1 was banned. response_body["rooms"][room_id1],
#
# FIXME: The actual number should be "1" (user3) but we currently don't
# support this for rooms where the user has left/been banned.
0,
) )
def test_rooms_meta_heroes_incremental_sync_no_change(self) -> None: def test_rooms_meta_heroes_incremental_sync_no_change(self) -> None:

View file

@ -4416,136 +4416,6 @@ class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
), ),
) )
def test_membership_snapshots_background_update_forgotten_partial(self) -> None:
"""
Test an existing `sliding_sync_membership_snapshots` row is updated with the
latest `forgotten` status after the background update passes over it.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
# User1 joins the room
self.helper.join(room_id, user1_id, tok=user1_tok)
# User1 leaves the room (we have to leave in order to forget the room)
self.helper.leave(room_id, user1_id, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id)
)
# Forget the room
channel = self.make_request(
"POST",
f"/_matrix/client/r0/rooms/{room_id}/forget",
content={},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.result)
# Clean-up the `sliding_sync_joined_rooms` table as if the forgotten status
# never made it into the table.
self.get_success(
self.store.db_pool.simple_update(
table="sliding_sync_membership_snapshots",
keyvalues={"room_id": room_id},
updatevalues={"forgotten": 0},
desc="sliding_sync_membership_snapshots.test_membership_snapshots_background_update_forgotten_partial",
)
)
# We should see the partial row that we made in preparation for the test.
sliding_sync_membership_snapshots_results = (
self._get_sliding_sync_membership_snapshots()
)
self.assertIncludes(
set(sliding_sync_membership_snapshots_results.keys()),
{
(room_id, user1_id),
(room_id, user2_id),
},
exact=True,
)
user1_snapshot = _SlidingSyncMembershipSnapshotResult(
room_id=room_id,
user_id=user1_id,
sender=user1_id,
membership_event_id=state_map[(EventTypes.Member, user1_id)].event_id,
membership=Membership.LEAVE,
event_stream_ordering=state_map[
(EventTypes.Member, user1_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=False,
tombstone_successor_room_id=None,
# Room is *not* forgotten because of our test preparation
forgotten=False,
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user1_id)),
user1_snapshot,
)
user2_snapshot = _SlidingSyncMembershipSnapshotResult(
room_id=room_id,
user_id=user2_id,
sender=user2_id,
membership_event_id=state_map[(EventTypes.Member, user2_id)].event_id,
membership=Membership.JOIN,
event_stream_ordering=state_map[
(EventTypes.Member, user2_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=False,
tombstone_successor_room_id=None,
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user2_id)),
user2_snapshot,
)
# Insert and run the background update.
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
"progress_json": "{}",
},
)
)
self.store.db_pool.updates._all_done = False
self.wait_for_background_updates()
# Make sure the table is populated
sliding_sync_membership_snapshots_results = (
self._get_sliding_sync_membership_snapshots()
)
self.assertIncludes(
set(sliding_sync_membership_snapshots_results.keys()),
{
(room_id, user1_id),
(room_id, user2_id),
},
exact=True,
)
# Forgotten status is now updated
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user1_id)),
attr.evolve(user1_snapshot, forgotten=True),
)
# Holds the info according to the current state when the user joined
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user2_id)),
user2_snapshot,
)
def test_membership_snapshots_background_update_multiple_memberships(self) -> None: def test_membership_snapshots_background_update_multiple_memberships(self) -> None:
""" """
Test that the background update for `sliding_sync_membership_snapshots` works Test that the background update for `sliding_sync_membership_snapshots` works