1
0
Fork 0
mirror of https://github.com/element-hq/synapse.git synced 2025-04-09 14:43:58 +00:00
This commit is contained in:
Eric Eastwood 2025-03-28 15:56:43 +00:00 committed by GitHub
commit 684b0c38b1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 1189 additions and 412 deletions

1
changelog.d/17676.misc Normal file
View file

@ -0,0 +1 @@
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.

View file

@ -1473,6 +1473,11 @@ class DatabasePool:
key_values: Collection[Collection[Any]],
value_names: Collection[str],
value_values: Collection[Collection[Any]],
# Given these are the same type as the normal values, force keyword-only so that
# they can't be confused.
*,
insertion_value_names: Collection[str] = [],
insertion_value_values: Collection[Iterable[Any]] = [],
desc: str,
) -> None:
"""
@ -1498,6 +1503,8 @@ class DatabasePool:
key_values,
value_names,
value_values,
insertion_value_names=insertion_value_names,
insertion_value_values=insertion_value_values,
db_autocommit=autocommit,
)
@ -1509,6 +1516,11 @@ class DatabasePool:
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Collection[Iterable[Any]],
# Given these are the same type as the normal values, force keyword-only so that
# they can't be confused.
*,
insertion_value_names: Collection[str] = [],
insertion_value_values: Collection[Iterable[Any]] = [],
) -> None:
"""
Upsert, many times.
@ -1520,6 +1532,9 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
insertion_value_names: The value column names to use only when inserting
insertion_value_values: A list of each row's value column values to use only
when inserting. Ignored if `insertion_value_names` is empty.
"""
# If there's nothing to upsert, then skip executing the query.
if not key_values:
@ -1529,14 +1544,30 @@ class DatabasePool:
# zip() works correctly.
if not value_names:
value_values = [() for x in range(len(key_values))]
elif len(value_values) != len(key_values):
elif len(key_values) != len(value_values):
raise ValueError(
f"{len(key_values)} key rows and {len(value_values)} value rows: should be the same number."
)
# No value columns, therefore make a blank list so that the following
# zip() works correctly.
if not insertion_value_names:
insertion_value_values = [() for x in range(len(key_values))]
elif len(key_values) != len(insertion_value_values):
raise ValueError(
f"{len(key_values)} key rows and {len(insertion_value_values)} insertion value rows: should be the same number."
)
if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
txn, table, key_names, key_values, value_names, value_values
txn,
table,
key_names,
key_values,
value_names,
value_values,
insertion_value_names=insertion_value_names,
insertion_value_values=insertion_value_values,
)
else:
return self.simple_upsert_many_txn_emulated(
@ -1546,6 +1577,8 @@ class DatabasePool:
key_values,
value_names,
value_values,
insertion_value_names=insertion_value_names,
insertion_value_values=insertion_value_values,
)
def simple_upsert_many_txn_emulated(
@ -1556,6 +1589,11 @@ class DatabasePool:
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
# Given these are the same type as the normal values, force keyword-only so that
# they can't be confused.
*,
insertion_value_names: Collection[str] = [],
insertion_value_values: Collection[Iterable[Any]] = [],
) -> None:
"""
Upsert, many times, but without native UPSERT support or batching.
@ -1567,6 +1605,9 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
insertion_value_names: The value column names to use only when inserting
insertion_value_values: A list of each row's value column values to use only
when inserting. Ignored if `insertion_value_names` is empty.
"""
# Lock the table just once, to prevent it being done once per row.
@ -1574,11 +1615,16 @@ class DatabasePool:
# the lock is held for the remainder of the current transaction.
self.engine.lock_table(txn, table)
for keyv, valv in zip(key_values, value_values):
for keyv, valv, insertionv in zip(
key_values, value_values, insertion_value_values
):
_keys = dict(zip(key_names, keyv))
_vals = dict(zip(value_names, valv))
_insertion_vals = dict(zip(insertion_value_names, insertionv))
self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False)
self.simple_upsert_txn_emulated(
txn, table, _keys, _vals, insertion_values=_insertion_vals, lock=False
)
@staticmethod
def simple_upsert_many_txn_native_upsert(
@ -1588,6 +1634,11 @@ class DatabasePool:
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
# Given these are the same type as the normal values, force keyword-only so that
# they can't be confused.
*,
insertion_value_names: Collection[str] = [],
insertion_value_values: Collection[Iterable[Any]] = [],
) -> None:
"""
Upsert, many times, using batching where possible.
@ -1599,10 +1650,14 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
insertion_value_names: The value column names to use only when inserting
insertion_value_values: A list of each row's value column values to use only
when inserting. Ignored if `insertion_value_names` is empty.
"""
allnames: List[str] = []
allnames.extend(key_names)
allnames.extend(value_names)
allnames.extend(insertion_value_names)
if not value_names:
latter = "NOTHING"
@ -1613,8 +1668,8 @@ class DatabasePool:
args = []
for x, y in zip(key_values, value_values):
args.append(tuple(x) + tuple(y))
for x, y, z in zip(key_values, value_values, insertion_value_values):
args.append(tuple(x) + tuple(y) + tuple(z))
if isinstance(txn.database_engine, PostgresEngine):
# We use `execute_values` as it can be a lot faster than `execute_batch`,

View file

@ -137,7 +137,7 @@ class SlidingSyncStateInsertValues(TypedDict, total=False):
"""
room_type: Optional[str]
is_encrypted: Optional[bool]
is_encrypted: bool
room_name: Optional[str]
tombstone_successor_room_id: Optional[str]
@ -150,7 +150,7 @@ class SlidingSyncMembershipSnapshotSharedInsertValues(
multiple memberships
"""
has_known_state: Optional[bool]
has_known_state: bool
@attr.s(slots=True, auto_attribs=True)

File diff suppressed because it is too large Load diff

View file

@ -31,6 +31,11 @@ from tests import unittest
class UpdateUpsertManyTests(unittest.HomeserverTestCase):
"""
Integration tests for the "simple" SQL generating methods in SQLBaseStore that
actually run against a database.
"""
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.storage = hs.get_datastores().main
@ -130,6 +135,65 @@ class UpdateUpsertManyTests(unittest.HomeserverTestCase):
{(1, "user1", "hello"), (2, "user2", "bleb")},
)
def test_upsert_many_with_insertion_values(self) -> None:
"""
Upsert_many will only insert the
`insertion_value_names`/`insertion_value_values` (not on update/conflict)
"""
# Add some data to an empty table
key_names = ["id", "username"]
key_values = [[1, "user1"], [2, "user2"]]
value_names: List[str] = []
value_values: List[List[str]] = []
insertion_value_names = ["value"]
insertion_value_values = [["hello"], ["there"]]
self.get_success(
self.storage.db_pool.runInteraction(
"test",
self.storage.db_pool.simple_upsert_many_txn,
self.table_name,
key_names,
key_values,
value_names,
value_values,
insertion_value_names=insertion_value_names,
insertion_value_values=insertion_value_values,
)
)
# Check results are what we expect
self.assertEqual(
set(self._dump_table_to_tuple()),
{(1, "user1", "hello"), (2, "user2", "there")},
)
# Update only user2
key_values = [[2, "user2"]]
# Since this row already exists, when we try to insert it again, it should not
# insert the value again.
insertion_value_values = [["again"]]
self.get_success(
self.storage.db_pool.runInteraction(
"test",
self.storage.db_pool.simple_upsert_many_txn,
self.table_name,
key_names,
key_values,
value_names,
value_values,
insertion_value_names=insertion_value_names,
insertion_value_values=insertion_value_values,
)
)
# Check results are what we expect
self.assertEqual(
set(self._dump_table_to_tuple()),
{(1, "user1", "hello"), (2, "user2", "there")},
)
def test_simple_update_many(self) -> None:
"""
simple_update_many performs many updates at once.

View file

@ -35,7 +35,12 @@ from tests.utils import USE_POSTGRES_FOR_TESTS, default_config
class SQLBaseStoreTestCase(unittest.TestCase):
"""Test the "simple" SQL generating methods in SQLBaseStore."""
"""
Test the "simple" SQL generating methods in SQLBaseStore.
Tests that the SQL is generated correctly and that the correct arguments are passed
(does not actually run the queries or test the end-result in the database).
"""
def setUp(self) -> None:
# This is the Twisted connection pool.
@ -620,6 +625,74 @@ class SQLBaseStoreTestCase(unittest.TestCase):
[("oldvalue",)],
)
@defer.inlineCallbacks
def test_upsert_many_no_values_with_insertion_values(
self,
) -> Generator["defer.Deferred[object]", object, None]:
yield defer.ensureDeferred(
self.datastore.db_pool.simple_upsert_many(
table="tablename",
key_names=["keycol1"],
key_values=[["keyval1.1"], ["keyval1.2"]],
value_names=[],
value_values=[],
insertion_value_names=["insertioncol1"],
insertion_value_values=[["insertionvalue1"], ["insertionvalue2"]],
desc="",
)
)
if USE_POSTGRES_FOR_TESTS:
self.mock_execute_values.assert_called_once_with(
self.mock_txn,
"INSERT INTO tablename (keycol1, insertioncol1) VALUES ? ON CONFLICT (keycol1) DO NOTHING",
[("keyval1.1", "insertionvalue1"), ("keyval1.2", "insertionvalue2")],
template=None,
fetch=False,
)
else:
self.mock_txn.executemany.assert_called_once_with(
"INSERT INTO tablename (keycol1, insertioncol1) VALUES (?, ?) ON CONFLICT (keycol1) DO NOTHING",
[("keyval1.1", "insertionvalue1"), ("keyval1.2", "insertionvalue2")],
)
@defer.inlineCallbacks
def test_upsert_many_values_with_insertion_values(
self,
) -> Generator["defer.Deferred[object]", object, None]:
yield defer.ensureDeferred(
self.datastore.db_pool.simple_upsert_many(
table="tablename",
key_names=["keycol1"],
key_values=[["keyval1.1"], ["keyval1.2"]],
value_names=["valuecol1"],
value_values=[["value1"], ["value2"]],
insertion_value_names=["insertioncol1"],
insertion_value_values=[["insertionvalue1"], ["insertionvalue2"]],
desc="",
)
)
if USE_POSTGRES_FOR_TESTS:
self.mock_execute_values.assert_called_once_with(
self.mock_txn,
"INSERT INTO tablename (keycol1, valuecol1, insertioncol1) VALUES ? ON CONFLICT (keycol1) DO UPDATE SET valuecol1=EXCLUDED.valuecol1",
[
("keyval1.1", "value1", "insertionvalue1"),
("keyval1.2", "value2", "insertionvalue2"),
],
template=None,
fetch=False,
)
else:
self.mock_txn.executemany.assert_called_once_with(
"INSERT INTO tablename (keycol1, valuecol1, insertioncol1) VALUES (?, ?, ?) ON CONFLICT (keycol1) DO UPDATE SET valuecol1=EXCLUDED.valuecol1",
[
("keyval1.1", "value1", "insertionvalue1"),
("keyval1.2", "value2", "insertionvalue2"),
],
)
@defer.inlineCallbacks
def test_upsert_emulated_no_values_exists(
self,

View file

@ -4416,6 +4416,222 @@ class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
),
)
def test_membership_snapshots_background_update_multiple_memberships(self) -> None:
"""
Test that the background update for `sliding_sync_membership_snapshots` works
correctly when we populate multiple memberships in a room. Make sure the
snapshot re-use isn't flawed.
"""
user1_id = self.register_user("user1", "pass")
_user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
user3_id = self.register_user("user3", "pass")
user3_tok = self.login(user3_id, "pass")
user4_id = self.register_user("user4", "pass")
user4_tok = self.login(user4_id, "pass")
user5_id = self.register_user("user5", "pass")
_user5_tok = self.login(user5_id, "pass")
user6_id = self.register_user("user6", "pass")
user6_tok = self.login(user6_id, "pass")
# Create a room
room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
# Add a room name
self.helper.send_state(
room_id,
EventTypes.Name,
{"name": "my super room"},
tok=user2_tok,
)
# User1 is invited to the room
self.helper.invite(room_id, src=user2_id, targ=user1_id, tok=user2_tok)
# User3 joins the room
self.helper.join(room_id, user3_id, tok=user3_tok)
# User4 joins the room
self.helper.join(room_id, user4_id, tok=user4_tok)
# User5 is invited to the room
self.helper.invite(room_id, src=user2_id, targ=user5_id, tok=user2_tok)
# User6 joins the room
self.helper.join(room_id, user6_id, tok=user6_tok)
# Update the room name after all of the memberships have been added
self.helper.send_state(
room_id,
EventTypes.Name,
{"name": "my super duper room"},
tok=user2_tok,
)
# Clean-up the `sliding_sync_membership_snapshots` table as if the inserts did not
# happen during event creation.
self.get_success(
self.store.db_pool.simple_delete_many(
table="sliding_sync_membership_snapshots",
column="room_id",
iterable=(room_id,),
keyvalues={},
desc="sliding_sync_membership_snapshots.test_membership_snapshots_background_update_forgotten_missing",
)
)
# We shouldn't find anything in the table because we just deleted them in
# preparation for the test.
sliding_sync_membership_snapshots_results = (
self._get_sliding_sync_membership_snapshots()
)
self.assertIncludes(
set(sliding_sync_membership_snapshots_results.keys()),
set(),
exact=True,
)
# Insert and run the background update.
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
"progress_json": "{}",
},
)
)
self.store.db_pool.updates._all_done = False
self.wait_for_background_updates()
# Make sure the table is populated
sliding_sync_membership_snapshots_results = (
self._get_sliding_sync_membership_snapshots()
)
self.assertIncludes(
set(sliding_sync_membership_snapshots_results.keys()),
{
(room_id, user1_id),
(room_id, user2_id),
(room_id, user3_id),
(room_id, user4_id),
(room_id, user5_id),
(room_id, user6_id),
},
exact=True,
)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id)
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user1_id)),
_SlidingSyncMembershipSnapshotResult(
room_id=room_id,
user_id=user1_id,
sender=user2_id,
membership_event_id=state_map[(EventTypes.Member, user1_id)].event_id,
membership=Membership.INVITE,
event_stream_ordering=state_map[
(EventTypes.Member, user1_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
# Has the value included in the stripped state at the time of the invite
room_name="my super room",
is_encrypted=False,
tombstone_successor_room_id=None,
),
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user2_id)),
_SlidingSyncMembershipSnapshotResult(
room_id=room_id,
user_id=user2_id,
sender=user2_id,
membership_event_id=state_map[(EventTypes.Member, user2_id)].event_id,
membership=Membership.JOIN,
event_stream_ordering=state_map[
(EventTypes.Member, user2_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
# Even though the room name after the room creator joined, the
# background update uses current state for the room for joined users.
room_name="my super duper room",
is_encrypted=False,
tombstone_successor_room_id=None,
),
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user3_id)),
_SlidingSyncMembershipSnapshotResult(
room_id=room_id,
user_id=user3_id,
sender=user3_id,
membership_event_id=state_map[(EventTypes.Member, user3_id)].event_id,
membership=Membership.JOIN,
event_stream_ordering=state_map[
(EventTypes.Member, user3_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name="my super duper room",
is_encrypted=False,
tombstone_successor_room_id=None,
),
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user4_id)),
_SlidingSyncMembershipSnapshotResult(
room_id=room_id,
user_id=user4_id,
sender=user4_id,
membership_event_id=state_map[(EventTypes.Member, user4_id)].event_id,
membership=Membership.JOIN,
event_stream_ordering=state_map[
(EventTypes.Member, user4_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name="my super duper room",
is_encrypted=False,
tombstone_successor_room_id=None,
),
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user5_id)),
_SlidingSyncMembershipSnapshotResult(
room_id=room_id,
user_id=user5_id,
sender=user2_id,
membership_event_id=state_map[(EventTypes.Member, user5_id)].event_id,
membership=Membership.INVITE,
event_stream_ordering=state_map[
(EventTypes.Member, user5_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
# Has the value included in the stripped state at the time of the invite
room_name="my super room",
is_encrypted=False,
tombstone_successor_room_id=None,
),
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user6_id)),
_SlidingSyncMembershipSnapshotResult(
room_id=room_id,
user_id=user6_id,
sender=user6_id,
membership_event_id=state_map[(EventTypes.Member, user6_id)].event_id,
membership=Membership.JOIN,
event_stream_ordering=state_map[
(EventTypes.Member, user6_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name="my super duper room",
is_encrypted=False,
tombstone_successor_room_id=None,
),
)
class SlidingSyncTablesCatchUpBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
"""