mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-14 11:57:44 +00:00
Merge remote-tracking branch 'origin/release-v1.110' into develop
This commit is contained in:
commit
99c107920d
5 changed files with 57 additions and 10 deletions
1
changelog.d/17389.misc
Normal file
1
changelog.d/17389.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix building debian package for debian sid.
|
1
changelog.d/17391.bugfix
Normal file
1
changelog.d/17391.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix bug where `/sync` requests could get blocked indefinitely after an upgrade from Synapse versions before v1.109.0.
|
|
@ -73,6 +73,8 @@ RUN apt-get update -qq -o Acquire::Languages=none \
|
||||||
curl \
|
curl \
|
||||||
debhelper \
|
debhelper \
|
||||||
devscripts \
|
devscripts \
|
||||||
|
# Required for building cffi from source.
|
||||||
|
libffi-dev \
|
||||||
libsystemd-dev \
|
libsystemd-dev \
|
||||||
lsb-release \
|
lsb-release \
|
||||||
pkg-config \
|
pkg-config \
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
|
||||||
|
import logging
|
||||||
from typing import TYPE_CHECKING, Sequence, Tuple
|
from typing import TYPE_CHECKING, Sequence, Tuple
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
@ -41,6 +42,9 @@ if TYPE_CHECKING:
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
||||||
class _EventSourcesInner:
|
class _EventSourcesInner:
|
||||||
room: RoomEventSource
|
room: RoomEventSource
|
||||||
|
@ -139,9 +143,16 @@ class EventSources:
|
||||||
key
|
key
|
||||||
].get_max_allocated_token()
|
].get_max_allocated_token()
|
||||||
|
|
||||||
token = token.copy_and_replace(
|
if max_token < token_value.get_max_stream_pos():
|
||||||
key, token.room_key.bound_stream_token(max_token)
|
logger.error(
|
||||||
)
|
"Bounding token from the future '%s': token: %s, bound: %s",
|
||||||
|
key,
|
||||||
|
token_value,
|
||||||
|
max_token,
|
||||||
|
)
|
||||||
|
token = token.copy_and_replace(
|
||||||
|
key, token_value.bound_stream_token(max_token)
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
assert isinstance(current_value, int)
|
assert isinstance(current_value, int)
|
||||||
if current_value < token_value:
|
if current_value < token_value:
|
||||||
|
@ -149,7 +160,14 @@ class EventSources:
|
||||||
key
|
key
|
||||||
].get_max_allocated_token()
|
].get_max_allocated_token()
|
||||||
|
|
||||||
token = token.copy_and_replace(key, min(token_value, max_token))
|
if max_token < token_value:
|
||||||
|
logger.error(
|
||||||
|
"Bounding token from the future '%s': token: %s, bound: %s",
|
||||||
|
key,
|
||||||
|
token_value,
|
||||||
|
max_token,
|
||||||
|
)
|
||||||
|
token = token.copy_and_replace(key, max_token)
|
||||||
|
|
||||||
return token
|
return token
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,14 @@ from synapse.handlers.sync import SyncConfig, SyncRequestKey, SyncResult, SyncVe
|
||||||
from synapse.rest import admin
|
from synapse.rest import admin
|
||||||
from synapse.rest.client import knock, login, room
|
from synapse.rest.client import knock, login, room
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.types import JsonDict, StreamKeyType, UserID, create_requester
|
from synapse.types import (
|
||||||
|
JsonDict,
|
||||||
|
MultiWriterStreamToken,
|
||||||
|
RoomStreamToken,
|
||||||
|
StreamKeyType,
|
||||||
|
UserID,
|
||||||
|
create_requester,
|
||||||
|
)
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
import tests.unittest
|
import tests.unittest
|
||||||
|
@ -999,7 +1006,13 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||||
|
|
||||||
self.get_success(sync_d, by=1.0)
|
self.get_success(sync_d, by=1.0)
|
||||||
|
|
||||||
def test_wait_for_invalid_future_sync_token(self) -> None:
|
@parameterized.expand(
|
||||||
|
[(key,) for key in StreamKeyType.__members__.values()],
|
||||||
|
name_func=lambda func, _, param: f"{func.__name__}_{param.args[0].name}",
|
||||||
|
)
|
||||||
|
def test_wait_for_invalid_future_sync_token(
|
||||||
|
self, stream_key: StreamKeyType
|
||||||
|
) -> None:
|
||||||
"""Like the previous test, except we give a token that has a stream
|
"""Like the previous test, except we give a token that has a stream
|
||||||
position ahead of what is in the DB, i.e. its invalid and we shouldn't
|
position ahead of what is in the DB, i.e. its invalid and we shouldn't
|
||||||
wait for the stream to advance (as it may never do so).
|
wait for the stream to advance (as it may never do so).
|
||||||
|
@ -1010,11 +1023,23 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||||
"""
|
"""
|
||||||
user = self.register_user("alice", "password")
|
user = self.register_user("alice", "password")
|
||||||
|
|
||||||
# Create a token and arbitrarily advance one of the streams.
|
# Create a token and advance one of the streams.
|
||||||
current_token = self.hs.get_event_sources().get_current_token()
|
current_token = self.hs.get_event_sources().get_current_token()
|
||||||
since_token = current_token.copy_and_advance(
|
token_value = current_token.get_field(stream_key)
|
||||||
StreamKeyType.PRESENCE, current_token.presence_key + 1
|
|
||||||
)
|
# How we advance the streams depends on the type.
|
||||||
|
if isinstance(token_value, int):
|
||||||
|
since_token = current_token.copy_and_advance(stream_key, token_value + 1)
|
||||||
|
elif isinstance(token_value, MultiWriterStreamToken):
|
||||||
|
since_token = current_token.copy_and_advance(
|
||||||
|
stream_key, MultiWriterStreamToken(stream=token_value.stream + 1)
|
||||||
|
)
|
||||||
|
elif isinstance(token_value, RoomStreamToken):
|
||||||
|
since_token = current_token.copy_and_advance(
|
||||||
|
stream_key, RoomStreamToken(stream=token_value.stream + 1)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise Exception("Unreachable")
|
||||||
|
|
||||||
sync_d = defer.ensureDeferred(
|
sync_d = defer.ensureDeferred(
|
||||||
self.sync_handler.wait_for_sync_for_user(
|
self.sync_handler.wait_for_sync_for_user(
|
||||||
|
|
Loading…
Reference in a new issue