1
0
Fork 0
mirror of https://github.com/element-hq/synapse.git synced 2025-01-20 18:42:33 +00:00

Retry well known on fail.

If we have recently seen a valid well-known for a domain we want to
retry on (non-final) errors a few times, to handle temporary blips in
networking/etc.
This commit is contained in:
Erik Johnston 2019-08-13 18:04:46 +01:00
parent 09f6152a11
commit e6e136decc
3 changed files with 160 additions and 67 deletions

View file

@ -51,9 +51,9 @@ class MatrixFederationAgent(object):
SRVResolver impl to use for looking up SRV records. None to use a default SRVResolver impl to use for looking up SRV records. None to use a default
implementation. implementation.
_well_known_cache (TTLCache|None): _well_known_resolver (WellKnownResolver|None):
TTLCache impl for storing cached well-known lookups. None to use a default WellKnownResolver to use to perform well-known lookups. None to use a
implementation. default implementation.
""" """
def __init__( def __init__(
@ -61,7 +61,7 @@ class MatrixFederationAgent(object):
reactor, reactor,
tls_client_options_factory, tls_client_options_factory,
_srv_resolver=None, _srv_resolver=None,
_well_known_cache=None, _well_known_resolver=None,
): ):
self._reactor = reactor self._reactor = reactor
self._clock = Clock(reactor) self._clock = Clock(reactor)
@ -76,15 +76,17 @@ class MatrixFederationAgent(object):
self._pool.maxPersistentPerHost = 5 self._pool.maxPersistentPerHost = 5
self._pool.cachedConnectionTimeout = 2 * 60 self._pool.cachedConnectionTimeout = 2 * 60
self._well_known_resolver = WellKnownResolver( if _well_known_resolver is None:
self._reactor, _well_known_resolver = WellKnownResolver(
agent=Agent(
self._reactor, self._reactor,
pool=self._pool, agent=Agent(
contextFactory=tls_client_options_factory, self._reactor,
), pool=self._pool,
well_known_cache=_well_known_cache, contextFactory=tls_client_options_factory,
) ),
)
self._well_known_resolver = _well_known_resolver
@defer.inlineCallbacks @defer.inlineCallbacks
def request(self, method, uri, headers=None, bodyProducer=None): def request(self, method, uri, headers=None, bodyProducer=None):

View file

@ -38,6 +38,13 @@ WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
# period to cache failure to fetch .well-known for # period to cache failure to fetch .well-known for
WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600 WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
# period to cache failure to fetch .well-known if there has recently been a
# valid well-known for that domain.
WELL_KNOWN_DOWN_CACHE_PERIOD = 2 * 60
# period to remember there was a valid well-known after valid record expires
WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID = 2 * 3600
# cap for .well-known cache period # cap for .well-known cache period
WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600 WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
@ -49,11 +56,16 @@ WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60
# we'll start trying to refetch 1 minute before it expires. # we'll start trying to refetch 1 minute before it expires.
WELL_KNOWN_GRACE_PERIOD_FACTOR = 0.2 WELL_KNOWN_GRACE_PERIOD_FACTOR = 0.2
# Number of times we retry fetching a well-known for a domain we know recently
# had a valid entry.
WELL_KNOWN_RETRY_ATTEMPTS = 3
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_well_known_cache = TTLCache("well-known") _well_known_cache = TTLCache("well-known")
_had_valid_well_known_cache = TTLCache("had-valid-well-known")
@attr.s(slots=True, frozen=True) @attr.s(slots=True, frozen=True)
@ -65,14 +77,20 @@ class WellKnownResolver(object):
"""Handles well-known lookups for matrix servers. """Handles well-known lookups for matrix servers.
""" """
def __init__(self, reactor, agent, well_known_cache=None): def __init__(
self, reactor, agent, well_known_cache=None, had_well_known_cache=None
):
self._reactor = reactor self._reactor = reactor
self._clock = Clock(reactor) self._clock = Clock(reactor)
if well_known_cache is None: if well_known_cache is None:
well_known_cache = _well_known_cache well_known_cache = _well_known_cache
if had_well_known_cache is None:
had_well_known_cache = _had_valid_well_known_cache
self._well_known_cache = well_known_cache self._well_known_cache = well_known_cache
self._had_valid_well_known_cache = had_well_known_cache
self._well_known_agent = RedirectAgent(agent) self._well_known_agent = RedirectAgent(agent)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -100,7 +118,7 @@ class WellKnownResolver(object):
# requests for the same server in parallel? # requests for the same server in parallel?
try: try:
with Measure(self._clock, "get_well_known"): with Measure(self._clock, "get_well_known"):
result, cache_period = yield self._do_get_well_known(server_name) result, cache_period = yield self._fetch_well_known(server_name)
except _FetchWellKnownFailure as e: except _FetchWellKnownFailure as e:
if prev_result and e.temporary: if prev_result and e.temporary:
@ -111,10 +129,20 @@ class WellKnownResolver(object):
result = None result = None
# add some randomness to the TTL to avoid a stampeding herd every hour if self._had_valid_well_known_cache.get(server_name, False):
# after startup # We have recently seen a valid well-known record for this
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD # server, so we cache the lack of well-known for a shorter time.
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER) cache_period = WELL_KNOWN_DOWN_CACHE_PERIOD
cache_period += random.uniform(
0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER
)
else:
# add some randomness to the TTL to avoid a stampeding herd every hour
# after startup
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
cache_period += random.uniform(
0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER
)
if cache_period > 0: if cache_period > 0:
self._well_known_cache.set(server_name, result, cache_period) self._well_known_cache.set(server_name, result, cache_period)
@ -122,7 +150,7 @@ class WellKnownResolver(object):
return WellKnownLookupResult(delegated_server=result) return WellKnownLookupResult(delegated_server=result)
@defer.inlineCallbacks @defer.inlineCallbacks
def _do_get_well_known(self, server_name): def _fetch_well_known(self, server_name):
"""Actually fetch and parse a .well-known, without checking the cache """Actually fetch and parse a .well-known, without checking the cache
Args: Args:
@ -134,24 +162,17 @@ class WellKnownResolver(object):
Returns: Returns:
Deferred[Tuple[bytes,int]]: The lookup result and cache period. Deferred[Tuple[bytes,int]]: The lookup result and cache period.
""" """
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
uri_str = uri.decode("ascii") had_valid_well_known = bool(
logger.info("Fetching %s", uri_str) self._had_valid_well_known_cache.get(server_name, False)
)
# We do this in two steps to differentiate between possibly transient # We do this in two steps to differentiate between possibly transient
# errors (e.g. can't connect to host, 503 response) and more permenant # errors (e.g. can't connect to host, 503 response) and more permenant
# errors (such as getting a 404 response). # errors (such as getting a 404 response).
try: response, body = yield self._make_well_known_request(
response = yield make_deferred_yieldable( server_name, retry=had_valid_well_known
self._well_known_agent.request(b"GET", uri) )
)
body = yield make_deferred_yieldable(readBody(response))
if 500 <= response.code < 600:
raise Exception("Non-200 response %s" % (response.code,))
except Exception as e:
logger.info("Error fetching %s: %s", uri_str, e)
raise _FetchWellKnownFailure(temporary=True)
try: try:
if response.code != 200: if response.code != 200:
@ -161,8 +182,11 @@ class WellKnownResolver(object):
logger.info("Response from .well-known: %s", parsed_body) logger.info("Response from .well-known: %s", parsed_body)
result = parsed_body["m.server"].encode("ascii") result = parsed_body["m.server"].encode("ascii")
except defer.CancelledError:
# Bail if we've been cancelled
raise
except Exception as e: except Exception as e:
logger.info("Error fetching %s: %s", uri_str, e) logger.info("Error parsing well-known for %s: %s", server_name, e)
raise _FetchWellKnownFailure(temporary=False) raise _FetchWellKnownFailure(temporary=False)
cache_period = _cache_period_from_headers( cache_period = _cache_period_from_headers(
@ -177,8 +201,62 @@ class WellKnownResolver(object):
cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD) cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD) cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD)
# We got a success, mark as such in the cache
self._had_valid_well_known_cache.set(
server_name,
bool(result),
cache_period + WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID,
)
return (result, cache_period) return (result, cache_period)
@defer.inlineCallbacks
def _make_well_known_request(self, server_name, retry):
"""Make the well known request.
This will retry the request if requested and it fails (with unable
to connect or receives a 5xx error).
Args:
server_name (bytes)
retry (bool): Whether to retry the request if it fails.
Returns:
Deferred[tuple[IResponse, bytes]] Returns the response object and
body. Response may be a non-200 response.
"""
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
uri_str = uri.decode("ascii")
i = 0
while True:
i += 1
logger.info("Fetching %s", uri_str)
try:
response = yield make_deferred_yieldable(
self._well_known_agent.request(b"GET", uri)
)
body = yield make_deferred_yieldable(readBody(response))
if 500 <= response.code < 600:
raise Exception("Non-200 response %s" % (response.code,))
return response, body
except defer.CancelledError:
# Bail if we've been cancelled
raise
except Exception as e:
logger.info("Retry: %s", retry)
if not retry or i >= WELL_KNOWN_RETRY_ATTEMPTS:
logger.info("Error fetching %s: %s", uri_str, e)
raise _FetchWellKnownFailure(temporary=True)
logger.info("Error fetching %s: %s. Retrying", uri_str, e)
# Sleep briefly in the hopes that they come back up
yield self._clock.sleep(0.5)
def _cache_period_from_headers(headers, time_now=time.time): def _cache_period_from_headers(headers, time_now=time.time):
cache_controls = _parse_cache_control(headers) cache_controls = _parse_cache_control(headers)

View file

@ -73,8 +73,6 @@ class MatrixFederationAgentTests(TestCase):
self.mock_resolver = Mock() self.mock_resolver = Mock()
self.well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds)
config_dict = default_config("test", parse=False) config_dict = default_config("test", parse=False)
config_dict["federation_custom_ca_list"] = [get_test_ca_cert_file()] config_dict["federation_custom_ca_list"] = [get_test_ca_cert_file()]
@ -82,11 +80,21 @@ class MatrixFederationAgentTests(TestCase):
config.parse_config_dict(config_dict, "", "") config.parse_config_dict(config_dict, "", "")
self.tls_factory = ClientTLSOptionsFactory(config) self.tls_factory = ClientTLSOptionsFactory(config)
self.well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds)
self.had_well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds)
self.well_known_resolver = WellKnownResolver(
self.reactor,
Agent(self.reactor, contextFactory=self.tls_factory),
well_known_cache=self.well_known_cache,
had_well_known_cache=self.had_well_known_cache,
)
self.agent = MatrixFederationAgent( self.agent = MatrixFederationAgent(
reactor=self.reactor, reactor=self.reactor,
tls_client_options_factory=self.tls_factory, tls_client_options_factory=self.tls_factory,
_srv_resolver=self.mock_resolver, _srv_resolver=self.mock_resolver,
_well_known_cache=self.well_known_cache, _well_known_resolver=self.well_known_resolver,
) )
def _make_connection(self, client_factory, expected_sni): def _make_connection(self, client_factory, expected_sni):
@ -701,11 +709,18 @@ class MatrixFederationAgentTests(TestCase):
config = default_config("test", parse=True) config = default_config("test", parse=True)
# Build a new agent and WellKnownResolver with a different tls factory
tls_factory = ClientTLSOptionsFactory(config)
agent = MatrixFederationAgent( agent = MatrixFederationAgent(
reactor=self.reactor, reactor=self.reactor,
tls_client_options_factory=ClientTLSOptionsFactory(config), tls_client_options_factory=tls_factory,
_srv_resolver=self.mock_resolver, _srv_resolver=self.mock_resolver,
_well_known_cache=self.well_known_cache, _well_known_resolver=WellKnownResolver(
self.reactor,
Agent(self.reactor, contextFactory=tls_factory),
well_known_cache=self.well_known_cache,
had_well_known_cache=self.had_well_known_cache,
),
) )
test_d = agent.request(b"GET", b"matrix://testserv/foo/bar") test_d = agent.request(b"GET", b"matrix://testserv/foo/bar")
@ -932,15 +947,9 @@ class MatrixFederationAgentTests(TestCase):
self.successResultOf(test_d) self.successResultOf(test_d)
def test_well_known_cache(self): def test_well_known_cache(self):
well_known_resolver = WellKnownResolver(
self.reactor,
Agent(self.reactor, contextFactory=self.tls_factory),
well_known_cache=self.well_known_cache,
)
self.reactor.lookups["testserv"] = "1.2.3.4" self.reactor.lookups["testserv"] = "1.2.3.4"
fetch_d = well_known_resolver.get_well_known(b"testserv") fetch_d = self.well_known_resolver.get_well_known(b"testserv")
# there should be an attempt to connect on port 443 for the .well-known # there should be an attempt to connect on port 443 for the .well-known
clients = self.reactor.tcpClients clients = self.reactor.tcpClients
@ -963,7 +972,7 @@ class MatrixFederationAgentTests(TestCase):
well_known_server.loseConnection() well_known_server.loseConnection()
# repeat the request: it should hit the cache # repeat the request: it should hit the cache
fetch_d = well_known_resolver.get_well_known(b"testserv") fetch_d = self.well_known_resolver.get_well_known(b"testserv")
r = self.successResultOf(fetch_d) r = self.successResultOf(fetch_d)
self.assertEqual(r.delegated_server, b"target-server") self.assertEqual(r.delegated_server, b"target-server")
@ -971,7 +980,7 @@ class MatrixFederationAgentTests(TestCase):
self.reactor.pump((1000.0,)) self.reactor.pump((1000.0,))
# now it should connect again # now it should connect again
fetch_d = well_known_resolver.get_well_known(b"testserv") fetch_d = self.well_known_resolver.get_well_known(b"testserv")
self.assertEqual(len(clients), 1) self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
@ -992,15 +1001,9 @@ class MatrixFederationAgentTests(TestCase):
it ignores transient errors. it ignores transient errors.
""" """
well_known_resolver = WellKnownResolver(
self.reactor,
Agent(self.reactor, contextFactory=self.tls_factory),
well_known_cache=self.well_known_cache,
)
self.reactor.lookups["testserv"] = "1.2.3.4" self.reactor.lookups["testserv"] = "1.2.3.4"
fetch_d = well_known_resolver.get_well_known(b"testserv") fetch_d = self.well_known_resolver.get_well_known(b"testserv")
# there should be an attempt to connect on port 443 for the .well-known # there should be an attempt to connect on port 443 for the .well-known
clients = self.reactor.tcpClients clients = self.reactor.tcpClients
@ -1026,27 +1029,37 @@ class MatrixFederationAgentTests(TestCase):
# another lookup. # another lookup.
self.reactor.pump((900.0,)) self.reactor.pump((900.0,))
fetch_d = well_known_resolver.get_well_known(b"testserv") fetch_d = self.well_known_resolver.get_well_known(b"testserv")
clients = self.reactor.tcpClients
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
# fonx the connection attempt, this will be treated as a temporary # The resolver may retry a few times, so fonx all requests that come along
# failure. attempts = 0
client_factory.clientConnectionFailed(None, Exception("nope")) while self.reactor.tcpClients:
clients = self.reactor.tcpClients
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
# attemptdelay on the hostnameendpoint is 0.3, so takes that long before the attempts += 1
# .well-known request fails.
self.reactor.pump((0.4,)) # fonx the connection attempt, this will be treated as a temporary
# failure.
client_factory.clientConnectionFailed(None, Exception("nope"))
# There's a few sleeps involved, so we have to pump the reactor a
# bit.
self.reactor.pump((1.0, 1.0))
# We expect to see more than one attempt as there was previously a valid
# well known.
self.assertGreater(attempts, 1)
# Resolver should return cached value, despite the lookup failing. # Resolver should return cached value, despite the lookup failing.
r = self.successResultOf(fetch_d) r = self.successResultOf(fetch_d)
self.assertEqual(r.delegated_server, b"target-server") self.assertEqual(r.delegated_server, b"target-server")
# Expire the cache and repeat the request # Expire both caches and repeat the request
self.reactor.pump((100.0,)) self.reactor.pump((10000.0,))
# Repated the request, this time it should fail if the lookup fails. # Repated the request, this time it should fail if the lookup fails.
fetch_d = well_known_resolver.get_well_known(b"testserv") fetch_d = self.well_known_resolver.get_well_known(b"testserv")
clients = self.reactor.tcpClients clients = self.reactor.tcpClients
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)