1
0
Fork 0
mirror of https://github.com/element-hq/synapse.git synced 2025-03-20 23:02:33 +00:00

Add a column participant to room_memberships table (#18068)

This commit is contained in:
Shay 2025-03-18 10:59:57 -07:00 committed by GitHub
parent bfafd0f2c7
commit 4b8dbe22c0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 373 additions and 1 deletions

1
changelog.d/18068.misc Normal file
View file

@ -0,0 +1 @@
Add a column `participant` to `room_memberships` table.

View file

@ -128,6 +128,7 @@ BOOLEAN_COLUMNS = {
"pushers": ["enabled"],
"redactions": ["have_censored"],
"remote_media_cache": ["authenticated"],
"room_memberships": ["participant"],
"room_stats_state": ["is_federatable"],
"rooms": ["is_public", "has_auth_chain_index"],
"sliding_sync_joined_rooms": ["is_encrypted"],

View file

@ -1462,6 +1462,12 @@ class EventCreationHandler:
)
return prev_event
if not event.is_state() and event.type in [
EventTypes.Message,
EventTypes.Encrypted,
]:
await self.store.set_room_participation(event.user_id, event.room_id)
if event.internal_metadata.is_out_of_band_membership():
# the only sort of out-of-band-membership events we expect to see here are
# invite rejections and rescinded knocks that we have generated ourselves.

View file

@ -79,6 +79,7 @@ logger = logging.getLogger(__name__)
_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
@attr.s(frozen=True, slots=True, auto_attribs=True)
@ -1606,6 +1607,66 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
from_ts,
)
async def set_room_participation(self, user_id: str, room_id: str) -> None:
"""
Record the provided user as participating in the given room
Args:
user_id: the user ID of the user
room_id: ID of the room to set the participant in
"""
def _set_room_participation_txn(
txn: LoggingTransaction, user_id: str, room_id: str
) -> None:
sql = """
UPDATE room_memberships
SET participant = true
WHERE (user_id, room_id) IN (
SELECT user_id, room_id
FROM room_memberships
WHERE user_id = ?
AND room_id = ?
ORDER BY event_stream_ordering DESC
LIMIT 1
)
"""
txn.execute(sql, (user_id, room_id))
await self.db_pool.runInteraction(
"_set_room_participation_txn", _set_room_participation_txn, user_id, room_id
)
async def get_room_participation(self, user_id: str, room_id: str) -> bool:
"""
Check whether a user is listed as a participant in a room
Args:
user_id: user ID of the user
room_id: ID of the room to check in
"""
def _get_room_participation_txn(
txn: LoggingTransaction, user_id: str, room_id: str
) -> bool:
sql = """
SELECT participant
FROM room_memberships
WHERE user_id = ?
AND room_id = ?
ORDER BY event_stream_ordering DESC
LIMIT 1
"""
txn.execute(sql, (user_id, room_id))
res = txn.fetchone()
if res:
return res[0]
return False
return await self.db_pool.runInteraction(
"_get_room_participation_txn", _get_room_participation_txn, user_id, room_id
)
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__(
@ -1636,6 +1697,93 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
columns=["user_id", "room_id"],
)
self.db_pool.updates.register_background_update_handler(
"populate_participant_bg_update", self._populate_participant
)
async def _populate_participant(self, progress: JsonDict, batch_size: int) -> int:
"""
Background update to populate column `participant` on `room_memberships` table
A 'participant' is someone who is currently joined to a room and has sent at least
one `m.room.message` or `m.room.encrypted` event.
This background update will set the `participant` column across all rows in
`room_memberships` based on the user's *current* join status, and if
they've *ever* sent a message or encrypted event. Therefore one should
never assume the `participant` column's value is based solely on whether
the user participated in a previous "session" (where a "session" is defined
as a period between the user joining and leaving). See
https://github.com/element-hq/synapse/pull/18068#discussion_r1931070291
for further detail.
"""
stream_token = progress.get("last_stream_token", None)
def _get_max_stream_token_txn(txn: LoggingTransaction) -> int:
sql = """
SELECT event_stream_ordering from room_memberships
ORDER BY event_stream_ordering DESC
LIMIT 1;
"""
txn.execute(sql)
res = txn.fetchone()
if not res or not res[0]:
return 0
return res[0]
def _background_populate_participant_txn(
txn: LoggingTransaction, stream_token: str
) -> None:
sql = """
UPDATE room_memberships
SET participant = True
FROM (
SELECT DISTINCT c.state_key, e.room_id
FROM current_state_events AS c
INNER JOIN events AS e ON c.room_id = e.room_id
WHERE c.membership = 'join'
AND c.state_key = e.sender
AND (
e.type = 'm.room.message'
OR e.type = 'm.room.encrypted'
)
) AS subquery
WHERE room_memberships.user_id = subquery.state_key
AND room_memberships.room_id = subquery.room_id
AND room_memberships.event_stream_ordering <= ?
AND room_memberships.event_stream_ordering > ?;
"""
batch = int(stream_token) - _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE
txn.execute(sql, (stream_token, batch))
if stream_token is None:
stream_token = await self.db_pool.runInteraction(
"_get_max_stream_token", _get_max_stream_token_txn
)
if stream_token < 0:
await self.db_pool.updates._end_background_update(
"populate_participant_bg_update"
)
return _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE
await self.db_pool.runInteraction(
"_background_populate_participant_txn",
_background_populate_participant_txn,
stream_token,
)
progress["last_stream_token"] = (
stream_token - _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE
)
await self.db_pool.runInteraction(
"populate_participant_bg_update",
self.db_pool.updates._background_update_progress_txn,
"populate_participant_bg_update",
progress,
)
return _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE
async def _background_add_membership_profile(
self, progress: JsonDict, batch_size: int
) -> int:

View file

@ -19,7 +19,7 @@
#
#
SCHEMA_VERSION = 89 # remember to update the list below when updating
SCHEMA_VERSION = 90 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
@ -158,6 +158,9 @@ Changes in SCHEMA_VERSION = 88
Changes in SCHEMA_VERSION = 89
- Add `state_groups_pending_deletion` and `state_groups_persisting` tables.
Changes in SCHEMA_VERSION = 90
- Add a column `participant` to `room_memberships` table
"""

View file

@ -0,0 +1,20 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Add a column `participant` to `room_memberships` table to track whether a room member has sent
-- a `m.room.message` or `m.room.encrypted` event into a room they are a member of
ALTER TABLE room_memberships ADD COLUMN participant BOOLEAN DEFAULT FALSE;
-- Add a background update to populate `participant` column
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(9001, 'populate_participant_bg_update', '{}');

View file

@ -4208,3 +4208,196 @@ class UserSuspensionTests(unittest.HomeserverTestCase):
shorthand=False,
)
self.assertEqual(channel.code, 200)
class RoomParticipantTestCase(unittest.HomeserverTestCase):
servlets = [
login.register_servlets,
room.register_servlets,
profile.register_servlets,
admin.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.user1 = self.register_user("thomas", "hackme")
self.tok1 = self.login("thomas", "hackme")
self.user2 = self.register_user("teresa", "hackme")
self.tok2 = self.login("teresa", "hackme")
self.room1 = self.helper.create_room_as(
room_creator=self.user1,
tok=self.tok1,
# Allow user2 to send state events into the room.
extra_content={
"power_level_content_override": {
"state_default": 0,
},
},
)
self.store = hs.get_datastores().main
@parameterized.expand(
[
# Should record participation.
param(
is_state=False,
event_type="m.room.message",
event_content={
"msgtype": "m.text",
"body": "I am engaging in this room",
},
record_participation=True,
),
param(
is_state=False,
event_type="m.room.encrypted",
event_content={
"algorithm": "m.megolm.v1.aes-sha2",
"ciphertext": "AwgAEnACgAkLmt6qF84IK++J7UDH2Za1YVchHyprqTqsg...",
"device_id": "RJYKSTBOIE",
"sender_key": "IlRMeOPX2e0MurIyfWEucYBRVOEEUMrOHqn/8mLqMjA",
"session_id": "X3lUlvLELLYxeTx4yOVu6UDpasGEVO0Jbu+QFnm0cKQ",
},
record_participation=True,
),
# Should not record participation.
param(
is_state=False,
event_type="m.sticker",
event_content={
"body": "My great sticker",
"info": {},
"url": "mxc://unused/mxcurl",
},
record_participation=False,
),
# An invalid **state event** with type `m.room.message`
param(
is_state=True,
event_type="m.room.message",
event_content={
"msgtype": "m.text",
"body": "I am engaging in this room",
},
record_participation=False,
),
# An invalid **state event** with type `m.room.encrypted`
# Note: this may become valid in the future with encrypted state, though we
# still may not want to consider it grounds for marking a user as participating.
param(
is_state=True,
event_type="m.room.encrypted",
event_content={
"algorithm": "m.megolm.v1.aes-sha2",
"ciphertext": "AwgAEnACgAkLmt6qF84IK++J7UDH2Za1YVchHyprqTqsg...",
"device_id": "RJYKSTBOIE",
"sender_key": "IlRMeOPX2e0MurIyfWEucYBRVOEEUMrOHqn/8mLqMjA",
"session_id": "X3lUlvLELLYxeTx4yOVu6UDpasGEVO0Jbu+QFnm0cKQ",
},
record_participation=False,
),
]
)
def test_sending_message_records_participation(
self,
is_state: bool,
event_type: str,
event_content: JsonDict,
record_participation: bool,
) -> None:
"""
Test that sending an various events into a room causes the user to
appropriately marked or not marked as a participant in that room.
"""
self.helper.join(self.room1, self.user2, tok=self.tok2)
# user has not sent any messages, so should not be a participant
participant = self.get_success(
self.store.get_room_participation(self.user2, self.room1)
)
self.assertFalse(participant)
# send an event into the room
if is_state:
# send a state event
self.helper.send_state(
self.room1,
event_type,
body=event_content,
tok=self.tok2,
)
else:
# send a non-state event
self.helper.send_event(
self.room1,
event_type,
content=event_content,
tok=self.tok2,
)
# check whether the user has been marked as a participant
participant = self.get_success(
self.store.get_room_participation(self.user2, self.room1)
)
self.assertEqual(participant, record_participation)
@parameterized.expand(
[
param(
event_type="m.room.message",
event_content={
"msgtype": "m.text",
"body": "I am engaging in this room",
},
),
param(
event_type="m.room.encrypted",
event_content={
"algorithm": "m.megolm.v1.aes-sha2",
"ciphertext": "AwgAEnACgAkLmt6qF84IK++J7UDH2Za1YVchHyprqTqsg...",
"device_id": "RJYKSTBOIE",
"sender_key": "IlRMeOPX2e0MurIyfWEucYBRVOEEUMrOHqn/8mLqMjA",
"session_id": "X3lUlvLELLYxeTx4yOVu6UDpasGEVO0Jbu+QFnm0cKQ",
},
),
]
)
def test_sending_event_and_leaving_does_not_record_participation(
self,
event_type: str,
event_content: JsonDict,
) -> None:
"""
Test that sending an event into a room that should mark a user as a
participant, but then leaving the room, results in the user no longer
be marked as a participant in that room.
"""
self.helper.join(self.room1, self.user2, tok=self.tok2)
# user has not sent any messages, so should not be a participant
participant = self.get_success(
self.store.get_room_participation(self.user2, self.room1)
)
self.assertFalse(participant)
# sending a message should now mark user as participant
self.helper.send_event(
self.room1,
event_type,
content=event_content,
tok=self.tok2,
)
participant = self.get_success(
self.store.get_room_participation(self.user2, self.room1)
)
self.assertTrue(participant)
# leave the room
self.helper.leave(self.room1, self.user2, tok=self.tok2)
# user should no longer be considered a participant
participant = self.get_success(
self.store.get_room_participation(self.user2, self.room1)
)
self.assertFalse(participant)