Initial stab at refactoring the SQL tables, including rejigging some of the storage layer.

This commit is contained in:
Erik Johnston 2014-08-13 16:27:14 +01:00
parent 3dfa84bec8
commit 336987bb8d
6 changed files with 113 additions and 259 deletions

View file

@ -46,50 +46,83 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore,
self.event_factory = hs.get_event_factory() self.event_factory = hs.get_event_factory()
self.hs = hs self.hs = hs
@defer.inlineCallbacks
def persist_event(self, event): def persist_event(self, event):
if event.type == MessageEvent.TYPE: if event.type == RoomMemberEvent.TYPE:
return self.store_message( yield self._store_room_member(event)
user_id=event.user_id,
room_id=event.room_id,
msg_id=event.msg_id,
content=json.dumps(event.content)
)
elif event.type == RoomMemberEvent.TYPE:
return self.store_room_member(
user_id=event.target_user_id,
sender=event.user_id,
room_id=event.room_id,
content=event.content,
membership=event.content["membership"]
)
elif event.type == FeedbackEvent.TYPE: elif event.type == FeedbackEvent.TYPE:
return self.store_feedback( yield self._store_feedback(event)
room_id=event.room_id,
msg_id=event.msg_id,
msg_sender_id=event.msg_sender_id,
fb_sender_id=event.user_id,
fb_type=event.feedback_type,
content=json.dumps(event.content)
)
elif event.type == RoomTopicEvent.TYPE:
return self.store_room_data(
room_id=event.room_id,
etype=event.type,
state_key=event.state_key,
content=json.dumps(event.content)
)
elif event.type == RoomConfigEvent.TYPE: elif event.type == RoomConfigEvent.TYPE:
if "visibility" in event.content: yield self._store_room_config(event)
visibility = event.content["visibility"]
return self.store_room_config(
room_id=event.room_id,
visibility=visibility
)
self._store_event(event)
@defer.inlineCallbacks
def get_event(self, event_id):
events_dict = yield self._simple_select_one(
"events",
{"event_id": event_id},
[
"event_id",
"type",
"sender",
"room_id",
"content",
"unrecognized_keys"
],
)
event = self._parse_event_from_row(events_dict)
defer.returnValue(event)
@defer.inlineCallbacks
def _store_event(self, event):
vals = {
"event_id": event.event_id,
"event_type", event.type,
"sender": event.user_id,
"room_id": event.room_id,
"content": event.content,
}
unrec = {k: v for k, v in event.get_full_dict() if k not in vals.keys()}
val["unrecognized_keys"] = unrec
yield self._simple_insert("events", vals)
if hasattr(event, "state_key"):
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
"event_type": event.event_type,
"state_key": event.state_key,
}
if hasattr(event, "prev_state"):
vals["prev_state"] = event.prev_state
yield self._simple_insert("state_events", vals)
# TODO (erikj): We also need to update the current state table?
@defer.inlineCallbacks
def get_current_state(room_id, event_type=None, state_key="")
sql = (
"SELECT e.* FROM events as e"
"INNER JOIN current_state as c ON e.event_id = c.event_id "
"INNER JOIN state_events as s ON e.event_id = s.event_id "
"WHERE c.room_id = ? "
)
if event_type:
sql += " s.type = ? AND s.state_key = ? "
args = (room_id, event_type, state_key)
else: else:
raise NotImplementedError( args = (room_id, )
"Don't know how to persist type=%s" % event.type
) results = yield self._execute_query(sql, *args)
defer.returnValue(
def schema_path(schema): def schema_path(schema):

View file

@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
from twisted.internet import defer from twisted.internet import defer
@ -20,6 +19,8 @@ from twisted.internet import defer
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
import collections import collections
import json
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -28,6 +29,7 @@ class SQLBaseStore(object):
def __init__(self, hs): def __init__(self, hs):
self._db_pool = hs.get_db_pool() self._db_pool = hs.get_db_pool()
self.event_factory = hs.get_event_factory()
def cursor_to_dict(self, cursor): def cursor_to_dict(self, cursor):
"""Converts a SQL cursor into an list of dicts. """Converts a SQL cursor into an list of dicts.
@ -63,6 +65,9 @@ class SQLBaseStore(object):
return decoder(cursor) return decoder(cursor)
return self._db_pool.runInteraction(interaction) return self._db_pool.runInteraction(interaction)
def _execut_query(self, query, *args):
return self._execute(self.cursor_to_dict, *args)
# "Simple" SQL API methods that operate on a single table with no JOINs, # "Simple" SQL API methods that operate on a single table with no JOINs,
# no complex WHERE clauses, just a dict of values for columns. # no complex WHERE clauses, just a dict of values for columns.
@ -279,6 +284,16 @@ class SQLBaseStore(object):
return self._db_pool.runInteraction(func) return self._db_pool.runInteraction(func)
def _parse_event_from_row(self, row_dict):
d = copy.deepcopy({k: v for k, v in row.items() if v})
d.update(json.loads(row["unrecognized_keys"]))
del d["unrecognized_keys"]
return self.event_factory.create_event(
etype=d["type"],
**d
)
class Table(object): class Table(object):
""" A base class used to store information about a particular table. """ A base class used to store information about a particular table.

View file

@ -22,54 +22,27 @@ import json
class FeedbackStore(SQLBaseStore): class FeedbackStore(SQLBaseStore):
def store_feedback(self, room_id, msg_id, msg_sender_id, def _store_feedback(self, event):
fb_sender_id, fb_type, content): return self._simple_insert("feedback", {
return self._simple_insert(FeedbackTable.table_name, dict( "event_id": event.event_id,
room_id=room_id, "feedback_type": event.feedback_type,
msg_id=msg_id, "room_id": event.room_id,
msg_sender_id=msg_sender_id, "target_event_id": event.target_event,
fb_sender_id=fb_sender_id, })
fb_type=fb_type,
content=content,
))
def get_feedback(self, room_id=None, msg_id=None, msg_sender_id=None, @defer.inlineCallback
fb_sender_id=None, fb_type=None): def get_feedback_for_event(self, event_id):
query = FeedbackTable.select_statement( sql = (
"msg_sender_id = ? AND room_id = ? AND msg_id = ? " + "SELECT events.* FROM events INNER JOIN feedback "
"AND fb_sender_id = ? AND feedback_type = ? " + "ON events.event_id = feedback.event_id "
"ORDER BY id DESC LIMIT 1") "WHERE feedback.target_event_id = ? "
return self._execute(
FeedbackTable.decode_single_result,
query, msg_sender_id, room_id, msg_id, fb_sender_id, fb_type,
) )
def get_max_feedback_id(self): rows = yield self._execute_query(sql, event_id)
return self._simple_max_id(FeedbackTable.table_name)
defer.returnValue(
class FeedbackTable(Table): [
table_name = "feedback" self._parse_event_from_row(r)
for r in rows
fields = [ ]
"id", )
"content",
"feedback_type",
"fb_sender_id",
"msg_id",
"room_id",
"msg_sender_id"
]
class EntryType(collections.namedtuple("FeedbackEntry", fields)):
def as_event(self, event_factory):
return event_factory.create_event(
etype=FeedbackEvent.TYPE,
room_id=self.room_id,
msg_id=self.msg_id,
msg_sender_id=self.msg_sender_id,
user_id=self.fb_sender_id,
feedback_type=self.feedback_type,
content=json.loads(self.content),
)

View file

@ -1,81 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore, Table
from synapse.api.events.room import MessageEvent
import collections
import json
class MessageStore(SQLBaseStore):
def get_message(self, user_id, room_id, msg_id):
"""Get a message from the store.
Args:
user_id (str): The ID of the user who sent the message.
room_id (str): The room the message was sent in.
msg_id (str): The unique ID for this user/room combo.
"""
query = MessagesTable.select_statement(
"user_id = ? AND room_id = ? AND msg_id = ? " +
"ORDER BY id DESC LIMIT 1")
return self._execute(
MessagesTable.decode_single_result,
query, user_id, room_id, msg_id,
)
def store_message(self, user_id, room_id, msg_id, content):
"""Store a message in the store.
Args:
user_id (str): The ID of the user who sent the message.
room_id (str): The room the message was sent in.
msg_id (str): The unique ID for this user/room combo.
content (str): The content of the message (JSON)
"""
return self._simple_insert(MessagesTable.table_name, dict(
user_id=user_id,
room_id=room_id,
msg_id=msg_id,
content=content,
))
def get_max_message_id(self):
return self._simple_max_id(MessagesTable.table_name)
class MessagesTable(Table):
table_name = "messages"
fields = [
"id",
"user_id",
"room_id",
"msg_id",
"content"
]
class EntryType(collections.namedtuple("MessageEntry", fields)):
def as_event(self, event_factory):
return event_factory.create_event(
etype=MessageEvent.TYPE,
room_id=self.room_id,
user_id=self.user_id,
msg_id=self.msg_id,
content=json.loads(self.content),
)

View file

@ -1,85 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore, Table
import collections
import json
class RoomDataStore(SQLBaseStore):
"""Provides various CRUD operations for Room Events. """
def get_room_data(self, room_id, etype, state_key=""):
"""Retrieve the data stored under this type and state_key.
Args:
room_id (str)
etype (str)
state_key (str)
Returns:
namedtuple: Or None if nothing exists at this path.
"""
query = RoomDataTable.select_statement(
"room_id = ? AND type = ? AND state_key = ? "
"ORDER BY id DESC LIMIT 1"
)
return self._execute(
RoomDataTable.decode_single_result,
query, room_id, etype, state_key,
)
def store_room_data(self, room_id, etype, state_key="", content=None):
"""Stores room specific data.
Args:
room_id (str)
etype (str)
state_key (str)
data (str)- The data to store for this path in JSON.
Returns:
The store ID for this data.
"""
return self._simple_insert(RoomDataTable.table_name, dict(
etype=etype,
state_key=state_key,
room_id=room_id,
content=content,
))
def get_max_room_data_id(self):
return self._simple_max_id(RoomDataTable.table_name)
class RoomDataTable(Table):
table_name = "room_data"
fields = [
"id",
"room_id",
"type",
"state_key",
"content"
]
class EntryType(collections.namedtuple("RoomDataEntry", fields)):
def as_event(self, event_factory):
return event_factory.create_event(
etype=self.type,
room_id=self.room_id,
content=json.loads(self.content),
)

View file

@ -16,8 +16,8 @@
CREATE TABLE IF NOT EXISTS events( CREATE TABLE IF NOT EXISTS events(
ordering INTEGER PRIMARY KEY AUTOINCREMENT, ordering INTEGER PRIMARY KEY AUTOINCREMENT,
event_id TEXT NOT NULL, event_id TEXT NOT NULL,
event_type TEXT NOT NULL, type TEXT NOT NULL,
sender TEXT, -- sender TEXT,
room_id TEXT, room_id TEXT,
content TEXT, content TEXT,
unrecognized_keys TEXT unrecognized_keys TEXT
@ -26,7 +26,7 @@ CREATE TABLE IF NOT EXISTS events(
CREATE TABLE IF NOT EXISTS state_events( CREATE TABLE IF NOT EXISTS state_events(
event_id TEXT NOT NULL, event_id TEXT NOT NULL,
room_id TEXT NOT NULL, room_id TEXT NOT NULL,
event_type TEXT NOT NULL, type TEXT NOT NULL,
state_key TEXT NOT NULL, state_key TEXT NOT NULL,
prev_state TEXT prev_state TEXT
); );
@ -47,9 +47,8 @@ CREATE TABLE IF NOT EXISTS room_memberships(
CREATE TABLE IF NOT EXISTS feedback( CREATE TABLE IF NOT EXISTS feedback(
event_id TEXT NOT NULL, event_id TEXT NOT NULL,
feedback_type TEXT, feedback_type TEXT,
fb_sender_id TEXT, target_event_id TEXT,sudo
room_id TEXT, room_id TEXT
content TEXT
); );
CREATE TABLE IF NOT EXISTS rooms( CREATE TABLE IF NOT EXISTS rooms(