1
0
Fork 0
mirror of https://github.com/element-hq/synapse.git synced 2025-03-31 03:45:13 +00:00

Merge with branch 'develop'

This commit is contained in:
Andrew Ferrazzutti 2025-03-28 09:16:36 -04:00
commit b471553194
31 changed files with 697 additions and 91 deletions

View file

@ -1,3 +1,10 @@
# Synapse 1.127.1 (2025-03-26)
## Security
- Fix [CVE-2025-30355](https://www.cve.org/CVERecord?id=CVE-2025-30355) / [GHSA-v56r-hwv5-mxg6](https://github.com/element-hq/synapse/security/advisories/GHSA-v56r-hwv5-mxg6). **High severity vulnerability affecting federation. The vulnerability has been exploited in the wild.**
# Synapse 1.127.0 (2025-03-25)
No significant changes since 1.127.0rc1.

1
changelog.d/18225.doc Normal file
View file

@ -0,0 +1 @@
Fix how to obtain access token and change naming from riot to element

View file

@ -0,0 +1 @@
Hashes of media files are now tracked by Synapse. Media quarantines will now apply to all files with the same hash.

1
changelog.d/18283.doc Normal file
View file

@ -0,0 +1 @@
Add docs for how to clear out the Poetry wheel cache.

1
changelog.d/18284.misc Normal file
View file

@ -0,0 +1 @@
Add DB delta to remove the old state group deletion job.

6
debian/changelog vendored
View file

@ -4,6 +4,12 @@ matrix-synapse-py3 (1.128.0~rc1+nmu1) UNRELEASED; urgency=medium
-- Synapse Packaging team <packages@matrix.org> Wed, 19 Mar 2025 17:38:49 +0000
matrix-synapse-py3 (1.127.1) stable; urgency=medium
* New Synapse release 1.127.1.
-- Synapse Packaging team <packages@matrix.org> Wed, 26 Mar 2025 21:07:31 +0000
matrix-synapse-py3 (1.127.0) stable; urgency=medium
* New Synapse release 1.127.0.

View file

@ -150,6 +150,28 @@ $ poetry shell
$ poetry install --extras all
```
If you want to go even further and remove the Poetry caches:
```shell
# Find your Poetry cache directory
# Docs: https://github.com/python-poetry/poetry/blob/main/docs/configuration.md#cache-directory
$ poetry config cache-dir
# Remove packages from all cached repositories
$ poetry cache clear --all .
# Go completely nuclear and clear out everything Poetry cache related
# including the wheel artifacts which is not covered by the above command
# (see https://github.com/python-poetry/poetry/issues/10304)
#
# This is necessary in order to rebuild or fetch new wheels. For example, if you update
# the `icu` library in on your system, you will need to rebuild the PyICU Python package
# in order to incorporate the correct dynamically linked library locations otherwise you
# will run into errors like: `ImportError: libicui18n.so.75: cannot open shared object file: No such file or directory`
$ rm -rf $(poetry config cache-dir)
```
## ...run a command in the `poetry` virtualenv?
Use `poetry run cmd args` when you need the python virtualenv context.

View file

@ -160,7 +160,7 @@ Using the following curl command:
```console
curl -H 'Authorization: Bearer <access-token>' -X DELETE https://matrix.org/_matrix/client/r0/directory/room/<room-alias>
```
`<access-token>` - can be obtained in riot by looking in the riot settings, down the bottom is:
`<access-token>` - can be obtained in element by looking in All settings, clicking Help & About and down the bottom is:
Access Token:\<click to reveal\>
`<room-alias>` - the room alias, eg. #my_room:matrix.org this possibly needs to be URL encoded also, for example %23my_room%3Amatrix.org

View file

@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.127.0"
version = "1.127.1"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"

View file

@ -29,8 +29,13 @@ from typing import Final
# the max size of a (canonical-json-encoded) event
MAX_PDU_SIZE = 65536
# the "depth" field on events is limited to 2**63 - 1
MAX_DEPTH = 2**63 - 1
# Max/min size of ints in canonical JSON
CANONICALJSON_MAX_INT = (2**53) - 1
CANONICALJSON_MIN_INT = -CANONICALJSON_MAX_INT
# the "depth" field on events is limited to the same as what
# canonicaljson accepts
MAX_DEPTH = CANONICALJSON_MAX_INT
# the maximum length for a room alias is 255 characters
MAX_ALIAS_LENGTH = 255

View file

@ -40,6 +40,8 @@ import attr
from canonicaljson import encode_canonical_json
from synapse.api.constants import (
CANONICALJSON_MAX_INT,
CANONICALJSON_MIN_INT,
MAX_PDU_SIZE,
EventContentFields,
EventTypes,
@ -61,9 +63,6 @@ SPLIT_FIELD_REGEX = re.compile(r"\\*\.")
# Find escaped characters, e.g. those with a \ in front of them.
ESCAPE_SEQUENCE_PATTERN = re.compile(r"\\(.)")
CANONICALJSON_MAX_INT = (2**53) - 1
CANONICALJSON_MIN_INT = -CANONICALJSON_MAX_INT
# Module API callback that allows adding fields to the unsigned section of
# events that are sent to clients.

View file

@ -86,9 +86,7 @@ class EventValidator:
# Depending on the room version, ensure the data is spec compliant JSON.
if event.room_version.strict_canonicaljson:
# Note that only the client controlled portion of the event is
# checked, since we trust the portions of the event we created.
validate_canonicaljson(event.content)
validate_canonicaljson(event.get_pdu_json())
if event.type == EventTypes.Aliases:
if "aliases" in event.content:

View file

@ -20,7 +20,7 @@
#
#
import logging
from typing import TYPE_CHECKING, Awaitable, Callable, Optional
from typing import TYPE_CHECKING, Awaitable, Callable, List, Optional, Sequence
from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
@ -29,6 +29,7 @@ from synapse.crypto.event_signing import check_event_content_hash
from synapse.crypto.keyring import Keyring
from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event, validate_canonicaljson
from synapse.federation.units import filter_pdus_for_valid_depth
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.opentracing import log_kv, trace
from synapse.types import JsonDict, get_domain_from_id
@ -267,6 +268,15 @@ def _is_invite_via_3pid(event: EventBase) -> bool:
)
def parse_events_from_pdu_json(
pdus_json: Sequence[JsonDict], room_version: RoomVersion
) -> List[EventBase]:
return [
event_from_pdu_json(pdu_json, room_version)
for pdu_json in filter_pdus_for_valid_depth(pdus_json)
]
def event_from_pdu_json(pdu_json: JsonDict, room_version: RoomVersion) -> EventBase:
"""Construct an EventBase from an event json received over federation

View file

@ -68,6 +68,7 @@ from synapse.federation.federation_base import (
FederationBase,
InvalidEventSignatureError,
event_from_pdu_json,
parse_events_from_pdu_json,
)
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.client import is_unknown_endpoint
@ -349,7 +350,7 @@ class FederationClient(FederationBase):
room_version = await self.store.get_room_version(room_id)
pdus = [event_from_pdu_json(p, room_version) for p in transaction_data_pdus]
pdus = parse_events_from_pdu_json(transaction_data_pdus, room_version)
# Check signatures and hash of pdus, removing any from the list that fail checks
pdus[:] = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
@ -393,9 +394,7 @@ class FederationClient(FederationBase):
transaction_data,
)
pdu_list: List[EventBase] = [
event_from_pdu_json(p, room_version) for p in transaction_data["pdus"]
]
pdu_list = parse_events_from_pdu_json(transaction_data["pdus"], room_version)
if pdu_list and pdu_list[0]:
pdu = pdu_list[0]
@ -809,7 +808,7 @@ class FederationClient(FederationBase):
room_version = await self.store.get_room_version(room_id)
auth_chain = [event_from_pdu_json(p, room_version) for p in res["auth_chain"]]
auth_chain = parse_events_from_pdu_json(res["auth_chain"], room_version)
signed_auth = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, auth_chain, room_version=room_version
@ -1529,9 +1528,7 @@ class FederationClient(FederationBase):
room_version = await self.store.get_room_version(room_id)
events = [
event_from_pdu_json(e, room_version) for e in content.get("events", [])
]
events = parse_events_from_pdu_json(content.get("events", []), room_version)
signed_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, events, room_version=room_version

View file

@ -66,7 +66,7 @@ from synapse.federation.federation_base import (
event_from_pdu_json,
)
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.federation.units import Edu, Transaction, serialize_and_filter_pdus
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import (
@ -469,7 +469,12 @@ class FederationServer(FederationBase):
logger.info("Ignoring PDU: %s", e)
continue
event = event_from_pdu_json(p, room_version)
try:
event = event_from_pdu_json(p, room_version)
except SynapseError as e:
logger.info("Ignoring PDU for failing to deserialize: %s", e)
continue
pdus_by_room.setdefault(room_id, []).append(event)
if event.origin_server_ts > newest_pdu_ts:
@ -636,8 +641,8 @@ class FederationServer(FederationBase):
)
return {
"pdus": [pdu.get_pdu_json() for pdu in pdus],
"auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
"pdus": serialize_and_filter_pdus(pdus),
"auth_chain": serialize_and_filter_pdus(auth_chain),
}
async def on_pdu_request(
@ -761,8 +766,8 @@ class FederationServer(FederationBase):
event_json = event.get_pdu_json(time_now)
resp = {
"event": event_json,
"state": [p.get_pdu_json(time_now) for p in state_events],
"auth_chain": [p.get_pdu_json(time_now) for p in auth_chain_events],
"state": serialize_and_filter_pdus(state_events, time_now),
"auth_chain": serialize_and_filter_pdus(auth_chain_events, time_now),
"members_omitted": caller_supports_partial_state,
}
@ -1005,7 +1010,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
auth_pdus = await self.handler.on_event_auth(event_id)
res = {"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus]}
res = {"auth_chain": serialize_and_filter_pdus(auth_pdus, time_now)}
return 200, res
async def on_query_client_keys(
@ -1090,7 +1095,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
return {"events": [ev.get_pdu_json(time_now) for ev in missing_events]}
return {"events": serialize_and_filter_pdus(missing_events, time_now)}
async def on_openid_userinfo(self, token: str) -> Optional[str]:
ts_now_ms = self._clock.time_msec()

View file

@ -24,10 +24,12 @@ server protocol.
"""
import logging
from typing import List, Optional
from typing import List, Optional, Sequence
import attr
from synapse.api.constants import CANONICALJSON_MAX_INT, CANONICALJSON_MIN_INT
from synapse.events import EventBase
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@ -104,8 +106,28 @@ class Transaction:
result = {
"origin": self.origin,
"origin_server_ts": self.origin_server_ts,
"pdus": self.pdus,
"pdus": filter_pdus_for_valid_depth(self.pdus),
}
if self.edus:
result["edus"] = self.edus
return result
def filter_pdus_for_valid_depth(pdus: Sequence[JsonDict]) -> List[JsonDict]:
filtered_pdus = []
for pdu in pdus:
# Drop PDUs that have a depth that is outside of the range allowed
# by canonical json.
if (
"depth" in pdu
and CANONICALJSON_MIN_INT <= pdu["depth"] <= CANONICALJSON_MAX_INT
):
filtered_pdus.append(pdu)
return filtered_pdus
def serialize_and_filter_pdus(
pdus: Sequence[EventBase], time_now: Optional[int] = None
) -> List[JsonDict]:
return filter_pdus_for_valid_depth([pdu.get_pdu_json(time_now) for pdu in pdus])

View file

@ -59,7 +59,11 @@ from synapse.media._base import (
respond_with_responder,
)
from synapse.media.filepath import MediaFilePaths
from synapse.media.media_storage import MediaStorage
from synapse.media.media_storage import (
MediaStorage,
SHA256TransparentIOReader,
SHA256TransparentIOWriter,
)
from synapse.media.storage_provider import StorageProviderWrapper
from synapse.media.thumbnailer import Thumbnailer, ThumbnailError
from synapse.media.url_previewer import UrlPreviewer
@ -301,15 +305,26 @@ class MediaRepository:
auth_user: The user_id of the uploader
"""
file_info = FileInfo(server_name=None, file_id=media_id)
fname = await self.media_storage.store_file(content, file_info)
sha256reader = SHA256TransparentIOReader(content)
# This implements all of IO as it has a passthrough
fname = await self.media_storage.store_file(sha256reader.wrap(), file_info)
sha256 = sha256reader.hexdigest()
should_quarantine = await self.store.get_is_hash_quarantined(sha256)
logger.info("Stored local media in file %r", fname)
if should_quarantine:
logger.warn(
"Media has been automatically quarantined as it matched existing quarantined media"
)
await self.store.update_local_media(
media_id=media_id,
media_type=media_type,
upload_name=upload_name,
media_length=content_length,
user_id=auth_user,
sha256=sha256,
quarantined_by="system" if should_quarantine else None,
)
try:
@ -342,11 +357,19 @@ class MediaRepository:
media_id = random_string(24)
file_info = FileInfo(server_name=None, file_id=media_id)
fname = await self.media_storage.store_file(content, file_info)
# This implements all of IO as it has a passthrough
sha256reader = SHA256TransparentIOReader(content)
fname = await self.media_storage.store_file(sha256reader.wrap(), file_info)
sha256 = sha256reader.hexdigest()
should_quarantine = await self.store.get_is_hash_quarantined(sha256)
logger.info("Stored local media in file %r", fname)
if should_quarantine:
logger.warn(
"Media has been automatically quarantined as it matched existing quarantined media"
)
await self.store.store_local_media(
media_id=media_id,
media_type=media_type,
@ -354,6 +377,9 @@ class MediaRepository:
upload_name=upload_name,
media_length=content_length,
user_id=auth_user,
sha256=sha256,
# TODO: Better name?
quarantined_by="system" if should_quarantine else None,
)
try:
@ -756,11 +782,13 @@ class MediaRepository:
file_info = FileInfo(server_name=server_name, file_id=file_id)
async with self.media_storage.store_into_file(file_info) as (f, fname):
sha256writer = SHA256TransparentIOWriter(f)
try:
length, headers = await self.client.download_media(
server_name,
media_id,
output_stream=f,
# This implements all of BinaryIO as it has a passthrough
output_stream=sha256writer.wrap(),
max_size=self.max_upload_size,
max_timeout_ms=max_timeout_ms,
download_ratelimiter=download_ratelimiter,
@ -825,6 +853,7 @@ class MediaRepository:
upload_name=upload_name,
media_length=length,
filesystem_id=file_id,
sha256=sha256writer.hexdigest(),
)
logger.info("Stored remote media in file %r", fname)
@ -845,6 +874,7 @@ class MediaRepository:
last_access_ts=time_now_ms,
quarantined_by=None,
authenticated=authenticated,
sha256=sha256writer.hexdigest(),
)
async def _federation_download_remote_file(
@ -879,11 +909,13 @@ class MediaRepository:
file_info = FileInfo(server_name=server_name, file_id=file_id)
async with self.media_storage.store_into_file(file_info) as (f, fname):
sha256writer = SHA256TransparentIOWriter(f)
try:
res = await self.client.federation_download_media(
server_name,
media_id,
output_stream=f,
# This implements all of BinaryIO as it has a passthrough
output_stream=sha256writer.wrap(),
max_size=self.max_upload_size,
max_timeout_ms=max_timeout_ms,
download_ratelimiter=download_ratelimiter,
@ -954,6 +986,7 @@ class MediaRepository:
upload_name=upload_name,
media_length=length,
filesystem_id=file_id,
sha256=sha256writer.hexdigest(),
)
logger.debug("Stored remote media in file %r", fname)
@ -974,6 +1007,7 @@ class MediaRepository:
last_access_ts=time_now_ms,
quarantined_by=None,
authenticated=authenticated,
sha256=sha256writer.hexdigest(),
)
def _get_thumbnail_requirements(

View file

@ -19,6 +19,7 @@
#
#
import contextlib
import hashlib
import json
import logging
import os
@ -70,6 +71,88 @@ logger = logging.getLogger(__name__)
CRLF = b"\r\n"
class SHA256TransparentIOWriter:
"""Will generate a SHA256 hash from a source stream transparently.
Args:
source: Source stream.
"""
def __init__(self, source: BinaryIO):
self._hash = hashlib.sha256()
self._source = source
def write(self, buffer: Union[bytes, bytearray]) -> int:
"""Wrapper for source.write()
Args:
buffer
Returns:
the value of source.write()
"""
res = self._source.write(buffer)
self._hash.update(buffer)
return res
def hexdigest(self) -> str:
"""The digest of the written or read value.
Returns:
The digest in hex formaat.
"""
return self._hash.hexdigest()
def wrap(self) -> BinaryIO:
# This class implements a subset the IO interface and passes through everything else via __getattr__
return cast(BinaryIO, self)
# Passthrough any other calls
def __getattr__(self, attr_name: str) -> Any:
return getattr(self._source, attr_name)
class SHA256TransparentIOReader:
"""Will generate a SHA256 hash from a source stream transparently.
Args:
source: Source IO stream.
"""
def __init__(self, source: IO):
self._hash = hashlib.sha256()
self._source = source
def read(self, n: int = -1) -> bytes:
"""Wrapper for source.read()
Args:
n
Returns:
the value of source.read()
"""
bytes = self._source.read(n)
self._hash.update(bytes)
return bytes
def hexdigest(self) -> str:
"""The digest of the written or read value.
Returns:
The digest in hex formaat.
"""
return self._hash.hexdigest()
def wrap(self) -> IO:
# This class implements a subset the IO interface and passes through everything else via __getattr__
return cast(IO, self)
# Passthrough any other calls
def __getattr__(self, attr_name: str) -> Any:
return getattr(self._source, attr_name)
class MediaStorage:
"""Responsible for storing/fetching files from local sources.
@ -107,7 +190,6 @@ class MediaStorage:
Returns:
the file path written to in the primary media store
"""
async with self.store_into_file(file_info) as (f, fname):
# Write to the main media repository
await self.write_to_file(source, f)

View file

@ -19,6 +19,7 @@
# [This file includes modifications made by New Vector Limited]
#
#
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
@ -51,6 +52,8 @@ BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2 = (
"media_repository_drop_index_wo_method_2"
)
logger = logging.getLogger(__name__)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class LocalMedia:
@ -65,6 +68,7 @@ class LocalMedia:
safe_from_quarantine: bool
user_id: Optional[str]
authenticated: Optional[bool]
sha256: Optional[str]
@attr.s(slots=True, frozen=True, auto_attribs=True)
@ -79,6 +83,7 @@ class RemoteMedia:
last_access_ts: int
quarantined_by: Optional[str]
authenticated: Optional[bool]
sha256: Optional[str]
@attr.s(slots=True, frozen=True, auto_attribs=True)
@ -154,6 +159,26 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore):
unique=True,
)
self.db_pool.updates.register_background_index_update(
update_name="local_media_repository_sha256_idx",
index_name="local_media_repository_sha256",
table="local_media_repository",
where_clause="sha256 IS NOT NULL",
columns=[
"sha256",
],
)
self.db_pool.updates.register_background_index_update(
update_name="remote_media_cache_sha256_idx",
index_name="remote_media_cache_sha256",
table="remote_media_cache",
where_clause="sha256 IS NOT NULL",
columns=[
"sha256",
],
)
self.db_pool.updates.register_background_update_handler(
BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2,
self._drop_media_index_without_method,
@ -221,6 +246,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"safe_from_quarantine",
"user_id",
"authenticated",
"sha256",
),
allow_none=True,
desc="get_local_media",
@ -239,6 +265,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
safe_from_quarantine=row[7],
user_id=row[8],
authenticated=row[9],
sha256=row[10],
)
async def get_local_media_by_user_paginate(
@ -295,7 +322,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
quarantined_by,
safe_from_quarantine,
user_id,
authenticated
authenticated,
sha256
FROM local_media_repository
WHERE user_id = ?
ORDER BY {order_by_column} {order}, media_id ASC
@ -320,6 +348,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
safe_from_quarantine=bool(row[8]),
user_id=row[9],
authenticated=row[10],
sha256=row[11],
)
for row in txn
]
@ -449,6 +478,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
media_length: int,
user_id: UserID,
url_cache: Optional[str] = None,
sha256: Optional[str] = None,
quarantined_by: Optional[str] = None,
) -> None:
if self.hs.config.media.enable_authenticated_media:
authenticated = True
@ -466,6 +497,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"user_id": user_id.to_string(),
"url_cache": url_cache,
"authenticated": authenticated,
"sha256": sha256,
"quarantined_by": quarantined_by,
},
desc="store_local_media",
)
@ -477,20 +510,28 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
upload_name: Optional[str],
media_length: int,
user_id: UserID,
sha256: str,
url_cache: Optional[str] = None,
quarantined_by: Optional[str] = None,
) -> None:
updatevalues = {
"media_type": media_type,
"upload_name": upload_name,
"media_length": media_length,
"url_cache": url_cache,
"sha256": sha256,
}
# This should never be un-set by this function.
if quarantined_by is not None:
updatevalues["quarantined_by"] = quarantined_by
await self.db_pool.simple_update_one(
"local_media_repository",
keyvalues={
"user_id": user_id.to_string(),
"media_id": media_id,
},
updatevalues={
"media_type": media_type,
"upload_name": upload_name,
"media_length": media_length,
"url_cache": url_cache,
},
updatevalues=updatevalues,
desc="update_local_media",
)
@ -657,6 +698,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"last_access_ts",
"quarantined_by",
"authenticated",
"sha256",
),
allow_none=True,
desc="get_cached_remote_media",
@ -674,6 +716,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
last_access_ts=row[5],
quarantined_by=row[6],
authenticated=row[7],
sha256=row[8],
)
async def store_cached_remote_media(
@ -685,6 +728,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
time_now_ms: int,
upload_name: Optional[str],
filesystem_id: str,
sha256: Optional[str],
) -> None:
if self.hs.config.media.enable_authenticated_media:
authenticated = True
@ -703,6 +747,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"filesystem_id": filesystem_id,
"last_access_ts": time_now_ms,
"authenticated": authenticated,
"sha256": sha256,
},
desc="store_cached_remote_media",
)
@ -946,3 +991,37 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
await self.db_pool.runInteraction(
"delete_url_cache_media", _delete_url_cache_media_txn
)
async def get_is_hash_quarantined(self, sha256: str) -> bool:
"""Get whether a specific sha256 hash digest matches any quarantined media.
Returns:
None if the media_id doesn't exist.
"""
def get_matching_media_txn(
txn: LoggingTransaction, table: str, sha256: str
) -> bool:
# Return on first match
sql = """
SELECT 1
FROM local_media_repository
WHERE sha256 = ? AND quarantined_by IS NOT NULL
UNION ALL
SELECT 1
FROM remote_media_cache
WHERE sha256 = ? AND quarantined_by IS NOT NULL
LIMIT 1
"""
txn.execute(sql, (sha256, sha256))
row = txn.fetchone()
return row is not None
return await self.db_pool.runInteraction(
"get_matching_media_txn",
get_matching_media_txn,
"local_media_repository",
sha256,
)

View file

@ -51,11 +51,15 @@ from synapse.api.room_versions import RoomVersion, RoomVersions
from synapse.config.homeserver import HomeServerConfig
from synapse.events import EventBase
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage._base import (
db_to_json,
make_in_list_sql_clause,
)
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
make_tuple_in_list_sql_clause,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.types import Cursor
@ -1127,6 +1131,109 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
return local_media_ids
def _quarantine_local_media_txn(
self,
txn: LoggingTransaction,
hashes: Set[str],
media_ids: Set[str],
quarantined_by: Optional[str],
) -> int:
"""Quarantine and unquarantine local media items.
Args:
txn (cursor)
hashes: A set of sha256 hashes for any media that should be quarantined
media_ids: A set of media IDs for any media that should be quarantined
quarantined_by: The ID of the user who initiated the quarantine request
If it is `None` media will be removed from quarantine
Returns:
The total number of media items quarantined
"""
total_media_quarantined = 0
# Effectively a legacy path, update any media that was explicitly named.
if media_ids:
sql_many_clause_sql, sql_many_clause_args = make_in_list_sql_clause(
txn.database_engine, "media_id", media_ids
)
sql = f"""
UPDATE local_media_repository
SET quarantined_by = ?
WHERE {sql_many_clause_sql}"""
if quarantined_by is not None:
sql += " AND safe_from_quarantine = FALSE"
txn.execute(sql, [quarantined_by] + sql_many_clause_args)
# Note that a rowcount of -1 can be used to indicate no rows were affected.
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
# Update any media that was identified via hash.
if hashes:
sql_many_clause_sql, sql_many_clause_args = make_in_list_sql_clause(
txn.database_engine, "sha256", hashes
)
sql = f"""
UPDATE local_media_repository
SET quarantined_by = ?
WHERE {sql_many_clause_sql}"""
if quarantined_by is not None:
sql += " AND safe_from_quarantine = FALSE"
txn.execute(sql, [quarantined_by] + sql_many_clause_args)
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
return total_media_quarantined
def _quarantine_remote_media_txn(
self,
txn: LoggingTransaction,
hashes: Set[str],
media: Set[Tuple[str, str]],
quarantined_by: Optional[str],
) -> int:
"""Quarantine and unquarantine remote items
Args:
txn (cursor)
hashes: A set of sha256 hashes for any media that should be quarantined
media_ids: A set of tuples (media_origin, media_id) for any media that should be quarantined
quarantined_by: The ID of the user who initiated the quarantine request
If it is `None` media will be removed from quarantine
Returns:
The total number of media items quarantined
"""
total_media_quarantined = 0
if media:
sql_in_list_clause, sql_args = make_tuple_in_list_sql_clause(
txn.database_engine,
("media_origin", "media_id"),
media,
)
sql = f"""
UPDATE remote_media_cache
SET quarantined_by = ?
WHERE {sql_in_list_clause}"""
txn.execute(sql, [quarantined_by] + sql_args)
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
total_media_quarantined = 0
if hashes:
sql_many_clause_sql, sql_many_clause_args = make_in_list_sql_clause(
txn.database_engine, "sha256", hashes
)
sql = f"""
UPDATE remote_media_cache
SET quarantined_by = ?
WHERE {sql_many_clause_sql}"""
txn.execute(sql, [quarantined_by] + sql_many_clause_args)
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
return total_media_quarantined
def _quarantine_media_txn(
self,
txn: LoggingTransaction,
@ -1146,40 +1253,49 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
Returns:
The total number of media items quarantined
"""
hashes = set()
media_ids = set()
remote_media = set()
# Update all the tables to set the quarantined_by flag
sql = """
UPDATE local_media_repository
SET quarantined_by = ?
WHERE media_id = ?
"""
# set quarantine
if quarantined_by is not None:
sql += "AND safe_from_quarantine = FALSE"
txn.executemany(
sql, [(quarantined_by, media_id) for media_id in local_mxcs]
# First, determine the hashes of the media we want to delete.
# We also want the media_ids for any media that lacks a hash.
if local_mxcs:
hash_sql_many_clause_sql, hash_sql_many_clause_args = (
make_in_list_sql_clause(txn.database_engine, "media_id", local_mxcs)
)
# remove from quarantine
else:
txn.executemany(
sql, [(quarantined_by, media_id) for media_id in local_mxcs]
hash_sql = f"SELECT sha256, media_id FROM local_media_repository WHERE {hash_sql_many_clause_sql}"
if quarantined_by is not None:
hash_sql += " AND safe_from_quarantine = FALSE"
txn.execute(hash_sql, hash_sql_many_clause_args)
for sha256, media_id in txn:
if sha256:
hashes.add(sha256)
else:
media_ids.add(media_id)
# Do the same for remote media
if remote_mxcs:
hash_sql_in_list_clause, hash_sql_args = make_tuple_in_list_sql_clause(
txn.database_engine,
("media_origin", "media_id"),
remote_mxcs,
)
# Note that a rowcount of -1 can be used to indicate no rows were affected.
total_media_quarantined = txn.rowcount if txn.rowcount > 0 else 0
hash_sql = f"SELECT sha256, media_origin, media_id FROM remote_media_cache WHERE {hash_sql_in_list_clause}"
txn.execute(hash_sql, hash_sql_args)
for sha256, media_origin, media_id in txn:
if sha256:
hashes.add(sha256)
else:
remote_media.add((media_origin, media_id))
txn.executemany(
"""
UPDATE remote_media_cache
SET quarantined_by = ?
WHERE media_origin = ? AND media_id = ?
""",
[(quarantined_by, origin, media_id) for origin, media_id in remote_mxcs],
count = self._quarantine_local_media_txn(txn, hashes, media_ids, quarantined_by)
count += self._quarantine_remote_media_txn(
txn, hashes, remote_media, quarantined_by
)
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
return total_media_quarantined
return count
async def block_room(self, room_id: str, user_id: str) -> None:
"""Marks the room as blocked.

View file

@ -19,7 +19,7 @@
#
#
SCHEMA_VERSION = 90 # remember to update the list below when updating
SCHEMA_VERSION = 91 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the

View file

@ -0,0 +1,21 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Store the SHA256 content hash of media files.
ALTER TABLE local_media_repository ADD COLUMN sha256 TEXT;
ALTER TABLE remote_media_cache ADD COLUMN sha256 TEXT;
-- Add a background updates to handle creating the new index.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(9101, 'local_media_repository_sha256_idx', '{}'),
(9101, 'remote_media_cache_sha256_idx', '{}');

View file

@ -0,0 +1,15 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Remove the old unreferenced state group deletion background update if it exists
DELETE FROM background_updates WHERE update_name = 'delete_unreferenced_state_groups_bg_update';

View file

@ -369,6 +369,7 @@ class ProfileTestCase(unittest.HomeserverTestCase):
time_now_ms=self.clock.time_msec(),
upload_name=None,
filesystem_id="xyz",
sha256="abcdefg12345",
)
)

View file

@ -31,6 +31,9 @@ from synapse.rest.client import login, register, room
from synapse.server import HomeServer
from synapse.types import UserID
from synapse.util import Clock
from synapse.util.stringutils import (
random_string,
)
from tests import unittest
from tests.unittest import override_config
@ -65,7 +68,6 @@ class MediaRetentionTestCase(unittest.HomeserverTestCase):
# quarantined media) into both the local store and the remote cache, plus
# one additional local media that is marked as protected from quarantine.
media_repository = hs.get_media_repository()
test_media_content = b"example string"
def _create_media_and_set_attributes(
last_accessed_ms: Optional[int],
@ -73,12 +75,14 @@ class MediaRetentionTestCase(unittest.HomeserverTestCase):
is_protected: Optional[bool] = False,
) -> MXCUri:
# "Upload" some media to the local media store
# If the meda
random_content = bytes(random_string(24), "utf-8")
mxc_uri: MXCUri = self.get_success(
media_repository.create_content(
media_type="text/plain",
upload_name=None,
content=io.BytesIO(test_media_content),
content_length=len(test_media_content),
content=io.BytesIO(random_content),
content_length=len(random_content),
auth_user=UserID.from_string(test_user_id),
)
)
@ -129,6 +133,7 @@ class MediaRetentionTestCase(unittest.HomeserverTestCase):
time_now_ms=clock.time_msec(),
upload_name="testfile.txt",
filesystem_id="abcdefg12345",
sha256=random_string(24),
)
)

View file

@ -42,6 +42,7 @@ from twisted.web.resource import Resource
from synapse.api.errors import Codes, HttpResponseException
from synapse.api.ratelimiting import Ratelimiter
from synapse.events import EventBase
from synapse.http.client import ByteWriteable
from synapse.http.types import QueryParams
from synapse.logging.context import make_deferred_yieldable
from synapse.media._base import FileInfo, ThumbnailInfo
@ -59,7 +60,7 @@ from synapse.util import Clock
from tests import unittest
from tests.server import FakeChannel
from tests.test_utils import SMALL_CMYK_JPEG, SMALL_PNG
from tests.test_utils import SMALL_CMYK_JPEG, SMALL_PNG, SMALL_PNG_SHA256
from tests.unittest import override_config
from tests.utils import default_config
@ -1257,3 +1258,107 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
)
assert channel.code == 502
assert channel.json_body["errcode"] == "M_TOO_LARGE"
def read_body(
response: IResponse, stream: ByteWriteable, max_size: Optional[int]
) -> Deferred:
d: Deferred = defer.Deferred()
stream.write(SMALL_PNG)
d.callback(len(SMALL_PNG))
return d
class MediaHashesTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
login.register_servlets,
media.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.user = self.register_user("user", "pass")
self.tok = self.login("user", "pass")
self.store = hs.get_datastores().main
self.client = hs.get_federation_http_client()
def create_resource_dict(self) -> Dict[str, Resource]:
resources = super().create_resource_dict()
resources["/_matrix/media"] = self.hs.get_media_repository_resource()
return resources
def test_ensure_correct_sha256(self) -> None:
"""Check that the hash does not change"""
media = self.helper.upload_media(SMALL_PNG, tok=self.tok, expect_code=200)
mxc = media.get("content_uri")
assert mxc
store_media = self.get_success(self.store.get_local_media(mxc[11:]))
assert store_media
self.assertEqual(
store_media.sha256,
SMALL_PNG_SHA256,
)
def test_ensure_multiple_correct_sha256(self) -> None:
"""Check that two media items have the same hash."""
media_a = self.helper.upload_media(SMALL_PNG, tok=self.tok, expect_code=200)
mxc_a = media_a.get("content_uri")
assert mxc_a
store_media_a = self.get_success(self.store.get_local_media(mxc_a[11:]))
assert store_media_a
media_b = self.helper.upload_media(SMALL_PNG, tok=self.tok, expect_code=200)
mxc_b = media_b.get("content_uri")
assert mxc_b
store_media_b = self.get_success(self.store.get_local_media(mxc_b[11:]))
assert store_media_b
self.assertNotEqual(
store_media_a.media_id,
store_media_b.media_id,
)
self.assertEqual(
store_media_a.sha256,
store_media_b.sha256,
)
@override_config(
{
"enable_authenticated_media": False,
}
)
# mock actually reading file body
@patch(
"synapse.http.matrixfederationclient.read_body_with_max_size",
read_body,
)
def test_ensure_correct_sha256_federated(self) -> None:
"""Check that federated media have the same hash."""
# Mock getting a file over federation
async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
resp = MagicMock(spec=IResponse)
resp.code = 200
resp.length = 500
resp.headers = Headers({"Content-Type": ["application/octet-stream"]})
resp.phrase = b"OK"
return resp
self.client._send_request = _send_request # type: ignore
# first request should go through
channel = self.make_request(
"GET",
"/_matrix/media/v3/download/remote.org/abc",
shorthand=False,
access_token=self.tok,
)
assert channel.code == 200
store_media = self.get_success(
self.store.get_cached_remote_media("remote.org", "abc")
)
assert store_media
self.assertEqual(
store_media.sha256,
SMALL_PNG_SHA256,
)

View file

@ -20,7 +20,7 @@
#
import urllib.parse
from typing import Dict
from typing import Dict, cast
from parameterized import parameterized
@ -32,6 +32,7 @@ from synapse.http.server import JsonResource
from synapse.rest.admin import VersionServlet
from synapse.rest.client import login, media, room
from synapse.server import HomeServer
from synapse.types import UserID
from synapse.util import Clock
from tests import unittest
@ -227,10 +228,25 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
# Upload some media
response_1 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok)
response_2 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok)
response_3 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok)
# Extract media IDs
server_and_media_id_1 = response_1["content_uri"][6:]
server_and_media_id_2 = response_2["content_uri"][6:]
server_and_media_id_3 = response_3["content_uri"][6:]
# Remove the hash from the media to simulate historic media.
self.get_success(
self.hs.get_datastores().main.update_local_media(
media_id=server_and_media_id_3.split("/")[1],
media_type="image/png",
upload_name=None,
media_length=123,
user_id=UserID.from_string(non_admin_user),
# Hack to force some media to have no hash.
sha256=cast(str, None),
)
)
# Quarantine all media by this user
url = "/_synapse/admin/v1/user/%s/media/quarantine" % urllib.parse.quote(
@ -244,12 +260,13 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
self.pump(1.0)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(
channel.json_body, {"num_quarantined": 2}, "Expected 2 quarantined items"
channel.json_body, {"num_quarantined": 3}, "Expected 3 quarantined items"
)
# Attempt to access each piece of media
self._ensure_quarantined(admin_user_tok, server_and_media_id_1)
self._ensure_quarantined(admin_user_tok, server_and_media_id_2)
self._ensure_quarantined(admin_user_tok, server_and_media_id_3)
def test_cannot_quarantine_safe_media(self) -> None:
self.register_user("user_admin", "pass", admin=True)

View file

@ -35,7 +35,7 @@ from synapse.server import HomeServer
from synapse.util import Clock
from tests import unittest
from tests.test_utils import SMALL_PNG
from tests.test_utils import SMALL_CMYK_JPEG, SMALL_PNG
from tests.unittest import override_config
VALID_TIMESTAMP = 1609459200000 # 2021-01-01 in milliseconds
@ -598,23 +598,27 @@ class DeleteMediaByDateSizeTestCase(_AdminMediaTests):
class QuarantineMediaByIDTestCase(_AdminMediaTests):
def upload_media_and_return_media_id(self, data: bytes) -> str:
# Upload some media into the room
response = self.helper.upload_media(
data,
tok=self.admin_user_tok,
expect_code=200,
)
# Extract media ID from the response
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
return server_and_media_id.split("/")[1]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.server_name = hs.hostname
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
# Upload some media into the room
response = self.helper.upload_media(
SMALL_PNG,
tok=self.admin_user_tok,
expect_code=200,
)
# Extract media ID from the response
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
self.media_id = server_and_media_id.split("/")[1]
self.media_id = self.upload_media_and_return_media_id(SMALL_PNG)
self.media_id_2 = self.upload_media_and_return_media_id(SMALL_PNG)
self.media_id_3 = self.upload_media_and_return_media_id(SMALL_PNG)
self.media_id_other = self.upload_media_and_return_media_id(SMALL_CMYK_JPEG)
self.url = "/_synapse/admin/v1/media/%s/%s/%s"
@parameterized.expand(["quarantine", "unquarantine"])
@ -686,6 +690,52 @@ class QuarantineMediaByIDTestCase(_AdminMediaTests):
assert media_info is not None
self.assertFalse(media_info.quarantined_by)
def test_quarantine_media_match_hash(self) -> None:
"""
Tests that quarantining removes all media with the same hash
"""
media_info = self.get_success(self.store.get_local_media(self.media_id))
assert media_info is not None
self.assertFalse(media_info.quarantined_by)
# quarantining
channel = self.make_request(
"POST",
self.url % ("quarantine", self.server_name, self.media_id),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
# Test that ALL similar media was quarantined.
for media in [self.media_id, self.media_id_2, self.media_id_3]:
media_info = self.get_success(self.store.get_local_media(media))
assert media_info is not None
self.assertTrue(media_info.quarantined_by)
# Test that other media was not.
media_info = self.get_success(self.store.get_local_media(self.media_id_other))
assert media_info is not None
self.assertFalse(media_info.quarantined_by)
# remove from quarantine
channel = self.make_request(
"POST",
self.url % ("unquarantine", self.server_name, self.media_id),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
# Test that ALL similar media is now reset.
for media in [self.media_id, self.media_id_2, self.media_id_3]:
media_info = self.get_success(self.store.get_local_media(media))
assert media_info is not None
self.assertFalse(media_info.quarantined_by)
def test_quarantine_protected_media(self) -> None:
"""
Tests that quarantining from protected media fails

View file

@ -137,6 +137,7 @@ class MediaDomainBlockingTests(unittest.HomeserverTestCase):
time_now_ms=clock.time_msec(),
upload_name="test.png",
filesystem_id=file_id,
sha256=file_id,
)
)
self.register_user("user", "password")
@ -2593,6 +2594,7 @@ class AuthenticatedMediaTestCase(unittest.HomeserverTestCase):
time_now_ms=self.clock.time_msec(),
upload_name="remote_test.png",
filesystem_id=file_id,
sha256=file_id,
)
)
@ -2725,6 +2727,7 @@ class AuthenticatedMediaTestCase(unittest.HomeserverTestCase):
time_now_ms=self.clock.time_msec(),
upload_name="remote_test.png",
filesystem_id=file_id,
sha256=file_id,
)
)

View file

@ -61,6 +61,7 @@ class MediaDomainBlockingTests(unittest.HomeserverTestCase):
time_now_ms=clock.time_msec(),
upload_name="test.png",
filesystem_id=file_id,
sha256=file_id,
)
)

View file

@ -139,6 +139,8 @@ SMALL_PNG = unhexlify(
b"0000001f15c4890000000a49444154789c63000100000500010d"
b"0a2db40000000049454e44ae426082"
)
# The SHA256 hexdigest for the above bytes.
SMALL_PNG_SHA256 = "ebf4f635a17d10d6eb46ba680b70142419aa3220f228001a036d311a22ee9d2a"
# A small CMYK-encoded JPEG image used in some tests.
#