mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-15 17:51:10 +00:00
when we're talking about backfilling data in federation, call it backfilling - not pagination.
This commit is contained in:
parent
f729f13735
commit
59dfbaba3b
9 changed files with 54 additions and 54 deletions
|
@ -24,7 +24,7 @@ Where the bottom (the transport layer) is what talks to the internet via HTTP, a
|
||||||
* duplicate pdu_id's - i.e., it makes sure we ignore them.
|
* duplicate pdu_id's - i.e., it makes sure we ignore them.
|
||||||
* responding to requests for a given pdu_id
|
* responding to requests for a given pdu_id
|
||||||
* responding to requests for all metadata for a given context (i.e. room)
|
* responding to requests for all metadata for a given context (i.e. room)
|
||||||
* handling incoming pagination requests
|
* handling incoming backfill requests
|
||||||
|
|
||||||
So it has to parse incoming messages to discover which are metadata and which aren't, and has to correctly clobber existing metadata where appropriate.
|
So it has to parse incoming messages to discover which are metadata and which aren't, and has to correctly clobber existing metadata where appropriate.
|
||||||
|
|
||||||
|
|
|
@ -155,9 +155,9 @@ To fetch all the state of a given context:
|
||||||
PDUs that encode the state.
|
PDUs that encode the state.
|
||||||
|
|
||||||
|
|
||||||
To paginate events on a given context:
|
To backfill events on a given context:
|
||||||
|
|
||||||
GET .../paginate/:context/
|
GET .../backfill/:context/
|
||||||
Query args: v, limit
|
Query args: v, limit
|
||||||
|
|
||||||
Response: JSON encoding of a single Transaction containing multiple PDUs
|
Response: JSON encoding of a single Transaction containing multiple PDUs
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
Versioning is, like, hard for paginating backwards because of the number of Home Servers involved.
|
Versioning is, like, hard for backfilling backwards because of the number of Home Servers involved.
|
||||||
|
|
||||||
The way we solve this is by doing versioning as an acyclic directed graph of PDUs. For pagination purposes, this is done on a per context basis.
|
The way we solve this is by doing versioning as an acyclic directed graph of PDUs. For backfilling purposes, this is done on a per context basis.
|
||||||
When we send a PDU we include all PDUs that have been received for that context that hasn't been subsequently listed in a later PDU. The trivial case is a simple list of PDUs, e.g. A <- B <- C. However, if two servers send out a PDU at the same to, both B and C would point at A - a later PDU would then list both B and C.
|
When we send a PDU we include all PDUs that have been received for that context that hasn't been subsequently listed in a later PDU. The trivial case is a simple list of PDUs, e.g. A <- B <- C. However, if two servers send out a PDU at the same to, both B and C would point at A - a later PDU would then list both B and C.
|
||||||
|
|
||||||
Problems with opaque version strings:
|
Problems with opaque version strings:
|
||||||
- How do you do clustering without mandating that a cluster can only have one transaction in flight to a given remote home server at a time.
|
- How do you do clustering without mandating that a cluster can only have one transaction in flight to a given remote home server at a time.
|
||||||
If you have multiple transactions sent at once, then you might drop one transaction, receive anotherwith a version that is later than the dropped transaction and which point ARGH WE LOST A TRANSACTION.
|
If you have multiple transactions sent at once, then you might drop one transaction, receive another with a version that is later than the dropped transaction and which point ARGH WE LOST A TRANSACTION.
|
||||||
- How do you do pagination? A version string defines a point in a stream w.r.t. a single home server, not a point in the context.
|
- How do you do backfilling? A version string defines a point in a stream w.r.t. a single home server, not a point in the context.
|
||||||
|
|
||||||
We only need to store the ends of the directed graph, we DO NOT need to do the whole one table of nodes and one of edges.
|
We only need to store the ends of the directed graph, we DO NOT need to do the whole one table of nodes and one of edges.
|
||||||
|
|
|
@ -104,12 +104,12 @@ class InputOutput(object):
|
||||||
#self.print_line("OK.")
|
#self.print_line("OK.")
|
||||||
return
|
return
|
||||||
|
|
||||||
m = re.match("^paginate (\S+)$", line)
|
m = re.match("^backfill (\S+)$", line)
|
||||||
if m:
|
if m:
|
||||||
# we want to paginate a room
|
# we want to backfill a room
|
||||||
room_name, = m.groups()
|
room_name, = m.groups()
|
||||||
self.print_line("paginate %s" % room_name)
|
self.print_line("backfill %s" % room_name)
|
||||||
self.server.paginate(room_name)
|
self.server.backfill(room_name)
|
||||||
return
|
return
|
||||||
|
|
||||||
self.print_line("Unrecognized command")
|
self.print_line("Unrecognized command")
|
||||||
|
@ -307,7 +307,7 @@ class HomeServer(ReplicationHandler):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(e)
|
logger.exception(e)
|
||||||
|
|
||||||
def paginate(self, room_name, limit=5):
|
def backfill(self, room_name, limit=5):
|
||||||
room = self.joined_rooms.get(room_name)
|
room = self.joined_rooms.get(room_name)
|
||||||
|
|
||||||
if not room:
|
if not room:
|
||||||
|
@ -315,7 +315,7 @@ class HomeServer(ReplicationHandler):
|
||||||
|
|
||||||
dest = room.oldest_server
|
dest = room.oldest_server
|
||||||
|
|
||||||
return self.replication_layer.paginate(dest, room_name, limit)
|
return self.replication_layer.backfill(dest, room_name, limit)
|
||||||
|
|
||||||
def _get_room_remote_servers(self, room_name):
|
def _get_room_remote_servers(self, room_name):
|
||||||
return [i for i in self.joined_rooms.setdefault(room_name,).servers]
|
return [i for i in self.joined_rooms.setdefault(room_name,).servers]
|
||||||
|
|
|
@ -75,8 +75,8 @@ class FederationEventHandler(object):
|
||||||
@log_function
|
@log_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def backfill(self, room_id, limit):
|
def backfill(self, room_id, limit):
|
||||||
# TODO: Work out which destinations to ask for pagination
|
# TODO: Work out which destinations to ask for backfill
|
||||||
# self.replication_layer.paginate(dest, room_id, limit)
|
# self.replication_layer.backfill(dest, room_id, limit)
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
|
|
|
@ -114,14 +114,14 @@ class PduActions(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def paginate(self, context, pdu_list, limit):
|
def backfill(self, context, pdu_list, limit):
|
||||||
""" For a given list of PDU id and origins return the proceeding
|
""" For a given list of PDU id and origins return the proceeding
|
||||||
`limit` `Pdu`s in the given `context`.
|
`limit` `Pdu`s in the given `context`.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred: Results in a list of `Pdu`s.
|
Deferred: Results in a list of `Pdu`s.
|
||||||
"""
|
"""
|
||||||
results = yield self.store.get_pagination(
|
results = yield self.store.get_backfill(
|
||||||
context, pdu_list, limit
|
context, pdu_list, limit
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -131,7 +131,7 @@ class PduActions(object):
|
||||||
def is_new(self, pdu):
|
def is_new(self, pdu):
|
||||||
""" When we receive a `Pdu` from a remote home server, we want to
|
""" When we receive a `Pdu` from a remote home server, we want to
|
||||||
figure out whether it is `new`, i.e. it is not some historic PDU that
|
figure out whether it is `new`, i.e. it is not some historic PDU that
|
||||||
we haven't seen simply because we haven't paginated back that far.
|
we haven't seen simply because we haven't backfilled back that far.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred: Results in a `bool`
|
Deferred: Results in a `bool`
|
||||||
|
|
|
@ -118,7 +118,7 @@ class ReplicationLayer(object):
|
||||||
*Note:* The home server should always call `send_pdu` even if it knows
|
*Note:* The home server should always call `send_pdu` even if it knows
|
||||||
that it does not need to be replicated to other home servers. This is
|
that it does not need to be replicated to other home servers. This is
|
||||||
in case e.g. someone else joins via a remote home server and then
|
in case e.g. someone else joins via a remote home server and then
|
||||||
paginates.
|
backfills.
|
||||||
|
|
||||||
TODO: Figure out when we should actually resolve the deferred.
|
TODO: Figure out when we should actually resolve the deferred.
|
||||||
|
|
||||||
|
@ -179,13 +179,13 @@ class ReplicationLayer(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def paginate(self, dest, context, limit):
|
def backfill(self, dest, context, limit):
|
||||||
"""Requests some more historic PDUs for the given context from the
|
"""Requests some more historic PDUs for the given context from the
|
||||||
given destination server.
|
given destination server.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
dest (str): The remote home server to ask.
|
dest (str): The remote home server to ask.
|
||||||
context (str): The context to paginate back on.
|
context (str): The context to backfill.
|
||||||
limit (int): The maximum number of PDUs to return.
|
limit (int): The maximum number of PDUs to return.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
|
@ -193,16 +193,16 @@ class ReplicationLayer(object):
|
||||||
"""
|
"""
|
||||||
extremities = yield self.store.get_oldest_pdus_in_context(context)
|
extremities = yield self.store.get_oldest_pdus_in_context(context)
|
||||||
|
|
||||||
logger.debug("paginate extrem=%s", extremities)
|
logger.debug("backfill extrem=%s", extremities)
|
||||||
|
|
||||||
# If there are no extremeties then we've (probably) reached the start.
|
# If there are no extremeties then we've (probably) reached the start.
|
||||||
if not extremities:
|
if not extremities:
|
||||||
return
|
return
|
||||||
|
|
||||||
transaction_data = yield self.transport_layer.paginate(
|
transaction_data = yield self.transport_layer.backfill(
|
||||||
dest, context, extremities, limit)
|
dest, context, extremities, limit)
|
||||||
|
|
||||||
logger.debug("paginate transaction_data=%s", repr(transaction_data))
|
logger.debug("backfill transaction_data=%s", repr(transaction_data))
|
||||||
|
|
||||||
transaction = Transaction(**transaction_data)
|
transaction = Transaction(**transaction_data)
|
||||||
|
|
||||||
|
@ -281,9 +281,9 @@ class ReplicationLayer(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def on_paginate_request(self, context, versions, limit):
|
def on_backfill_request(self, context, versions, limit):
|
||||||
|
|
||||||
pdus = yield self.pdu_actions.paginate(context, versions, limit)
|
pdus = yield self.pdu_actions.backfill(context, versions, limit)
|
||||||
|
|
||||||
defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict()))
|
defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict()))
|
||||||
|
|
||||||
|
@ -427,7 +427,7 @@ class ReplicationLayer(object):
|
||||||
# Get missing pdus if necessary.
|
# Get missing pdus if necessary.
|
||||||
is_new = yield self.pdu_actions.is_new(pdu)
|
is_new = yield self.pdu_actions.is_new(pdu)
|
||||||
if is_new and not pdu.outlier:
|
if is_new and not pdu.outlier:
|
||||||
# We only paginate backwards to the min depth.
|
# We only backfill backwards to the min depth.
|
||||||
min_depth = yield self.store.get_min_depth_for_context(pdu.context)
|
min_depth = yield self.store.get_min_depth_for_context(pdu.context)
|
||||||
|
|
||||||
if min_depth and pdu.depth > min_depth:
|
if min_depth and pdu.depth > min_depth:
|
||||||
|
|
|
@ -112,7 +112,7 @@ class TransportLayer(object):
|
||||||
return self._do_request_for_transaction(destination, subpath)
|
return self._do_request_for_transaction(destination, subpath)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def paginate(self, dest, context, pdu_tuples, limit):
|
def backfill(self, dest, context, pdu_tuples, limit):
|
||||||
""" Requests `limit` previous PDUs in a given context before list of
|
""" Requests `limit` previous PDUs in a given context before list of
|
||||||
PDUs.
|
PDUs.
|
||||||
|
|
||||||
|
@ -126,14 +126,14 @@ class TransportLayer(object):
|
||||||
Deferred: Results in a dict received from the remote homeserver.
|
Deferred: Results in a dict received from the remote homeserver.
|
||||||
"""
|
"""
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"paginate dest=%s, context=%s, pdu_tuples=%s, limit=%s",
|
"backfill dest=%s, context=%s, pdu_tuples=%s, limit=%s",
|
||||||
dest, context, repr(pdu_tuples), str(limit)
|
dest, context, repr(pdu_tuples), str(limit)
|
||||||
)
|
)
|
||||||
|
|
||||||
if not pdu_tuples:
|
if not pdu_tuples:
|
||||||
return
|
return
|
||||||
|
|
||||||
subpath = "/paginate/%s/" % context
|
subpath = "/backfill/%s/" % context
|
||||||
|
|
||||||
args = {"v": ["%s,%s" % (i, o) for i, o in pdu_tuples]}
|
args = {"v": ["%s,%s" % (i, o) for i, o in pdu_tuples]}
|
||||||
args["limit"] = limit
|
args["limit"] = limit
|
||||||
|
@ -251,8 +251,8 @@ class TransportLayer(object):
|
||||||
|
|
||||||
self.server.register_path(
|
self.server.register_path(
|
||||||
"GET",
|
"GET",
|
||||||
re.compile("^" + PREFIX + "/paginate/([^/]*)/$"),
|
re.compile("^" + PREFIX + "/backfill/([^/]*)/$"),
|
||||||
lambda request, context: self._on_paginate_request(
|
lambda request, context: self._on_backfill_request(
|
||||||
context, request.args["v"],
|
context, request.args["v"],
|
||||||
request.args["limit"]
|
request.args["limit"]
|
||||||
)
|
)
|
||||||
|
@ -352,7 +352,7 @@ class TransportLayer(object):
|
||||||
defer.returnValue(data)
|
defer.returnValue(data)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def _on_paginate_request(self, context, v_list, limits):
|
def _on_backfill_request(self, context, v_list, limits):
|
||||||
if not limits:
|
if not limits:
|
||||||
return defer.succeed(
|
return defer.succeed(
|
||||||
(400, {"error": "Did not include limit param"})
|
(400, {"error": "Did not include limit param"})
|
||||||
|
@ -362,7 +362,7 @@ class TransportLayer(object):
|
||||||
|
|
||||||
versions = [v.split(",", 1) for v in v_list]
|
versions = [v.split(",", 1) for v in v_list]
|
||||||
|
|
||||||
return self.request_handler.on_paginate_request(
|
return self.request_handler.on_backfill_request(
|
||||||
context, versions, limit)
|
context, versions, limit)
|
||||||
|
|
||||||
|
|
||||||
|
@ -371,14 +371,14 @@ class TransportReceivedHandler(object):
|
||||||
"""
|
"""
|
||||||
def on_incoming_transaction(self, transaction):
|
def on_incoming_transaction(self, transaction):
|
||||||
""" Called on PUT /send/<transaction_id>, or on response to a request
|
""" Called on PUT /send/<transaction_id>, or on response to a request
|
||||||
that we sent (e.g. a pagination request)
|
that we sent (e.g. a backfill request)
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
transaction (synapse.transaction.Transaction): The transaction that
|
transaction (synapse.transaction.Transaction): The transaction that
|
||||||
was sent to us.
|
was sent to us.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
twisted.internet.defer.Deferred: A deferred that get's fired when
|
twisted.internet.defer.Deferred: A deferred that gets fired when
|
||||||
the transaction has finished being processed.
|
the transaction has finished being processed.
|
||||||
|
|
||||||
The result should be a tuple in the form of
|
The result should be a tuple in the form of
|
||||||
|
@ -438,14 +438,14 @@ class TransportRequestHandler(object):
|
||||||
def on_context_state_request(self, context):
|
def on_context_state_request(self, context):
|
||||||
""" Called on GET /state/<context>/
|
""" Called on GET /state/<context>/
|
||||||
|
|
||||||
Get's hit when someone wants all the *current* state for a given
|
Gets hit when someone wants all the *current* state for a given
|
||||||
contexts.
|
contexts.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
context (str): The name of the context that we're interested in.
|
context (str): The name of the context that we're interested in.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
twisted.internet.defer.Deferred: A deferred that get's fired when
|
twisted.internet.defer.Deferred: A deferred that gets fired when
|
||||||
the transaction has finished being processed.
|
the transaction has finished being processed.
|
||||||
|
|
||||||
The result should be a tuple in the form of
|
The result should be a tuple in the form of
|
||||||
|
@ -457,20 +457,20 @@ class TransportRequestHandler(object):
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def on_paginate_request(self, context, versions, limit):
|
def on_backfill_request(self, context, versions, limit):
|
||||||
""" Called on GET /paginate/<context>/?v=...&limit=...
|
""" Called on GET /backfill/<context>/?v=...&limit=...
|
||||||
|
|
||||||
Get's hit when we want to paginate backwards on a given context from
|
Gets hit when we want to backfill backwards on a given context from
|
||||||
the given point.
|
the given point.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
context (str): The context to paginate on
|
context (str): The context to backfill
|
||||||
versions (list): A list of 2-tuple's representing where to paginate
|
versions (list): A list of 2-tuples representing where to backfill
|
||||||
from, in the form `(pdu_id, origin)`
|
from, in the form `(pdu_id, origin)`
|
||||||
limit (int): How many pdus to return.
|
limit (int): How many pdus to return.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred: Resultsin a tuple in the form of
|
Deferred: Results in a tuple in the form of
|
||||||
`(response_code, respond_body)`, where `response_body` is a python
|
`(response_code, respond_body)`, where `response_body` is a python
|
||||||
dict that will get serialized to JSON.
|
dict that will get serialized to JSON.
|
||||||
|
|
||||||
|
|
|
@ -168,7 +168,7 @@ class PduStore(SQLBaseStore):
|
||||||
|
|
||||||
return self._get_pdu_tuples(txn, txn.fetchall())
|
return self._get_pdu_tuples(txn, txn.fetchall())
|
||||||
|
|
||||||
def get_pagination(self, context, pdu_list, limit):
|
def get_backfill(self, context, pdu_list, limit):
|
||||||
"""Get a list of Pdus for a given topic that occured before (and
|
"""Get a list of Pdus for a given topic that occured before (and
|
||||||
including) the pdus in pdu_list. Return a list of max size `limit`.
|
including) the pdus in pdu_list. Return a list of max size `limit`.
|
||||||
|
|
||||||
|
@ -182,12 +182,12 @@ class PduStore(SQLBaseStore):
|
||||||
list: A list of PduTuples
|
list: A list of PduTuples
|
||||||
"""
|
"""
|
||||||
return self._db_pool.runInteraction(
|
return self._db_pool.runInteraction(
|
||||||
self._get_paginate, context, pdu_list, limit
|
self._get_backfill, context, pdu_list, limit
|
||||||
)
|
)
|
||||||
|
|
||||||
def _get_paginate(self, txn, context, pdu_list, limit):
|
def _get_backfill(self, txn, context, pdu_list, limit):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"paginate: %s, %s, %s",
|
"backfill: %s, %s, %s",
|
||||||
context, repr(pdu_list), limit
|
context, repr(pdu_list), limit
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -213,7 +213,7 @@ class PduStore(SQLBaseStore):
|
||||||
new_front = []
|
new_front = []
|
||||||
for pdu_id, origin in front:
|
for pdu_id, origin in front:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"_paginate_interaction: i=%s, o=%s",
|
"_backfill_interaction: i=%s, o=%s",
|
||||||
pdu_id, origin
|
pdu_id, origin
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -224,7 +224,7 @@ class PduStore(SQLBaseStore):
|
||||||
|
|
||||||
for row in txn.fetchall():
|
for row in txn.fetchall():
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"_paginate_interaction: got i=%s, o=%s",
|
"_backfill_interaction: got i=%s, o=%s",
|
||||||
*row
|
*row
|
||||||
)
|
)
|
||||||
new_front.append(row)
|
new_front.append(row)
|
||||||
|
@ -262,7 +262,7 @@ class PduStore(SQLBaseStore):
|
||||||
|
|
||||||
def update_min_depth_for_context(self, context, depth):
|
def update_min_depth_for_context(self, context, depth):
|
||||||
"""Update the minimum `depth` of the given context, which is the line
|
"""Update the minimum `depth` of the given context, which is the line
|
||||||
where we stop paginating backwards on.
|
on which we stop backfilling backwards.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
context (str)
|
context (str)
|
||||||
|
@ -320,9 +320,9 @@ class PduStore(SQLBaseStore):
|
||||||
return [(row[0], row[1], row[2]) for row in results]
|
return [(row[0], row[1], row[2]) for row in results]
|
||||||
|
|
||||||
def get_oldest_pdus_in_context(self, context):
|
def get_oldest_pdus_in_context(self, context):
|
||||||
"""Get a list of Pdus that we paginated beyond yet (and haven't seen).
|
"""Get a list of Pdus that we haven't backfilled beyond yet (and haven't
|
||||||
This list is used when we want to paginate backwards and is the list we
|
seen). This list is used when we want to backfill backwards and is the
|
||||||
send to the remote server.
|
list we send to the remote server.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
txn
|
txn
|
||||||
|
|
Loading…
Reference in a new issue