skip some dict munging in event persistence (#11560)

Create a new dict helper method `simple_insert_many_values_txn`, which takes
raw row values, rather than {key=>value} dicts. This saves us a bunch of dict
munging, and makes it easier to use generators rather than creating
intermediate lists and dicts.
This commit is contained in:
Richard van der Hoff 2021-12-10 15:02:33 +00:00 committed by GitHub
parent 86e7a6d16e
commit f0562183e7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 114 additions and 60 deletions

1
changelog.d/11560.misc Normal file
View file

@ -0,0 +1 @@
Minor efficiency improvements in event persistence.

View file

@ -896,6 +896,9 @@ class DatabasePool:
) -> None: ) -> None:
"""Executes an INSERT query on the named table. """Executes an INSERT query on the named table.
The input is given as a list of dicts, with one dict per row.
Generally simple_insert_many_values should be preferred for new code.
Args: Args:
table: string giving the table name table: string giving the table name
values: dict of new column names and values for them values: dict of new column names and values for them
@ -909,6 +912,9 @@ class DatabasePool:
) -> None: ) -> None:
"""Executes an INSERT query on the named table. """Executes an INSERT query on the named table.
The input is given as a list of dicts, with one dict per row.
Generally simple_insert_many_values_txn should be preferred for new code.
Args: Args:
txn: The transaction to use. txn: The transaction to use.
table: string giving the table name table: string giving the table name
@ -933,23 +939,66 @@ class DatabasePool:
if k != keys[0]: if k != keys[0]:
raise RuntimeError("All items must have the same keys") raise RuntimeError("All items must have the same keys")
return DatabasePool.simple_insert_many_values_txn(txn, table, keys[0], vals)
async def simple_insert_many_values(
self,
table: str,
keys: Collection[str],
values: Iterable[Iterable[Any]],
desc: str,
) -> None:
"""Executes an INSERT query on the named table.
The input is given as a list of rows, where each row is a list of values.
(Actually any iterable is fine.)
Args:
table: string giving the table name
keys: list of column names
values: for each row, a list of values in the same order as `keys`
desc: description of the transaction, for logging and metrics
"""
await self.runInteraction(
desc, self.simple_insert_many_values_txn, table, keys, values
)
@staticmethod
def simple_insert_many_values_txn(
txn: LoggingTransaction,
table: str,
keys: Collection[str],
values: Iterable[Iterable[Any]],
) -> None:
"""Executes an INSERT query on the named table.
The input is given as a list of rows, where each row is a list of values.
(Actually any iterable is fine.)
Args:
txn: The transaction to use.
table: string giving the table name
keys: list of column names
values: for each row, a list of values in the same order as `keys`
"""
if isinstance(txn.database_engine, PostgresEngine): if isinstance(txn.database_engine, PostgresEngine):
# We use `execute_values` as it can be a lot faster than `execute_batch`, # We use `execute_values` as it can be a lot faster than `execute_batch`,
# but it's only available on postgres. # but it's only available on postgres.
sql = "INSERT INTO %s (%s) VALUES ?" % ( sql = "INSERT INTO %s (%s) VALUES ?" % (
table, table,
", ".join(k for k in keys[0]), ", ".join(k for k in keys),
) )
txn.execute_values(sql, vals, fetch=False) txn.execute_values(sql, values, fetch=False)
else: else:
sql = "INSERT INTO %s (%s) VALUES(%s)" % ( sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table, table,
", ".join(k for k in keys[0]), ", ".join(k for k in keys),
", ".join("?" for _ in keys[0]), ", ".join("?" for _ in keys),
) )
txn.execute_batch(sql, vals) txn.execute_batch(sql, values)
async def simple_upsert( async def simple_upsert(
self, self,

View file

@ -19,6 +19,7 @@ from collections import OrderedDict
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
Collection,
Dict, Dict,
Generator, Generator,
Iterable, Iterable,
@ -1319,14 +1320,13 @@ class PersistEventsStore:
return [ec for ec in events_and_contexts if ec[0] not in to_remove] return [ec for ec in events_and_contexts if ec[0] not in to_remove]
def _store_event_txn(self, txn, events_and_contexts): def _store_event_txn(
self,
txn: LoggingTransaction,
events_and_contexts: Collection[Tuple[EventBase, EventContext]],
) -> None:
"""Insert new events into the event, event_json, redaction and """Insert new events into the event, event_json, redaction and
state_events tables. state_events tables.
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
""" """
if not events_and_contexts: if not events_and_contexts:
@ -1339,46 +1339,58 @@ class PersistEventsStore:
d.pop("redacted_because", None) d.pop("redacted_because", None)
return d return d
self.db_pool.simple_insert_many_txn( self.db_pool.simple_insert_many_values_txn(
txn, txn,
table="event_json", table="event_json",
values=[ keys=("event_id", "room_id", "internal_metadata", "json", "format_version"),
{ values=(
"event_id": event.event_id, (
"room_id": event.room_id, event.event_id,
"internal_metadata": json_encoder.encode( event.room_id,
event.internal_metadata.get_dict() json_encoder.encode(event.internal_metadata.get_dict()),
), json_encoder.encode(event_dict(event)),
"json": json_encoder.encode(event_dict(event)), event.format_version,
"format_version": event.format_version, )
}
for event, _ in events_and_contexts for event, _ in events_and_contexts
], ),
) )
self.db_pool.simple_insert_many_txn( self.db_pool.simple_insert_many_values_txn(
txn, txn,
table="events", table="events",
values=[ keys=(
{ "instance_name",
"instance_name": self._instance_name, "stream_ordering",
"stream_ordering": event.internal_metadata.stream_ordering, "topological_ordering",
"topological_ordering": event.depth, "depth",
"depth": event.depth, "event_id",
"event_id": event.event_id, "room_id",
"room_id": event.room_id, "type",
"type": event.type, "processed",
"processed": True, "outlier",
"outlier": event.internal_metadata.is_outlier(), "origin_server_ts",
"origin_server_ts": int(event.origin_server_ts), "received_ts",
"received_ts": self._clock.time_msec(), "sender",
"sender": event.sender, "contains_url",
"contains_url": ( ),
"url" in event.content and isinstance(event.content["url"], str) values=(
), (
} self._instance_name,
event.internal_metadata.stream_ordering,
event.depth, # topological_ordering
event.depth, # depth
event.event_id,
event.room_id,
event.type,
True, # processed
event.internal_metadata.is_outlier(),
int(event.origin_server_ts),
self._clock.time_msec(),
event.sender,
"url" in event.content and isinstance(event.content["url"], str),
)
for event, _ in events_and_contexts for event, _ in events_and_contexts
], ),
) )
# If we're persisting an unredacted event we go and ensure # If we're persisting an unredacted event we go and ensure
@ -1397,23 +1409,15 @@ class PersistEventsStore:
) )
txn.execute(sql + clause, [False] + args) txn.execute(sql + clause, [False] + args)
state_events_and_contexts = [ self.db_pool.simple_insert_many_values_txn(
ec for ec in events_and_contexts if ec[0].is_state() txn,
] table="state_events",
keys=("event_id", "room_id", "type", "state_key"),
state_values = [] values=(
for event, _ in state_events_and_contexts: (event.event_id, event.room_id, event.type, event.state_key)
vals = { for event, _ in events_and_contexts
"event_id": event.event_id, if event.is_state()
"room_id": event.room_id, ),
"type": event.type,
"state_key": event.state_key,
}
state_values.append(vals)
self.db_pool.simple_insert_many_txn(
txn, table="state_events", values=state_values
) )
def _store_rejected_events_txn(self, txn, events_and_contexts): def _store_rejected_events_txn(self, txn, events_and_contexts):