Add admin API to get some information about federation status (#11407)

This commit is contained in:
Dirk Klimpel 2021-12-06 17:59:50 +01:00 committed by GitHub
parent 494ebd7347
commit 8b4b153c9e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 783 additions and 0 deletions

View file

@ -0,0 +1 @@
Add admin API to get some information about federation status with remote servers.

View file

@ -65,6 +65,7 @@
- [Statistics](admin_api/statistics.md)
- [Users](admin_api/user_admin_api.md)
- [Server Version](admin_api/version_api.md)
- [Federation](usage/administration/admin_api/federation.md)
- [Manhole](manhole.md)
- [Monitoring](metrics-howto.md)
- [Understanding Synapse Through Grafana Graphs](usage/administration/understanding_synapse_through_grafana_graphs.md)

View file

@ -0,0 +1,114 @@
# Federation API
This API allows a server administrator to manage Synapse's federation with other homeservers.
Note: This API is new, experimental and "subject to change".
## List of destinations
This API gets the current destination retry timing info for all remote servers.
The list contains all the servers with which the server federates,
regardless of whether an error occurred or not.
If an error occurs, it may take up to 20 minutes for the error to be displayed here,
as a complete retry must have failed.
The API is:
A standard request with no filtering:
```
GET /_synapse/admin/v1/federation/destinations
```
A response body like the following is returned:
```json
{
"destinations":[
{
"destination": "matrix.org",
"retry_last_ts": 1557332397936,
"retry_interval": 3000000,
"failure_ts": 1557329397936,
"last_successful_stream_ordering": null
}
],
"total": 1
}
```
To paginate, check for `next_token` and if present, call the endpoint again
with `from` set to the value of `next_token`. This will return a new page.
If the endpoint does not return a `next_token` then there are no more destinations
to paginate through.
**Parameters**
The following query parameters are available:
- `from` - Offset in the returned list. Defaults to `0`.
- `limit` - Maximum amount of destinations to return. Defaults to `100`.
- `order_by` - The method in which to sort the returned list of destinations.
Valid values are:
- `destination` - Destinations are ordered alphabetically by remote server name.
This is the default.
- `retry_last_ts` - Destinations are ordered by time of last retry attempt in ms.
- `retry_interval` - Destinations are ordered by how long until next retry in ms.
- `failure_ts` - Destinations are ordered by when the server started failing in ms.
- `last_successful_stream_ordering` - Destinations are ordered by the stream ordering
of the most recent successfully-sent PDU.
- `dir` - Direction of room order. Either `f` for forwards or `b` for backwards. Setting
this value to `b` will reverse the above sort order. Defaults to `f`.
*Caution:* The database only has an index on the column `destination`.
This means that if a different sort order is used,
this can cause a large load on the database, especially for large environments.
**Response**
The following fields are returned in the JSON response body:
- `destinations` - An array of objects, each containing information about a destination.
Destination objects contain the following fields:
- `destination` - string - Name of the remote server to federate.
- `retry_last_ts` - integer - The last time Synapse tried and failed to reach the
remote server, in ms. This is `0` if the last attempt to communicate with the
remote server was successful.
- `retry_interval` - integer - How long since the last time Synapse tried to reach
the remote server before trying again, in ms. This is `0` if no further retrying occuring.
- `failure_ts` - nullable integer - The first time Synapse tried and failed to reach the
remote server, in ms. This is `null` if communication with the remote server has never failed.
- `last_successful_stream_ordering` - nullable integer - The stream ordering of the most
recent successfully-sent [PDU](understanding_synapse_through_grafana_graphs.md#federation)
to this destination, or `null` if this information has not been tracked yet.
- `next_token`: string representing a positive integer - Indication for pagination. See above.
- `total` - integer - Total number of destinations.
# Destination Details API
This API gets the retry timing info for a specific remote server.
The API is:
```
GET /_synapse/admin/v1/federation/destinations/<destination>
```
A response body like the following is returned:
```json
{
"destination": "matrix.org",
"retry_last_ts": 1557332397936,
"retry_interval": 3000000,
"failure_ts": 1557329397936,
"last_successful_stream_ordering": null
}
```
**Response**
The response fields are the same like in the `destinations` array in
[List of destinations](#list-of-destinations) response.

View file

@ -40,6 +40,10 @@ from synapse.rest.admin.event_reports import (
EventReportDetailRestServlet,
EventReportsRestServlet,
)
from synapse.rest.admin.federation import (
DestinationsRestServlet,
ListDestinationsRestServlet,
)
from synapse.rest.admin.groups import DeleteGroupAdminRestServlet
from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
from synapse.rest.admin.registration_tokens import (
@ -261,6 +265,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ListRegistrationTokensRestServlet(hs).register(http_server)
NewRegistrationTokenRestServlet(hs).register(http_server)
RegistrationTokenRestServlet(hs).register(http_server)
DestinationsRestServlet(hs).register(http_server)
ListDestinationsRestServlet(hs).register(http_server)
# Some servlets only get registered for the main process.
if hs.config.worker.worker_app is None:

View file

@ -0,0 +1,135 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, Tuple
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.servlet import RestServlet, parse_integer, parse_string
from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
from synapse.storage.databases.main.transactions import DestinationSortOrder
from synapse.types import JsonDict
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class ListDestinationsRestServlet(RestServlet):
"""Get request to list all destinations.
This needs user to have administrator access in Synapse.
GET /_synapse/admin/v1/federation/destinations?from=0&limit=10
returns:
200 OK with list of destinations if success otherwise an error.
The parameters `from` and `limit` are required only for pagination.
By default, a `limit` of 100 is used.
The parameter `destination` can be used to filter by destination.
The parameter `order_by` can be used to order the result.
"""
PATTERNS = admin_patterns("/federation/destinations$")
def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self._store = hs.get_datastore()
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self._auth, request)
start = parse_integer(request, "from", default=0)
limit = parse_integer(request, "limit", default=100)
if start < 0:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Query parameter from must be a string representing a positive integer.",
errcode=Codes.INVALID_PARAM,
)
if limit < 0:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Query parameter limit must be a string representing a positive integer.",
errcode=Codes.INVALID_PARAM,
)
destination = parse_string(request, "destination")
order_by = parse_string(
request,
"order_by",
default=DestinationSortOrder.DESTINATION.value,
allowed_values=[dest.value for dest in DestinationSortOrder],
)
direction = parse_string(request, "dir", default="f", allowed_values=("f", "b"))
destinations, total = await self._store.get_destinations_paginate(
start, limit, destination, order_by, direction
)
response = {"destinations": destinations, "total": total}
if (start + limit) < total:
response["next_token"] = str(start + len(destinations))
return HTTPStatus.OK, response
class DestinationsRestServlet(RestServlet):
"""Get details of a destination.
This needs user to have administrator access in Synapse.
GET /_synapse/admin/v1/federation/destinations/<destination>
returns:
200 OK with details of a destination if success otherwise an error.
"""
PATTERNS = admin_patterns("/federation/destinations/(?P<destination>[^/]+)$")
def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self._store = hs.get_datastore()
async def on_GET(
self, request: SynapseRequest, destination: str
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self._auth, request)
destination_retry_timings = await self._store.get_destination_retry_timings(
destination
)
if not destination_retry_timings:
raise NotFoundError("Unknown destination")
last_successful_stream_ordering = (
await self._store.get_destination_last_successful_stream_ordering(
destination
)
)
response = {
"destination": destination,
"failure_ts": destination_retry_timings.failure_ts,
"retry_last_ts": destination_retry_timings.retry_last_ts,
"retry_interval": destination_retry_timings.retry_interval,
"last_successful_stream_ordering": last_successful_stream_ordering,
}
return HTTPStatus.OK, response

View file

@ -14,6 +14,7 @@
import logging
from collections import namedtuple
from enum import Enum
from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
import attr
@ -44,6 +45,16 @@ _UpdateTransactionRow = namedtuple(
)
class DestinationSortOrder(Enum):
"""Enum to define the sorting method used when returning destinations."""
DESTINATION = "destination"
RETRY_LAST_TS = "retry_last_ts"
RETTRY_INTERVAL = "retry_interval"
FAILURE_TS = "failure_ts"
LAST_SUCCESSFUL_STREAM_ORDERING = "last_successful_stream_ordering"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class DestinationRetryTimings:
"""The current destination retry timing info for a remote server."""
@ -480,3 +491,62 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
destinations = [row[0] for row in txn]
return destinations
async def get_destinations_paginate(
self,
start: int,
limit: int,
destination: Optional[str] = None,
order_by: str = DestinationSortOrder.DESTINATION.value,
direction: str = "f",
) -> Tuple[List[JsonDict], int]:
"""Function to retrieve a paginated list of destinations.
This will return a json list of destinations and the
total number of destinations matching the filter criteria.
Args:
start: start number to begin the query from
limit: number of rows to retrieve
destination: search string in destination
order_by: the sort order of the returned list
direction: sort ascending or descending
Returns:
A tuple of a list of mappings from destination to information
and a count of total destinations.
"""
def get_destinations_paginate_txn(
txn: LoggingTransaction,
) -> Tuple[List[JsonDict], int]:
order_by_column = DestinationSortOrder(order_by).value
if direction == "b":
order = "DESC"
else:
order = "ASC"
args = []
where_statement = ""
if destination:
args.extend(["%" + destination.lower() + "%"])
where_statement = "WHERE LOWER(destination) LIKE ?"
sql_base = f"FROM destinations {where_statement} "
sql = f"SELECT COUNT(*) as total_destinations {sql_base}"
txn.execute(sql, args)
count = txn.fetchone()[0]
sql = f"""
SELECT destination, retry_last_ts, retry_interval, failure_ts,
last_successful_stream_ordering
{sql_base}
ORDER BY {order_by_column} {order}, destination ASC
LIMIT ? OFFSET ?
"""
txn.execute(sql, args + [limit, start])
destinations = self.db_pool.cursor_to_dict(txn)
return destinations, count
return await self.db_pool.runInteraction(
"get_destinations_paginate_txn", get_destinations_paginate_txn
)

View file

@ -0,0 +1,456 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from http import HTTPStatus
from typing import List, Optional
from parameterized import parameterized
import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.rest.client import login
from synapse.server import HomeServer
from synapse.types import JsonDict
from tests import unittest
class FederationTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
]
def prepare(self, reactor, clock, hs: HomeServer):
self.store = hs.get_datastore()
self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.url = "/_synapse/admin/v1/federation/destinations"
@parameterized.expand(
[
("/_synapse/admin/v1/federation/destinations",),
("/_synapse/admin/v1/federation/destinations/dummy",),
]
)
def test_requester_is_no_admin(self, url: str):
"""
If the user is not a server admin, an error 403 is returned.
"""
self.register_user("user", "pass", admin=False)
other_user_tok = self.login("user", "pass")
channel = self.make_request(
"GET",
url,
content={},
access_token=other_user_tok,
)
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_invalid_parameter(self):
"""
If parameters are invalid, an error is returned.
"""
# negative limit
channel = self.make_request(
"GET",
self.url + "?limit=-5",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# negative from
channel = self.make_request(
"GET",
self.url + "?from=-5",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# unkown order_by
channel = self.make_request(
"GET",
self.url + "?order_by=bar",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
# invalid search order
channel = self.make_request(
"GET",
self.url + "?dir=bar",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
# invalid destination
channel = self.make_request(
"GET",
self.url + "/dummy",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_limit(self):
"""
Testing list of destinations with limit
"""
number_destinations = 20
self._create_destinations(number_destinations)
channel = self.make_request(
"GET",
self.url + "?limit=5",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_destinations)
self.assertEqual(len(channel.json_body["destinations"]), 5)
self.assertEqual(channel.json_body["next_token"], "5")
self._check_fields(channel.json_body["destinations"])
def test_from(self):
"""
Testing list of destinations with a defined starting point (from)
"""
number_destinations = 20
self._create_destinations(number_destinations)
channel = self.make_request(
"GET",
self.url + "?from=5",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_destinations)
self.assertEqual(len(channel.json_body["destinations"]), 15)
self.assertNotIn("next_token", channel.json_body)
self._check_fields(channel.json_body["destinations"])
def test_limit_and_from(self):
"""
Testing list of destinations with a defined starting point and limit
"""
number_destinations = 20
self._create_destinations(number_destinations)
channel = self.make_request(
"GET",
self.url + "?from=5&limit=10",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_destinations)
self.assertEqual(channel.json_body["next_token"], "15")
self.assertEqual(len(channel.json_body["destinations"]), 10)
self._check_fields(channel.json_body["destinations"])
def test_next_token(self):
"""
Testing that `next_token` appears at the right place
"""
number_destinations = 20
self._create_destinations(number_destinations)
# `next_token` does not appear
# Number of results is the number of entries
channel = self.make_request(
"GET",
self.url + "?limit=20",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_destinations)
self.assertEqual(len(channel.json_body["destinations"]), number_destinations)
self.assertNotIn("next_token", channel.json_body)
# `next_token` does not appear
# Number of max results is larger than the number of entries
channel = self.make_request(
"GET",
self.url + "?limit=21",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_destinations)
self.assertEqual(len(channel.json_body["destinations"]), number_destinations)
self.assertNotIn("next_token", channel.json_body)
# `next_token` does appear
# Number of max results is smaller than the number of entries
channel = self.make_request(
"GET",
self.url + "?limit=19",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_destinations)
self.assertEqual(len(channel.json_body["destinations"]), 19)
self.assertEqual(channel.json_body["next_token"], "19")
# Check
# Set `from` to value of `next_token` for request remaining entries
# `next_token` does not appear
channel = self.make_request(
"GET",
self.url + "?from=19",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_destinations)
self.assertEqual(len(channel.json_body["destinations"]), 1)
self.assertNotIn("next_token", channel.json_body)
def test_list_all_destinations(self):
"""
List all destinations.
"""
number_destinations = 5
self._create_destinations(number_destinations)
channel = self.make_request(
"GET",
self.url,
{},
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(number_destinations, len(channel.json_body["destinations"]))
self.assertEqual(number_destinations, channel.json_body["total"])
# Check that all fields are available
self._check_fields(channel.json_body["destinations"])
def test_order_by(self):
"""
Testing order list with parameter `order_by`
"""
def _order_test(
expected_destination_list: List[str],
order_by: Optional[str],
dir: Optional[str] = None,
):
"""Request the list of destinations in a certain order.
Assert that order is what we expect
Args:
expected_destination_list: The list of user_id in the order
we expect to get back from the server
order_by: The type of ordering to give the server
dir: The direction of ordering to give the server
"""
url = f"{self.url}?"
if order_by is not None:
url += f"order_by={order_by}&"
if dir is not None and dir in ("b", "f"):
url += f"dir={dir}"
channel = self.make_request(
"GET",
url,
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], len(expected_destination_list))
returned_order = [
row["destination"] for row in channel.json_body["destinations"]
]
self.assertEqual(expected_destination_list, returned_order)
self._check_fields(channel.json_body["destinations"])
# create destinations
dest = [
("sub-a.example.com", 100, 300, 200, 300),
("sub-b.example.com", 200, 200, 100, 100),
("sub-c.example.com", 300, 100, 300, 200),
]
for (
destination,
failure_ts,
retry_last_ts,
retry_interval,
last_successful_stream_ordering,
) in dest:
self.get_success(
self.store.set_destination_retry_timings(
destination, failure_ts, retry_last_ts, retry_interval
)
)
self.get_success(
self.store.set_destination_last_successful_stream_ordering(
destination, last_successful_stream_ordering
)
)
# order by default (destination)
_order_test([dest[0][0], dest[1][0], dest[2][0]], None)
_order_test([dest[0][0], dest[1][0], dest[2][0]], None, "f")
_order_test([dest[2][0], dest[1][0], dest[0][0]], None, "b")
# order by destination
_order_test([dest[0][0], dest[1][0], dest[2][0]], "destination")
_order_test([dest[0][0], dest[1][0], dest[2][0]], "destination", "f")
_order_test([dest[2][0], dest[1][0], dest[0][0]], "destination", "b")
# order by failure_ts
_order_test([dest[0][0], dest[1][0], dest[2][0]], "failure_ts")
_order_test([dest[0][0], dest[1][0], dest[2][0]], "failure_ts", "f")
_order_test([dest[2][0], dest[1][0], dest[0][0]], "failure_ts", "b")
# order by retry_last_ts
_order_test([dest[2][0], dest[1][0], dest[0][0]], "retry_last_ts")
_order_test([dest[2][0], dest[1][0], dest[0][0]], "retry_last_ts", "f")
_order_test([dest[0][0], dest[1][0], dest[2][0]], "retry_last_ts", "b")
# order by retry_interval
_order_test([dest[1][0], dest[0][0], dest[2][0]], "retry_interval")
_order_test([dest[1][0], dest[0][0], dest[2][0]], "retry_interval", "f")
_order_test([dest[2][0], dest[0][0], dest[1][0]], "retry_interval", "b")
# order by last_successful_stream_ordering
_order_test(
[dest[1][0], dest[2][0], dest[0][0]], "last_successful_stream_ordering"
)
_order_test(
[dest[1][0], dest[2][0], dest[0][0]], "last_successful_stream_ordering", "f"
)
_order_test(
[dest[0][0], dest[2][0], dest[1][0]], "last_successful_stream_ordering", "b"
)
def test_search_term(self):
"""Test that searching for a destination works correctly"""
def _search_test(
expected_destination: Optional[str],
search_term: str,
):
"""Search for a destination and check that the returned destinationis a match
Args:
expected_destination: The room_id expected to be returned by the API.
Set to None to expect zero results for the search
search_term: The term to search for room names with
"""
url = f"{self.url}?destination={search_term}"
channel = self.make_request(
"GET",
url.encode("ascii"),
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Check that destinations were returned
self.assertTrue("destinations" in channel.json_body)
self._check_fields(channel.json_body["destinations"])
destinations = channel.json_body["destinations"]
# Check that the expected number of destinations were returned
expected_destination_count = 1 if expected_destination else 0
self.assertEqual(len(destinations), expected_destination_count)
self.assertEqual(channel.json_body["total"], expected_destination_count)
if expected_destination:
# Check that the first returned destination is correct
self.assertEqual(expected_destination, destinations[0]["destination"])
number_destinations = 3
self._create_destinations(number_destinations)
# Test searching
_search_test("sub0.example.com", "0")
_search_test("sub0.example.com", "sub0")
_search_test("sub1.example.com", "1")
_search_test("sub1.example.com", "1.")
# Test case insensitive
_search_test("sub0.example.com", "SUB0")
_search_test(None, "foo")
_search_test(None, "bar")
def test_get_single_destination(self):
"""
Get one specific destinations.
"""
self._create_destinations(5)
channel = self.make_request(
"GET",
self.url + "/sub0.example.com",
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("sub0.example.com", channel.json_body["destination"])
# Check that all fields are available
# convert channel.json_body into a List
self._check_fields([channel.json_body])
def _create_destinations(self, number_destinations: int):
"""Create a number of destinations
Args:
number_destinations: Number of destinations to be created
"""
for i in range(0, number_destinations):
dest = f"sub{i}.example.com"
self.get_success(self.store.set_destination_retry_timings(dest, 50, 50, 50))
self.get_success(
self.store.set_destination_last_successful_stream_ordering(dest, 100)
)
def _check_fields(self, content: List[JsonDict]):
"""Checks that the expected destination attributes are present in content
Args:
content: List that is checked for content
"""
for c in content:
self.assertIn("destination", c)
self.assertIn("retry_last_ts", c)
self.assertIn("retry_interval", c)
self.assertIn("failure_ts", c)
self.assertIn("last_successful_stream_ordering", c)