mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-14 11:57:44 +00:00
Fix handling of connection timeouts in outgoing http requests (#8400)
* Remove `on_timeout_cancel` from `timeout_deferred` The `on_timeout_cancel` param to `timeout_deferred` wasn't always called on a timeout (in particular if the canceller raised an exception), so it was unreliable. It was also only used in one place, and to be honest it's easier to do what it does a different way. * Fix handling of connection timeouts in outgoing http requests Turns out that if we get a timeout during connection, then a different exception is raised, which wasn't always handled correctly. To fix it, catch the exception in SimpleHttpClient and turn it into a RequestTimedOutError (which is already a documented exception). Also add a description to RequestTimedOutError so that we can see which stage it failed at. * Fix incorrect handling of timeouts reading federation responses This was trapping the wrong sort of TimeoutError, so was never being hit. The effect was relatively minor, but we should fix this so that it does the expected thing. * Fix inconsistent handling of `timeout` param between methods `get_json`, `put_json` and `delete_json` were applying a different timeout to the response body to `post_json`; bring them in line and test. Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com> Co-authored-by: Erik Johnston <erik@matrix.org>
This commit is contained in:
parent
bd380d942f
commit
1c262431f9
9 changed files with 311 additions and 98 deletions
1
changelog.d/8400.bugfix
Normal file
1
changelog.d/8400.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix incorrect handling of timeouts on outgoing HTTP requests.
|
|
@ -21,8 +21,6 @@ import logging
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
from typing import Awaitable, Callable, Dict, List, Optional, Tuple
|
from typing import Awaitable, Callable, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
from twisted.internet.error import TimeoutError
|
|
||||||
|
|
||||||
from synapse.api.errors import (
|
from synapse.api.errors import (
|
||||||
CodeMessageException,
|
CodeMessageException,
|
||||||
Codes,
|
Codes,
|
||||||
|
@ -30,6 +28,7 @@ from synapse.api.errors import (
|
||||||
SynapseError,
|
SynapseError,
|
||||||
)
|
)
|
||||||
from synapse.config.emailconfig import ThreepidBehaviour
|
from synapse.config.emailconfig import ThreepidBehaviour
|
||||||
|
from synapse.http import RequestTimedOutError
|
||||||
from synapse.http.client import SimpleHttpClient
|
from synapse.http.client import SimpleHttpClient
|
||||||
from synapse.types import JsonDict, Requester
|
from synapse.types import JsonDict, Requester
|
||||||
from synapse.util import json_decoder
|
from synapse.util import json_decoder
|
||||||
|
@ -93,7 +92,7 @@ class IdentityHandler(BaseHandler):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
data = await self.http_client.get_json(url, query_params)
|
data = await self.http_client.get_json(url, query_params)
|
||||||
except TimeoutError:
|
except RequestTimedOutError:
|
||||||
raise SynapseError(500, "Timed out contacting identity server")
|
raise SynapseError(500, "Timed out contacting identity server")
|
||||||
except HttpResponseException as e:
|
except HttpResponseException as e:
|
||||||
logger.info(
|
logger.info(
|
||||||
|
@ -173,7 +172,7 @@ class IdentityHandler(BaseHandler):
|
||||||
if e.code != 404 or not use_v2:
|
if e.code != 404 or not use_v2:
|
||||||
logger.error("3PID bind failed with Matrix error: %r", e)
|
logger.error("3PID bind failed with Matrix error: %r", e)
|
||||||
raise e.to_synapse_error()
|
raise e.to_synapse_error()
|
||||||
except TimeoutError:
|
except RequestTimedOutError:
|
||||||
raise SynapseError(500, "Timed out contacting identity server")
|
raise SynapseError(500, "Timed out contacting identity server")
|
||||||
except CodeMessageException as e:
|
except CodeMessageException as e:
|
||||||
data = json_decoder.decode(e.msg) # XXX WAT?
|
data = json_decoder.decode(e.msg) # XXX WAT?
|
||||||
|
@ -273,7 +272,7 @@ class IdentityHandler(BaseHandler):
|
||||||
else:
|
else:
|
||||||
logger.error("Failed to unbind threepid on identity server: %s", e)
|
logger.error("Failed to unbind threepid on identity server: %s", e)
|
||||||
raise SynapseError(500, "Failed to contact identity server")
|
raise SynapseError(500, "Failed to contact identity server")
|
||||||
except TimeoutError:
|
except RequestTimedOutError:
|
||||||
raise SynapseError(500, "Timed out contacting identity server")
|
raise SynapseError(500, "Timed out contacting identity server")
|
||||||
|
|
||||||
await self.store.remove_user_bound_threepid(
|
await self.store.remove_user_bound_threepid(
|
||||||
|
@ -419,7 +418,7 @@ class IdentityHandler(BaseHandler):
|
||||||
except HttpResponseException as e:
|
except HttpResponseException as e:
|
||||||
logger.info("Proxied requestToken failed: %r", e)
|
logger.info("Proxied requestToken failed: %r", e)
|
||||||
raise e.to_synapse_error()
|
raise e.to_synapse_error()
|
||||||
except TimeoutError:
|
except RequestTimedOutError:
|
||||||
raise SynapseError(500, "Timed out contacting identity server")
|
raise SynapseError(500, "Timed out contacting identity server")
|
||||||
|
|
||||||
async def requestMsisdnToken(
|
async def requestMsisdnToken(
|
||||||
|
@ -471,7 +470,7 @@ class IdentityHandler(BaseHandler):
|
||||||
except HttpResponseException as e:
|
except HttpResponseException as e:
|
||||||
logger.info("Proxied requestToken failed: %r", e)
|
logger.info("Proxied requestToken failed: %r", e)
|
||||||
raise e.to_synapse_error()
|
raise e.to_synapse_error()
|
||||||
except TimeoutError:
|
except RequestTimedOutError:
|
||||||
raise SynapseError(500, "Timed out contacting identity server")
|
raise SynapseError(500, "Timed out contacting identity server")
|
||||||
|
|
||||||
assert self.hs.config.public_baseurl
|
assert self.hs.config.public_baseurl
|
||||||
|
@ -553,7 +552,7 @@ class IdentityHandler(BaseHandler):
|
||||||
id_server + "/_matrix/identity/api/v1/validate/msisdn/submitToken",
|
id_server + "/_matrix/identity/api/v1/validate/msisdn/submitToken",
|
||||||
body,
|
body,
|
||||||
)
|
)
|
||||||
except TimeoutError:
|
except RequestTimedOutError:
|
||||||
raise SynapseError(500, "Timed out contacting identity server")
|
raise SynapseError(500, "Timed out contacting identity server")
|
||||||
except HttpResponseException as e:
|
except HttpResponseException as e:
|
||||||
logger.warning("Error contacting msisdn account_threepid_delegate: %s", e)
|
logger.warning("Error contacting msisdn account_threepid_delegate: %s", e)
|
||||||
|
@ -627,7 +626,7 @@ class IdentityHandler(BaseHandler):
|
||||||
# require or validate it. See the following for context:
|
# require or validate it. See the following for context:
|
||||||
# https://github.com/matrix-org/synapse/issues/5253#issuecomment-666246950
|
# https://github.com/matrix-org/synapse/issues/5253#issuecomment-666246950
|
||||||
return data["mxid"]
|
return data["mxid"]
|
||||||
except TimeoutError:
|
except RequestTimedOutError:
|
||||||
raise SynapseError(500, "Timed out contacting identity server")
|
raise SynapseError(500, "Timed out contacting identity server")
|
||||||
except IOError as e:
|
except IOError as e:
|
||||||
logger.warning("Error from v1 identity server lookup: %s" % (e,))
|
logger.warning("Error from v1 identity server lookup: %s" % (e,))
|
||||||
|
@ -655,7 +654,7 @@ class IdentityHandler(BaseHandler):
|
||||||
"%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server),
|
"%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server),
|
||||||
{"access_token": id_access_token},
|
{"access_token": id_access_token},
|
||||||
)
|
)
|
||||||
except TimeoutError:
|
except RequestTimedOutError:
|
||||||
raise SynapseError(500, "Timed out contacting identity server")
|
raise SynapseError(500, "Timed out contacting identity server")
|
||||||
|
|
||||||
if not isinstance(hash_details, dict):
|
if not isinstance(hash_details, dict):
|
||||||
|
@ -727,7 +726,7 @@ class IdentityHandler(BaseHandler):
|
||||||
},
|
},
|
||||||
headers=headers,
|
headers=headers,
|
||||||
)
|
)
|
||||||
except TimeoutError:
|
except RequestTimedOutError:
|
||||||
raise SynapseError(500, "Timed out contacting identity server")
|
raise SynapseError(500, "Timed out contacting identity server")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Error when performing a v2 3pid lookup: %s", e)
|
logger.warning("Error when performing a v2 3pid lookup: %s", e)
|
||||||
|
@ -823,7 +822,7 @@ class IdentityHandler(BaseHandler):
|
||||||
invite_config,
|
invite_config,
|
||||||
{"Authorization": create_id_access_token_header(id_access_token)},
|
{"Authorization": create_id_access_token_header(id_access_token)},
|
||||||
)
|
)
|
||||||
except TimeoutError:
|
except RequestTimedOutError:
|
||||||
raise SynapseError(500, "Timed out contacting identity server")
|
raise SynapseError(500, "Timed out contacting identity server")
|
||||||
except HttpResponseException as e:
|
except HttpResponseException as e:
|
||||||
if e.code != 404:
|
if e.code != 404:
|
||||||
|
@ -841,7 +840,7 @@ class IdentityHandler(BaseHandler):
|
||||||
data = await self.blacklisting_http_client.post_json_get_json(
|
data = await self.blacklisting_http_client.post_json_get_json(
|
||||||
url, invite_config
|
url, invite_config
|
||||||
)
|
)
|
||||||
except TimeoutError:
|
except RequestTimedOutError:
|
||||||
raise SynapseError(500, "Timed out contacting identity server")
|
raise SynapseError(500, "Timed out contacting identity server")
|
||||||
except HttpResponseException as e:
|
except HttpResponseException as e:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
|
|
|
@ -16,8 +16,6 @@
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from twisted.internet import task
|
from twisted.internet import task
|
||||||
from twisted.internet.defer import CancelledError
|
|
||||||
from twisted.python import failure
|
|
||||||
from twisted.web.client import FileBodyProducer
|
from twisted.web.client import FileBodyProducer
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
|
@ -26,19 +24,8 @@ from synapse.api.errors import SynapseError
|
||||||
class RequestTimedOutError(SynapseError):
|
class RequestTimedOutError(SynapseError):
|
||||||
"""Exception representing timeout of an outbound request"""
|
"""Exception representing timeout of an outbound request"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self, msg):
|
||||||
super().__init__(504, "Timed out")
|
super().__init__(504, msg)
|
||||||
|
|
||||||
|
|
||||||
def cancelled_to_request_timed_out_error(value, timeout):
|
|
||||||
"""Turns CancelledErrors into RequestTimedOutErrors.
|
|
||||||
|
|
||||||
For use with async.add_timeout_to_deferred
|
|
||||||
"""
|
|
||||||
if isinstance(value, failure.Failure):
|
|
||||||
value.trap(CancelledError)
|
|
||||||
raise RequestTimedOutError()
|
|
||||||
return value
|
|
||||||
|
|
||||||
|
|
||||||
ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(.*)$")
|
ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(.*)$")
|
||||||
|
|
|
@ -13,7 +13,6 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import urllib
|
import urllib
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
@ -38,7 +37,7 @@ from zope.interface import implementer, provider
|
||||||
|
|
||||||
from OpenSSL import SSL
|
from OpenSSL import SSL
|
||||||
from OpenSSL.SSL import VERIFY_NONE
|
from OpenSSL.SSL import VERIFY_NONE
|
||||||
from twisted.internet import defer, protocol, ssl
|
from twisted.internet import defer, error as twisted_error, protocol, ssl
|
||||||
from twisted.internet.interfaces import (
|
from twisted.internet.interfaces import (
|
||||||
IReactorPluggableNameResolver,
|
IReactorPluggableNameResolver,
|
||||||
IResolutionReceiver,
|
IResolutionReceiver,
|
||||||
|
@ -46,17 +45,18 @@ from twisted.internet.interfaces import (
|
||||||
from twisted.internet.task import Cooperator
|
from twisted.internet.task import Cooperator
|
||||||
from twisted.python.failure import Failure
|
from twisted.python.failure import Failure
|
||||||
from twisted.web._newclient import ResponseDone
|
from twisted.web._newclient import ResponseDone
|
||||||
from twisted.web.client import Agent, HTTPConnectionPool, readBody
|
from twisted.web.client import (
|
||||||
|
Agent,
|
||||||
|
HTTPConnectionPool,
|
||||||
|
ResponseNeverReceived,
|
||||||
|
readBody,
|
||||||
|
)
|
||||||
from twisted.web.http import PotentialDataLoss
|
from twisted.web.http import PotentialDataLoss
|
||||||
from twisted.web.http_headers import Headers
|
from twisted.web.http_headers import Headers
|
||||||
from twisted.web.iweb import IResponse
|
from twisted.web.iweb import IResponse
|
||||||
|
|
||||||
from synapse.api.errors import Codes, HttpResponseException, SynapseError
|
from synapse.api.errors import Codes, HttpResponseException, SynapseError
|
||||||
from synapse.http import (
|
from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri
|
||||||
QuieterFileBodyProducer,
|
|
||||||
cancelled_to_request_timed_out_error,
|
|
||||||
redact_uri,
|
|
||||||
)
|
|
||||||
from synapse.http.proxyagent import ProxyAgent
|
from synapse.http.proxyagent import ProxyAgent
|
||||||
from synapse.logging.context import make_deferred_yieldable
|
from synapse.logging.context import make_deferred_yieldable
|
||||||
from synapse.logging.opentracing import set_tag, start_active_span, tags
|
from synapse.logging.opentracing import set_tag, start_active_span, tags
|
||||||
|
@ -332,8 +332,6 @@ class SimpleHttpClient:
|
||||||
RequestTimedOutError if the request times out before the headers are read
|
RequestTimedOutError if the request times out before the headers are read
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# A small wrapper around self.agent.request() so we can easily attach
|
|
||||||
# counters to it
|
|
||||||
outgoing_requests_counter.labels(method).inc()
|
outgoing_requests_counter.labels(method).inc()
|
||||||
|
|
||||||
# log request but strip `access_token` (AS requests for example include this)
|
# log request but strip `access_token` (AS requests for example include this)
|
||||||
|
@ -362,15 +360,17 @@ class SimpleHttpClient:
|
||||||
data=body_producer,
|
data=body_producer,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
**self._extra_treq_args
|
**self._extra_treq_args
|
||||||
)
|
) # type: defer.Deferred
|
||||||
|
|
||||||
# we use our own timeout mechanism rather than treq's as a workaround
|
# we use our own timeout mechanism rather than treq's as a workaround
|
||||||
# for https://twistedmatrix.com/trac/ticket/9534.
|
# for https://twistedmatrix.com/trac/ticket/9534.
|
||||||
request_deferred = timeout_deferred(
|
request_deferred = timeout_deferred(
|
||||||
request_deferred,
|
request_deferred, 60, self.hs.get_reactor(),
|
||||||
60,
|
|
||||||
self.hs.get_reactor(),
|
|
||||||
cancelled_to_request_timed_out_error,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# turn timeouts into RequestTimedOutErrors
|
||||||
|
request_deferred.addErrback(_timeout_to_request_timed_out_error)
|
||||||
|
|
||||||
response = await make_deferred_yieldable(request_deferred)
|
response = await make_deferred_yieldable(request_deferred)
|
||||||
|
|
||||||
incoming_responses_counter.labels(method, response.code).inc()
|
incoming_responses_counter.labels(method, response.code).inc()
|
||||||
|
@ -410,7 +410,7 @@ class SimpleHttpClient:
|
||||||
parsed json
|
parsed json
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
RequestTimedOutException: if there is a timeout before the response headers
|
RequestTimedOutError: if there is a timeout before the response headers
|
||||||
are received. Note there is currently no timeout on reading the response
|
are received. Note there is currently no timeout on reading the response
|
||||||
body.
|
body.
|
||||||
|
|
||||||
|
@ -461,7 +461,7 @@ class SimpleHttpClient:
|
||||||
parsed json
|
parsed json
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
RequestTimedOutException: if there is a timeout before the response headers
|
RequestTimedOutError: if there is a timeout before the response headers
|
||||||
are received. Note there is currently no timeout on reading the response
|
are received. Note there is currently no timeout on reading the response
|
||||||
body.
|
body.
|
||||||
|
|
||||||
|
@ -506,7 +506,7 @@ class SimpleHttpClient:
|
||||||
Returns:
|
Returns:
|
||||||
Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON.
|
Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON.
|
||||||
Raises:
|
Raises:
|
||||||
RequestTimedOutException: if there is a timeout before the response headers
|
RequestTimedOutError: if there is a timeout before the response headers
|
||||||
are received. Note there is currently no timeout on reading the response
|
are received. Note there is currently no timeout on reading the response
|
||||||
body.
|
body.
|
||||||
|
|
||||||
|
@ -538,7 +538,7 @@ class SimpleHttpClient:
|
||||||
Returns:
|
Returns:
|
||||||
Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON.
|
Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON.
|
||||||
Raises:
|
Raises:
|
||||||
RequestTimedOutException: if there is a timeout before the response headers
|
RequestTimedOutError: if there is a timeout before the response headers
|
||||||
are received. Note there is currently no timeout on reading the response
|
are received. Note there is currently no timeout on reading the response
|
||||||
body.
|
body.
|
||||||
|
|
||||||
|
@ -586,7 +586,7 @@ class SimpleHttpClient:
|
||||||
Succeeds when we get a 2xx HTTP response, with the
|
Succeeds when we get a 2xx HTTP response, with the
|
||||||
HTTP body as bytes.
|
HTTP body as bytes.
|
||||||
Raises:
|
Raises:
|
||||||
RequestTimedOutException: if there is a timeout before the response headers
|
RequestTimedOutError: if there is a timeout before the response headers
|
||||||
are received. Note there is currently no timeout on reading the response
|
are received. Note there is currently no timeout on reading the response
|
||||||
body.
|
body.
|
||||||
|
|
||||||
|
@ -631,7 +631,7 @@ class SimpleHttpClient:
|
||||||
headers, absolute URI of the response and HTTP response code.
|
headers, absolute URI of the response and HTTP response code.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
RequestTimedOutException: if there is a timeout before the response headers
|
RequestTimedOutError: if there is a timeout before the response headers
|
||||||
are received. Note there is currently no timeout on reading the response
|
are received. Note there is currently no timeout on reading the response
|
||||||
body.
|
body.
|
||||||
|
|
||||||
|
@ -684,6 +684,18 @@ class SimpleHttpClient:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _timeout_to_request_timed_out_error(f: Failure):
|
||||||
|
if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError):
|
||||||
|
# The TCP connection has its own timeout (set by the 'connectTimeout' param
|
||||||
|
# on the Agent), which raises twisted_error.TimeoutError exception.
|
||||||
|
raise RequestTimedOutError("Timeout connecting to remote server")
|
||||||
|
elif f.check(defer.TimeoutError, ResponseNeverReceived):
|
||||||
|
# this one means that we hit our overall timeout on the request
|
||||||
|
raise RequestTimedOutError("Timeout waiting for response from remote server")
|
||||||
|
|
||||||
|
return f
|
||||||
|
|
||||||
|
|
||||||
# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
|
# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
|
||||||
# The two should be factored out.
|
# The two should be factored out.
|
||||||
|
|
||||||
|
|
|
@ -171,7 +171,7 @@ async def _handle_json_response(
|
||||||
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
|
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
|
||||||
|
|
||||||
body = await make_deferred_yieldable(d)
|
body = await make_deferred_yieldable(d)
|
||||||
except TimeoutError as e:
|
except defer.TimeoutError as e:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"{%s} [%s] Timed out reading response - %s %s",
|
"{%s} [%s] Timed out reading response - %s %s",
|
||||||
request.txn_id,
|
request.txn_id,
|
||||||
|
@ -655,10 +655,14 @@ class MatrixFederationHttpClient:
|
||||||
long_retries (bool): whether to use the long retry algorithm. See
|
long_retries (bool): whether to use the long retry algorithm. See
|
||||||
docs on _send_request for details.
|
docs on _send_request for details.
|
||||||
|
|
||||||
timeout (int|None): number of milliseconds to wait for the response headers
|
timeout (int|None): number of milliseconds to wait for the response.
|
||||||
(including connecting to the server), *for each attempt*.
|
|
||||||
self._default_timeout (60s) by default.
|
self._default_timeout (60s) by default.
|
||||||
|
|
||||||
|
Note that we may make several attempts to send the request; this
|
||||||
|
timeout applies to the time spent waiting for response headers for
|
||||||
|
*each* attempt (including connection time) as well as the time spent
|
||||||
|
reading the response body after a 200 response.
|
||||||
|
|
||||||
ignore_backoff (bool): true to ignore the historical backoff data
|
ignore_backoff (bool): true to ignore the historical backoff data
|
||||||
and try the request anyway.
|
and try the request anyway.
|
||||||
backoff_on_404 (bool): True if we should count a 404 response as
|
backoff_on_404 (bool): True if we should count a 404 response as
|
||||||
|
@ -704,8 +708,13 @@ class MatrixFederationHttpClient:
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if timeout is not None:
|
||||||
|
_sec_timeout = timeout / 1000
|
||||||
|
else:
|
||||||
|
_sec_timeout = self.default_timeout
|
||||||
|
|
||||||
body = await _handle_json_response(
|
body = await _handle_json_response(
|
||||||
self.reactor, self.default_timeout, request, response, start_ms
|
self.reactor, _sec_timeout, request, response, start_ms
|
||||||
)
|
)
|
||||||
|
|
||||||
return body
|
return body
|
||||||
|
@ -734,10 +743,14 @@ class MatrixFederationHttpClient:
|
||||||
long_retries (bool): whether to use the long retry algorithm. See
|
long_retries (bool): whether to use the long retry algorithm. See
|
||||||
docs on _send_request for details.
|
docs on _send_request for details.
|
||||||
|
|
||||||
timeout (int|None): number of milliseconds to wait for the response headers
|
timeout (int|None): number of milliseconds to wait for the response.
|
||||||
(including connecting to the server), *for each attempt*.
|
|
||||||
self._default_timeout (60s) by default.
|
self._default_timeout (60s) by default.
|
||||||
|
|
||||||
|
Note that we may make several attempts to send the request; this
|
||||||
|
timeout applies to the time spent waiting for response headers for
|
||||||
|
*each* attempt (including connection time) as well as the time spent
|
||||||
|
reading the response body after a 200 response.
|
||||||
|
|
||||||
ignore_backoff (bool): true to ignore the historical backoff data and
|
ignore_backoff (bool): true to ignore the historical backoff data and
|
||||||
try the request anyway.
|
try the request anyway.
|
||||||
|
|
||||||
|
@ -801,10 +814,14 @@ class MatrixFederationHttpClient:
|
||||||
args (dict|None): A dictionary used to create query strings, defaults to
|
args (dict|None): A dictionary used to create query strings, defaults to
|
||||||
None.
|
None.
|
||||||
|
|
||||||
timeout (int|None): number of milliseconds to wait for the response headers
|
timeout (int|None): number of milliseconds to wait for the response.
|
||||||
(including connecting to the server), *for each attempt*.
|
|
||||||
self._default_timeout (60s) by default.
|
self._default_timeout (60s) by default.
|
||||||
|
|
||||||
|
Note that we may make several attempts to send the request; this
|
||||||
|
timeout applies to the time spent waiting for response headers for
|
||||||
|
*each* attempt (including connection time) as well as the time spent
|
||||||
|
reading the response body after a 200 response.
|
||||||
|
|
||||||
ignore_backoff (bool): true to ignore the historical backoff data
|
ignore_backoff (bool): true to ignore the historical backoff data
|
||||||
and try the request anyway.
|
and try the request anyway.
|
||||||
|
|
||||||
|
@ -840,8 +857,13 @@ class MatrixFederationHttpClient:
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if timeout is not None:
|
||||||
|
_sec_timeout = timeout / 1000
|
||||||
|
else:
|
||||||
|
_sec_timeout = self.default_timeout
|
||||||
|
|
||||||
body = await _handle_json_response(
|
body = await _handle_json_response(
|
||||||
self.reactor, self.default_timeout, request, response, start_ms
|
self.reactor, _sec_timeout, request, response, start_ms
|
||||||
)
|
)
|
||||||
|
|
||||||
return body
|
return body
|
||||||
|
@ -865,10 +887,14 @@ class MatrixFederationHttpClient:
|
||||||
long_retries (bool): whether to use the long retry algorithm. See
|
long_retries (bool): whether to use the long retry algorithm. See
|
||||||
docs on _send_request for details.
|
docs on _send_request for details.
|
||||||
|
|
||||||
timeout (int|None): number of milliseconds to wait for the response headers
|
timeout (int|None): number of milliseconds to wait for the response.
|
||||||
(including connecting to the server), *for each attempt*.
|
|
||||||
self._default_timeout (60s) by default.
|
self._default_timeout (60s) by default.
|
||||||
|
|
||||||
|
Note that we may make several attempts to send the request; this
|
||||||
|
timeout applies to the time spent waiting for response headers for
|
||||||
|
*each* attempt (including connection time) as well as the time spent
|
||||||
|
reading the response body after a 200 response.
|
||||||
|
|
||||||
ignore_backoff (bool): true to ignore the historical backoff data and
|
ignore_backoff (bool): true to ignore the historical backoff data and
|
||||||
try the request anyway.
|
try the request anyway.
|
||||||
|
|
||||||
|
@ -900,8 +926,13 @@ class MatrixFederationHttpClient:
|
||||||
ignore_backoff=ignore_backoff,
|
ignore_backoff=ignore_backoff,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if timeout is not None:
|
||||||
|
_sec_timeout = timeout / 1000
|
||||||
|
else:
|
||||||
|
_sec_timeout = self.default_timeout
|
||||||
|
|
||||||
body = await _handle_json_response(
|
body = await _handle_json_response(
|
||||||
self.reactor, self.default_timeout, request, response, start_ms
|
self.reactor, _sec_timeout, request, response, start_ms
|
||||||
)
|
)
|
||||||
return body
|
return body
|
||||||
|
|
||||||
|
|
|
@ -44,8 +44,11 @@ class ProxyAgent(_AgentBase):
|
||||||
`BrowserLikePolicyForHTTPS`, so unless you have special
|
`BrowserLikePolicyForHTTPS`, so unless you have special
|
||||||
requirements you can leave this as-is.
|
requirements you can leave this as-is.
|
||||||
|
|
||||||
connectTimeout (float): The amount of time that this Agent will wait
|
connectTimeout (Optional[float]): The amount of time that this Agent will wait
|
||||||
for the peer to accept a connection.
|
for the peer to accept a connection, in seconds. If 'None',
|
||||||
|
HostnameEndpoint's default (30s) will be used.
|
||||||
|
|
||||||
|
This is used for connections to both proxies and destination servers.
|
||||||
|
|
||||||
bindAddress (bytes): The local address for client sockets to bind to.
|
bindAddress (bytes): The local address for client sockets to bind to.
|
||||||
|
|
||||||
|
@ -108,6 +111,15 @@ class ProxyAgent(_AgentBase):
|
||||||
Returns:
|
Returns:
|
||||||
Deferred[IResponse]: completes when the header of the response has
|
Deferred[IResponse]: completes when the header of the response has
|
||||||
been received (regardless of the response status code).
|
been received (regardless of the response status code).
|
||||||
|
|
||||||
|
Can fail with:
|
||||||
|
SchemeNotSupported: if the uri is not http or https
|
||||||
|
|
||||||
|
twisted.internet.error.TimeoutError if the server we are connecting
|
||||||
|
to (proxy or destination) does not accept a connection before
|
||||||
|
connectTimeout.
|
||||||
|
|
||||||
|
... other things too.
|
||||||
"""
|
"""
|
||||||
uri = uri.strip()
|
uri = uri.strip()
|
||||||
if not _VALID_URI.match(uri):
|
if not _VALID_URI.match(uri):
|
||||||
|
|
|
@ -449,18 +449,8 @@ class ReadWriteLock:
|
||||||
R = TypeVar("R")
|
R = TypeVar("R")
|
||||||
|
|
||||||
|
|
||||||
def _cancelled_to_timed_out_error(value: R, timeout: float) -> R:
|
|
||||||
if isinstance(value, failure.Failure):
|
|
||||||
value.trap(CancelledError)
|
|
||||||
raise defer.TimeoutError(timeout, "Deferred")
|
|
||||||
return value
|
|
||||||
|
|
||||||
|
|
||||||
def timeout_deferred(
|
def timeout_deferred(
|
||||||
deferred: defer.Deferred,
|
deferred: defer.Deferred, timeout: float, reactor: IReactorTime,
|
||||||
timeout: float,
|
|
||||||
reactor: IReactorTime,
|
|
||||||
on_timeout_cancel: Optional[Callable[[Any, float], Any]] = None,
|
|
||||||
) -> defer.Deferred:
|
) -> defer.Deferred:
|
||||||
"""The in built twisted `Deferred.addTimeout` fails to time out deferreds
|
"""The in built twisted `Deferred.addTimeout` fails to time out deferreds
|
||||||
that have a canceller that throws exceptions. This method creates a new
|
that have a canceller that throws exceptions. This method creates a new
|
||||||
|
@ -469,27 +459,21 @@ def timeout_deferred(
|
||||||
|
|
||||||
(See https://twistedmatrix.com/trac/ticket/9534)
|
(See https://twistedmatrix.com/trac/ticket/9534)
|
||||||
|
|
||||||
NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
|
NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred.
|
||||||
|
|
||||||
|
NOTE: the TimeoutError raised by the resultant deferred is
|
||||||
|
twisted.internet.defer.TimeoutError, which is *different* to the built-in
|
||||||
|
TimeoutError, as well as various other TimeoutErrors you might have imported.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
deferred: The Deferred to potentially timeout.
|
deferred: The Deferred to potentially timeout.
|
||||||
timeout: Timeout in seconds
|
timeout: Timeout in seconds
|
||||||
reactor: The twisted reactor to use
|
reactor: The twisted reactor to use
|
||||||
on_timeout_cancel: A callable which is called immediately
|
|
||||||
after the deferred times out, and not if this deferred is
|
|
||||||
otherwise cancelled before the timeout.
|
|
||||||
|
|
||||||
It takes an arbitrary value, which is the value of the deferred at
|
|
||||||
that exact point in time (probably a CancelledError Failure), and
|
|
||||||
the timeout.
|
|
||||||
|
|
||||||
The default callable (if none is provided) will translate a
|
|
||||||
CancelledError Failure into a defer.TimeoutError.
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A new Deferred.
|
A new Deferred, which will errback with defer.TimeoutError on timeout.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
new_d = defer.Deferred()
|
new_d = defer.Deferred()
|
||||||
|
|
||||||
timed_out = [False]
|
timed_out = [False]
|
||||||
|
@ -502,18 +486,23 @@ def timeout_deferred(
|
||||||
except: # noqa: E722, if we throw any exception it'll break time outs
|
except: # noqa: E722, if we throw any exception it'll break time outs
|
||||||
logger.exception("Canceller failed during timeout")
|
logger.exception("Canceller failed during timeout")
|
||||||
|
|
||||||
|
# the cancel() call should have set off a chain of errbacks which
|
||||||
|
# will have errbacked new_d, but in case it hasn't, errback it now.
|
||||||
|
|
||||||
if not new_d.called:
|
if not new_d.called:
|
||||||
new_d.errback(defer.TimeoutError(timeout, "Deferred"))
|
new_d.errback(defer.TimeoutError("Timed out after %gs" % (timeout,)))
|
||||||
|
|
||||||
delayed_call = reactor.callLater(timeout, time_it_out)
|
delayed_call = reactor.callLater(timeout, time_it_out)
|
||||||
|
|
||||||
def convert_cancelled(value):
|
def convert_cancelled(value: failure.Failure):
|
||||||
if timed_out[0]:
|
# if the orgininal deferred was cancelled, and our timeout has fired, then
|
||||||
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
|
# the reason it was cancelled was due to our timeout. Turn the CancelledError
|
||||||
return to_call(value, timeout)
|
# into a TimeoutError.
|
||||||
|
if timed_out[0] and value.check(CancelledError):
|
||||||
|
raise defer.TimeoutError("Timed out after %gs" % (timeout,))
|
||||||
return value
|
return value
|
||||||
|
|
||||||
deferred.addBoth(convert_cancelled)
|
deferred.addErrback(convert_cancelled)
|
||||||
|
|
||||||
def cancel_timeout(result):
|
def cancel_timeout(result):
|
||||||
# stop the pending call to cancel the deferred if it's been fired
|
# stop the pending call to cancel the deferred if it's been fired
|
||||||
|
|
|
@ -318,14 +318,14 @@ class FederationClientTests(HomeserverTestCase):
|
||||||
r = self.successResultOf(d)
|
r = self.successResultOf(d)
|
||||||
self.assertEqual(r.code, 200)
|
self.assertEqual(r.code, 200)
|
||||||
|
|
||||||
def test_client_headers_no_body(self):
|
@parameterized.expand(["get_json", "post_json", "delete_json", "put_json"])
|
||||||
|
def test_timeout_reading_body(self, method_name: str):
|
||||||
"""
|
"""
|
||||||
If the HTTP request is connected, but gets no response before being
|
If the HTTP request is connected, but gets no response before being
|
||||||
timed out, it'll give a ResponseNeverReceived.
|
timed out, it'll give a RequestSendFailed with can_retry.
|
||||||
"""
|
"""
|
||||||
d = defer.ensureDeferred(
|
method = getattr(self.cl, method_name)
|
||||||
self.cl.post_json("testserv:8008", "foo/bar", timeout=10000)
|
d = defer.ensureDeferred(method("testserv:8008", "foo/bar", timeout=10000))
|
||||||
)
|
|
||||||
|
|
||||||
self.pump()
|
self.pump()
|
||||||
|
|
||||||
|
@ -349,7 +349,9 @@ class FederationClientTests(HomeserverTestCase):
|
||||||
self.reactor.advance(10.5)
|
self.reactor.advance(10.5)
|
||||||
f = self.failureResultOf(d)
|
f = self.failureResultOf(d)
|
||||||
|
|
||||||
self.assertIsInstance(f.value, TimeoutError)
|
self.assertIsInstance(f.value, RequestSendFailed)
|
||||||
|
self.assertTrue(f.value.can_retry)
|
||||||
|
self.assertIsInstance(f.value.inner_exception, defer.TimeoutError)
|
||||||
|
|
||||||
def test_client_requires_trailing_slashes(self):
|
def test_client_requires_trailing_slashes(self):
|
||||||
"""
|
"""
|
||||||
|
|
180
tests/http/test_simple_client.py
Normal file
180
tests/http/test_simple_client.py
Normal file
|
@ -0,0 +1,180 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
from mock import Mock
|
||||||
|
|
||||||
|
from netaddr import IPSet
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
from twisted.internet.error import DNSLookupError
|
||||||
|
|
||||||
|
from synapse.http import RequestTimedOutError
|
||||||
|
from synapse.http.client import SimpleHttpClient
|
||||||
|
from synapse.server import HomeServer
|
||||||
|
|
||||||
|
from tests.unittest import HomeserverTestCase
|
||||||
|
|
||||||
|
|
||||||
|
class SimpleHttpClientTests(HomeserverTestCase):
|
||||||
|
def prepare(self, reactor, clock, hs: "HomeServer"):
|
||||||
|
# Add a DNS entry for a test server
|
||||||
|
self.reactor.lookups["testserv"] = "1.2.3.4"
|
||||||
|
|
||||||
|
self.cl = hs.get_simple_http_client()
|
||||||
|
|
||||||
|
def test_dns_error(self):
|
||||||
|
"""
|
||||||
|
If the DNS lookup returns an error, it will bubble up.
|
||||||
|
"""
|
||||||
|
d = defer.ensureDeferred(self.cl.get_json("http://testserv2:8008/foo/bar"))
|
||||||
|
self.pump()
|
||||||
|
|
||||||
|
f = self.failureResultOf(d)
|
||||||
|
self.assertIsInstance(f.value, DNSLookupError)
|
||||||
|
|
||||||
|
def test_client_connection_refused(self):
|
||||||
|
d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar"))
|
||||||
|
|
||||||
|
self.pump()
|
||||||
|
|
||||||
|
# Nothing happened yet
|
||||||
|
self.assertNoResult(d)
|
||||||
|
|
||||||
|
clients = self.reactor.tcpClients
|
||||||
|
self.assertEqual(len(clients), 1)
|
||||||
|
(host, port, factory, _timeout, _bindAddress) = clients[0]
|
||||||
|
self.assertEqual(host, "1.2.3.4")
|
||||||
|
self.assertEqual(port, 8008)
|
||||||
|
e = Exception("go away")
|
||||||
|
factory.clientConnectionFailed(None, e)
|
||||||
|
self.pump(0.5)
|
||||||
|
|
||||||
|
f = self.failureResultOf(d)
|
||||||
|
|
||||||
|
self.assertIs(f.value, e)
|
||||||
|
|
||||||
|
def test_client_never_connect(self):
|
||||||
|
"""
|
||||||
|
If the HTTP request is not connected and is timed out, it'll give a
|
||||||
|
ConnectingCancelledError or TimeoutError.
|
||||||
|
"""
|
||||||
|
d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar"))
|
||||||
|
|
||||||
|
self.pump()
|
||||||
|
|
||||||
|
# Nothing happened yet
|
||||||
|
self.assertNoResult(d)
|
||||||
|
|
||||||
|
# Make sure treq is trying to connect
|
||||||
|
clients = self.reactor.tcpClients
|
||||||
|
self.assertEqual(len(clients), 1)
|
||||||
|
self.assertEqual(clients[0][0], "1.2.3.4")
|
||||||
|
self.assertEqual(clients[0][1], 8008)
|
||||||
|
|
||||||
|
# Deferred is still without a result
|
||||||
|
self.assertNoResult(d)
|
||||||
|
|
||||||
|
# Push by enough to time it out
|
||||||
|
self.reactor.advance(120)
|
||||||
|
f = self.failureResultOf(d)
|
||||||
|
|
||||||
|
self.assertIsInstance(f.value, RequestTimedOutError)
|
||||||
|
|
||||||
|
def test_client_connect_no_response(self):
|
||||||
|
"""
|
||||||
|
If the HTTP request is connected, but gets no response before being
|
||||||
|
timed out, it'll give a ResponseNeverReceived.
|
||||||
|
"""
|
||||||
|
d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar"))
|
||||||
|
|
||||||
|
self.pump()
|
||||||
|
|
||||||
|
# Nothing happened yet
|
||||||
|
self.assertNoResult(d)
|
||||||
|
|
||||||
|
# Make sure treq is trying to connect
|
||||||
|
clients = self.reactor.tcpClients
|
||||||
|
self.assertEqual(len(clients), 1)
|
||||||
|
self.assertEqual(clients[0][0], "1.2.3.4")
|
||||||
|
self.assertEqual(clients[0][1], 8008)
|
||||||
|
|
||||||
|
conn = Mock()
|
||||||
|
client = clients[0][2].buildProtocol(None)
|
||||||
|
client.makeConnection(conn)
|
||||||
|
|
||||||
|
# Deferred is still without a result
|
||||||
|
self.assertNoResult(d)
|
||||||
|
|
||||||
|
# Push by enough to time it out
|
||||||
|
self.reactor.advance(120)
|
||||||
|
f = self.failureResultOf(d)
|
||||||
|
|
||||||
|
self.assertIsInstance(f.value, RequestTimedOutError)
|
||||||
|
|
||||||
|
def test_client_ip_range_blacklist(self):
|
||||||
|
"""Ensure that Synapse does not try to connect to blacklisted IPs"""
|
||||||
|
|
||||||
|
# Add some DNS entries we'll blacklist
|
||||||
|
self.reactor.lookups["internal"] = "127.0.0.1"
|
||||||
|
self.reactor.lookups["internalv6"] = "fe80:0:0:0:0:8a2e:370:7337"
|
||||||
|
ip_blacklist = IPSet(["127.0.0.0/8", "fe80::/64"])
|
||||||
|
|
||||||
|
cl = SimpleHttpClient(self.hs, ip_blacklist=ip_blacklist)
|
||||||
|
|
||||||
|
# Try making a GET request to a blacklisted IPv4 address
|
||||||
|
# ------------------------------------------------------
|
||||||
|
# Make the request
|
||||||
|
d = defer.ensureDeferred(cl.get_json("http://internal:8008/foo/bar"))
|
||||||
|
self.pump(1)
|
||||||
|
|
||||||
|
# Check that it was unable to resolve the address
|
||||||
|
clients = self.reactor.tcpClients
|
||||||
|
self.assertEqual(len(clients), 0)
|
||||||
|
|
||||||
|
self.failureResultOf(d, DNSLookupError)
|
||||||
|
|
||||||
|
# Try making a POST request to a blacklisted IPv6 address
|
||||||
|
# -------------------------------------------------------
|
||||||
|
# Make the request
|
||||||
|
d = defer.ensureDeferred(
|
||||||
|
cl.post_json_get_json("http://internalv6:8008/foo/bar", {})
|
||||||
|
)
|
||||||
|
|
||||||
|
# Move the reactor forwards
|
||||||
|
self.pump(1)
|
||||||
|
|
||||||
|
# Check that it was unable to resolve the address
|
||||||
|
clients = self.reactor.tcpClients
|
||||||
|
self.assertEqual(len(clients), 0)
|
||||||
|
|
||||||
|
# Check that it was due to a blacklisted DNS lookup
|
||||||
|
self.failureResultOf(d, DNSLookupError)
|
||||||
|
|
||||||
|
# Try making a GET request to a non-blacklisted IPv4 address
|
||||||
|
# ----------------------------------------------------------
|
||||||
|
# Make the request
|
||||||
|
d = defer.ensureDeferred(cl.get_json("http://testserv:8008/foo/bar"))
|
||||||
|
|
||||||
|
# Nothing has happened yet
|
||||||
|
self.assertNoResult(d)
|
||||||
|
|
||||||
|
# Move the reactor forwards
|
||||||
|
self.pump(1)
|
||||||
|
|
||||||
|
# Check that it was able to resolve the address
|
||||||
|
clients = self.reactor.tcpClients
|
||||||
|
self.assertNotEqual(len(clients), 0)
|
||||||
|
|
||||||
|
# Connection will still fail as this IP address does not resolve to anything
|
||||||
|
self.failureResultOf(d, RequestTimedOutError)
|
Loading…
Reference in a new issue