From 8dea89837a58639350d43e3b5da54973061ad4dd Mon Sep 17 00:00:00 2001 From: Hugh Nimmo-Smith Date: Tue, 16 Jul 2024 17:58:18 +0100 Subject: [PATCH] Prototype of using m.typing to clean up MSC3401 call memberships Only apply logic where io.element.type=org.matrix.msc3401.call.member on the typing PUT --- synapse/handlers/typing.py | 117 +++++++++++++++++++++++++++++++----- synapse/rest/client/room.py | 8 ++- 2 files changed, 110 insertions(+), 15 deletions(-) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 4c87718337..e031cdf3b3 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -40,7 +40,9 @@ from synapse.types import ( StrCollection, StreamKeyType, UserID, + create_requester, ) +from synapse.types.state import StateFilter from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure from synapse.util.retryutils import filter_destinations_by_retry_limiter @@ -58,6 +60,8 @@ logger = logging.getLogger(__name__) class RoomMember: room_id: str user_id: str + type: str + device_id: Optional[str] = None # How often we expect remote servers to resend us presence. @@ -108,6 +112,8 @@ class FollowerTypingHandler: self.clock.looping_call(self._handle_timeouts, 5000) self.clock.looping_call(self._prune_old_typing, FORGET_TIMEOUT) + self.state = hs.get_state_handler() + self.event_creation_handler = hs.get_event_creation_handler() def _reset(self) -> None: """Reset the typing handler's data caches.""" @@ -130,9 +136,9 @@ class FollowerTypingHandler: members = set(self.wheel_timer.fetch(now)) for member in members: - self._handle_timeout_for_member(now, member) + await self._handle_timeout_for_member(now, member) - def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None: + async def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None: if not self.is_typing(member): # Nothing to do if they're no longer typing return @@ -283,8 +289,8 @@ class TypingWriterHandler(FollowerTypingHandler): "TypingStreamChangeCache", self._latest_room_serial ) - def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None: - super()._handle_timeout_for_member(now, member) + async def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None: + await super()._handle_timeout_for_member(now, member) if not self.is_typing(member): # Nothing to do if they're no longer typing @@ -293,11 +299,16 @@ class TypingWriterHandler(FollowerTypingHandler): until = self._member_typing_until.get(member, None) if not until or until <= now: logger.info("Timing out typing for: %s", member.user_id) - self._stopped_typing(member) + await self._stopped_typing(member) return async def started_typing( - self, target_user: UserID, requester: Requester, room_id: str, timeout: int + self, + target_user: UserID, + requester: Requester, + room_id: str, + timeout: int, + type: str, ) -> None: target_user_id = target_user.to_string() @@ -316,7 +327,12 @@ class TypingWriterHandler(FollowerTypingHandler): logger.debug("%s has started typing in %s", target_user_id, room_id) - member = RoomMember(room_id=room_id, user_id=target_user_id) + member = RoomMember( + room_id=room_id, + user_id=target_user_id, + device_id=requester.device_id, + type=type, + ) was_present = member.user_id in self._room_typing.get(room_id, set()) @@ -332,7 +348,7 @@ class TypingWriterHandler(FollowerTypingHandler): self._push_update(member=member, typing=True) async def stopped_typing( - self, target_user: UserID, requester: Requester, room_id: str + self, target_user: UserID, requester: Requester, room_id: str, type: str ) -> None: target_user_id = target_user.to_string() @@ -351,17 +367,90 @@ class TypingWriterHandler(FollowerTypingHandler): logger.debug("%s has stopped typing in %s", target_user_id, room_id) - member = RoomMember(room_id=room_id, user_id=target_user_id) + member = RoomMember( + room_id=room_id, + user_id=target_user_id, + device_id=requester.device_id, + type=type, + ) - self._stopped_typing(member) + await self._stopped_typing(member) - def user_left_room(self, user: UserID, room_id: str) -> None: + async def user_left_room(self, user: UserID, room_id: str) -> None: user_id = user.to_string() if self.is_mine_id(user_id): - member = RoomMember(room_id=room_id, user_id=user_id) - self._stopped_typing(member) + # We use a device_id of "*" to indicate that all devices will have left the room + member = RoomMember(room_id=room_id, user_id=user_id, device_id="*") + await self._stopped_typing(member) + + async def _stopped_typing(self, member: RoomMember) -> None: + logger.debug( + "User %s has stopped typing (type=%s) in %s on device %s", + member.user_id, + member.type, + member.room_id, + member.device_id, + ) + + if ( + member.type == "org.matrix.msc3401.call.member" + and member.device_id is not None + ): + # Check room state to see if any MSC3401 member events needing removal + event_filter = [ + ("org.matrix.msc3401.call.member", member.user_id), + ] + + state_filter = StateFilter.from_types(event_filter) + + state_ids = await self._storage_controllers.state.get_current_state_ids( + member.room_id, + state_filter, + ) + + state_events = await self.store.get_events(state_ids.values()) + + for event in state_events.values(): + if "memberships" in event.content: + # n.b. Simplified type + memberships: List[Dict[str, str]] = event.content["memberships"] + new_memberships = [] + modified = False + if member.device_id == "*": + # user has left room so remove all memberships if the were any + modified = len(memberships) > 0 + else: + # find a membership for this device_id + for membership in memberships: + if membership["device_id"] == member.device_id: + modified = True + else: + new_memberships.append(membership) + + if modified: + # send state event with new memberships + logger.info( + "Found matching MSC3401 call member event to cleanup. New memberships is %s", + new_memberships, + ) + requester = create_requester( + member.user_id, authenticated_entity=member.user_id + ) + + await self.event_creation_handler.create_and_send_nonmember_event( + requester, + { + "type": "org.matrix.msc3401.call.member", + "state_key": member.user_id, + "room_id": member.room_id, + "sender": member.user_id, + "content": { + "memberships": new_memberships, + }, + }, + ratelimit=False, + ) - def _stopped_typing(self, member: RoomMember) -> None: if member.user_id not in self._room_typing.get(member.room_id, set()): # No point return diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 903c74f6d8..33d6f05f69 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1256,6 +1256,8 @@ class RoomTypingRestServlet(RestServlet): # Limit timeout to stop people from setting silly typing timeouts. timeout = min(content.get("timeout", 30000), 120000) + type = content.get("io.element.type", "m.typing") + # Defer getting the typing handler since it will raise on WORKER_PATTERNS. typing_handler = self.hs.get_typing_writer_handler() @@ -1266,10 +1268,14 @@ class RoomTypingRestServlet(RestServlet): requester=requester, room_id=room_id, timeout=timeout, + type=type, ) else: await typing_handler.stopped_typing( - target_user=target_user, requester=requester, room_id=room_id + target_user=target_user, + requester=requester, + room_id=room_id, + type=type, ) except ShadowBanError: # Pretend this worked without error.