mirror of
https://github.com/element-hq/synapse.git
synced 2025-03-05 15:37:02 +00:00
Fix join being denied after being invited over federation (#18075)
This also happens for rejecting an invite. Basically, any out-of-band membership transition where we first get the membership as an `outlier` and then rely on federation filling us in to de-outlier it. This PR mainly addresses automated test flakiness, bots/scripts, and options within Synapse like [`auto_accept_invites`](https://element-hq.github.io/synapse/v1.122/usage/configuration/config_documentation.html#auto_accept_invites) that are able to react quickly (before federation is able to push us events), but also helps in generic scenarios where federation is lagging. I initially thought this might be a Synapse consistency issue (see issues labeled with [`Z-Read-After-Write`](https://github.com/matrix-org/synapse/labels/Z-Read-After-Write)) but it seems to be an event auth logic problem. Workers probably do increase the number of possible race condition scenarios that make this visible though (replication and cache invalidation lag). Fix https://github.com/element-hq/synapse/issues/15012 (probably fixes https://github.com/matrix-org/synapse/issues/15012 (https://github.com/element-hq/synapse/issues/15012)) Related to https://github.com/matrix-org/matrix-spec/issues/2062 Problems: 1. We don't consider [out-of-band membership](https://github.com/element-hq/synapse/blob/develop/docs/development/room-dag-concepts.md#out-of-band-membership-events) (outliers) in our `event_auth` logic even though we expose them in `/sync`. 1. (This PR doesn't address this point) Perhaps we should consider authing events in the persistence queue as events already in the queue could allow subsequent events to be allowed (events come through many channels: federation transaction, remote invite, remote join, local send). But this doesn't save us in the case where the event is more delayed over federation. ### What happened before? I wrote some Complement test that stresses this exact scenario and reproduces the problem: https://github.com/matrix-org/complement/pull/757 ``` COMPLEMENT_ALWAYS_PRINT_SERVER_LOGS=1 COMPLEMENT_DIR=../complement ./scripts-dev/complement.sh -run TestSynapseConsistency ``` We have `hs1` and `hs2` running in monolith mode (no workers): 1. `@charlie1:hs2` is invited and joins the room: 1. `hs1` invites `@charlie1:hs2` to a room which we receive on `hs2` as `PUT /_matrix/federation/v1/invite/{roomId}/{eventId}` (`on_invite_request(...)`) and the invite membership is persisted as an outlier. The `room_memberships` and `local_current_membership` database tables are also updated which means they are visible down `/sync` at this point. 1. `@charlie1:hs2` decides to join because it saw the invite down `/sync`. Because `hs2` is not yet in the room, this happens as a remote join `make_join`/`send_join` which comes back with all of the auth events needed to auth successfully and now `@charlie1:hs2` is successfully joined to the room. 1. `@charlie2:hs2` is invited and and tries to join the room: 1. `hs1` invites `@charlie2:hs2` to the room which we receive on `hs2` as `PUT /_matrix/federation/v1/invite/{roomId}/{eventId}` (`on_invite_request(...)`) and the invite membership is persisted as an outlier. The `room_memberships` and `local_current_membership` database tables are also updated which means they are visible down `/sync` at this point. 1. Because `hs2` is already participating in the room, we also see the invite come over federation in a transaction and we start processing it (not done yet, see below) 1. `@charlie2:hs2` decides to join because it saw the invite down `/sync`. Because `hs2`, is already in the room, this happens as a local join but we deny the event because our `event_auth` logic thinks that we have no membership in the room ❌ (expected to be able to join because we saw the invite down `/sync`) 1. We finally finish processing the `@charlie2:hs2` invite event from and de-outlier it. - If this finished before we tried to join we would have been fine but this is the race condition that makes this situation visible. Logs for `hs2`: ``` 🗳️ on_invite_request: handling event <FrozenEventV3 event_id=$PRPCvdXdcqyjdUKP_NxGF2CcukmwOaoK0ZR1WiVOZVk, type=m.room.member, state_key=@user-2-charlie1:hs2, membership=invite, outlier=False> 🔦 _store_room_members_txn update room_memberships: <FrozenEventV3 event_id=$PRPCvdXdcqyjdUKP_NxGF2CcukmwOaoK0ZR1WiVOZVk, type=m.room.member, state_key=@user-2-charlie1:hs2, membership=invite, outlier=True> 🔦 _store_room_members_txn update local_current_membership: <FrozenEventV3 event_id=$PRPCvdXdcqyjdUKP_NxGF2CcukmwOaoK0ZR1WiVOZVk, type=m.room.member, state_key=@user-2-charlie1:hs2, membership=invite, outlier=True> 📨 Notifying about new event <FrozenEventV3 event_id=$PRPCvdXdcqyjdUKP_NxGF2CcukmwOaoK0ZR1WiVOZVk, type=m.room.member, state_key=@user-2-charlie1:hs2, membership=invite, outlier=True> ✅ on_invite_request: handled event <FrozenEventV3 event_id=$PRPCvdXdcqyjdUKP_NxGF2CcukmwOaoK0ZR1WiVOZVk, type=m.room.member, state_key=@user-2-charlie1:hs2, membership=invite, outlier=True> 🧲 do_invite_join for @user-2-charlie1:hs2 in !sfZVBdLUezpPWetrol:hs1 🔦 _store_room_members_txn update room_memberships: <FrozenEventV3 event_id=$bwv8LxFnqfpsw_rhR7OrTjtz09gaJ23MqstKOcs7ygA, type=m.room.member, state_key=@user-1-alice:hs1, membership=join, outlier=True> 🔦 _store_room_members_txn update room_memberships: <FrozenEventV3 event_id=$oju1ts3G3pz5O62IesrxX5is4LxAwU3WPr4xvid5ijI, type=m.room.member, state_key=@user-2-charlie1:hs2, membership=join, outlier=False> 📨 Notifying about new event <FrozenEventV3 event_id=$oju1ts3G3pz5O62IesrxX5is4LxAwU3WPr4xvid5ijI, type=m.room.member, state_key=@user-2-charlie1:hs2, membership=join, outlier=False> ... 🗳️ on_invite_request: handling event <FrozenEventV3 event_id=$O_54j7O--6xMsegY5EVZ9SA-mI4_iHJOIoRwYyeWIPY, type=m.room.member, state_key=@user-3-charlie2:hs2, membership=invite, outlier=False> 🔦 _store_room_members_txn update room_memberships: <FrozenEventV3 event_id=$O_54j7O--6xMsegY5EVZ9SA-mI4_iHJOIoRwYyeWIPY, type=m.room.member, state_key=@user-3-charlie2:hs2, membership=invite, outlier=True> 🔦 _store_room_members_txn update local_current_membership: <FrozenEventV3 event_id=$O_54j7O--6xMsegY5EVZ9SA-mI4_iHJOIoRwYyeWIPY, type=m.room.member, state_key=@user-3-charlie2:hs2, membership=invite, outlier=True> 📨 Notifying about new event <FrozenEventV3 event_id=$O_54j7O--6xMsegY5EVZ9SA-mI4_iHJOIoRwYyeWIPY, type=m.room.member, state_key=@user-3-charlie2:hs2, membership=invite, outlier=True> ✅ on_invite_request: handled event <FrozenEventV3 event_id=$O_54j7O--6xMsegY5EVZ9SA-mI4_iHJOIoRwYyeWIPY, type=m.room.member, state_key=@user-3-charlie2:hs2, membership=invite, outlier=True> 📬 handling received PDU in room !sfZVBdLUezpPWetrol:hs1: <FrozenEventV3 event_id=$O_54j7O--6xMsegY5EVZ9SA-mI4_iHJOIoRwYyeWIPY, type=m.room.member, state_key=@user-3-charlie2:hs2, membership=invite, outlier=False> 📮 handle_new_client_event: handling <FrozenEventV3 event_id=$WNVDTQrxy5tCdPQHMyHyIn7tE4NWqKsZ8Bn8R4WbBSA, type=m.room.member, state_key=@user-3-charlie2:hs2, membership=join, outlier=False> ❌ Denying new event <FrozenEventV3 event_id=$WNVDTQrxy5tCdPQHMyHyIn7tE4NWqKsZ8Bn8R4WbBSA, type=m.room.member, state_key=@user-3-charlie2:hs2, membership=join, outlier=False> because 403: You are not invited to this room. synapse.http.server - 130 - INFO - POST-16 - <SynapseRequest at 0x7f460c91fbf0 method='POST' uri='/_matrix/client/v3/join/%21sfZVBdLUezpPWetrol:hs1?server_name=hs1' clientproto='HTTP/1.0' site='8080'> SynapseError: 403 - You are not invited to this room. 📨 Notifying about new event <FrozenEventV3 event_id=$O_54j7O--6xMsegY5EVZ9SA-mI4_iHJOIoRwYyeWIPY, type=m.room.member, state_key=@user-3-charlie2:hs2, membership=invite, outlier=False> ✅ handled received PDU in room !sfZVBdLUezpPWetrol:hs1: <FrozenEventV3 event_id=$O_54j7O--6xMsegY5EVZ9SA-mI4_iHJOIoRwYyeWIPY, type=m.room.member, state_key=@user-3-charlie2:hs2, membership=invite, outlier=False> ```
This commit is contained in:
parent
148e93576e
commit
6ec5e13ec9
16 changed files with 1341 additions and 442 deletions
1
changelog.d/18075.bugfix
Normal file
1
changelog.d/18075.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix join being denied after being invited over federation. Also fixes other out-of-band membership transitions.
|
|
@ -566,6 +566,7 @@ def _is_membership_change_allowed(
|
|||
logger.debug(
|
||||
"_is_membership_change_allowed: %s",
|
||||
{
|
||||
"caller_membership": caller.membership if caller else None,
|
||||
"caller_in_room": caller_in_room,
|
||||
"caller_invited": caller_invited,
|
||||
"caller_knocked": caller_knocked,
|
||||
|
@ -677,7 +678,8 @@ def _is_membership_change_allowed(
|
|||
and join_rule == JoinRules.KNOCK_RESTRICTED
|
||||
)
|
||||
):
|
||||
if not caller_in_room and not caller_invited:
|
||||
# You can only join the room if you are invited or are already in the room.
|
||||
if not (caller_in_room or caller_invited):
|
||||
raise AuthError(403, "You are not invited to this room.")
|
||||
else:
|
||||
# TODO (erikj): may_join list
|
||||
|
|
|
@ -42,7 +42,7 @@ import attr
|
|||
from typing_extensions import Literal
|
||||
from unpaddedbase64 import encode_base64
|
||||
|
||||
from synapse.api.constants import RelationTypes
|
||||
from synapse.api.constants import EventTypes, RelationTypes
|
||||
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
|
||||
from synapse.synapse_rust.events import EventInternalMetadata
|
||||
from synapse.types import JsonDict, StrCollection
|
||||
|
@ -325,12 +325,17 @@ class EventBase(metaclass=abc.ABCMeta):
|
|||
def __repr__(self) -> str:
|
||||
rejection = f"REJECTED={self.rejected_reason}, " if self.rejected_reason else ""
|
||||
|
||||
conditional_membership_string = ""
|
||||
if self.get("type") == EventTypes.Member:
|
||||
conditional_membership_string = f"membership={self.membership}, "
|
||||
|
||||
return (
|
||||
f"<{self.__class__.__name__} "
|
||||
f"{rejection}"
|
||||
f"event_id={self.event_id}, "
|
||||
f"type={self.get('type')}, "
|
||||
f"state_key={self.get('state_key')}, "
|
||||
f"{conditional_membership_string}"
|
||||
f"outlier={self.internal_metadata.is_outlier()}"
|
||||
">"
|
||||
)
|
||||
|
|
|
@ -24,7 +24,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
|
|||
import attr
|
||||
from signedjson.types import SigningKey
|
||||
|
||||
from synapse.api.constants import MAX_DEPTH
|
||||
from synapse.api.constants import MAX_DEPTH, EventTypes
|
||||
from synapse.api.room_versions import (
|
||||
KNOWN_EVENT_FORMAT_VERSIONS,
|
||||
EventFormatVersions,
|
||||
|
@ -109,6 +109,19 @@ class EventBuilder:
|
|||
def is_state(self) -> bool:
|
||||
return self._state_key is not None
|
||||
|
||||
def is_mine_id(self, user_id: str) -> bool:
|
||||
"""Determines whether a user ID or room alias originates from this homeserver.
|
||||
|
||||
Returns:
|
||||
`True` if the hostname part of the user ID or room alias matches this
|
||||
homeserver.
|
||||
`False` otherwise, or if the user ID or room alias is malformed.
|
||||
"""
|
||||
localpart_hostname = user_id.split(":", 1)
|
||||
if len(localpart_hostname) < 2:
|
||||
return False
|
||||
return localpart_hostname[1] == self._hostname
|
||||
|
||||
async def build(
|
||||
self,
|
||||
prev_event_ids: List[str],
|
||||
|
@ -142,6 +155,46 @@ class EventBuilder:
|
|||
self, state_ids
|
||||
)
|
||||
|
||||
# Check for out-of-band membership that may have been exposed on `/sync` but
|
||||
# the events have not been de-outliered yet so they won't be part of the
|
||||
# room state yet.
|
||||
#
|
||||
# This helps in situations where a remote homeserver invites a local user to
|
||||
# a room that we're already participating in; and we've persisted the invite
|
||||
# as an out-of-band membership (outlier), but it hasn't been pushed to us as
|
||||
# part of a `/send` transaction yet and de-outliered. This also helps for
|
||||
# any of the other out-of-band membership transitions.
|
||||
#
|
||||
# As an optimization, we could check if the room state already includes a
|
||||
# non-`leave` membership event, then we can assume the membership event has
|
||||
# been de-outliered and we don't need to check for an out-of-band
|
||||
# membership. But we don't have the necessary information from a
|
||||
# `StateMap[str]` and we'll just have to take the hit of this extra lookup
|
||||
# for any membership event for now.
|
||||
if self.type == EventTypes.Member and self.is_mine_id(self.state_key):
|
||||
(
|
||||
_membership,
|
||||
member_event_id,
|
||||
) = await self._store.get_local_current_membership_for_user_in_room(
|
||||
user_id=self.state_key,
|
||||
room_id=self.room_id,
|
||||
)
|
||||
# There is no need to check if the membership is actually an
|
||||
# out-of-band membership (`outlier`) as we would end up with the
|
||||
# same result either way (adding the member event to the
|
||||
# `auth_event_ids`).
|
||||
if (
|
||||
member_event_id is not None
|
||||
# We only need to be careful about duplicating the event in the
|
||||
# `auth_event_ids` list (duplicate `type`/`state_key` is part of the
|
||||
# authorization rules)
|
||||
and member_event_id not in auth_event_ids
|
||||
):
|
||||
auth_event_ids.append(member_event_id)
|
||||
# Also make sure to point to the previous membership event that will
|
||||
# allow this one to happen so the computed state works out.
|
||||
prev_event_ids.append(member_event_id)
|
||||
|
||||
format_version = self.room_version.event_format
|
||||
# The types of auth/prev events changes between event versions.
|
||||
prev_events: Union[StrCollection, List[Tuple[str, Dict[str, str]]]]
|
||||
|
|
|
@ -2272,8 +2272,9 @@ class FederationEventHandler:
|
|||
event_and_contexts, backfilled=backfilled
|
||||
)
|
||||
|
||||
# After persistence we always need to notify replication there may
|
||||
# be new data.
|
||||
# After persistence, we never notify clients (wake up `/sync` streams) about
|
||||
# backfilled events but it's important to let all the workers know about any
|
||||
# new event (backfilled or not) because TODO
|
||||
self._notifier.notify_replication()
|
||||
|
||||
if self._ephemeral_messages_enabled:
|
||||
|
|
|
@ -391,7 +391,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||
def is_mine(self, domain_specific_string: DomainSpecificString) -> bool:
|
||||
return domain_specific_string.domain == self.hostname
|
||||
|
||||
def is_mine_id(self, string: str) -> bool:
|
||||
def is_mine_id(self, user_id: str) -> bool:
|
||||
"""Determines whether a user ID or room alias originates from this homeserver.
|
||||
|
||||
Returns:
|
||||
|
@ -399,7 +399,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||
homeserver.
|
||||
`False` otherwise, or if the user ID or room alias is malformed.
|
||||
"""
|
||||
localpart_hostname = string.split(":", 1)
|
||||
localpart_hostname = user_id.split(":", 1)
|
||||
if len(localpart_hostname) < 2:
|
||||
return False
|
||||
return localpart_hostname[1] == self.hostname
|
||||
|
|
161
tests/federation/test_federation_devices.py
Normal file
161
tests/federation/test_federation_devices.py
Normal file
|
@ -0,0 +1,161 @@
|
|||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2024 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
# Originally licensed under the Apache License, Version 2.0:
|
||||
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||
#
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
|
||||
import logging
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.handlers.device import DeviceListUpdater
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import Clock
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
from tests import unittest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DeviceListResyncTestCase(unittest.HomeserverTestCase):
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = self.hs.get_datastores().main
|
||||
|
||||
def test_retry_device_list_resync(self) -> None:
|
||||
"""Tests that device lists are marked as stale if they couldn't be synced, and
|
||||
that stale device lists are retried periodically.
|
||||
"""
|
||||
remote_user_id = "@john:test_remote"
|
||||
remote_origin = "test_remote"
|
||||
|
||||
# Track the number of attempts to resync the user's device list.
|
||||
self.resync_attempts = 0
|
||||
|
||||
# When this function is called, increment the number of resync attempts (only if
|
||||
# we're querying devices for the right user ID), then raise a
|
||||
# NotRetryingDestination error to fail the resync gracefully.
|
||||
def query_user_devices(
|
||||
destination: str, user_id: str, timeout: int = 30000
|
||||
) -> JsonDict:
|
||||
if user_id == remote_user_id:
|
||||
self.resync_attempts += 1
|
||||
|
||||
raise NotRetryingDestination(0, 0, destination)
|
||||
|
||||
# Register the mock on the federation client.
|
||||
federation_client = self.hs.get_federation_client()
|
||||
federation_client.query_user_devices = Mock(side_effect=query_user_devices) # type: ignore[method-assign]
|
||||
|
||||
# Register a mock on the store so that the incoming update doesn't fail because
|
||||
# we don't share a room with the user.
|
||||
self.store.get_rooms_for_user = AsyncMock(return_value=["!someroom:test"])
|
||||
|
||||
# Manually inject a fake device list update. We need this update to include at
|
||||
# least one prev_id so that the user's device list will need to be retried.
|
||||
device_list_updater = self.hs.get_device_handler().device_list_updater
|
||||
assert isinstance(device_list_updater, DeviceListUpdater)
|
||||
self.get_success(
|
||||
device_list_updater.incoming_device_list_update(
|
||||
origin=remote_origin,
|
||||
edu_content={
|
||||
"deleted": False,
|
||||
"device_display_name": "Mobile",
|
||||
"device_id": "QBUAZIFURK",
|
||||
"prev_id": [5],
|
||||
"stream_id": 6,
|
||||
"user_id": remote_user_id,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# Check that there was one resync attempt.
|
||||
self.assertEqual(self.resync_attempts, 1)
|
||||
|
||||
# Check that the resync attempt failed and caused the user's device list to be
|
||||
# marked as stale.
|
||||
need_resync = self.get_success(
|
||||
self.store.get_user_ids_requiring_device_list_resync()
|
||||
)
|
||||
self.assertIn(remote_user_id, need_resync)
|
||||
|
||||
# Check that waiting for 30 seconds caused Synapse to retry resyncing the device
|
||||
# list.
|
||||
self.reactor.advance(30)
|
||||
self.assertEqual(self.resync_attempts, 2)
|
||||
|
||||
def test_cross_signing_keys_retry(self) -> None:
|
||||
"""Tests that resyncing a device list correctly processes cross-signing keys from
|
||||
the remote server.
|
||||
"""
|
||||
remote_user_id = "@john:test_remote"
|
||||
remote_master_key = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY"
|
||||
remote_self_signing_key = "QeIiFEjluPBtI7WQdG365QKZcFs9kqmHir6RBD0//nQ"
|
||||
|
||||
# Register mock device list retrieval on the federation client.
|
||||
federation_client = self.hs.get_federation_client()
|
||||
federation_client.query_user_devices = AsyncMock( # type: ignore[method-assign]
|
||||
return_value={
|
||||
"user_id": remote_user_id,
|
||||
"stream_id": 1,
|
||||
"devices": [],
|
||||
"master_key": {
|
||||
"user_id": remote_user_id,
|
||||
"usage": ["master"],
|
||||
"keys": {"ed25519:" + remote_master_key: remote_master_key},
|
||||
},
|
||||
"self_signing_key": {
|
||||
"user_id": remote_user_id,
|
||||
"usage": ["self_signing"],
|
||||
"keys": {
|
||||
"ed25519:" + remote_self_signing_key: remote_self_signing_key
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
# Resync the device list.
|
||||
device_handler = self.hs.get_device_handler()
|
||||
self.get_success(
|
||||
device_handler.device_list_updater.multi_user_device_resync(
|
||||
[remote_user_id]
|
||||
),
|
||||
)
|
||||
|
||||
# Retrieve the cross-signing keys for this user.
|
||||
keys = self.get_success(
|
||||
self.store.get_e2e_cross_signing_keys_bulk(user_ids=[remote_user_id]),
|
||||
)
|
||||
self.assertIn(remote_user_id, keys)
|
||||
key = keys[remote_user_id]
|
||||
assert key is not None
|
||||
|
||||
# Check that the master key is the one returned by the mock.
|
||||
master_key = key["master"]
|
||||
self.assertEqual(len(master_key["keys"]), 1)
|
||||
self.assertTrue("ed25519:" + remote_master_key in master_key["keys"].keys())
|
||||
self.assertTrue(remote_master_key in master_key["keys"].values())
|
||||
|
||||
# Check that the self-signing key is the one returned by the mock.
|
||||
self_signing_key = key["self_signing"]
|
||||
self.assertEqual(len(self_signing_key["keys"]), 1)
|
||||
self.assertTrue(
|
||||
"ed25519:" + remote_self_signing_key in self_signing_key["keys"].keys(),
|
||||
)
|
||||
self.assertTrue(remote_self_signing_key in self_signing_key["keys"].values())
|
671
tests/federation/test_federation_out_of_band_membership.py
Normal file
671
tests/federation/test_federation_out_of_band_membership.py
Normal file
|
@ -0,0 +1,671 @@
|
|||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
# Originally licensed under the Apache License, Version 2.0:
|
||||
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||
#
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
|
||||
import logging
|
||||
import time
|
||||
import urllib.parse
|
||||
from http import HTTPStatus
|
||||
from typing import Any, Callable, Optional, Set, Tuple, TypeVar, Union
|
||||
from unittest.mock import Mock
|
||||
|
||||
import attr
|
||||
from parameterized import parameterized
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.api.constants import EventContentFields, EventTypes, Membership
|
||||
from synapse.api.room_versions import RoomVersion, RoomVersions
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.events.utils import strip_event
|
||||
from synapse.federation.federation_base import (
|
||||
event_from_pdu_json,
|
||||
)
|
||||
from synapse.federation.transport.client import SendJoinResponse
|
||||
from synapse.http.matrixfederationclient import (
|
||||
ByteParser,
|
||||
)
|
||||
from synapse.http.types import QueryParams
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client import login, room, sync
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict, MutableStateMap, StateMap
|
||||
from synapse.types.handlers.sliding_sync import (
|
||||
StateValues,
|
||||
)
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
from tests.utils import test_timeout
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def required_state_json_to_state_map(required_state: Any) -> StateMap[EventBase]:
|
||||
state_map: MutableStateMap[EventBase] = {}
|
||||
|
||||
# Scrutinize JSON values to ensure it's in the expected format
|
||||
if isinstance(required_state, list):
|
||||
for state_event_dict in required_state:
|
||||
# Yell because we're in a test and this is unexpected
|
||||
assert isinstance(
|
||||
state_event_dict, dict
|
||||
), "`required_state` should be a list of event dicts"
|
||||
|
||||
event_type = state_event_dict["type"]
|
||||
event_state_key = state_event_dict["state_key"]
|
||||
|
||||
# Yell because we're in a test and this is unexpected
|
||||
assert isinstance(
|
||||
event_type, str
|
||||
), "Each event in `required_state` should have a string `type`"
|
||||
assert isinstance(
|
||||
event_state_key, str
|
||||
), "Each event in `required_state` should have a string `state_key`"
|
||||
|
||||
state_map[(event_type, event_state_key)] = make_event_from_dict(
|
||||
state_event_dict
|
||||
)
|
||||
else:
|
||||
# Yell because we're in a test and this is unexpected
|
||||
raise AssertionError("`required_state` should be a list of event dicts")
|
||||
|
||||
return state_map
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class RemoteRoomJoinResult:
|
||||
remote_room_id: str
|
||||
room_version: RoomVersion
|
||||
remote_room_creator_user_id: str
|
||||
local_user1_id: str
|
||||
local_user1_tok: str
|
||||
state_map: StateMap[EventBase]
|
||||
|
||||
|
||||
class OutOfBandMembershipTests(unittest.FederatingHomeserverTestCase):
|
||||
"""
|
||||
Tests to make sure that interactions with out-of-band membership (outliers) works as
|
||||
expected.
|
||||
|
||||
- invites received over federation, before we join the room
|
||||
- *rejections* for said invites
|
||||
|
||||
See the "Out-of-band membership events" section in
|
||||
`docs/development/room-dag-concepts.md` for more information.
|
||||
"""
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
sync.register_servlets,
|
||||
]
|
||||
|
||||
sync_endpoint = "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
|
||||
|
||||
def default_config(self) -> JsonDict:
|
||||
conf = super().default_config()
|
||||
# Federation sending is disabled by default in the test environment
|
||||
# so we need to enable it like this.
|
||||
conf["federation_sender_instances"] = ["master"]
|
||||
|
||||
return conf
|
||||
|
||||
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
|
||||
self.federation_http_client = Mock(
|
||||
# The problem with using `spec=MatrixFederationHttpClient` here is that it
|
||||
# requires everything to be mocked which is a lot of work that I don't want
|
||||
# to do when the code only uses a few methods (`get_json` and `put_json`).
|
||||
)
|
||||
return self.setup_test_homeserver(
|
||||
federation_http_client=self.federation_http_client
|
||||
)
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
super().prepare(reactor, clock, hs)
|
||||
|
||||
self.store = self.hs.get_datastores().main
|
||||
self.storage_controllers = hs.get_storage_controllers()
|
||||
|
||||
def do_sync(
|
||||
self, sync_body: JsonDict, *, since: Optional[str] = None, tok: str
|
||||
) -> Tuple[JsonDict, str]:
|
||||
"""Do a sliding sync request with given body.
|
||||
|
||||
Asserts the request was successful.
|
||||
|
||||
Attributes:
|
||||
sync_body: The full request body to use
|
||||
since: Optional since token
|
||||
tok: Access token to use
|
||||
|
||||
Returns:
|
||||
A tuple of the response body and the `pos` field.
|
||||
"""
|
||||
|
||||
sync_path = self.sync_endpoint
|
||||
if since:
|
||||
sync_path += f"?pos={since}"
|
||||
|
||||
channel = self.make_request(
|
||||
method="POST",
|
||||
path=sync_path,
|
||||
content=sync_body,
|
||||
access_token=tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
return channel.json_body, channel.json_body["pos"]
|
||||
|
||||
def _invite_local_user_to_remote_room_and_join(self) -> RemoteRoomJoinResult:
|
||||
"""
|
||||
Helper to reproduce this scenario:
|
||||
|
||||
1. The remote user invites our local user to a room on their remote server (which
|
||||
creates an out-of-band invite membership for user1 on our local server).
|
||||
2. The local user notices the invite from `/sync`.
|
||||
3. The local user joins the room.
|
||||
4. The local user can see that they are now joined to the room from `/sync`.
|
||||
"""
|
||||
|
||||
# Create a local user
|
||||
local_user1_id = self.register_user("user1", "pass")
|
||||
local_user1_tok = self.login(local_user1_id, "pass")
|
||||
|
||||
# Create a remote room
|
||||
room_creator_user_id = f"@remote-user:{self.OTHER_SERVER_NAME}"
|
||||
remote_room_id = f"!remote-room:{self.OTHER_SERVER_NAME}"
|
||||
room_version = RoomVersions.V10
|
||||
|
||||
room_create_event = make_event_from_dict(
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"room_id": remote_room_id,
|
||||
"sender": room_creator_user_id,
|
||||
"depth": 1,
|
||||
"origin_server_ts": 1,
|
||||
"type": EventTypes.Create,
|
||||
"state_key": "",
|
||||
"content": {
|
||||
# The `ROOM_CREATOR` field could be removed if we used a room
|
||||
# version > 10 (in favor of relying on `sender`)
|
||||
EventContentFields.ROOM_CREATOR: room_creator_user_id,
|
||||
EventContentFields.ROOM_VERSION: room_version.identifier,
|
||||
},
|
||||
"auth_events": [],
|
||||
"prev_events": [],
|
||||
}
|
||||
),
|
||||
room_version=room_version,
|
||||
)
|
||||
|
||||
creator_membership_event = make_event_from_dict(
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"room_id": remote_room_id,
|
||||
"sender": room_creator_user_id,
|
||||
"depth": 2,
|
||||
"origin_server_ts": 2,
|
||||
"type": EventTypes.Member,
|
||||
"state_key": room_creator_user_id,
|
||||
"content": {"membership": Membership.JOIN},
|
||||
"auth_events": [room_create_event.event_id],
|
||||
"prev_events": [room_create_event.event_id],
|
||||
}
|
||||
),
|
||||
room_version=room_version,
|
||||
)
|
||||
|
||||
# From the remote homeserver, invite user1 on the local homserver
|
||||
user1_invite_membership_event = make_event_from_dict(
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"room_id": remote_room_id,
|
||||
"sender": room_creator_user_id,
|
||||
"depth": 3,
|
||||
"origin_server_ts": 3,
|
||||
"type": EventTypes.Member,
|
||||
"state_key": local_user1_id,
|
||||
"content": {"membership": Membership.INVITE},
|
||||
"auth_events": [
|
||||
room_create_event.event_id,
|
||||
creator_membership_event.event_id,
|
||||
],
|
||||
"prev_events": [creator_membership_event.event_id],
|
||||
}
|
||||
),
|
||||
room_version=room_version,
|
||||
)
|
||||
channel = self.make_signed_federation_request(
|
||||
"PUT",
|
||||
f"/_matrix/federation/v2/invite/{remote_room_id}/{user1_invite_membership_event.event_id}",
|
||||
content={
|
||||
"event": user1_invite_membership_event.get_dict(),
|
||||
"invite_room_state": [
|
||||
strip_event(room_create_event),
|
||||
],
|
||||
"room_version": room_version.identifier,
|
||||
},
|
||||
)
|
||||
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
|
||||
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [(EventTypes.Member, StateValues.WILDCARD)],
|
||||
"timeline_limit": 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Sync until the local user1 can see the invite
|
||||
with test_timeout(
|
||||
3,
|
||||
"Unable to find user1's invite event in the room",
|
||||
):
|
||||
while True:
|
||||
response_body, _ = self.do_sync(sync_body, tok=local_user1_tok)
|
||||
if (
|
||||
remote_room_id in response_body["rooms"].keys()
|
||||
# If they have `invite_state` for the room, they are invited
|
||||
and len(
|
||||
response_body["rooms"][remote_room_id].get("invite_state", [])
|
||||
)
|
||||
> 0
|
||||
):
|
||||
break
|
||||
|
||||
# Prevent tight-looping to allow the `test_timeout` to work
|
||||
time.sleep(0.1)
|
||||
|
||||
user1_join_membership_event_template = make_event_from_dict(
|
||||
{
|
||||
"room_id": remote_room_id,
|
||||
"sender": local_user1_id,
|
||||
"depth": 4,
|
||||
"origin_server_ts": 4,
|
||||
"type": EventTypes.Member,
|
||||
"state_key": local_user1_id,
|
||||
"content": {"membership": Membership.JOIN},
|
||||
"auth_events": [
|
||||
room_create_event.event_id,
|
||||
user1_invite_membership_event.event_id,
|
||||
],
|
||||
"prev_events": [user1_invite_membership_event.event_id],
|
||||
},
|
||||
room_version=room_version,
|
||||
)
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
# Mock the remote homeserver responding to our HTTP requests
|
||||
#
|
||||
# We're going to mock the following endpoints so that user1 can join the remote room:
|
||||
# - GET /_matrix/federation/v1/make_join/{room_id}/{user_id}
|
||||
# - PUT /_matrix/federation/v2/send_join/{room_id}/{user_id}
|
||||
#
|
||||
async def get_json(
|
||||
destination: str,
|
||||
path: str,
|
||||
args: Optional[QueryParams] = None,
|
||||
retry_on_dns_fail: bool = True,
|
||||
timeout: Optional[int] = None,
|
||||
ignore_backoff: bool = False,
|
||||
try_trailing_slash_on_400: bool = False,
|
||||
parser: Optional[ByteParser[T]] = None,
|
||||
) -> Union[JsonDict, T]:
|
||||
if (
|
||||
path
|
||||
== f"/_matrix/federation/v1/make_join/{urllib.parse.quote_plus(remote_room_id)}/{urllib.parse.quote_plus(local_user1_id)}"
|
||||
):
|
||||
return {
|
||||
"event": user1_join_membership_event_template.get_pdu_json(),
|
||||
"room_version": room_version.identifier,
|
||||
}
|
||||
|
||||
raise NotImplementedError(
|
||||
"We have not mocked a response for `get_json(...)` for the following endpoint yet: "
|
||||
+ f"{destination}{path}"
|
||||
)
|
||||
|
||||
self.federation_http_client.get_json.side_effect = get_json
|
||||
|
||||
# PDU's that hs1 sent to hs2
|
||||
collected_pdus_from_hs1_federation_send: Set[str] = set()
|
||||
|
||||
async def put_json(
|
||||
destination: str,
|
||||
path: str,
|
||||
args: Optional[QueryParams] = None,
|
||||
data: Optional[JsonDict] = None,
|
||||
json_data_callback: Optional[Callable[[], JsonDict]] = None,
|
||||
long_retries: bool = False,
|
||||
timeout: Optional[int] = None,
|
||||
ignore_backoff: bool = False,
|
||||
backoff_on_404: bool = False,
|
||||
try_trailing_slash_on_400: bool = False,
|
||||
parser: Optional[ByteParser[T]] = None,
|
||||
backoff_on_all_error_codes: bool = False,
|
||||
) -> Union[JsonDict, T, SendJoinResponse]:
|
||||
if (
|
||||
path.startswith(
|
||||
f"/_matrix/federation/v2/send_join/{urllib.parse.quote_plus(remote_room_id)}/"
|
||||
)
|
||||
and data is not None
|
||||
and data.get("type") == EventTypes.Member
|
||||
and data.get("state_key") == local_user1_id
|
||||
# We're assuming this is a `ByteParser[SendJoinResponse]`
|
||||
and parser is not None
|
||||
):
|
||||
# As the remote server, we need to sign the event before sending it back
|
||||
user1_join_membership_event_signed = make_event_from_dict(
|
||||
self.add_hashes_and_signatures_from_other_server(data),
|
||||
room_version=room_version,
|
||||
)
|
||||
|
||||
# Since they passed in a `parser`, we need to return the type that
|
||||
# they're expecting instead of just a `JsonDict`
|
||||
return SendJoinResponse(
|
||||
auth_events=[
|
||||
room_create_event,
|
||||
user1_invite_membership_event,
|
||||
],
|
||||
state=[
|
||||
room_create_event,
|
||||
creator_membership_event,
|
||||
user1_invite_membership_event,
|
||||
],
|
||||
event_dict=user1_join_membership_event_signed.get_pdu_json(),
|
||||
event=user1_join_membership_event_signed,
|
||||
members_omitted=False,
|
||||
servers_in_room=[
|
||||
self.OTHER_SERVER_NAME,
|
||||
],
|
||||
)
|
||||
|
||||
if path.startswith("/_matrix/federation/v1/send/") and data is not None:
|
||||
for pdu in data.get("pdus", []):
|
||||
event = event_from_pdu_json(pdu, room_version)
|
||||
collected_pdus_from_hs1_federation_send.add(event.event_id)
|
||||
|
||||
# Just acknowledge everything hs1 is trying to send hs2
|
||||
return {
|
||||
event_from_pdu_json(pdu, room_version).event_id: {}
|
||||
for pdu in data.get("pdus", [])
|
||||
}
|
||||
|
||||
raise NotImplementedError(
|
||||
"We have not mocked a response for `put_json(...)` for the following endpoint yet: "
|
||||
+ f"{destination}{path} with the following body data: {data}"
|
||||
)
|
||||
|
||||
self.federation_http_client.put_json.side_effect = put_json
|
||||
|
||||
# User1 joins the room
|
||||
self.helper.join(remote_room_id, local_user1_id, tok=local_user1_tok)
|
||||
|
||||
# Reset the mocks now that user1 has joined the room
|
||||
self.federation_http_client.get_json.side_effect = None
|
||||
self.federation_http_client.put_json.side_effect = None
|
||||
|
||||
# Sync until the local user1 can see that they are now joined to the room
|
||||
with test_timeout(
|
||||
3,
|
||||
"Unable to find user1's join event in the room",
|
||||
):
|
||||
while True:
|
||||
response_body, _ = self.do_sync(sync_body, tok=local_user1_tok)
|
||||
if remote_room_id in response_body["rooms"].keys():
|
||||
required_state_map = required_state_json_to_state_map(
|
||||
response_body["rooms"][remote_room_id]["required_state"]
|
||||
)
|
||||
if (
|
||||
required_state_map.get((EventTypes.Member, local_user1_id))
|
||||
is not None
|
||||
):
|
||||
break
|
||||
|
||||
# Prevent tight-looping to allow the `test_timeout` to work
|
||||
time.sleep(0.1)
|
||||
|
||||
# Nothing needs to be sent from hs1 to hs2 since we already let the other
|
||||
# homeserver know by doing the `/make_join` and `/send_join` dance.
|
||||
self.assertIncludes(
|
||||
collected_pdus_from_hs1_federation_send,
|
||||
set(),
|
||||
exact=True,
|
||||
message="Didn't expect any events to be sent from hs1 over federation to hs2",
|
||||
)
|
||||
|
||||
return RemoteRoomJoinResult(
|
||||
remote_room_id=remote_room_id,
|
||||
room_version=room_version,
|
||||
remote_room_creator_user_id=room_creator_user_id,
|
||||
local_user1_id=local_user1_id,
|
||||
local_user1_tok=local_user1_tok,
|
||||
state_map=self.get_success(
|
||||
self.storage_controllers.state.get_current_state(remote_room_id)
|
||||
),
|
||||
)
|
||||
|
||||
def test_can_join_from_out_of_band_invite(self) -> None:
|
||||
"""
|
||||
Test to make sure that we can join a room that we were invited to over
|
||||
federation; even if our server has never participated in the room before.
|
||||
"""
|
||||
self._invite_local_user_to_remote_room_and_join()
|
||||
|
||||
@parameterized.expand(
|
||||
[("accept invite", Membership.JOIN), ("reject invite", Membership.LEAVE)]
|
||||
)
|
||||
def test_can_x_from_out_of_band_invite_after_we_are_already_participating_in_the_room(
|
||||
self, _test_description: str, membership_action: str
|
||||
) -> None:
|
||||
"""
|
||||
Test to make sure that we can do either a) join the room (accept the invite) or
|
||||
b) reject the invite after being invited to over federation; even if we are
|
||||
already participating in the room.
|
||||
|
||||
This is a regression test to make sure we stress the scenario where even though
|
||||
we are already participating in the room, local users can still react to invites
|
||||
regardless of whether the remote server has told us about the invite event (via
|
||||
a federation `/send` transaction) and we have de-outliered the invite event.
|
||||
Previously, we would mistakenly throw an error saying the user wasn't in the
|
||||
room when they tried to join or reject the invite.
|
||||
"""
|
||||
remote_room_join_result = self._invite_local_user_to_remote_room_and_join()
|
||||
remote_room_id = remote_room_join_result.remote_room_id
|
||||
room_version = remote_room_join_result.room_version
|
||||
|
||||
# Create another local user
|
||||
local_user2_id = self.register_user("user2", "pass")
|
||||
local_user2_tok = self.login(local_user2_id, "pass")
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
# PDU's that hs1 sent to hs2
|
||||
collected_pdus_from_hs1_federation_send: Set[str] = set()
|
||||
|
||||
async def put_json(
|
||||
destination: str,
|
||||
path: str,
|
||||
args: Optional[QueryParams] = None,
|
||||
data: Optional[JsonDict] = None,
|
||||
json_data_callback: Optional[Callable[[], JsonDict]] = None,
|
||||
long_retries: bool = False,
|
||||
timeout: Optional[int] = None,
|
||||
ignore_backoff: bool = False,
|
||||
backoff_on_404: bool = False,
|
||||
try_trailing_slash_on_400: bool = False,
|
||||
parser: Optional[ByteParser[T]] = None,
|
||||
backoff_on_all_error_codes: bool = False,
|
||||
) -> Union[JsonDict, T]:
|
||||
if path.startswith("/_matrix/federation/v1/send/") and data is not None:
|
||||
for pdu in data.get("pdus", []):
|
||||
event = event_from_pdu_json(pdu, room_version)
|
||||
collected_pdus_from_hs1_federation_send.add(event.event_id)
|
||||
|
||||
# Just acknowledge everything hs1 is trying to send hs2
|
||||
return {
|
||||
event_from_pdu_json(pdu, room_version).event_id: {}
|
||||
for pdu in data.get("pdus", [])
|
||||
}
|
||||
|
||||
raise NotImplementedError(
|
||||
"We have not mocked a response for `put_json(...)` for the following endpoint yet: "
|
||||
+ f"{destination}{path} with the following body data: {data}"
|
||||
)
|
||||
|
||||
self.federation_http_client.put_json.side_effect = put_json
|
||||
|
||||
# From the remote homeserver, invite user2 on the local homserver
|
||||
user2_invite_membership_event = make_event_from_dict(
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"room_id": remote_room_id,
|
||||
"sender": remote_room_join_result.remote_room_creator_user_id,
|
||||
"depth": 5,
|
||||
"origin_server_ts": 5,
|
||||
"type": EventTypes.Member,
|
||||
"state_key": local_user2_id,
|
||||
"content": {"membership": Membership.INVITE},
|
||||
"auth_events": [
|
||||
remote_room_join_result.state_map[
|
||||
(EventTypes.Create, "")
|
||||
].event_id,
|
||||
remote_room_join_result.state_map[
|
||||
(
|
||||
EventTypes.Member,
|
||||
remote_room_join_result.remote_room_creator_user_id,
|
||||
)
|
||||
].event_id,
|
||||
],
|
||||
"prev_events": [
|
||||
remote_room_join_result.state_map[
|
||||
(EventTypes.Member, remote_room_join_result.local_user1_id)
|
||||
].event_id
|
||||
],
|
||||
}
|
||||
),
|
||||
room_version=room_version,
|
||||
)
|
||||
channel = self.make_signed_federation_request(
|
||||
"PUT",
|
||||
f"/_matrix/federation/v2/invite/{remote_room_id}/{user2_invite_membership_event.event_id}",
|
||||
content={
|
||||
"event": user2_invite_membership_event.get_dict(),
|
||||
"invite_room_state": [
|
||||
strip_event(
|
||||
remote_room_join_result.state_map[(EventTypes.Create, "")]
|
||||
),
|
||||
],
|
||||
"room_version": room_version.identifier,
|
||||
},
|
||||
)
|
||||
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
|
||||
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [(EventTypes.Member, StateValues.WILDCARD)],
|
||||
"timeline_limit": 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Sync until the local user2 can see the invite
|
||||
with test_timeout(
|
||||
3,
|
||||
"Unable to find user2's invite event in the room",
|
||||
):
|
||||
while True:
|
||||
response_body, _ = self.do_sync(sync_body, tok=local_user2_tok)
|
||||
if (
|
||||
remote_room_id in response_body["rooms"].keys()
|
||||
# If they have `invite_state` for the room, they are invited
|
||||
and len(
|
||||
response_body["rooms"][remote_room_id].get("invite_state", [])
|
||||
)
|
||||
> 0
|
||||
):
|
||||
break
|
||||
|
||||
# Prevent tight-looping to allow the `test_timeout` to work
|
||||
time.sleep(0.1)
|
||||
|
||||
if membership_action == Membership.JOIN:
|
||||
# User2 joins the room
|
||||
join_event = self.helper.join(
|
||||
remote_room_join_result.remote_room_id,
|
||||
local_user2_id,
|
||||
tok=local_user2_tok,
|
||||
)
|
||||
expected_pdu_event_id = join_event["event_id"]
|
||||
elif membership_action == Membership.LEAVE:
|
||||
# User2 rejects the invite
|
||||
leave_event = self.helper.leave(
|
||||
remote_room_join_result.remote_room_id,
|
||||
local_user2_id,
|
||||
tok=local_user2_tok,
|
||||
)
|
||||
expected_pdu_event_id = leave_event["event_id"]
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
"This test does not support this membership action yet"
|
||||
)
|
||||
|
||||
# Sync until the local user2 can see their new membership in the room
|
||||
with test_timeout(
|
||||
3,
|
||||
"Unable to find user2's new membership event in the room",
|
||||
):
|
||||
while True:
|
||||
response_body, _ = self.do_sync(sync_body, tok=local_user2_tok)
|
||||
if membership_action == Membership.JOIN:
|
||||
if remote_room_id in response_body["rooms"].keys():
|
||||
required_state_map = required_state_json_to_state_map(
|
||||
response_body["rooms"][remote_room_id]["required_state"]
|
||||
)
|
||||
if (
|
||||
required_state_map.get((EventTypes.Member, local_user2_id))
|
||||
is not None
|
||||
):
|
||||
break
|
||||
elif membership_action == Membership.LEAVE:
|
||||
if remote_room_id not in response_body["rooms"].keys():
|
||||
break
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
"This test does not support this membership action yet"
|
||||
)
|
||||
|
||||
# Prevent tight-looping to allow the `test_timeout` to work
|
||||
time.sleep(0.1)
|
||||
|
||||
# Make sure that we let hs2 know about the new membership event
|
||||
self.assertIncludes(
|
||||
collected_pdus_from_hs1_federation_send,
|
||||
{expected_pdu_event_id},
|
||||
exact=True,
|
||||
message="Expected to find the event ID of the user2 membership to be sent from hs1 over federation to hs2",
|
||||
)
|
|
@ -20,14 +20,21 @@
|
|||
#
|
||||
import logging
|
||||
from http import HTTPStatus
|
||||
from typing import Optional, Union
|
||||
from unittest.mock import Mock
|
||||
|
||||
from parameterized import parameterized
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import FederationError
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
|
||||
from synapse.config.server import DEFAULT_ROOM_VERSION
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.federation.federation_base import event_from_pdu_json
|
||||
from synapse.http.types import QueryParams
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client import login, room
|
||||
from synapse.server import HomeServer
|
||||
|
@ -85,6 +92,163 @@ class FederationServerTests(unittest.FederatingHomeserverTestCase):
|
|||
self.assertEqual(500, channel.code, channel.result)
|
||||
|
||||
|
||||
def _create_acl_event(content: JsonDict) -> EventBase:
|
||||
return make_event_from_dict(
|
||||
{
|
||||
"room_id": "!a:b",
|
||||
"event_id": "$a:b",
|
||||
"type": "m.room.server_acls",
|
||||
"sender": "@a:b",
|
||||
"content": content,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class MessageAcceptTests(unittest.FederatingHomeserverTestCase):
|
||||
"""
|
||||
Tests to make sure that we don't accept flawed events from federation (incoming).
|
||||
"""
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
login.register_servlets,
|
||||
room.register_servlets,
|
||||
]
|
||||
|
||||
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
|
||||
self.http_client = Mock()
|
||||
return self.setup_test_homeserver(federation_http_client=self.http_client)
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
super().prepare(reactor, clock, hs)
|
||||
|
||||
self.store = self.hs.get_datastores().main
|
||||
self.storage_controllers = hs.get_storage_controllers()
|
||||
self.federation_event_handler = self.hs.get_federation_event_handler()
|
||||
|
||||
# Create a local room
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
self.room_id = self.helper.create_room_as(
|
||||
user1_id, tok=user1_tok, is_public=True
|
||||
)
|
||||
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(self.room_id)
|
||||
)
|
||||
|
||||
# Figure out what the forward extremities in the room are (the most recent
|
||||
# events that aren't tied into the DAG)
|
||||
forward_extremity_event_ids = self.get_success(
|
||||
self.hs.get_datastores().main.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
|
||||
# Join a remote user to the room that will attempt to send bad events
|
||||
self.remote_bad_user_id = f"@baduser:{self.OTHER_SERVER_NAME}"
|
||||
self.remote_bad_user_join_event = make_event_from_dict(
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"room_id": self.room_id,
|
||||
"sender": self.remote_bad_user_id,
|
||||
"state_key": self.remote_bad_user_id,
|
||||
"depth": 1000,
|
||||
"origin_server_ts": 1,
|
||||
"type": EventTypes.Member,
|
||||
"content": {"membership": Membership.JOIN},
|
||||
"auth_events": [
|
||||
state_map[(EventTypes.Create, "")].event_id,
|
||||
state_map[(EventTypes.JoinRules, "")].event_id,
|
||||
],
|
||||
"prev_events": list(forward_extremity_event_ids),
|
||||
}
|
||||
),
|
||||
room_version=RoomVersions.V10,
|
||||
)
|
||||
|
||||
# Send the join, it should return None (which is not an error)
|
||||
self.assertEqual(
|
||||
self.get_success(
|
||||
self.federation_event_handler.on_receive_pdu(
|
||||
self.OTHER_SERVER_NAME, self.remote_bad_user_join_event
|
||||
)
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
# Make sure we actually joined the room
|
||||
self.assertEqual(
|
||||
self.get_success(self.store.get_latest_event_ids_in_room(self.room_id)),
|
||||
{self.remote_bad_user_join_event.event_id},
|
||||
)
|
||||
|
||||
def test_cant_hide_direct_ancestors(self) -> None:
|
||||
"""
|
||||
If you send a message, you must be able to provide the direct
|
||||
prev_events that said event references.
|
||||
"""
|
||||
|
||||
async def post_json(
|
||||
destination: str,
|
||||
path: str,
|
||||
data: Optional[JsonDict] = None,
|
||||
long_retries: bool = False,
|
||||
timeout: Optional[int] = None,
|
||||
ignore_backoff: bool = False,
|
||||
args: Optional[QueryParams] = None,
|
||||
) -> Union[JsonDict, list]:
|
||||
# If it asks us for new missing events, give them NOTHING
|
||||
if path.startswith("/_matrix/federation/v1/get_missing_events/"):
|
||||
return {"events": []}
|
||||
return {}
|
||||
|
||||
self.http_client.post_json = post_json
|
||||
|
||||
# Figure out what the forward extremities in the room are (the most recent
|
||||
# events that aren't tied into the DAG)
|
||||
forward_extremity_event_ids = self.get_success(
|
||||
self.hs.get_datastores().main.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
|
||||
# Now lie about an event's prev_events
|
||||
lying_event = make_event_from_dict(
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"room_id": self.room_id,
|
||||
"sender": self.remote_bad_user_id,
|
||||
"depth": 1000,
|
||||
"origin_server_ts": 1,
|
||||
"type": "m.room.message",
|
||||
"content": {"body": "hewwo?"},
|
||||
"auth_events": [],
|
||||
"prev_events": ["$missing_prev_event"]
|
||||
+ list(forward_extremity_event_ids),
|
||||
}
|
||||
),
|
||||
room_version=RoomVersions.V10,
|
||||
)
|
||||
|
||||
with LoggingContext("test-context"):
|
||||
failure = self.get_failure(
|
||||
self.federation_event_handler.on_receive_pdu(
|
||||
self.OTHER_SERVER_NAME, lying_event
|
||||
),
|
||||
FederationError,
|
||||
)
|
||||
|
||||
# on_receive_pdu should throw an error
|
||||
self.assertEqual(
|
||||
failure.value.args[0],
|
||||
(
|
||||
"ERROR 403: Your server isn't divulging details about prev_events "
|
||||
"referenced in this event."
|
||||
),
|
||||
)
|
||||
|
||||
# Make sure the invalid event isn't there
|
||||
extrem = self.get_success(self.store.get_latest_event_ids_in_room(self.room_id))
|
||||
self.assertEqual(extrem, {self.remote_bad_user_join_event.event_id})
|
||||
|
||||
|
||||
class ServerACLsTestCase(unittest.TestCase):
|
||||
def test_blocked_server(self) -> None:
|
||||
e = _create_acl_event({"allow": ["*"], "deny": ["evil.com"]})
|
||||
|
@ -355,13 +519,76 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
|
|||
# is probably sufficient to reassure that the bucket is updated.
|
||||
|
||||
|
||||
def _create_acl_event(content: JsonDict) -> EventBase:
|
||||
return make_event_from_dict(
|
||||
{
|
||||
"room_id": "!a:b",
|
||||
"event_id": "$a:b",
|
||||
"type": "m.room.server_acls",
|
||||
"sender": "@a:b",
|
||||
"content": content,
|
||||
class StripUnsignedFromEventsTestCase(unittest.TestCase):
|
||||
"""
|
||||
Test to make sure that we handle the raw JSON events from federation carefully and
|
||||
strip anything that shouldn't be there.
|
||||
"""
|
||||
|
||||
def test_strip_unauthorized_unsigned_values(self) -> None:
|
||||
event1 = {
|
||||
"sender": "@baduser:test.serv",
|
||||
"state_key": "@baduser:test.serv",
|
||||
"event_id": "$event1:test.serv",
|
||||
"depth": 1000,
|
||||
"origin_server_ts": 1,
|
||||
"type": "m.room.member",
|
||||
"origin": "test.servx",
|
||||
"content": {"membership": "join"},
|
||||
"auth_events": [],
|
||||
"unsigned": {"malicious garbage": "hackz", "more warez": "more hackz"},
|
||||
}
|
||||
)
|
||||
filtered_event = event_from_pdu_json(event1, RoomVersions.V1)
|
||||
# Make sure unauthorized fields are stripped from unsigned
|
||||
self.assertNotIn("more warez", filtered_event.unsigned)
|
||||
|
||||
def test_strip_event_maintains_allowed_fields(self) -> None:
|
||||
event2 = {
|
||||
"sender": "@baduser:test.serv",
|
||||
"state_key": "@baduser:test.serv",
|
||||
"event_id": "$event2:test.serv",
|
||||
"depth": 1000,
|
||||
"origin_server_ts": 1,
|
||||
"type": "m.room.member",
|
||||
"origin": "test.servx",
|
||||
"auth_events": [],
|
||||
"content": {"membership": "join"},
|
||||
"unsigned": {
|
||||
"malicious garbage": "hackz",
|
||||
"more warez": "more hackz",
|
||||
"age": 14,
|
||||
"invite_room_state": [],
|
||||
},
|
||||
}
|
||||
|
||||
filtered_event2 = event_from_pdu_json(event2, RoomVersions.V1)
|
||||
self.assertIn("age", filtered_event2.unsigned)
|
||||
self.assertEqual(14, filtered_event2.unsigned["age"])
|
||||
self.assertNotIn("more warez", filtered_event2.unsigned)
|
||||
# Invite_room_state is allowed in events of type m.room.member
|
||||
self.assertIn("invite_room_state", filtered_event2.unsigned)
|
||||
self.assertEqual([], filtered_event2.unsigned["invite_room_state"])
|
||||
|
||||
def test_strip_event_removes_fields_based_on_event_type(self) -> None:
|
||||
event3 = {
|
||||
"sender": "@baduser:test.serv",
|
||||
"state_key": "@baduser:test.serv",
|
||||
"event_id": "$event3:test.serv",
|
||||
"depth": 1000,
|
||||
"origin_server_ts": 1,
|
||||
"type": "m.room.power_levels",
|
||||
"origin": "test.servx",
|
||||
"content": {},
|
||||
"auth_events": [],
|
||||
"unsigned": {
|
||||
"malicious garbage": "hackz",
|
||||
"more warez": "more hackz",
|
||||
"age": 14,
|
||||
"invite_room_state": [],
|
||||
},
|
||||
}
|
||||
filtered_event3 = event_from_pdu_json(event3, RoomVersions.V1)
|
||||
self.assertIn("age", filtered_event3.unsigned)
|
||||
# Invite_room_state field is only permitted in event type m.room.member
|
||||
self.assertNotIn("invite_room_state", filtered_event3.unsigned)
|
||||
self.assertNotIn("more warez", filtered_event3.unsigned)
|
||||
|
|
|
@ -375,7 +375,7 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
|
|||
|
||||
In this test, we pretend we are processing a "pulled" event via
|
||||
backfill. The pulled event succesfully processes and the backward
|
||||
extremeties are updated along with clearing out any failed pull attempts
|
||||
extremities are updated along with clearing out any failed pull attempts
|
||||
for those old extremities.
|
||||
|
||||
We check that we correctly cleared failed pull attempts of the
|
||||
|
|
|
@ -23,14 +23,21 @@ from typing import Optional, cast
|
|||
from unittest.mock import Mock, call
|
||||
|
||||
from parameterized import parameterized
|
||||
from signedjson.key import generate_signing_key
|
||||
from signedjson.key import (
|
||||
encode_verify_key_base64,
|
||||
generate_signing_key,
|
||||
get_verify_key,
|
||||
)
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership, PresenceState
|
||||
from synapse.api.presence import UserDevicePresenceState, UserPresenceState
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.events.builder import EventBuilder
|
||||
from synapse.api.room_versions import (
|
||||
RoomVersion,
|
||||
)
|
||||
from synapse.crypto.event_signing import add_hashes_and_signatures
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.federation.sender import FederationSender
|
||||
from synapse.handlers.presence import (
|
||||
BUSY_ONLINE_TIMEOUT,
|
||||
|
@ -48,6 +55,7 @@ from synapse.rest import admin
|
|||
from synapse.rest.client import login, room, sync
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.database import LoggingDatabaseConnection
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.types import JsonDict, UserID, get_domain_from_id
|
||||
from synapse.util import Clock
|
||||
|
||||
|
@ -1926,6 +1934,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
|||
# self.event_builder_for_2.hostname = "test2"
|
||||
|
||||
self.store = hs.get_datastores().main
|
||||
self.storage_controllers = hs.get_storage_controllers()
|
||||
self.state = hs.get_state_handler()
|
||||
self._event_auth_handler = hs.get_event_auth_handler()
|
||||
|
||||
|
@ -2041,29 +2050,35 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
|||
|
||||
hostname = get_domain_from_id(user_id)
|
||||
|
||||
room_version = self.get_success(self.store.get_room_version_id(room_id))
|
||||
room_version = self.get_success(self.store.get_room_version(room_id))
|
||||
|
||||
builder = EventBuilder(
|
||||
state=self.state,
|
||||
event_auth_handler=self._event_auth_handler,
|
||||
store=self.store,
|
||||
clock=self.clock,
|
||||
hostname=hostname,
|
||||
signing_key=self.random_signing_key,
|
||||
room_version=KNOWN_ROOM_VERSIONS[room_version],
|
||||
room_id=room_id,
|
||||
type=EventTypes.Member,
|
||||
sender=user_id,
|
||||
state_key=user_id,
|
||||
content={"membership": Membership.JOIN},
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id)
|
||||
)
|
||||
|
||||
prev_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(room_id)
|
||||
# Figure out what the forward extremities in the room are (the most recent
|
||||
# events that aren't tied into the DAG)
|
||||
forward_extremity_event_ids = self.get_success(
|
||||
self.hs.get_datastores().main.get_latest_event_ids_in_room(room_id)
|
||||
)
|
||||
|
||||
event = self.get_success(
|
||||
builder.build(prev_event_ids=list(prev_event_ids), auth_event_ids=None)
|
||||
event = self.create_fake_event_from_remote_server(
|
||||
remote_server_name=hostname,
|
||||
event_dict={
|
||||
"room_id": room_id,
|
||||
"sender": user_id,
|
||||
"type": EventTypes.Member,
|
||||
"state_key": user_id,
|
||||
"depth": 1000,
|
||||
"origin_server_ts": 1,
|
||||
"content": {"membership": Membership.JOIN},
|
||||
"auth_events": [
|
||||
state_map[(EventTypes.Create, "")].event_id,
|
||||
state_map[(EventTypes.JoinRules, "")].event_id,
|
||||
],
|
||||
"prev_events": list(forward_extremity_event_ids),
|
||||
},
|
||||
room_version=room_version,
|
||||
)
|
||||
|
||||
self.get_success(self.federation_event_handler.on_receive_pdu(hostname, event))
|
||||
|
@ -2071,3 +2086,50 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
|||
# Check that it was successfully persisted.
|
||||
self.get_success(self.store.get_event(event.event_id))
|
||||
self.get_success(self.store.get_event(event.event_id))
|
||||
|
||||
def create_fake_event_from_remote_server(
|
||||
self, remote_server_name: str, event_dict: JsonDict, room_version: RoomVersion
|
||||
) -> EventBase:
|
||||
"""
|
||||
This is similar to what `FederatingHomeserverTestCase` is doing but we don't
|
||||
need all of the extra baggage and we want to be able to create an event from
|
||||
many remote servers.
|
||||
"""
|
||||
|
||||
# poke the other server's signing key into the key store, so that we don't
|
||||
# make requests for it
|
||||
other_server_signature_key = generate_signing_key("test")
|
||||
verify_key = get_verify_key(other_server_signature_key)
|
||||
verify_key_id = "%s:%s" % (verify_key.alg, verify_key.version)
|
||||
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.store_server_keys_response(
|
||||
remote_server_name,
|
||||
from_server=remote_server_name,
|
||||
ts_added_ms=self.clock.time_msec(),
|
||||
verify_keys={
|
||||
verify_key_id: FetchKeyResult(
|
||||
verify_key=verify_key,
|
||||
valid_until_ts=self.clock.time_msec() + 10000,
|
||||
),
|
||||
},
|
||||
response_json={
|
||||
"verify_keys": {
|
||||
verify_key_id: {"key": encode_verify_key_base64(verify_key)}
|
||||
}
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
add_hashes_and_signatures(
|
||||
room_version=room_version,
|
||||
event_dict=event_dict,
|
||||
signature_name=remote_server_name,
|
||||
signing_key=other_server_signature_key,
|
||||
)
|
||||
event = make_event_from_dict(
|
||||
event_dict,
|
||||
room_version=room_version,
|
||||
)
|
||||
|
||||
return event
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
from http import HTTPStatus
|
||||
from typing import Collection, ContextManager, List, Optional
|
||||
from unittest.mock import AsyncMock, Mock, patch
|
||||
|
||||
|
@ -347,7 +348,15 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
|||
# the prev_events used when creating the join event, such that the ban does not
|
||||
# precede the join.
|
||||
with self._patch_get_latest_events([last_room_creation_event_id]):
|
||||
self.helper.join(room_id, eve, tok=eve_token)
|
||||
self.helper.join(
|
||||
room_id,
|
||||
eve,
|
||||
tok=eve_token,
|
||||
# Previously, this join would succeed but now we expect it to fail at
|
||||
# this point. The rest of the test is for the case when this used to
|
||||
# succeed.
|
||||
expect_code=HTTPStatus.FORBIDDEN,
|
||||
)
|
||||
|
||||
# Eve makes a second, incremental sync.
|
||||
eve_incremental_sync_after_join: SyncResult = self.get_success(
|
||||
|
|
|
@ -22,14 +22,26 @@ import logging
|
|||
from unittest.mock import AsyncMock, Mock
|
||||
|
||||
from netaddr import IPSet
|
||||
from signedjson.key import (
|
||||
encode_verify_key_base64,
|
||||
generate_signing_key,
|
||||
get_verify_key,
|
||||
)
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.events.builder import EventBuilderFactory
|
||||
from synapse.api.room_versions import RoomVersion
|
||||
from synapse.crypto.event_signing import add_hashes_and_signatures
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.handlers.typing import TypingWriterHandler
|
||||
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
|
||||
from synapse.rest.admin import register_servlets_for_client_rest_resource
|
||||
from synapse.rest.client import login, room
|
||||
from synapse.types import UserID, create_requester
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.types import JsonDict, UserID, create_requester
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests.replication._base import BaseMultiWorkerStreamTestCase
|
||||
from tests.server import get_clock
|
||||
|
@ -63,6 +75,9 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
|
|||
ip_blocklist=IPSet(),
|
||||
)
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.storage_controllers = hs.get_storage_controllers()
|
||||
|
||||
def test_send_event_single_sender(self) -> None:
|
||||
"""Test that using a single federation sender worker correctly sends a
|
||||
new event.
|
||||
|
@ -243,35 +258,92 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
|
|||
self.assertTrue(sent_on_1)
|
||||
self.assertTrue(sent_on_2)
|
||||
|
||||
def create_fake_event_from_remote_server(
|
||||
self, remote_server_name: str, event_dict: JsonDict, room_version: RoomVersion
|
||||
) -> EventBase:
|
||||
"""
|
||||
This is similar to what `FederatingHomeserverTestCase` is doing but we don't
|
||||
need all of the extra baggage and we want to be able to create an event from
|
||||
many remote servers.
|
||||
"""
|
||||
|
||||
# poke the other server's signing key into the key store, so that we don't
|
||||
# make requests for it
|
||||
other_server_signature_key = generate_signing_key("test")
|
||||
verify_key = get_verify_key(other_server_signature_key)
|
||||
verify_key_id = "%s:%s" % (verify_key.alg, verify_key.version)
|
||||
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.store_server_keys_response(
|
||||
remote_server_name,
|
||||
from_server=remote_server_name,
|
||||
ts_added_ms=self.clock.time_msec(),
|
||||
verify_keys={
|
||||
verify_key_id: FetchKeyResult(
|
||||
verify_key=verify_key,
|
||||
valid_until_ts=self.clock.time_msec() + 10000,
|
||||
),
|
||||
},
|
||||
response_json={
|
||||
"verify_keys": {
|
||||
verify_key_id: {"key": encode_verify_key_base64(verify_key)}
|
||||
}
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
add_hashes_and_signatures(
|
||||
room_version=room_version,
|
||||
event_dict=event_dict,
|
||||
signature_name=remote_server_name,
|
||||
signing_key=other_server_signature_key,
|
||||
)
|
||||
event = make_event_from_dict(
|
||||
event_dict,
|
||||
room_version=room_version,
|
||||
)
|
||||
|
||||
return event
|
||||
|
||||
def create_room_with_remote_server(
|
||||
self, user: str, token: str, remote_server: str = "other_server"
|
||||
) -> str:
|
||||
room = self.helper.create_room_as(user, tok=token)
|
||||
room_id = self.helper.create_room_as(user, tok=token)
|
||||
store = self.hs.get_datastores().main
|
||||
federation = self.hs.get_federation_event_handler()
|
||||
|
||||
prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
|
||||
room_version = self.get_success(store.get_room_version(room))
|
||||
room_version = self.get_success(store.get_room_version(room_id))
|
||||
|
||||
factory = EventBuilderFactory(self.hs)
|
||||
factory.hostname = remote_server
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id)
|
||||
)
|
||||
|
||||
# Figure out what the forward extremities in the room are (the most recent
|
||||
# events that aren't tied into the DAG)
|
||||
prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room_id))
|
||||
|
||||
user_id = UserID("user", remote_server).to_string()
|
||||
|
||||
event_dict = {
|
||||
"type": EventTypes.Member,
|
||||
"state_key": user_id,
|
||||
"content": {"membership": Membership.JOIN},
|
||||
"sender": user_id,
|
||||
"room_id": room,
|
||||
}
|
||||
|
||||
builder = factory.for_room_version(room_version, event_dict)
|
||||
join_event = self.get_success(
|
||||
builder.build(prev_event_ids=list(prev_event_ids), auth_event_ids=None)
|
||||
join_event = self.create_fake_event_from_remote_server(
|
||||
remote_server_name=remote_server,
|
||||
event_dict={
|
||||
"room_id": room_id,
|
||||
"sender": user_id,
|
||||
"type": EventTypes.Member,
|
||||
"state_key": user_id,
|
||||
"depth": 1000,
|
||||
"origin_server_ts": 1,
|
||||
"content": {"membership": Membership.JOIN},
|
||||
"auth_events": [
|
||||
state_map[(EventTypes.Create, "")].event_id,
|
||||
state_map[(EventTypes.JoinRules, "")].event_id,
|
||||
],
|
||||
"prev_events": list(prev_event_ids),
|
||||
},
|
||||
room_version=room_version,
|
||||
)
|
||||
|
||||
self.get_success(federation.on_send_membership_event(remote_server, join_event))
|
||||
self.replicate()
|
||||
|
||||
return room
|
||||
return room_id
|
||||
|
|
|
@ -742,7 +742,7 @@ class RoomsCreateTestCase(RoomBase):
|
|||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
self.assertTrue("room_id" in channel.json_body)
|
||||
assert channel.resource_usage is not None
|
||||
self.assertEqual(33, channel.resource_usage.db_txn_count)
|
||||
self.assertEqual(34, channel.resource_usage.db_txn_count)
|
||||
|
||||
def test_post_room_initial_state(self) -> None:
|
||||
# POST with initial_state config key, expect new room id
|
||||
|
@ -755,7 +755,7 @@ class RoomsCreateTestCase(RoomBase):
|
|||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
self.assertTrue("room_id" in channel.json_body)
|
||||
assert channel.resource_usage is not None
|
||||
self.assertEqual(35, channel.resource_usage.db_txn_count)
|
||||
self.assertEqual(36, channel.resource_usage.db_txn_count)
|
||||
|
||||
def test_post_room_visibility_key(self) -> None:
|
||||
# POST with visibility config key, expect new room id
|
||||
|
|
|
@ -1,378 +0,0 @@
|
|||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
# Originally licensed under the Apache License, Version 2.0:
|
||||
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||
#
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
|
||||
from typing import Collection, List, Optional, Union
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.api.errors import FederationError
|
||||
from synapse.api.room_versions import RoomVersion, RoomVersions
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.federation.federation_base import event_from_pdu_json
|
||||
from synapse.handlers.device import DeviceListUpdater
|
||||
from synapse.http.types import QueryParams
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict, UserID, create_requester
|
||||
from synapse.util import Clock
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class MessageAcceptTests(unittest.HomeserverTestCase):
|
||||
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
|
||||
self.http_client = Mock()
|
||||
return self.setup_test_homeserver(federation_http_client=self.http_client)
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
user_id = UserID("us", "test")
|
||||
our_user = create_requester(user_id)
|
||||
room_creator = self.hs.get_room_creation_handler()
|
||||
self.room_id = self.get_success(
|
||||
room_creator.create_room(
|
||||
our_user, room_creator._presets_dict["public_chat"], ratelimit=False
|
||||
)
|
||||
)[0]
|
||||
|
||||
self.store = self.hs.get_datastores().main
|
||||
|
||||
# Figure out what the most recent event is
|
||||
most_recent = next(
|
||||
iter(
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.get_latest_event_ids_in_room(
|
||||
self.room_id
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
join_event = make_event_from_dict(
|
||||
{
|
||||
"room_id": self.room_id,
|
||||
"sender": "@baduser:test.serv",
|
||||
"state_key": "@baduser:test.serv",
|
||||
"event_id": "$join:test.serv",
|
||||
"depth": 1000,
|
||||
"origin_server_ts": 1,
|
||||
"type": "m.room.member",
|
||||
"origin": "test.servx",
|
||||
"content": {"membership": "join"},
|
||||
"auth_events": [],
|
||||
"prev_state": [(most_recent, {})],
|
||||
"prev_events": [(most_recent, {})],
|
||||
}
|
||||
)
|
||||
|
||||
self.handler = self.hs.get_federation_handler()
|
||||
federation_event_handler = self.hs.get_federation_event_handler()
|
||||
|
||||
async def _check_event_auth(
|
||||
origin: Optional[str], event: EventBase, context: EventContext
|
||||
) -> None:
|
||||
pass
|
||||
|
||||
federation_event_handler._check_event_auth = _check_event_auth # type: ignore[method-assign]
|
||||
self.client = self.hs.get_federation_client()
|
||||
|
||||
async def _check_sigs_and_hash_for_pulled_events_and_fetch(
|
||||
dest: str, pdus: Collection[EventBase], room_version: RoomVersion
|
||||
) -> List[EventBase]:
|
||||
return list(pdus)
|
||||
|
||||
self.client._check_sigs_and_hash_for_pulled_events_and_fetch = ( # type: ignore[method-assign]
|
||||
_check_sigs_and_hash_for_pulled_events_and_fetch # type: ignore[assignment]
|
||||
)
|
||||
|
||||
# Send the join, it should return None (which is not an error)
|
||||
self.assertEqual(
|
||||
self.get_success(
|
||||
federation_event_handler.on_receive_pdu("test.serv", join_event)
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
# Make sure we actually joined the room
|
||||
self.assertEqual(
|
||||
self.get_success(self.store.get_latest_event_ids_in_room(self.room_id)),
|
||||
{"$join:test.serv"},
|
||||
)
|
||||
|
||||
def test_cant_hide_direct_ancestors(self) -> None:
|
||||
"""
|
||||
If you send a message, you must be able to provide the direct
|
||||
prev_events that said event references.
|
||||
"""
|
||||
|
||||
async def post_json(
|
||||
destination: str,
|
||||
path: str,
|
||||
data: Optional[JsonDict] = None,
|
||||
long_retries: bool = False,
|
||||
timeout: Optional[int] = None,
|
||||
ignore_backoff: bool = False,
|
||||
args: Optional[QueryParams] = None,
|
||||
) -> Union[JsonDict, list]:
|
||||
# If it asks us for new missing events, give them NOTHING
|
||||
if path.startswith("/_matrix/federation/v1/get_missing_events/"):
|
||||
return {"events": []}
|
||||
return {}
|
||||
|
||||
self.http_client.post_json = post_json
|
||||
|
||||
# Figure out what the most recent event is
|
||||
most_recent = next(
|
||||
iter(
|
||||
self.get_success(self.store.get_latest_event_ids_in_room(self.room_id))
|
||||
)
|
||||
)
|
||||
|
||||
# Now lie about an event
|
||||
lying_event = make_event_from_dict(
|
||||
{
|
||||
"room_id": self.room_id,
|
||||
"sender": "@baduser:test.serv",
|
||||
"event_id": "one:test.serv",
|
||||
"depth": 1000,
|
||||
"origin_server_ts": 1,
|
||||
"type": "m.room.message",
|
||||
"origin": "test.serv",
|
||||
"content": {"body": "hewwo?"},
|
||||
"auth_events": [],
|
||||
"prev_events": [("two:test.serv", {}), (most_recent, {})],
|
||||
}
|
||||
)
|
||||
|
||||
federation_event_handler = self.hs.get_federation_event_handler()
|
||||
with LoggingContext("test-context"):
|
||||
failure = self.get_failure(
|
||||
federation_event_handler.on_receive_pdu("test.serv", lying_event),
|
||||
FederationError,
|
||||
)
|
||||
|
||||
# on_receive_pdu should throw an error
|
||||
self.assertEqual(
|
||||
failure.value.args[0],
|
||||
(
|
||||
"ERROR 403: Your server isn't divulging details about prev_events "
|
||||
"referenced in this event."
|
||||
),
|
||||
)
|
||||
|
||||
# Make sure the invalid event isn't there
|
||||
extrem = self.get_success(self.store.get_latest_event_ids_in_room(self.room_id))
|
||||
self.assertEqual(extrem, {"$join:test.serv"})
|
||||
|
||||
def test_retry_device_list_resync(self) -> None:
|
||||
"""Tests that device lists are marked as stale if they couldn't be synced, and
|
||||
that stale device lists are retried periodically.
|
||||
"""
|
||||
remote_user_id = "@john:test_remote"
|
||||
remote_origin = "test_remote"
|
||||
|
||||
# Track the number of attempts to resync the user's device list.
|
||||
self.resync_attempts = 0
|
||||
|
||||
# When this function is called, increment the number of resync attempts (only if
|
||||
# we're querying devices for the right user ID), then raise a
|
||||
# NotRetryingDestination error to fail the resync gracefully.
|
||||
def query_user_devices(
|
||||
destination: str, user_id: str, timeout: int = 30000
|
||||
) -> JsonDict:
|
||||
if user_id == remote_user_id:
|
||||
self.resync_attempts += 1
|
||||
|
||||
raise NotRetryingDestination(0, 0, destination)
|
||||
|
||||
# Register the mock on the federation client.
|
||||
federation_client = self.hs.get_federation_client()
|
||||
federation_client.query_user_devices = Mock(side_effect=query_user_devices) # type: ignore[method-assign]
|
||||
|
||||
# Register a mock on the store so that the incoming update doesn't fail because
|
||||
# we don't share a room with the user.
|
||||
store = self.hs.get_datastores().main
|
||||
store.get_rooms_for_user = AsyncMock(return_value=["!someroom:test"])
|
||||
|
||||
# Manually inject a fake device list update. We need this update to include at
|
||||
# least one prev_id so that the user's device list will need to be retried.
|
||||
device_list_updater = self.hs.get_device_handler().device_list_updater
|
||||
assert isinstance(device_list_updater, DeviceListUpdater)
|
||||
self.get_success(
|
||||
device_list_updater.incoming_device_list_update(
|
||||
origin=remote_origin,
|
||||
edu_content={
|
||||
"deleted": False,
|
||||
"device_display_name": "Mobile",
|
||||
"device_id": "QBUAZIFURK",
|
||||
"prev_id": [5],
|
||||
"stream_id": 6,
|
||||
"user_id": remote_user_id,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# Check that there was one resync attempt.
|
||||
self.assertEqual(self.resync_attempts, 1)
|
||||
|
||||
# Check that the resync attempt failed and caused the user's device list to be
|
||||
# marked as stale.
|
||||
need_resync = self.get_success(
|
||||
store.get_user_ids_requiring_device_list_resync()
|
||||
)
|
||||
self.assertIn(remote_user_id, need_resync)
|
||||
|
||||
# Check that waiting for 30 seconds caused Synapse to retry resyncing the device
|
||||
# list.
|
||||
self.reactor.advance(30)
|
||||
self.assertEqual(self.resync_attempts, 2)
|
||||
|
||||
def test_cross_signing_keys_retry(self) -> None:
|
||||
"""Tests that resyncing a device list correctly processes cross-signing keys from
|
||||
the remote server.
|
||||
"""
|
||||
remote_user_id = "@john:test_remote"
|
||||
remote_master_key = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY"
|
||||
remote_self_signing_key = "QeIiFEjluPBtI7WQdG365QKZcFs9kqmHir6RBD0//nQ"
|
||||
|
||||
# Register mock device list retrieval on the federation client.
|
||||
federation_client = self.hs.get_federation_client()
|
||||
federation_client.query_user_devices = AsyncMock( # type: ignore[method-assign]
|
||||
return_value={
|
||||
"user_id": remote_user_id,
|
||||
"stream_id": 1,
|
||||
"devices": [],
|
||||
"master_key": {
|
||||
"user_id": remote_user_id,
|
||||
"usage": ["master"],
|
||||
"keys": {"ed25519:" + remote_master_key: remote_master_key},
|
||||
},
|
||||
"self_signing_key": {
|
||||
"user_id": remote_user_id,
|
||||
"usage": ["self_signing"],
|
||||
"keys": {
|
||||
"ed25519:" + remote_self_signing_key: remote_self_signing_key
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
# Resync the device list.
|
||||
device_handler = self.hs.get_device_handler()
|
||||
self.get_success(
|
||||
device_handler.device_list_updater.multi_user_device_resync(
|
||||
[remote_user_id]
|
||||
),
|
||||
)
|
||||
|
||||
# Retrieve the cross-signing keys for this user.
|
||||
keys = self.get_success(
|
||||
self.store.get_e2e_cross_signing_keys_bulk(user_ids=[remote_user_id]),
|
||||
)
|
||||
self.assertIn(remote_user_id, keys)
|
||||
key = keys[remote_user_id]
|
||||
assert key is not None
|
||||
|
||||
# Check that the master key is the one returned by the mock.
|
||||
master_key = key["master"]
|
||||
self.assertEqual(len(master_key["keys"]), 1)
|
||||
self.assertTrue("ed25519:" + remote_master_key in master_key["keys"].keys())
|
||||
self.assertTrue(remote_master_key in master_key["keys"].values())
|
||||
|
||||
# Check that the self-signing key is the one returned by the mock.
|
||||
self_signing_key = key["self_signing"]
|
||||
self.assertEqual(len(self_signing_key["keys"]), 1)
|
||||
self.assertTrue(
|
||||
"ed25519:" + remote_self_signing_key in self_signing_key["keys"].keys(),
|
||||
)
|
||||
self.assertTrue(remote_self_signing_key in self_signing_key["keys"].values())
|
||||
|
||||
|
||||
class StripUnsignedFromEventsTestCase(unittest.TestCase):
|
||||
def test_strip_unauthorized_unsigned_values(self) -> None:
|
||||
event1 = {
|
||||
"sender": "@baduser:test.serv",
|
||||
"state_key": "@baduser:test.serv",
|
||||
"event_id": "$event1:test.serv",
|
||||
"depth": 1000,
|
||||
"origin_server_ts": 1,
|
||||
"type": "m.room.member",
|
||||
"origin": "test.servx",
|
||||
"content": {"membership": "join"},
|
||||
"auth_events": [],
|
||||
"unsigned": {"malicious garbage": "hackz", "more warez": "more hackz"},
|
||||
}
|
||||
filtered_event = event_from_pdu_json(event1, RoomVersions.V1)
|
||||
# Make sure unauthorized fields are stripped from unsigned
|
||||
self.assertNotIn("more warez", filtered_event.unsigned)
|
||||
|
||||
def test_strip_event_maintains_allowed_fields(self) -> None:
|
||||
event2 = {
|
||||
"sender": "@baduser:test.serv",
|
||||
"state_key": "@baduser:test.serv",
|
||||
"event_id": "$event2:test.serv",
|
||||
"depth": 1000,
|
||||
"origin_server_ts": 1,
|
||||
"type": "m.room.member",
|
||||
"origin": "test.servx",
|
||||
"auth_events": [],
|
||||
"content": {"membership": "join"},
|
||||
"unsigned": {
|
||||
"malicious garbage": "hackz",
|
||||
"more warez": "more hackz",
|
||||
"age": 14,
|
||||
"invite_room_state": [],
|
||||
},
|
||||
}
|
||||
|
||||
filtered_event2 = event_from_pdu_json(event2, RoomVersions.V1)
|
||||
self.assertIn("age", filtered_event2.unsigned)
|
||||
self.assertEqual(14, filtered_event2.unsigned["age"])
|
||||
self.assertNotIn("more warez", filtered_event2.unsigned)
|
||||
# Invite_room_state is allowed in events of type m.room.member
|
||||
self.assertIn("invite_room_state", filtered_event2.unsigned)
|
||||
self.assertEqual([], filtered_event2.unsigned["invite_room_state"])
|
||||
|
||||
def test_strip_event_removes_fields_based_on_event_type(self) -> None:
|
||||
event3 = {
|
||||
"sender": "@baduser:test.serv",
|
||||
"state_key": "@baduser:test.serv",
|
||||
"event_id": "$event3:test.serv",
|
||||
"depth": 1000,
|
||||
"origin_server_ts": 1,
|
||||
"type": "m.room.power_levels",
|
||||
"origin": "test.servx",
|
||||
"content": {},
|
||||
"auth_events": [],
|
||||
"unsigned": {
|
||||
"malicious garbage": "hackz",
|
||||
"more warez": "more hackz",
|
||||
"age": 14,
|
||||
"invite_room_state": [],
|
||||
},
|
||||
}
|
||||
filtered_event3 = event_from_pdu_json(event3, RoomVersions.V1)
|
||||
self.assertIn("age", filtered_event3.unsigned)
|
||||
# Invite_room_state field is only permitted in event type m.room.member
|
||||
self.assertNotIn("invite_room_state", filtered_event3.unsigned)
|
||||
self.assertNotIn("more warez", filtered_event3.unsigned)
|
|
@ -400,11 +400,24 @@ class TestTimeout(Exception):
|
|||
|
||||
|
||||
class test_timeout:
|
||||
"""
|
||||
FIXME: This implementation is not robust against other code tight-looping and
|
||||
preventing the signals propagating and timing out the test. You may need to add
|
||||
`time.sleep(0.1)` to your code in order to allow this timeout to work correctly.
|
||||
|
||||
```py
|
||||
with test_timeout(3):
|
||||
while True:
|
||||
my_checking_func()
|
||||
time.sleep(0.1)
|
||||
```
|
||||
"""
|
||||
|
||||
def __init__(self, seconds: int, error_message: Optional[str] = None) -> None:
|
||||
if error_message is None:
|
||||
error_message = "test timed out after {}s.".format(seconds)
|
||||
self.error_message = f"Test timed out after {seconds}s"
|
||||
if error_message is not None:
|
||||
self.error_message += f": {error_message}"
|
||||
self.seconds = seconds
|
||||
self.error_message = error_message
|
||||
|
||||
def handle_timeout(self, signum: int, frame: Optional[FrameType]) -> None:
|
||||
raise TestTimeout(self.error_message)
|
||||
|
|
Loading…
Add table
Reference in a new issue