mirror of
https://github.com/element-hq/synapse.git
synced 2025-01-20 18:42:33 +00:00
Validate the alt_aliases property of canonical alias events (#6971)
This commit is contained in:
parent
b29474e0aa
commit
7dcbc33a1b
8 changed files with 256 additions and 54 deletions
1
changelog.d/6971.feature
Normal file
1
changelog.d/6971.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Validate the alt_aliases property of canonical alias events.
|
|
@ -66,6 +66,7 @@ class Codes(object):
|
||||||
EXPIRED_ACCOUNT = "ORG_MATRIX_EXPIRED_ACCOUNT"
|
EXPIRED_ACCOUNT = "ORG_MATRIX_EXPIRED_ACCOUNT"
|
||||||
INVALID_SIGNATURE = "M_INVALID_SIGNATURE"
|
INVALID_SIGNATURE = "M_INVALID_SIGNATURE"
|
||||||
USER_DEACTIVATED = "M_USER_DEACTIVATED"
|
USER_DEACTIVATED = "M_USER_DEACTIVATED"
|
||||||
|
BAD_ALIAS = "M_BAD_ALIAS"
|
||||||
|
|
||||||
|
|
||||||
class CodeMessageException(RuntimeError):
|
class CodeMessageException(RuntimeError):
|
||||||
|
|
|
@ -13,8 +13,6 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
import collections
|
|
||||||
import logging
|
import logging
|
||||||
import string
|
import string
|
||||||
from typing import List
|
from typing import List
|
||||||
|
@ -307,15 +305,17 @@ class DirectoryHandler(BaseHandler):
|
||||||
send_update = True
|
send_update = True
|
||||||
content.pop("alias", "")
|
content.pop("alias", "")
|
||||||
|
|
||||||
# Filter alt_aliases for the removed alias.
|
# Filter the alt_aliases property for the removed alias. Note that the
|
||||||
alt_aliases = content.pop("alt_aliases", None)
|
# value is not modified if alt_aliases is of an unexpected form.
|
||||||
# If the aliases are not a list (or not found) do not attempt to modify
|
alt_aliases = content.get("alt_aliases")
|
||||||
# the list.
|
if isinstance(alt_aliases, (list, tuple)) and alias_str in alt_aliases:
|
||||||
if isinstance(alt_aliases, collections.Sequence):
|
|
||||||
send_update = True
|
send_update = True
|
||||||
alt_aliases = [alias for alias in alt_aliases if alias != alias_str]
|
alt_aliases = [alias for alias in alt_aliases if alias != alias_str]
|
||||||
|
|
||||||
if alt_aliases:
|
if alt_aliases:
|
||||||
content["alt_aliases"] = alt_aliases
|
content["alt_aliases"] = alt_aliases
|
||||||
|
else:
|
||||||
|
del content["alt_aliases"]
|
||||||
|
|
||||||
if send_update:
|
if send_update:
|
||||||
yield self.event_creation_handler.create_and_send_nonmember_event(
|
yield self.event_creation_handler.create_and_send_nonmember_event(
|
||||||
|
|
|
@ -888,19 +888,60 @@ class EventCreationHandler(object):
|
||||||
yield self.base_handler.maybe_kick_guest_users(event, context)
|
yield self.base_handler.maybe_kick_guest_users(event, context)
|
||||||
|
|
||||||
if event.type == EventTypes.CanonicalAlias:
|
if event.type == EventTypes.CanonicalAlias:
|
||||||
# Check the alias is acually valid (at this time at least)
|
# Validate a newly added alias or newly added alt_aliases.
|
||||||
|
|
||||||
|
original_alias = None
|
||||||
|
original_alt_aliases = set()
|
||||||
|
|
||||||
|
original_event_id = event.unsigned.get("replaces_state")
|
||||||
|
if original_event_id:
|
||||||
|
original_event = yield self.store.get_event(original_event_id)
|
||||||
|
|
||||||
|
if original_event:
|
||||||
|
original_alias = original_event.content.get("alias", None)
|
||||||
|
original_alt_aliases = original_event.content.get("alt_aliases", [])
|
||||||
|
|
||||||
|
# Check the alias is currently valid (if it has changed).
|
||||||
room_alias_str = event.content.get("alias", None)
|
room_alias_str = event.content.get("alias", None)
|
||||||
if room_alias_str:
|
directory_handler = self.hs.get_handlers().directory_handler
|
||||||
|
if room_alias_str and room_alias_str != original_alias:
|
||||||
room_alias = RoomAlias.from_string(room_alias_str)
|
room_alias = RoomAlias.from_string(room_alias_str)
|
||||||
directory_handler = self.hs.get_handlers().directory_handler
|
|
||||||
mapping = yield directory_handler.get_association(room_alias)
|
mapping = yield directory_handler.get_association(room_alias)
|
||||||
|
|
||||||
if mapping["room_id"] != event.room_id:
|
if mapping["room_id"] != event.room_id:
|
||||||
raise SynapseError(
|
raise SynapseError(
|
||||||
400,
|
400,
|
||||||
"Room alias %s does not point to the room" % (room_alias_str,),
|
"Room alias %s does not point to the room" % (room_alias_str,),
|
||||||
|
Codes.BAD_ALIAS,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Check that alt_aliases is the proper form.
|
||||||
|
alt_aliases = event.content.get("alt_aliases", [])
|
||||||
|
if not isinstance(alt_aliases, (list, tuple)):
|
||||||
|
raise SynapseError(
|
||||||
|
400, "The alt_aliases property must be a list.", Codes.INVALID_PARAM
|
||||||
|
)
|
||||||
|
|
||||||
|
# If the old version of alt_aliases is of an unknown form,
|
||||||
|
# completely replace it.
|
||||||
|
if not isinstance(original_alt_aliases, (list, tuple)):
|
||||||
|
original_alt_aliases = []
|
||||||
|
|
||||||
|
# Check that each alias is currently valid.
|
||||||
|
new_alt_aliases = set(alt_aliases) - set(original_alt_aliases)
|
||||||
|
if new_alt_aliases:
|
||||||
|
for alias_str in new_alt_aliases:
|
||||||
|
room_alias = RoomAlias.from_string(alias_str)
|
||||||
|
mapping = yield directory_handler.get_association(room_alias)
|
||||||
|
|
||||||
|
if mapping["room_id"] != event.room_id:
|
||||||
|
raise SynapseError(
|
||||||
|
400,
|
||||||
|
"Room alias %s does not point to the room"
|
||||||
|
% (room_alias_str,),
|
||||||
|
Codes.BAD_ALIAS,
|
||||||
|
)
|
||||||
|
|
||||||
federation_handler = self.hs.get_handlers().federation_handler
|
federation_handler = self.hs.get_handlers().federation_handler
|
||||||
|
|
||||||
if event.type == EventTypes.Member:
|
if event.type == EventTypes.Member:
|
||||||
|
|
|
@ -23,7 +23,7 @@ import attr
|
||||||
from signedjson.key import decode_verify_key_bytes
|
from signedjson.key import decode_verify_key_bytes
|
||||||
from unpaddedbase64 import decode_base64
|
from unpaddedbase64 import decode_base64
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import Codes, SynapseError
|
||||||
|
|
||||||
# define a version of typing.Collection that works on python 3.5
|
# define a version of typing.Collection that works on python 3.5
|
||||||
if sys.version_info[:3] >= (3, 6, 0):
|
if sys.version_info[:3] >= (3, 6, 0):
|
||||||
|
@ -166,11 +166,13 @@ class DomainSpecificString(namedtuple("DomainSpecificString", ("localpart", "dom
|
||||||
return self
|
return self
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_string(cls, s):
|
def from_string(cls, s: str):
|
||||||
"""Parse the string given by 's' into a structure object."""
|
"""Parse the string given by 's' into a structure object."""
|
||||||
if len(s) < 1 or s[0:1] != cls.SIGIL:
|
if len(s) < 1 or s[0:1] != cls.SIGIL:
|
||||||
raise SynapseError(
|
raise SynapseError(
|
||||||
400, "Expected %s string to start with '%s'" % (cls.__name__, cls.SIGIL)
|
400,
|
||||||
|
"Expected %s string to start with '%s'" % (cls.__name__, cls.SIGIL),
|
||||||
|
Codes.INVALID_PARAM,
|
||||||
)
|
)
|
||||||
|
|
||||||
parts = s[1:].split(":", 1)
|
parts = s[1:].split(":", 1)
|
||||||
|
@ -179,6 +181,7 @@ class DomainSpecificString(namedtuple("DomainSpecificString", ("localpart", "dom
|
||||||
400,
|
400,
|
||||||
"Expected %s of the form '%slocalname:domain'"
|
"Expected %s of the form '%slocalname:domain'"
|
||||||
% (cls.__name__, cls.SIGIL),
|
% (cls.__name__, cls.SIGIL),
|
||||||
|
Codes.INVALID_PARAM,
|
||||||
)
|
)
|
||||||
|
|
||||||
domain = parts[1]
|
domain = parts[1]
|
||||||
|
@ -235,11 +238,13 @@ class GroupID(DomainSpecificString):
|
||||||
def from_string(cls, s):
|
def from_string(cls, s):
|
||||||
group_id = super(GroupID, cls).from_string(s)
|
group_id = super(GroupID, cls).from_string(s)
|
||||||
if not group_id.localpart:
|
if not group_id.localpart:
|
||||||
raise SynapseError(400, "Group ID cannot be empty")
|
raise SynapseError(400, "Group ID cannot be empty", Codes.INVALID_PARAM)
|
||||||
|
|
||||||
if contains_invalid_mxid_characters(group_id.localpart):
|
if contains_invalid_mxid_characters(group_id.localpart):
|
||||||
raise SynapseError(
|
raise SynapseError(
|
||||||
400, "Group ID can only contain characters a-z, 0-9, or '=_-./'"
|
400,
|
||||||
|
"Group ID can only contain characters a-z, 0-9, or '=_-./'",
|
||||||
|
Codes.INVALID_PARAM,
|
||||||
)
|
)
|
||||||
|
|
||||||
return group_id
|
return group_id
|
||||||
|
|
|
@ -88,6 +88,7 @@ class DirectoryTestCase(unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_delete_alias_not_allowed(self):
|
def test_delete_alias_not_allowed(self):
|
||||||
|
"""Removing an alias should be denied if a user does not have the proper permissions."""
|
||||||
room_id = "!8765qwer:test"
|
room_id = "!8765qwer:test"
|
||||||
self.get_success(
|
self.get_success(
|
||||||
self.store.create_room_alias_association(self.my_room, room_id, ["test"])
|
self.store.create_room_alias_association(self.my_room, room_id, ["test"])
|
||||||
|
@ -101,6 +102,7 @@ class DirectoryTestCase(unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_delete_alias(self):
|
def test_delete_alias(self):
|
||||||
|
"""Removing an alias should work when a user does has the proper permissions."""
|
||||||
room_id = "!8765qwer:test"
|
room_id = "!8765qwer:test"
|
||||||
user_id = "@user:test"
|
user_id = "@user:test"
|
||||||
self.get_success(
|
self.get_success(
|
||||||
|
@ -159,30 +161,42 @@ class CanonicalAliasTestCase(unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
self.test_alias = "#test:test"
|
self.test_alias = "#test:test"
|
||||||
self.room_alias = RoomAlias.from_string(self.test_alias)
|
self.room_alias = self._add_alias(self.test_alias)
|
||||||
|
|
||||||
|
def _add_alias(self, alias: str) -> RoomAlias:
|
||||||
|
"""Add an alias to the test room."""
|
||||||
|
room_alias = RoomAlias.from_string(alias)
|
||||||
|
|
||||||
# Create a new alias to this room.
|
# Create a new alias to this room.
|
||||||
self.get_success(
|
self.get_success(
|
||||||
self.store.create_room_alias_association(
|
self.store.create_room_alias_association(
|
||||||
self.room_alias, self.room_id, ["test"], self.admin_user
|
room_alias, self.room_id, ["test"], self.admin_user
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return room_alias
|
||||||
|
|
||||||
|
def _set_canonical_alias(self, content):
|
||||||
|
"""Configure the canonical alias state on the room."""
|
||||||
|
self.helper.send_state(
|
||||||
|
self.room_id, "m.room.canonical_alias", content, tok=self.admin_user_tok,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_canonical_alias(self):
|
||||||
|
"""Get the canonical alias state of the room."""
|
||||||
|
return self.get_success(
|
||||||
|
self.state_handler.get_current_state(
|
||||||
|
self.room_id, EventTypes.CanonicalAlias, ""
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_remove_alias(self):
|
def test_remove_alias(self):
|
||||||
"""Removing an alias that is the canonical alias should remove it there too."""
|
"""Removing an alias that is the canonical alias should remove it there too."""
|
||||||
# Set this new alias as the canonical alias for this room
|
# Set this new alias as the canonical alias for this room
|
||||||
self.helper.send_state(
|
self._set_canonical_alias(
|
||||||
self.room_id,
|
{"alias": self.test_alias, "alt_aliases": [self.test_alias]}
|
||||||
"m.room.canonical_alias",
|
|
||||||
{"alias": self.test_alias, "alt_aliases": [self.test_alias]},
|
|
||||||
tok=self.admin_user_tok,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
data = self.get_success(
|
data = self._get_canonical_alias()
|
||||||
self.state_handler.get_current_state(
|
|
||||||
self.room_id, EventTypes.CanonicalAlias, ""
|
|
||||||
)
|
|
||||||
)
|
|
||||||
self.assertEqual(data["content"]["alias"], self.test_alias)
|
self.assertEqual(data["content"]["alias"], self.test_alias)
|
||||||
self.assertEqual(data["content"]["alt_aliases"], [self.test_alias])
|
self.assertEqual(data["content"]["alt_aliases"], [self.test_alias])
|
||||||
|
|
||||||
|
@ -193,11 +207,7 @@ class CanonicalAliasTestCase(unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
data = self.get_success(
|
data = self._get_canonical_alias()
|
||||||
self.state_handler.get_current_state(
|
|
||||||
self.room_id, EventTypes.CanonicalAlias, ""
|
|
||||||
)
|
|
||||||
)
|
|
||||||
self.assertNotIn("alias", data["content"])
|
self.assertNotIn("alias", data["content"])
|
||||||
self.assertNotIn("alt_aliases", data["content"])
|
self.assertNotIn("alt_aliases", data["content"])
|
||||||
|
|
||||||
|
@ -205,29 +215,17 @@ class CanonicalAliasTestCase(unittest.HomeserverTestCase):
|
||||||
"""Removing an alias listed as in alt_aliases should remove it there too."""
|
"""Removing an alias listed as in alt_aliases should remove it there too."""
|
||||||
# Create a second alias.
|
# Create a second alias.
|
||||||
other_test_alias = "#test2:test"
|
other_test_alias = "#test2:test"
|
||||||
other_room_alias = RoomAlias.from_string(other_test_alias)
|
other_room_alias = self._add_alias(other_test_alias)
|
||||||
self.get_success(
|
|
||||||
self.store.create_room_alias_association(
|
|
||||||
other_room_alias, self.room_id, ["test"], self.admin_user
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Set the alias as the canonical alias for this room.
|
# Set the alias as the canonical alias for this room.
|
||||||
self.helper.send_state(
|
self._set_canonical_alias(
|
||||||
self.room_id,
|
|
||||||
"m.room.canonical_alias",
|
|
||||||
{
|
{
|
||||||
"alias": self.test_alias,
|
"alias": self.test_alias,
|
||||||
"alt_aliases": [self.test_alias, other_test_alias],
|
"alt_aliases": [self.test_alias, other_test_alias],
|
||||||
},
|
}
|
||||||
tok=self.admin_user_tok,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
data = self.get_success(
|
data = self._get_canonical_alias()
|
||||||
self.state_handler.get_current_state(
|
|
||||||
self.room_id, EventTypes.CanonicalAlias, ""
|
|
||||||
)
|
|
||||||
)
|
|
||||||
self.assertEqual(data["content"]["alias"], self.test_alias)
|
self.assertEqual(data["content"]["alias"], self.test_alias)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
data["content"]["alt_aliases"], [self.test_alias, other_test_alias]
|
data["content"]["alt_aliases"], [self.test_alias, other_test_alias]
|
||||||
|
@ -240,11 +238,7 @@ class CanonicalAliasTestCase(unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
data = self.get_success(
|
data = self._get_canonical_alias()
|
||||||
self.state_handler.get_current_state(
|
|
||||||
self.room_id, EventTypes.CanonicalAlias, ""
|
|
||||||
)
|
|
||||||
)
|
|
||||||
self.assertEqual(data["content"]["alias"], self.test_alias)
|
self.assertEqual(data["content"]["alias"], self.test_alias)
|
||||||
self.assertEqual(data["content"]["alt_aliases"], [self.test_alias])
|
self.assertEqual(data["content"]["alt_aliases"], [self.test_alias])
|
||||||
|
|
||||||
|
|
|
@ -1821,3 +1821,163 @@ class RoomAliasListTestCase(unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
self.render(request)
|
self.render(request)
|
||||||
self.assertEqual(channel.code, expected_code, channel.result)
|
self.assertEqual(channel.code, expected_code, channel.result)
|
||||||
|
|
||||||
|
|
||||||
|
class RoomCanonicalAliasTestCase(unittest.HomeserverTestCase):
|
||||||
|
servlets = [
|
||||||
|
synapse.rest.admin.register_servlets_for_client_rest_resource,
|
||||||
|
directory.register_servlets,
|
||||||
|
login.register_servlets,
|
||||||
|
room.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
|
def prepare(self, reactor, clock, homeserver):
|
||||||
|
self.room_owner = self.register_user("room_owner", "test")
|
||||||
|
self.room_owner_tok = self.login("room_owner", "test")
|
||||||
|
|
||||||
|
self.room_id = self.helper.create_room_as(
|
||||||
|
self.room_owner, tok=self.room_owner_tok
|
||||||
|
)
|
||||||
|
|
||||||
|
self.alias = "#alias:test"
|
||||||
|
self._set_alias_via_directory(self.alias)
|
||||||
|
|
||||||
|
def _set_alias_via_directory(self, alias: str, expected_code: int = 200):
|
||||||
|
url = "/_matrix/client/r0/directory/room/" + alias
|
||||||
|
data = {"room_id": self.room_id}
|
||||||
|
request_data = json.dumps(data)
|
||||||
|
|
||||||
|
request, channel = self.make_request(
|
||||||
|
"PUT", url, request_data, access_token=self.room_owner_tok
|
||||||
|
)
|
||||||
|
self.render(request)
|
||||||
|
self.assertEqual(channel.code, expected_code, channel.result)
|
||||||
|
|
||||||
|
def _get_canonical_alias(self, expected_code: int = 200) -> JsonDict:
|
||||||
|
"""Calls the endpoint under test. returns the json response object."""
|
||||||
|
request, channel = self.make_request(
|
||||||
|
"GET",
|
||||||
|
"rooms/%s/state/m.room.canonical_alias" % (self.room_id,),
|
||||||
|
access_token=self.room_owner_tok,
|
||||||
|
)
|
||||||
|
self.render(request)
|
||||||
|
self.assertEqual(channel.code, expected_code, channel.result)
|
||||||
|
res = channel.json_body
|
||||||
|
self.assertIsInstance(res, dict)
|
||||||
|
return res
|
||||||
|
|
||||||
|
def _set_canonical_alias(self, content: str, expected_code: int = 200) -> JsonDict:
|
||||||
|
"""Calls the endpoint under test. returns the json response object."""
|
||||||
|
request, channel = self.make_request(
|
||||||
|
"PUT",
|
||||||
|
"rooms/%s/state/m.room.canonical_alias" % (self.room_id,),
|
||||||
|
json.dumps(content),
|
||||||
|
access_token=self.room_owner_tok,
|
||||||
|
)
|
||||||
|
self.render(request)
|
||||||
|
self.assertEqual(channel.code, expected_code, channel.result)
|
||||||
|
res = channel.json_body
|
||||||
|
self.assertIsInstance(res, dict)
|
||||||
|
return res
|
||||||
|
|
||||||
|
def test_canonical_alias(self):
|
||||||
|
"""Test a basic alias message."""
|
||||||
|
# There is no canonical alias to start with.
|
||||||
|
self._get_canonical_alias(expected_code=404)
|
||||||
|
|
||||||
|
# Create an alias.
|
||||||
|
self._set_canonical_alias({"alias": self.alias})
|
||||||
|
|
||||||
|
# Canonical alias now exists!
|
||||||
|
res = self._get_canonical_alias()
|
||||||
|
self.assertEqual(res, {"alias": self.alias})
|
||||||
|
|
||||||
|
# Now remove the alias.
|
||||||
|
self._set_canonical_alias({})
|
||||||
|
|
||||||
|
# There is an alias event, but it is empty.
|
||||||
|
res = self._get_canonical_alias()
|
||||||
|
self.assertEqual(res, {})
|
||||||
|
|
||||||
|
def test_alt_aliases(self):
|
||||||
|
"""Test a canonical alias message with alt_aliases."""
|
||||||
|
# Create an alias.
|
||||||
|
self._set_canonical_alias({"alt_aliases": [self.alias]})
|
||||||
|
|
||||||
|
# Canonical alias now exists!
|
||||||
|
res = self._get_canonical_alias()
|
||||||
|
self.assertEqual(res, {"alt_aliases": [self.alias]})
|
||||||
|
|
||||||
|
# Now remove the alt_aliases.
|
||||||
|
self._set_canonical_alias({})
|
||||||
|
|
||||||
|
# There is an alias event, but it is empty.
|
||||||
|
res = self._get_canonical_alias()
|
||||||
|
self.assertEqual(res, {})
|
||||||
|
|
||||||
|
def test_alias_alt_aliases(self):
|
||||||
|
"""Test a canonical alias message with an alias and alt_aliases."""
|
||||||
|
# Create an alias.
|
||||||
|
self._set_canonical_alias({"alias": self.alias, "alt_aliases": [self.alias]})
|
||||||
|
|
||||||
|
# Canonical alias now exists!
|
||||||
|
res = self._get_canonical_alias()
|
||||||
|
self.assertEqual(res, {"alias": self.alias, "alt_aliases": [self.alias]})
|
||||||
|
|
||||||
|
# Now remove the alias and alt_aliases.
|
||||||
|
self._set_canonical_alias({})
|
||||||
|
|
||||||
|
# There is an alias event, but it is empty.
|
||||||
|
res = self._get_canonical_alias()
|
||||||
|
self.assertEqual(res, {})
|
||||||
|
|
||||||
|
def test_partial_modify(self):
|
||||||
|
"""Test removing only the alt_aliases."""
|
||||||
|
# Create an alias.
|
||||||
|
self._set_canonical_alias({"alias": self.alias, "alt_aliases": [self.alias]})
|
||||||
|
|
||||||
|
# Canonical alias now exists!
|
||||||
|
res = self._get_canonical_alias()
|
||||||
|
self.assertEqual(res, {"alias": self.alias, "alt_aliases": [self.alias]})
|
||||||
|
|
||||||
|
# Now remove the alt_aliases.
|
||||||
|
self._set_canonical_alias({"alias": self.alias})
|
||||||
|
|
||||||
|
# There is an alias event, but it is empty.
|
||||||
|
res = self._get_canonical_alias()
|
||||||
|
self.assertEqual(res, {"alias": self.alias})
|
||||||
|
|
||||||
|
def test_add_alias(self):
|
||||||
|
"""Test removing only the alt_aliases."""
|
||||||
|
# Create an additional alias.
|
||||||
|
second_alias = "#second:test"
|
||||||
|
self._set_alias_via_directory(second_alias)
|
||||||
|
|
||||||
|
# Add the canonical alias.
|
||||||
|
self._set_canonical_alias({"alias": self.alias, "alt_aliases": [self.alias]})
|
||||||
|
|
||||||
|
# Then add the second alias.
|
||||||
|
self._set_canonical_alias(
|
||||||
|
{"alias": self.alias, "alt_aliases": [self.alias, second_alias]}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Canonical alias now exists!
|
||||||
|
res = self._get_canonical_alias()
|
||||||
|
self.assertEqual(
|
||||||
|
res, {"alias": self.alias, "alt_aliases": [self.alias, second_alias]}
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_bad_data(self):
|
||||||
|
"""Invalid data for alt_aliases should cause errors."""
|
||||||
|
self._set_canonical_alias({"alt_aliases": "@bad:test"}, expected_code=400)
|
||||||
|
self._set_canonical_alias({"alt_aliases": None}, expected_code=400)
|
||||||
|
self._set_canonical_alias({"alt_aliases": 0}, expected_code=400)
|
||||||
|
self._set_canonical_alias({"alt_aliases": 1}, expected_code=400)
|
||||||
|
self._set_canonical_alias({"alt_aliases": False}, expected_code=400)
|
||||||
|
self._set_canonical_alias({"alt_aliases": True}, expected_code=400)
|
||||||
|
self._set_canonical_alias({"alt_aliases": {}}, expected_code=400)
|
||||||
|
|
||||||
|
def test_bad_alias(self):
|
||||||
|
"""An alias which does not point to the room raises a SynapseError."""
|
||||||
|
self._set_canonical_alias({"alias": "@unknown:test"}, expected_code=400)
|
||||||
|
self._set_canonical_alias({"alt_aliases": ["@unknown:test"]}, expected_code=400)
|
||||||
|
|
|
@ -75,7 +75,7 @@ class GroupIDTestCase(unittest.TestCase):
|
||||||
self.fail("Parsing '%s' should raise exception" % id_string)
|
self.fail("Parsing '%s' should raise exception" % id_string)
|
||||||
except SynapseError as exc:
|
except SynapseError as exc:
|
||||||
self.assertEqual(400, exc.code)
|
self.assertEqual(400, exc.code)
|
||||||
self.assertEqual("M_UNKNOWN", exc.errcode)
|
self.assertEqual("M_INVALID_PARAM", exc.errcode)
|
||||||
|
|
||||||
|
|
||||||
class MapUsernameTestCase(unittest.TestCase):
|
class MapUsernameTestCase(unittest.TestCase):
|
||||||
|
|
Loading…
Add table
Reference in a new issue