mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-15 17:51:10 +00:00
Only lock when we're backfilling (#16159)
This commit is contained in:
parent
3b3fed7229
commit
dffe095642
3 changed files with 160 additions and 149 deletions
1
changelog.d/16159.misc
Normal file
1
changelog.d/16159.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Reduce scope of locks when paginating to alleviate DB contention.
|
|
@ -60,6 +60,7 @@ from synapse.events import EventBase
|
|||
from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.federation.federation_client import InvalidResponseError
|
||||
from synapse.handlers.pagination import PURGE_PAGINATION_LOCK_NAME
|
||||
from synapse.http.servlet import assert_params_in_dict
|
||||
from synapse.logging.context import nested_logging_context
|
||||
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
|
||||
|
@ -152,6 +153,7 @@ class FederationHandler:
|
|||
self._device_handler = hs.get_device_handler()
|
||||
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
|
||||
self._notifier = hs.get_notifier()
|
||||
self._worker_locks = hs.get_worker_locks_handler()
|
||||
|
||||
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
|
||||
hs
|
||||
|
@ -200,7 +202,7 @@ class FederationHandler:
|
|||
@trace
|
||||
@tag_args
|
||||
async def maybe_backfill(
|
||||
self, room_id: str, current_depth: int, limit: int
|
||||
self, room_id: str, current_depth: int, limit: int, record_time: bool = True
|
||||
) -> bool:
|
||||
"""Checks the database to see if we should backfill before paginating,
|
||||
and if so do.
|
||||
|
@ -213,21 +215,25 @@ class FederationHandler:
|
|||
limit: The number of events that the pagination request will
|
||||
return. This is used as part of the heuristic to decide if we
|
||||
should back paginate.
|
||||
record_time: Whether to record the time it takes to backfill.
|
||||
|
||||
Returns:
|
||||
True if we actually tried to backfill something, otherwise False.
|
||||
"""
|
||||
# Starting the processing time here so we can include the room backfill
|
||||
# linearizer lock queue in the timing
|
||||
processing_start_time = self.clock.time_msec()
|
||||
processing_start_time = self.clock.time_msec() if record_time else 0
|
||||
|
||||
async with self._room_backfill.queue(room_id):
|
||||
return await self._maybe_backfill_inner(
|
||||
room_id,
|
||||
current_depth,
|
||||
limit,
|
||||
processing_start_time=processing_start_time,
|
||||
)
|
||||
async with self._worker_locks.acquire_read_write_lock(
|
||||
PURGE_PAGINATION_LOCK_NAME, room_id, write=False
|
||||
):
|
||||
return await self._maybe_backfill_inner(
|
||||
room_id,
|
||||
current_depth,
|
||||
limit,
|
||||
processing_start_time=processing_start_time,
|
||||
)
|
||||
|
||||
@trace
|
||||
@tag_args
|
||||
|
@ -305,12 +311,21 @@ class FederationHandler:
|
|||
# of history that extends all the way back to where we are currently paginating
|
||||
# and it's within the 100 events that are returned from `/backfill`.
|
||||
if not sorted_backfill_points and current_depth != MAX_DEPTH:
|
||||
# Check that we actually have later backfill points, if not just return.
|
||||
have_later_backfill_points = await self.store.get_backfill_points_in_room(
|
||||
room_id=room_id,
|
||||
current_depth=MAX_DEPTH,
|
||||
limit=1,
|
||||
)
|
||||
if not have_later_backfill_points:
|
||||
return False
|
||||
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points."
|
||||
)
|
||||
run_as_background_process(
|
||||
"_maybe_backfill_inner_anyway_with_max_depth",
|
||||
self._maybe_backfill_inner,
|
||||
self.maybe_backfill,
|
||||
room_id=room_id,
|
||||
# We use `MAX_DEPTH` so that we find all backfill points next
|
||||
# time (all events are below the `MAX_DEPTH`)
|
||||
|
@ -319,7 +334,7 @@ class FederationHandler:
|
|||
# We don't want to start another timing observation from this
|
||||
# nested recursive call. The top-most call can record the time
|
||||
# overall otherwise the smaller one will throw off the results.
|
||||
processing_start_time=None,
|
||||
record_time=False,
|
||||
)
|
||||
# We return `False` because we're backfilling in the background and there is
|
||||
# no new events immediately for the caller to know about yet.
|
||||
|
|
|
@ -487,155 +487,150 @@ class PaginationHandler:
|
|||
|
||||
room_token = from_token.room_key
|
||||
|
||||
async with self._worker_locks.acquire_read_write_lock(
|
||||
PURGE_PAGINATION_LOCK_NAME, room_id, write=False
|
||||
):
|
||||
(membership, member_event_id) = (None, None)
|
||||
if not use_admin_priviledge:
|
||||
(
|
||||
membership,
|
||||
member_event_id,
|
||||
) = await self.auth.check_user_in_room_or_world_readable(
|
||||
room_id, requester, allow_departed_users=True
|
||||
)
|
||||
|
||||
if pagin_config.direction == Direction.BACKWARDS:
|
||||
# if we're going backwards, we might need to backfill. This
|
||||
# requires that we have a topo token.
|
||||
if room_token.topological:
|
||||
curr_topo = room_token.topological
|
||||
else:
|
||||
curr_topo = await self.store.get_current_topological_token(
|
||||
room_id, room_token.stream
|
||||
)
|
||||
|
||||
# If they have left the room then clamp the token to be before
|
||||
# they left the room, to save the effort of loading from the
|
||||
# database.
|
||||
if (
|
||||
pagin_config.direction == Direction.BACKWARDS
|
||||
and not use_admin_priviledge
|
||||
and membership == Membership.LEAVE
|
||||
):
|
||||
# This is only None if the room is world_readable, in which case
|
||||
# "Membership.JOIN" would have been returned and we should never hit
|
||||
# this branch.
|
||||
assert member_event_id
|
||||
|
||||
leave_token = await self.store.get_topological_token_for_event(
|
||||
member_event_id
|
||||
)
|
||||
assert leave_token.topological is not None
|
||||
|
||||
if leave_token.topological < curr_topo:
|
||||
from_token = from_token.copy_and_replace(
|
||||
StreamKeyType.ROOM, leave_token
|
||||
)
|
||||
|
||||
to_room_key = None
|
||||
if pagin_config.to_token:
|
||||
to_room_key = pagin_config.to_token.room_key
|
||||
|
||||
# Initially fetch the events from the database. With any luck, we can return
|
||||
# these without blocking on backfill (handled below).
|
||||
events, next_key = await self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=from_token.room_key,
|
||||
to_key=to_room_key,
|
||||
direction=pagin_config.direction,
|
||||
limit=pagin_config.limit,
|
||||
event_filter=event_filter,
|
||||
(membership, member_event_id) = (None, None)
|
||||
if not use_admin_priviledge:
|
||||
(
|
||||
membership,
|
||||
member_event_id,
|
||||
) = await self.auth.check_user_in_room_or_world_readable(
|
||||
room_id, requester, allow_departed_users=True
|
||||
)
|
||||
|
||||
if pagin_config.direction == Direction.BACKWARDS:
|
||||
# We use a `Set` because there can be multiple events at a given depth
|
||||
# and we only care about looking at the unique continum of depths to
|
||||
# find gaps.
|
||||
event_depths: Set[int] = {event.depth for event in events}
|
||||
sorted_event_depths = sorted(event_depths)
|
||||
|
||||
# Inspect the depths of the returned events to see if there are any gaps
|
||||
found_big_gap = False
|
||||
number_of_gaps = 0
|
||||
previous_event_depth = (
|
||||
sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
|
||||
if pagin_config.direction == Direction.BACKWARDS:
|
||||
# if we're going backwards, we might need to backfill. This
|
||||
# requires that we have a topo token.
|
||||
if room_token.topological:
|
||||
curr_topo = room_token.topological
|
||||
else:
|
||||
curr_topo = await self.store.get_current_topological_token(
|
||||
room_id, room_token.stream
|
||||
)
|
||||
for event_depth in sorted_event_depths:
|
||||
# We don't expect a negative depth but we'll just deal with it in
|
||||
# any case by taking the absolute value to get the true gap between
|
||||
# any two integers.
|
||||
depth_gap = abs(event_depth - previous_event_depth)
|
||||
# A `depth_gap` of 1 is a normal continuous chain to the next event
|
||||
# (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
|
||||
# also possible there is no event at a given depth but we can't ever
|
||||
# know that for sure)
|
||||
if depth_gap > 1:
|
||||
number_of_gaps += 1
|
||||
|
||||
# We only tolerate a small number single-event long gaps in the
|
||||
# returned events because those are most likely just events we've
|
||||
# failed to pull in the past. Anything longer than that is probably
|
||||
# a sign that we're missing a decent chunk of history and we should
|
||||
# try to backfill it.
|
||||
#
|
||||
# XXX: It's possible we could tolerate longer gaps if we checked
|
||||
# that a given events `prev_events` is one that has failed pull
|
||||
# attempts and we could just treat it like a dead branch of history
|
||||
# for now or at least something that we don't need the block the
|
||||
# client on to try pulling.
|
||||
#
|
||||
# XXX: If we had something like MSC3871 to indicate gaps in the
|
||||
# timeline to the client, we could also get away with any sized gap
|
||||
# and just have the client refetch the holes as they see fit.
|
||||
if depth_gap > 2:
|
||||
found_big_gap = True
|
||||
break
|
||||
previous_event_depth = event_depth
|
||||
# If they have left the room then clamp the token to be before
|
||||
# they left the room, to save the effort of loading from the
|
||||
# database.
|
||||
if (
|
||||
pagin_config.direction == Direction.BACKWARDS
|
||||
and not use_admin_priviledge
|
||||
and membership == Membership.LEAVE
|
||||
):
|
||||
# This is only None if the room is world_readable, in which case
|
||||
# "Membership.JOIN" would have been returned and we should never hit
|
||||
# this branch.
|
||||
assert member_event_id
|
||||
|
||||
# Backfill in the foreground if we found a big gap, have too many holes,
|
||||
# or we don't have enough events to fill the limit that the client asked
|
||||
# for.
|
||||
missing_too_many_events = (
|
||||
number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
|
||||
leave_token = await self.store.get_topological_token_for_event(
|
||||
member_event_id
|
||||
)
|
||||
assert leave_token.topological is not None
|
||||
|
||||
if leave_token.topological < curr_topo:
|
||||
from_token = from_token.copy_and_replace(
|
||||
StreamKeyType.ROOM, leave_token
|
||||
)
|
||||
not_enough_events_to_fill_response = len(events) < pagin_config.limit
|
||||
if (
|
||||
found_big_gap
|
||||
or missing_too_many_events
|
||||
or not_enough_events_to_fill_response
|
||||
):
|
||||
did_backfill = (
|
||||
await self.hs.get_federation_handler().maybe_backfill(
|
||||
room_id,
|
||||
curr_topo,
|
||||
limit=pagin_config.limit,
|
||||
)
|
||||
)
|
||||
|
||||
# If we did backfill something, refetch the events from the database to
|
||||
# catch anything new that might have been added since we last fetched.
|
||||
if did_backfill:
|
||||
events, next_key = await self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=from_token.room_key,
|
||||
to_key=to_room_key,
|
||||
direction=pagin_config.direction,
|
||||
limit=pagin_config.limit,
|
||||
event_filter=event_filter,
|
||||
)
|
||||
else:
|
||||
# Otherwise, we can backfill in the background for eventual
|
||||
# consistency's sake but we don't need to block the client waiting
|
||||
# for a costly federation call and processing.
|
||||
run_as_background_process(
|
||||
"maybe_backfill_in_the_background",
|
||||
self.hs.get_federation_handler().maybe_backfill,
|
||||
room_id,
|
||||
curr_topo,
|
||||
to_room_key = None
|
||||
if pagin_config.to_token:
|
||||
to_room_key = pagin_config.to_token.room_key
|
||||
|
||||
# Initially fetch the events from the database. With any luck, we can return
|
||||
# these without blocking on backfill (handled below).
|
||||
events, next_key = await self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=from_token.room_key,
|
||||
to_key=to_room_key,
|
||||
direction=pagin_config.direction,
|
||||
limit=pagin_config.limit,
|
||||
event_filter=event_filter,
|
||||
)
|
||||
|
||||
if pagin_config.direction == Direction.BACKWARDS:
|
||||
# We use a `Set` because there can be multiple events at a given depth
|
||||
# and we only care about looking at the unique continum of depths to
|
||||
# find gaps.
|
||||
event_depths: Set[int] = {event.depth for event in events}
|
||||
sorted_event_depths = sorted(event_depths)
|
||||
|
||||
# Inspect the depths of the returned events to see if there are any gaps
|
||||
found_big_gap = False
|
||||
number_of_gaps = 0
|
||||
previous_event_depth = (
|
||||
sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
|
||||
)
|
||||
for event_depth in sorted_event_depths:
|
||||
# We don't expect a negative depth but we'll just deal with it in
|
||||
# any case by taking the absolute value to get the true gap between
|
||||
# any two integers.
|
||||
depth_gap = abs(event_depth - previous_event_depth)
|
||||
# A `depth_gap` of 1 is a normal continuous chain to the next event
|
||||
# (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
|
||||
# also possible there is no event at a given depth but we can't ever
|
||||
# know that for sure)
|
||||
if depth_gap > 1:
|
||||
number_of_gaps += 1
|
||||
|
||||
# We only tolerate a small number single-event long gaps in the
|
||||
# returned events because those are most likely just events we've
|
||||
# failed to pull in the past. Anything longer than that is probably
|
||||
# a sign that we're missing a decent chunk of history and we should
|
||||
# try to backfill it.
|
||||
#
|
||||
# XXX: It's possible we could tolerate longer gaps if we checked
|
||||
# that a given events `prev_events` is one that has failed pull
|
||||
# attempts and we could just treat it like a dead branch of history
|
||||
# for now or at least something that we don't need the block the
|
||||
# client on to try pulling.
|
||||
#
|
||||
# XXX: If we had something like MSC3871 to indicate gaps in the
|
||||
# timeline to the client, we could also get away with any sized gap
|
||||
# and just have the client refetch the holes as they see fit.
|
||||
if depth_gap > 2:
|
||||
found_big_gap = True
|
||||
break
|
||||
previous_event_depth = event_depth
|
||||
|
||||
# Backfill in the foreground if we found a big gap, have too many holes,
|
||||
# or we don't have enough events to fill the limit that the client asked
|
||||
# for.
|
||||
missing_too_many_events = (
|
||||
number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
|
||||
)
|
||||
not_enough_events_to_fill_response = len(events) < pagin_config.limit
|
||||
if (
|
||||
found_big_gap
|
||||
or missing_too_many_events
|
||||
or not_enough_events_to_fill_response
|
||||
):
|
||||
did_backfill = await self.hs.get_federation_handler().maybe_backfill(
|
||||
room_id,
|
||||
curr_topo,
|
||||
limit=pagin_config.limit,
|
||||
)
|
||||
|
||||
# If we did backfill something, refetch the events from the database to
|
||||
# catch anything new that might have been added since we last fetched.
|
||||
if did_backfill:
|
||||
events, next_key = await self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=from_token.room_key,
|
||||
to_key=to_room_key,
|
||||
direction=pagin_config.direction,
|
||||
limit=pagin_config.limit,
|
||||
event_filter=event_filter,
|
||||
)
|
||||
else:
|
||||
# Otherwise, we can backfill in the background for eventual
|
||||
# consistency's sake but we don't need to block the client waiting
|
||||
# for a costly federation call and processing.
|
||||
run_as_background_process(
|
||||
"maybe_backfill_in_the_background",
|
||||
self.hs.get_federation_handler().maybe_backfill,
|
||||
room_id,
|
||||
curr_topo,
|
||||
limit=pagin_config.limit,
|
||||
)
|
||||
|
||||
next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
|
||||
next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
|
||||
|
||||
# if no events are returned from pagination, that implies
|
||||
# we have reached the end of the available events.
|
||||
|
|
Loading…
Reference in a new issue