diff --git a/changelog.d/18068.misc b/changelog.d/18068.misc new file mode 100644 index 0000000000..af6f78f549 --- /dev/null +++ b/changelog.d/18068.misc @@ -0,0 +1 @@ +Add a column `participant` to `room_memberships` table. \ No newline at end of file diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 3f67a739a0..1bb9940180 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -128,6 +128,7 @@ BOOLEAN_COLUMNS = { "pushers": ["enabled"], "redactions": ["have_censored"], "remote_media_cache": ["authenticated"], + "room_memberships": ["participant"], "room_stats_state": ["is_federatable"], "rooms": ["is_public", "has_auth_chain_index"], "sliding_sync_joined_rooms": ["is_encrypted"], diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4642b8b578..52c61cfa54 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1462,6 +1462,12 @@ class EventCreationHandler: ) return prev_event + if not event.is_state() and event.type in [ + EventTypes.Message, + EventTypes.Encrypted, + ]: + await self.store.set_room_participation(event.user_id, event.room_id) + if event.internal_metadata.is_out_of_band_membership(): # the only sort of out-of-band-membership events we expect to see here are # invite rejections and rescinded knocks that we have generated ourselves. diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 50ed6a28bf..a0a6dcd04e 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -79,6 +79,7 @@ logger = logging.getLogger(__name__) _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership" +_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000 @attr.s(frozen=True, slots=True, auto_attribs=True) @@ -1606,6 +1607,66 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): from_ts, ) + async def set_room_participation(self, user_id: str, room_id: str) -> None: + """ + Record the provided user as participating in the given room + + Args: + user_id: the user ID of the user + room_id: ID of the room to set the participant in + """ + + def _set_room_participation_txn( + txn: LoggingTransaction, user_id: str, room_id: str + ) -> None: + sql = """ + UPDATE room_memberships + SET participant = true + WHERE (user_id, room_id) IN ( + SELECT user_id, room_id + FROM room_memberships + WHERE user_id = ? + AND room_id = ? + ORDER BY event_stream_ordering DESC + LIMIT 1 + ) + """ + txn.execute(sql, (user_id, room_id)) + + await self.db_pool.runInteraction( + "_set_room_participation_txn", _set_room_participation_txn, user_id, room_id + ) + + async def get_room_participation(self, user_id: str, room_id: str) -> bool: + """ + Check whether a user is listed as a participant in a room + + Args: + user_id: user ID of the user + room_id: ID of the room to check in + """ + + def _get_room_participation_txn( + txn: LoggingTransaction, user_id: str, room_id: str + ) -> bool: + sql = """ + SELECT participant + FROM room_memberships + WHERE user_id = ? + AND room_id = ? + ORDER BY event_stream_ordering DESC + LIMIT 1 + """ + txn.execute(sql, (user_id, room_id)) + res = txn.fetchone() + if res: + return res[0] + return False + + return await self.db_pool.runInteraction( + "_get_room_participation_txn", _get_room_participation_txn, user_id, room_id + ) + class RoomMemberBackgroundUpdateStore(SQLBaseStore): def __init__( @@ -1636,6 +1697,93 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore): columns=["user_id", "room_id"], ) + self.db_pool.updates.register_background_update_handler( + "populate_participant_bg_update", self._populate_participant + ) + + async def _populate_participant(self, progress: JsonDict, batch_size: int) -> int: + """ + Background update to populate column `participant` on `room_memberships` table + + A 'participant' is someone who is currently joined to a room and has sent at least + one `m.room.message` or `m.room.encrypted` event. + + This background update will set the `participant` column across all rows in + `room_memberships` based on the user's *current* join status, and if + they've *ever* sent a message or encrypted event. Therefore one should + never assume the `participant` column's value is based solely on whether + the user participated in a previous "session" (where a "session" is defined + as a period between the user joining and leaving). See + https://github.com/element-hq/synapse/pull/18068#discussion_r1931070291 + for further detail. + """ + stream_token = progress.get("last_stream_token", None) + + def _get_max_stream_token_txn(txn: LoggingTransaction) -> int: + sql = """ + SELECT event_stream_ordering from room_memberships + ORDER BY event_stream_ordering DESC + LIMIT 1; + """ + txn.execute(sql) + res = txn.fetchone() + if not res or not res[0]: + return 0 + return res[0] + + def _background_populate_participant_txn( + txn: LoggingTransaction, stream_token: str + ) -> None: + sql = """ + UPDATE room_memberships + SET participant = True + FROM ( + SELECT DISTINCT c.state_key, e.room_id + FROM current_state_events AS c + INNER JOIN events AS e ON c.room_id = e.room_id + WHERE c.membership = 'join' + AND c.state_key = e.sender + AND ( + e.type = 'm.room.message' + OR e.type = 'm.room.encrypted' + ) + ) AS subquery + WHERE room_memberships.user_id = subquery.state_key + AND room_memberships.room_id = subquery.room_id + AND room_memberships.event_stream_ordering <= ? + AND room_memberships.event_stream_ordering > ?; + """ + batch = int(stream_token) - _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE + txn.execute(sql, (stream_token, batch)) + + if stream_token is None: + stream_token = await self.db_pool.runInteraction( + "_get_max_stream_token", _get_max_stream_token_txn + ) + + if stream_token < 0: + await self.db_pool.updates._end_background_update( + "populate_participant_bg_update" + ) + return _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE + + await self.db_pool.runInteraction( + "_background_populate_participant_txn", + _background_populate_participant_txn, + stream_token, + ) + + progress["last_stream_token"] = ( + stream_token - _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE + ) + await self.db_pool.runInteraction( + "populate_participant_bg_update", + self.db_pool.updates._background_update_progress_txn, + "populate_participant_bg_update", + progress, + ) + return _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE + async def _background_add_membership_profile( self, progress: JsonDict, batch_size: int ) -> int: diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 49e648a92f..f87b1a4a0a 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -19,7 +19,7 @@ # # -SCHEMA_VERSION = 89 # remember to update the list below when updating +SCHEMA_VERSION = 90 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -158,6 +158,9 @@ Changes in SCHEMA_VERSION = 88 Changes in SCHEMA_VERSION = 89 - Add `state_groups_pending_deletion` and `state_groups_persisting` tables. + +Changes in SCHEMA_VERSION = 90 + - Add a column `participant` to `room_memberships` table """ diff --git a/synapse/storage/schema/main/delta/90/01_add_column_participant_room_memberships_table.sql b/synapse/storage/schema/main/delta/90/01_add_column_participant_room_memberships_table.sql new file mode 100644 index 0000000000..672be1031e --- /dev/null +++ b/synapse/storage/schema/main/delta/90/01_add_column_participant_room_memberships_table.sql @@ -0,0 +1,20 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2025 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +-- Add a column `participant` to `room_memberships` table to track whether a room member has sent +-- a `m.room.message` or `m.room.encrypted` event into a room they are a member of +ALTER TABLE room_memberships ADD COLUMN participant BOOLEAN DEFAULT FALSE; + +-- Add a background update to populate `participant` column +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (9001, 'populate_participant_bg_update', '{}'); \ No newline at end of file diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index dd8350ddd1..6c93ead3b8 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -4208,3 +4208,196 @@ class UserSuspensionTests(unittest.HomeserverTestCase): shorthand=False, ) self.assertEqual(channel.code, 200) + + +class RoomParticipantTestCase(unittest.HomeserverTestCase): + servlets = [ + login.register_servlets, + room.register_servlets, + profile.register_servlets, + admin.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.user1 = self.register_user("thomas", "hackme") + self.tok1 = self.login("thomas", "hackme") + + self.user2 = self.register_user("teresa", "hackme") + self.tok2 = self.login("teresa", "hackme") + + self.room1 = self.helper.create_room_as( + room_creator=self.user1, + tok=self.tok1, + # Allow user2 to send state events into the room. + extra_content={ + "power_level_content_override": { + "state_default": 0, + }, + }, + ) + self.store = hs.get_datastores().main + + @parameterized.expand( + [ + # Should record participation. + param( + is_state=False, + event_type="m.room.message", + event_content={ + "msgtype": "m.text", + "body": "I am engaging in this room", + }, + record_participation=True, + ), + param( + is_state=False, + event_type="m.room.encrypted", + event_content={ + "algorithm": "m.megolm.v1.aes-sha2", + "ciphertext": "AwgAEnACgAkLmt6qF84IK++J7UDH2Za1YVchHyprqTqsg...", + "device_id": "RJYKSTBOIE", + "sender_key": "IlRMeOPX2e0MurIyfWEucYBRVOEEUMrOHqn/8mLqMjA", + "session_id": "X3lUlvLELLYxeTx4yOVu6UDpasGEVO0Jbu+QFnm0cKQ", + }, + record_participation=True, + ), + # Should not record participation. + param( + is_state=False, + event_type="m.sticker", + event_content={ + "body": "My great sticker", + "info": {}, + "url": "mxc://unused/mxcurl", + }, + record_participation=False, + ), + # An invalid **state event** with type `m.room.message` + param( + is_state=True, + event_type="m.room.message", + event_content={ + "msgtype": "m.text", + "body": "I am engaging in this room", + }, + record_participation=False, + ), + # An invalid **state event** with type `m.room.encrypted` + # Note: this may become valid in the future with encrypted state, though we + # still may not want to consider it grounds for marking a user as participating. + param( + is_state=True, + event_type="m.room.encrypted", + event_content={ + "algorithm": "m.megolm.v1.aes-sha2", + "ciphertext": "AwgAEnACgAkLmt6qF84IK++J7UDH2Za1YVchHyprqTqsg...", + "device_id": "RJYKSTBOIE", + "sender_key": "IlRMeOPX2e0MurIyfWEucYBRVOEEUMrOHqn/8mLqMjA", + "session_id": "X3lUlvLELLYxeTx4yOVu6UDpasGEVO0Jbu+QFnm0cKQ", + }, + record_participation=False, + ), + ] + ) + def test_sending_message_records_participation( + self, + is_state: bool, + event_type: str, + event_content: JsonDict, + record_participation: bool, + ) -> None: + """ + Test that sending an various events into a room causes the user to + appropriately marked or not marked as a participant in that room. + """ + self.helper.join(self.room1, self.user2, tok=self.tok2) + + # user has not sent any messages, so should not be a participant + participant = self.get_success( + self.store.get_room_participation(self.user2, self.room1) + ) + self.assertFalse(participant) + + # send an event into the room + if is_state: + # send a state event + self.helper.send_state( + self.room1, + event_type, + body=event_content, + tok=self.tok2, + ) + else: + # send a non-state event + self.helper.send_event( + self.room1, + event_type, + content=event_content, + tok=self.tok2, + ) + + # check whether the user has been marked as a participant + participant = self.get_success( + self.store.get_room_participation(self.user2, self.room1) + ) + self.assertEqual(participant, record_participation) + + @parameterized.expand( + [ + param( + event_type="m.room.message", + event_content={ + "msgtype": "m.text", + "body": "I am engaging in this room", + }, + ), + param( + event_type="m.room.encrypted", + event_content={ + "algorithm": "m.megolm.v1.aes-sha2", + "ciphertext": "AwgAEnACgAkLmt6qF84IK++J7UDH2Za1YVchHyprqTqsg...", + "device_id": "RJYKSTBOIE", + "sender_key": "IlRMeOPX2e0MurIyfWEucYBRVOEEUMrOHqn/8mLqMjA", + "session_id": "X3lUlvLELLYxeTx4yOVu6UDpasGEVO0Jbu+QFnm0cKQ", + }, + ), + ] + ) + def test_sending_event_and_leaving_does_not_record_participation( + self, + event_type: str, + event_content: JsonDict, + ) -> None: + """ + Test that sending an event into a room that should mark a user as a + participant, but then leaving the room, results in the user no longer + be marked as a participant in that room. + """ + self.helper.join(self.room1, self.user2, tok=self.tok2) + + # user has not sent any messages, so should not be a participant + participant = self.get_success( + self.store.get_room_participation(self.user2, self.room1) + ) + self.assertFalse(participant) + + # sending a message should now mark user as participant + self.helper.send_event( + self.room1, + event_type, + content=event_content, + tok=self.tok2, + ) + participant = self.get_success( + self.store.get_room_participation(self.user2, self.room1) + ) + self.assertTrue(participant) + + # leave the room + self.helper.leave(self.room1, self.user2, tok=self.tok2) + + # user should no longer be considered a participant + participant = self.get_success( + self.store.get_room_participation(self.user2, self.room1) + ) + self.assertFalse(participant)