Close ijson coroutines ourselves instead of letting the GC close them (#12875)

Hopefully this means that exceptions raised due to truncated JSON
get a sensible logging context and stack.

Signed-off-by: Sean Quah <seanq@matrix.org>
This commit is contained in:
Sean Quah 2022-05-27 11:03:05 +01:00 committed by GitHub
parent 7b88f5a107
commit bb7a637765
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 2 deletions

1
changelog.d/12875.bugfix Normal file
View file

@ -0,0 +1 @@
Explicitly close `ijson` coroutines once we are done with them, instead of leaving the garbage collector to close them.

View file

@ -1363,7 +1363,7 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
def __init__(self, room_version: RoomVersion, v1_api: bool): def __init__(self, room_version: RoomVersion, v1_api: bool):
self._response = SendJoinResponse([], [], event_dict={}) self._response = SendJoinResponse([], [], event_dict={})
self._room_version = room_version self._room_version = room_version
self._coros = [] self._coros: List[Generator[None, bytes, None]] = []
# The V1 API has the shape of `[200, {...}]`, which we handle by # The V1 API has the shape of `[200, {...}]`, which we handle by
# prefixing with `item.*`. # prefixing with `item.*`.
@ -1411,6 +1411,9 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
return len(data) return len(data)
def finish(self) -> SendJoinResponse: def finish(self) -> SendJoinResponse:
for c in self._coros:
c.close()
if self._response.event_dict: if self._response.event_dict:
self._response.event = make_event_from_dict( self._response.event = make_event_from_dict(
self._response.event_dict, self._room_version self._response.event_dict, self._room_version
@ -1430,7 +1433,7 @@ class _StateParser(ByteParser[StateRequestResponse]):
def __init__(self, room_version: RoomVersion): def __init__(self, room_version: RoomVersion):
self._response = StateRequestResponse([], []) self._response = StateRequestResponse([], [])
self._room_version = room_version self._room_version = room_version
self._coros = [ self._coros: List[Generator[None, bytes, None]] = [
ijson.items_coro( ijson.items_coro(
_event_list_parser(room_version, self._response.state), _event_list_parser(room_version, self._response.state),
"pdus.item", "pdus.item",
@ -1449,4 +1452,6 @@ class _StateParser(ByteParser[StateRequestResponse]):
return len(data) return len(data)
def finish(self) -> StateRequestResponse: def finish(self) -> StateRequestResponse:
for c in self._coros:
c.close()
return self._response return self._response

View file

@ -225,6 +225,7 @@ async def _handle_response(
if max_response_size is None: if max_response_size is None:
max_response_size = MAX_RESPONSE_SIZE max_response_size = MAX_RESPONSE_SIZE
finished = False
try: try:
check_content_type_is(response.headers, parser.CONTENT_TYPE) check_content_type_is(response.headers, parser.CONTENT_TYPE)
@ -233,6 +234,7 @@ async def _handle_response(
length = await make_deferred_yieldable(d) length = await make_deferred_yieldable(d)
finished = True
value = parser.finish() value = parser.finish()
except BodyExceededMaxSize as e: except BodyExceededMaxSize as e:
# The response was too big. # The response was too big.
@ -283,6 +285,15 @@ async def _handle_response(
e, e,
) )
raise raise
finally:
if not finished:
# There was an exception and we didn't `finish()` the parse.
# Let the parser know that it can free up any resources.
try:
parser.finish()
except Exception:
# Ignore any additional exceptions.
pass
time_taken_secs = reactor.seconds() - start_ms / 1000 time_taken_secs = reactor.seconds() - start_ms / 1000