Fix clamp leave and disable backfill

This commit is contained in:
Erik Johnston 2018-05-21 17:43:03 +01:00
parent 9eaf69a386
commit 4ebf003644
2 changed files with 82 additions and 18 deletions

View file

@ -211,29 +211,17 @@ class MessageHandler(BaseHandler):
)
if source_config.direction == 'b':
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
max_topo = room_token.topological
else:
max_topo = yield self.store.get_max_topological_token(
room_id, room_token.stream
)
if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
leave_token = yield self.store.get_topological_token_for_event(
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
)
leave_token = yield self.store.get_topological_token_for_event(
member_event_id,
)
source_config.from_key = yield self.store.clamp_token_before(
room_id, source_config.from_key, leave_token,
)
events, next_key, extremities = yield self.store.paginate_room_events(
room_id=room_id,

View file

@ -915,6 +915,82 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue((events, token, extremities))
def clamp_token_before(self, room_id, token_str, clamp_to_token_str):
"""For a given room returns the given token if its before
clamp_to, otherwise returns clamp_to.
Args:
room_id (str)
token_str (str)
clamp_to_token_str(str): Must be topological token
Returns:
Deferred[str]
"""
token = RoomStreamToken.parse(token_str)
clamp_to_token = RoomStreamToken.parse(clamp_to_token_str)
def clamp_token_before_txn(txn, token):
# If we're given a stream ordering, convert to topological token
if not token.chunk:
row = self._simple_select_one_txn(
txn,
table="events",
keyvalues={
"stream_ordering": token.stream,
},
retcols=("chunk_id", "topological_ordering", "stream_ordering",),
)
token = RoomStreamToken(*row)
# If both tokens have chunk_ids, we can use that.
if token.chunk and clamp_to_token.chunk:
if token.chunk == clamp_to_token.chunk:
if token.topological < clamp_to_token.topological:
return token_str
else:
return clamp_to_token_str
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
)
if table.is_before(token.chunk, clamp_to_token.chunk):
return token_str
else:
return clamp_to_token_str
# Ok, so we're dealing with events that haven't been chunked yet,
# lets just cheat and fallback to depth.
token_depth = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={
"stream_ordering": token.stream,
},
retcol="depth",
)
clamp_depth = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={
"stream_ordering": clamp_to_token.stream,
},
retcol="depth",
)
if token_depth < clamp_depth:
return token_str
else:
return clamp_to_token_str
return self.runInteraction(
"clamp_token_before", clamp_token_before_txn, token
)
class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):