mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-14 11:57:44 +00:00
Additional functionality for declaring worker types in Complement (#14921)
Co-authored-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>
This commit is contained in:
parent
e7b559d2ca
commit
003a25ae5c
3 changed files with 412 additions and 114 deletions
1
changelog.d/14921.misc
Normal file
1
changelog.d/14921.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add additional functionality to declaring worker types when starting Complement in worker mode.
|
|
@ -51,8 +51,7 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
|
||||||
# -z True if the length of string is zero.
|
# -z True if the length of string is zero.
|
||||||
if [[ -z "$SYNAPSE_WORKER_TYPES" ]]; then
|
if [[ -z "$SYNAPSE_WORKER_TYPES" ]]; then
|
||||||
export SYNAPSE_WORKER_TYPES="\
|
export SYNAPSE_WORKER_TYPES="\
|
||||||
event_persister, \
|
event_persister:2, \
|
||||||
event_persister, \
|
|
||||||
background_worker, \
|
background_worker, \
|
||||||
frontend_proxy, \
|
frontend_proxy, \
|
||||||
event_creator, \
|
event_creator, \
|
||||||
|
@ -64,7 +63,8 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
|
||||||
synchrotron, \
|
synchrotron, \
|
||||||
client_reader, \
|
client_reader, \
|
||||||
appservice, \
|
appservice, \
|
||||||
pusher"
|
pusher, \
|
||||||
|
stream_writers=account_data+presence+receipts+to_device+typing"
|
||||||
|
|
||||||
fi
|
fi
|
||||||
log "Workers requested: $SYNAPSE_WORKER_TYPES"
|
log "Workers requested: $SYNAPSE_WORKER_TYPES"
|
||||||
|
|
|
@ -19,8 +19,15 @@
|
||||||
# The environment variables it reads are:
|
# The environment variables it reads are:
|
||||||
# * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
|
# * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
|
||||||
# * SYNAPSE_REPORT_STATS: Whether to report stats.
|
# * SYNAPSE_REPORT_STATS: Whether to report stats.
|
||||||
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
|
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKERS_CONFIG
|
||||||
# below. Leave empty for no workers.
|
# below. Leave empty for no workers. Add a ':' and a number at the end to
|
||||||
|
# multiply that worker. Append multiple worker types with '+' to merge the
|
||||||
|
# worker types into a single worker. Add a name and a '=' to the front of a
|
||||||
|
# worker type to give this instance a name in logs and nginx.
|
||||||
|
# Examples:
|
||||||
|
# SYNAPSE_WORKER_TYPES='event_persister, federation_sender, client_reader'
|
||||||
|
# SYNAPSE_WORKER_TYPES='event_persister:2, federation_sender:2, client_reader'
|
||||||
|
# SYNAPSE_WORKER_TYPES='stream_writers=account_data+presence+typing'
|
||||||
# * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
|
# * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
|
||||||
# will be treated as Application Service registration files.
|
# will be treated as Application Service registration files.
|
||||||
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
|
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
|
||||||
|
@ -40,16 +47,33 @@
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
|
import re
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
|
from collections import defaultdict
|
||||||
|
from itertools import chain
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Optional, Set
|
from typing import (
|
||||||
|
Any,
|
||||||
|
Dict,
|
||||||
|
List,
|
||||||
|
Mapping,
|
||||||
|
MutableMapping,
|
||||||
|
NoReturn,
|
||||||
|
Optional,
|
||||||
|
Set,
|
||||||
|
SupportsIndex,
|
||||||
|
)
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
from jinja2 import Environment, FileSystemLoader
|
from jinja2 import Environment, FileSystemLoader
|
||||||
|
|
||||||
MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
|
MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
|
||||||
|
|
||||||
|
# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
|
||||||
|
# during processing with the name of the worker.
|
||||||
|
WORKER_PLACEHOLDER_NAME = "placeholder_name"
|
||||||
|
|
||||||
# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
|
# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
|
||||||
# Watching /_matrix/client needs a "client" listener
|
# Watching /_matrix/client needs a "client" listener
|
||||||
# Watching /_matrix/federation needs a "federation" listener
|
# Watching /_matrix/federation needs a "federation" listener
|
||||||
|
@ -70,11 +94,13 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||||
"endpoint_patterns": [
|
"endpoint_patterns": [
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
|
||||||
],
|
],
|
||||||
"shared_extra_conf": {"update_user_directory_from_worker": "user_dir1"},
|
"shared_extra_conf": {
|
||||||
|
"update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME
|
||||||
|
},
|
||||||
"worker_extra_conf": "",
|
"worker_extra_conf": "",
|
||||||
},
|
},
|
||||||
"media_repository": {
|
"media_repository": {
|
||||||
"app": "synapse.app.media_repository",
|
"app": "synapse.app.generic_worker",
|
||||||
"listener_resources": ["media"],
|
"listener_resources": ["media"],
|
||||||
"endpoint_patterns": [
|
"endpoint_patterns": [
|
||||||
"^/_matrix/media/",
|
"^/_matrix/media/",
|
||||||
|
@ -87,7 +113,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||||
# The first configured media worker will run the media background jobs
|
# The first configured media worker will run the media background jobs
|
||||||
"shared_extra_conf": {
|
"shared_extra_conf": {
|
||||||
"enable_media_repo": False,
|
"enable_media_repo": False,
|
||||||
"media_instance_running_background_jobs": "media_repository1",
|
"media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME,
|
||||||
},
|
},
|
||||||
"worker_extra_conf": "enable_media_repo: true",
|
"worker_extra_conf": "enable_media_repo: true",
|
||||||
},
|
},
|
||||||
|
@ -95,7 +121,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||||
"app": "synapse.app.generic_worker",
|
"app": "synapse.app.generic_worker",
|
||||||
"listener_resources": [],
|
"listener_resources": [],
|
||||||
"endpoint_patterns": [],
|
"endpoint_patterns": [],
|
||||||
"shared_extra_conf": {"notify_appservices_from_worker": "appservice1"},
|
"shared_extra_conf": {
|
||||||
|
"notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME
|
||||||
|
},
|
||||||
"worker_extra_conf": "",
|
"worker_extra_conf": "",
|
||||||
},
|
},
|
||||||
"federation_sender": {
|
"federation_sender": {
|
||||||
|
@ -192,9 +220,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||||
"app": "synapse.app.generic_worker",
|
"app": "synapse.app.generic_worker",
|
||||||
"listener_resources": [],
|
"listener_resources": [],
|
||||||
"endpoint_patterns": [],
|
"endpoint_patterns": [],
|
||||||
# This worker cannot be sharded. Therefore there should only ever be one background
|
# This worker cannot be sharded. Therefore, there should only ever be one
|
||||||
# worker, and it should be named background_worker1
|
# background worker. This is enforced for the safety of your database.
|
||||||
"shared_extra_conf": {"run_background_tasks_on": "background_worker1"},
|
"shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME},
|
||||||
"worker_extra_conf": "",
|
"worker_extra_conf": "",
|
||||||
},
|
},
|
||||||
"event_creator": {
|
"event_creator": {
|
||||||
|
@ -275,7 +303,7 @@ NGINX_LOCATION_CONFIG_BLOCK = """
|
||||||
"""
|
"""
|
||||||
|
|
||||||
NGINX_UPSTREAM_CONFIG_BLOCK = """
|
NGINX_UPSTREAM_CONFIG_BLOCK = """
|
||||||
upstream {upstream_worker_type} {{
|
upstream {upstream_worker_base_name} {{
|
||||||
{body}
|
{body}
|
||||||
}}
|
}}
|
||||||
"""
|
"""
|
||||||
|
@ -326,7 +354,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
|
||||||
|
|
||||||
def add_worker_roles_to_shared_config(
|
def add_worker_roles_to_shared_config(
|
||||||
shared_config: dict,
|
shared_config: dict,
|
||||||
worker_type: str,
|
worker_types_set: Set[str],
|
||||||
worker_name: str,
|
worker_name: str,
|
||||||
worker_port: int,
|
worker_port: int,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -334,22 +362,36 @@ def add_worker_roles_to_shared_config(
|
||||||
append appropriate worker information to it for the current worker_type instance.
|
append appropriate worker information to it for the current worker_type instance.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
shared_config: The config dict that all worker instances share (after being converted to YAML)
|
shared_config: The config dict that all worker instances share (after being
|
||||||
worker_type: The type of worker (one of those defined in WORKERS_CONFIG).
|
converted to YAML)
|
||||||
|
worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
|
||||||
|
This list can be a single worker type or multiple.
|
||||||
worker_name: The name of the worker instance.
|
worker_name: The name of the worker instance.
|
||||||
worker_port: The HTTP replication port that the worker instance is listening on.
|
worker_port: The HTTP replication port that the worker instance is listening on.
|
||||||
"""
|
"""
|
||||||
# The instance_map config field marks the workers that write to various replication streams
|
# The instance_map config field marks the workers that write to various replication
|
||||||
|
# streams
|
||||||
instance_map = shared_config.setdefault("instance_map", {})
|
instance_map = shared_config.setdefault("instance_map", {})
|
||||||
|
|
||||||
# Worker-type specific sharding config
|
# This is a list of the stream_writers that there can be only one of. Events can be
|
||||||
if worker_type == "pusher":
|
# sharded, and therefore doesn't belong here.
|
||||||
|
singular_stream_writers = [
|
||||||
|
"account_data",
|
||||||
|
"presence",
|
||||||
|
"receipts",
|
||||||
|
"to_device",
|
||||||
|
"typing",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Worker-type specific sharding config. Now a single worker can fulfill multiple
|
||||||
|
# roles, check each.
|
||||||
|
if "pusher" in worker_types_set:
|
||||||
shared_config.setdefault("pusher_instances", []).append(worker_name)
|
shared_config.setdefault("pusher_instances", []).append(worker_name)
|
||||||
|
|
||||||
elif worker_type == "federation_sender":
|
if "federation_sender" in worker_types_set:
|
||||||
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
|
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
|
||||||
|
|
||||||
elif worker_type == "event_persister":
|
if "event_persister" in worker_types_set:
|
||||||
# Event persisters write to the events stream, so we need to update
|
# Event persisters write to the events stream, so we need to update
|
||||||
# the list of event stream writers
|
# the list of event stream writers
|
||||||
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
|
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
|
||||||
|
@ -362,19 +404,154 @@ def add_worker_roles_to_shared_config(
|
||||||
"port": worker_port,
|
"port": worker_port,
|
||||||
}
|
}
|
||||||
|
|
||||||
elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]:
|
# Update the list of stream writers. It's convenient that the name of the worker
|
||||||
# Update the list of stream writers
|
# type is the same as the stream to write. Iterate over the whole list in case there
|
||||||
# It's convenient that the name of the worker type is the same as the stream to write
|
# is more than one.
|
||||||
shared_config.setdefault("stream_writers", {}).setdefault(
|
for worker in worker_types_set:
|
||||||
worker_type, []
|
if worker in singular_stream_writers:
|
||||||
).append(worker_name)
|
shared_config.setdefault("stream_writers", {}).setdefault(
|
||||||
|
worker, []
|
||||||
|
).append(worker_name)
|
||||||
|
|
||||||
# Map of stream writer instance names to host/ports combos
|
# Map of stream writer instance names to host/ports combos
|
||||||
# For now, all stream writers need http replication ports
|
# For now, all stream writers need http replication ports
|
||||||
instance_map[worker_name] = {
|
instance_map[worker_name] = {
|
||||||
"host": "localhost",
|
"host": "localhost",
|
||||||
"port": worker_port,
|
"port": worker_port,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def merge_worker_template_configs(
|
||||||
|
existing_dict: Dict[str, Any] | None,
|
||||||
|
to_be_merged_dict: Dict[str, Any],
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""When given an existing dict of worker template configuration consisting with both
|
||||||
|
dicts and lists, merge new template data from WORKERS_CONFIG(or create) and
|
||||||
|
return new dict.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
existing_dict: Either an existing worker template or a fresh blank one.
|
||||||
|
to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into
|
||||||
|
existing_dict.
|
||||||
|
Returns: The newly merged together dict values.
|
||||||
|
"""
|
||||||
|
new_dict: Dict[str, Any] = {}
|
||||||
|
if not existing_dict:
|
||||||
|
# It doesn't exist yet, just use the new dict(but take a copy not a reference)
|
||||||
|
new_dict = to_be_merged_dict.copy()
|
||||||
|
else:
|
||||||
|
for i in to_be_merged_dict.keys():
|
||||||
|
if (i == "endpoint_patterns") or (i == "listener_resources"):
|
||||||
|
# merge the two lists, remove duplicates
|
||||||
|
new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
|
||||||
|
elif i == "shared_extra_conf":
|
||||||
|
# merge dictionary's, the worker name will be replaced later
|
||||||
|
new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
|
||||||
|
elif i == "worker_extra_conf":
|
||||||
|
# There is only one worker type that has a 'worker_extra_conf' and it is
|
||||||
|
# the media_repo. Since duplicate worker types on the same worker don't
|
||||||
|
# work, this is fine.
|
||||||
|
new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
|
||||||
|
else:
|
||||||
|
# Everything else should be identical, like "app", which only works
|
||||||
|
# because all apps are now generic_workers.
|
||||||
|
new_dict[i] = to_be_merged_dict[i]
|
||||||
|
return new_dict
|
||||||
|
|
||||||
|
|
||||||
|
def insert_worker_name_for_worker_config(
|
||||||
|
existing_dict: Dict[str, Any], worker_name: str
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""Insert a given worker name into the worker's configuration dict.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
existing_dict: The worker_config dict that is imported into shared_config.
|
||||||
|
worker_name: The name of the worker to insert.
|
||||||
|
Returns: Copy of the dict with newly inserted worker name
|
||||||
|
"""
|
||||||
|
dict_to_edit = existing_dict.copy()
|
||||||
|
for k, v in dict_to_edit["shared_extra_conf"].items():
|
||||||
|
# Only proceed if it's the placeholder name string
|
||||||
|
if v == WORKER_PLACEHOLDER_NAME:
|
||||||
|
dict_to_edit["shared_extra_conf"][k] = worker_name
|
||||||
|
return dict_to_edit
|
||||||
|
|
||||||
|
|
||||||
|
def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
|
||||||
|
"""
|
||||||
|
Apply multiplier(if found) by returning a new expanded list with some basic error
|
||||||
|
checking.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
worker_types: The unprocessed List of requested workers
|
||||||
|
Returns:
|
||||||
|
A new list with all requested workers expanded.
|
||||||
|
"""
|
||||||
|
# Checking performed:
|
||||||
|
# 1. if worker:2 or more is declared, it will create additional workers up to number
|
||||||
|
# 2. if worker:1, it will create a single copy of this worker as if no number was
|
||||||
|
# given
|
||||||
|
# 3. if worker:0 is declared, this worker will be ignored. This is to allow for
|
||||||
|
# scripting and automated expansion and is intended behaviour.
|
||||||
|
# 4. if worker:NaN or is a negative number, it will error and log it.
|
||||||
|
new_worker_types = []
|
||||||
|
for worker_type in worker_types:
|
||||||
|
if ":" in worker_type:
|
||||||
|
worker_type_components = split_and_strip_string(worker_type, ":", 1)
|
||||||
|
worker_count = 0
|
||||||
|
# Should only be 2 components, a type of worker(s) and an integer as a
|
||||||
|
# string. Cast the number as an int then it can be used as a counter.
|
||||||
|
try:
|
||||||
|
worker_count = int(worker_type_components[1])
|
||||||
|
except ValueError:
|
||||||
|
error(
|
||||||
|
f"Bad number in worker count for '{worker_type}': "
|
||||||
|
f"'{worker_type_components[1]}' is not an integer"
|
||||||
|
)
|
||||||
|
|
||||||
|
# As long as there are more than 0, we add one to the list to make below.
|
||||||
|
for _ in range(worker_count):
|
||||||
|
new_worker_types.append(worker_type_components[0])
|
||||||
|
|
||||||
|
else:
|
||||||
|
# If it's not a real worker_type, it will error out later.
|
||||||
|
new_worker_types.append(worker_type)
|
||||||
|
return new_worker_types
|
||||||
|
|
||||||
|
|
||||||
|
def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
|
||||||
|
"""Helper to check to make sure worker types that cannot have multiples do not.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
worker_type: The type of worker to check against.
|
||||||
|
Returns: True if allowed, False if not
|
||||||
|
"""
|
||||||
|
return worker_type not in [
|
||||||
|
"background_worker",
|
||||||
|
"account_data",
|
||||||
|
"presence",
|
||||||
|
"receipts",
|
||||||
|
"typing",
|
||||||
|
"to_device",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def split_and_strip_string(
|
||||||
|
given_string: str, split_char: str, max_split: SupportsIndex = -1
|
||||||
|
) -> List[str]:
|
||||||
|
"""
|
||||||
|
Helper to split a string on split_char and strip whitespace from each end of each
|
||||||
|
element.
|
||||||
|
Args:
|
||||||
|
given_string: The string to split
|
||||||
|
split_char: The character to split the string on
|
||||||
|
max_split: kwarg for split() to limit how many times the split() happens
|
||||||
|
Returns:
|
||||||
|
A List of strings
|
||||||
|
"""
|
||||||
|
# Removes whitespace from ends of result strings before adding to list. Allow for
|
||||||
|
# overriding 'maxsplit' kwarg, default being -1 to signify no maximum.
|
||||||
|
return [x.strip() for x in given_string.split(split_char, maxsplit=max_split)]
|
||||||
|
|
||||||
|
|
||||||
def generate_base_homeserver_config() -> None:
|
def generate_base_homeserver_config() -> None:
|
||||||
|
@ -389,29 +566,153 @@ def generate_base_homeserver_config() -> None:
|
||||||
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
|
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_worker_types(
|
||||||
|
requested_worker_types: List[str],
|
||||||
|
) -> Dict[str, Set[str]]:
|
||||||
|
"""Read the desired list of requested workers and prepare the data for use in
|
||||||
|
generating worker config files while also checking for potential gotchas.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
requested_worker_types: The list formed from the split environment variable
|
||||||
|
containing the unprocessed requests for workers.
|
||||||
|
|
||||||
|
Returns: A dict of worker names to set of worker types. Format:
|
||||||
|
{'worker_name':
|
||||||
|
{'worker_type', 'worker_type2'}
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
# A counter of worker_base_name -> int. Used for determining the name for a given
|
||||||
|
# worker when generating its config file, as each worker's name is just
|
||||||
|
# worker_base_name followed by instance number
|
||||||
|
worker_base_name_counter: Dict[str, int] = defaultdict(int)
|
||||||
|
|
||||||
|
# Similar to above, but more finely grained. This is used to determine we don't have
|
||||||
|
# more than a single worker for cases where multiples would be bad(e.g. presence).
|
||||||
|
worker_type_shard_counter: Dict[str, int] = defaultdict(int)
|
||||||
|
|
||||||
|
# The final result of all this processing
|
||||||
|
dict_to_return: Dict[str, Set[str]] = {}
|
||||||
|
|
||||||
|
# Handle any multipliers requested for given workers.
|
||||||
|
multiple_processed_worker_types = apply_requested_multiplier_for_worker(
|
||||||
|
requested_worker_types
|
||||||
|
)
|
||||||
|
|
||||||
|
# Process each worker_type_string
|
||||||
|
# Examples of expected formats:
|
||||||
|
# - requested_name=type1+type2+type3
|
||||||
|
# - synchrotron
|
||||||
|
# - event_creator+event_persister
|
||||||
|
for worker_type_string in multiple_processed_worker_types:
|
||||||
|
# First, if a name is requested, use that — otherwise generate one.
|
||||||
|
worker_base_name: str = ""
|
||||||
|
if "=" in worker_type_string:
|
||||||
|
# Split on "=", remove extra whitespace from ends then make list
|
||||||
|
worker_type_split = split_and_strip_string(worker_type_string, "=")
|
||||||
|
if len(worker_type_split) > 2:
|
||||||
|
error(
|
||||||
|
"There should only be one '=' in the worker type string. "
|
||||||
|
f"Please fix: {worker_type_string}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Assign the name
|
||||||
|
worker_base_name = worker_type_split[0]
|
||||||
|
|
||||||
|
if not re.match(r"^[a-zA-Z0-9_+-]*[a-zA-Z_+-]$", worker_base_name):
|
||||||
|
# Apply a fairly narrow regex to the worker names. Some characters
|
||||||
|
# aren't safe for use in file paths or nginx configurations.
|
||||||
|
# Don't allow to end with a number because we'll add a number
|
||||||
|
# ourselves in a moment.
|
||||||
|
error(
|
||||||
|
"Invalid worker name; please choose a name consisting of "
|
||||||
|
"alphanumeric letters, _ + -, but not ending with a digit: "
|
||||||
|
f"{worker_base_name!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Continue processing the remainder of the worker_type string
|
||||||
|
# with the name override removed.
|
||||||
|
worker_type_string = worker_type_split[1]
|
||||||
|
|
||||||
|
# Split the worker_type_string on "+", remove whitespace from ends then make
|
||||||
|
# the list a set so it's deduplicated.
|
||||||
|
worker_types_set: Set[str] = set(
|
||||||
|
split_and_strip_string(worker_type_string, "+")
|
||||||
|
)
|
||||||
|
|
||||||
|
if not worker_base_name:
|
||||||
|
# No base name specified: generate one deterministically from set of
|
||||||
|
# types
|
||||||
|
worker_base_name = "+".join(sorted(worker_types_set))
|
||||||
|
|
||||||
|
# At this point, we have:
|
||||||
|
# worker_base_name which is the name for the worker, without counter.
|
||||||
|
# worker_types_set which is the set of worker types for this worker.
|
||||||
|
|
||||||
|
# Validate worker_type and make sure we don't allow sharding for a worker type
|
||||||
|
# that doesn't support it. Will error and stop if it is a problem,
|
||||||
|
# e.g. 'background_worker'.
|
||||||
|
for worker_type in worker_types_set:
|
||||||
|
# Verify this is a real defined worker type. If it's not, stop everything so
|
||||||
|
# it can be fixed.
|
||||||
|
if worker_type not in WORKERS_CONFIG:
|
||||||
|
error(
|
||||||
|
f"{worker_type} is an unknown worker type! Was found in "
|
||||||
|
f"'{worker_type_string}'. Please fix!"
|
||||||
|
)
|
||||||
|
|
||||||
|
if worker_type in worker_type_shard_counter:
|
||||||
|
if not is_sharding_allowed_for_worker_type(worker_type):
|
||||||
|
error(
|
||||||
|
f"There can be only a single worker with {worker_type} "
|
||||||
|
"type. Please recount and remove."
|
||||||
|
)
|
||||||
|
# Not in shard counter, must not have seen it yet, add it.
|
||||||
|
worker_type_shard_counter[worker_type] += 1
|
||||||
|
|
||||||
|
# Generate the number for the worker using incrementing counter
|
||||||
|
worker_base_name_counter[worker_base_name] += 1
|
||||||
|
worker_number = worker_base_name_counter[worker_base_name]
|
||||||
|
worker_name = f"{worker_base_name}{worker_number}"
|
||||||
|
|
||||||
|
if worker_number > 1:
|
||||||
|
# If this isn't the first worker, check that we don't have a confusing
|
||||||
|
# mixture of worker types with the same base name.
|
||||||
|
first_worker_with_base_name = dict_to_return[f"{worker_base_name}1"]
|
||||||
|
if first_worker_with_base_name != worker_types_set:
|
||||||
|
error(
|
||||||
|
f"Can not use worker_name: '{worker_name}' for worker_type(s): "
|
||||||
|
f"{worker_types_set!r}. It is already in use by "
|
||||||
|
f"worker_type(s): {first_worker_with_base_name!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
dict_to_return[worker_name] = worker_types_set
|
||||||
|
|
||||||
|
return dict_to_return
|
||||||
|
|
||||||
|
|
||||||
def generate_worker_files(
|
def generate_worker_files(
|
||||||
environ: Mapping[str, str], config_path: str, data_dir: str
|
environ: Mapping[str, str],
|
||||||
|
config_path: str,
|
||||||
|
data_dir: str,
|
||||||
|
requested_worker_types: Dict[str, Set[str]],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Read the desired list of workers from environment variables and generate
|
"""Read the desired workers(if any) that is passed in and generate shared
|
||||||
shared homeserver, nginx and supervisord configs.
|
homeserver, nginx and supervisord configs.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
environ: os.environ instance.
|
environ: os.environ instance.
|
||||||
config_path: The location of the generated Synapse main worker config file.
|
config_path: The location of the generated Synapse main worker config file.
|
||||||
data_dir: The location of the synapse data directory. Where log and
|
data_dir: The location of the synapse data directory. Where log and
|
||||||
user-facing config files live.
|
user-facing config files live.
|
||||||
|
requested_worker_types: A Dict containing requested workers in the format of
|
||||||
|
{'worker_name1': {'worker_type', ...}}
|
||||||
"""
|
"""
|
||||||
# Note that yaml cares about indentation, so care should be taken to insert lines
|
# Note that yaml cares about indentation, so care should be taken to insert lines
|
||||||
# into files at the correct indentation below.
|
# into files at the correct indentation below.
|
||||||
|
|
||||||
# shared_config is the contents of a Synapse config file that will be shared amongst
|
# First read the original config file and extract the listeners block. Then we'll
|
||||||
# the main Synapse process as well as all workers.
|
# add another listener for replication. Later we'll write out the result to the
|
||||||
# It is intended mainly for disabling functionality when certain workers are spun up,
|
# shared config file.
|
||||||
# and adding a replication listener.
|
|
||||||
|
|
||||||
# First read the original config file and extract the listeners block. Then we'll add
|
|
||||||
# another listener for replication. Later we'll write out the result to the shared
|
|
||||||
# config file.
|
|
||||||
listeners = [
|
listeners = [
|
||||||
{
|
{
|
||||||
"port": 9093,
|
"port": 9093,
|
||||||
|
@ -427,9 +728,9 @@ def generate_worker_files(
|
||||||
listeners += original_listeners
|
listeners += original_listeners
|
||||||
|
|
||||||
# The shared homeserver config. The contents of which will be inserted into the
|
# The shared homeserver config. The contents of which will be inserted into the
|
||||||
# base shared worker jinja2 template.
|
# base shared worker jinja2 template. This config file will be passed to all
|
||||||
#
|
# workers, included Synapse's main process. It is intended mainly for disabling
|
||||||
# This config file will be passed to all workers, included Synapse's main process.
|
# functionality when certain workers are spun up, and adding a replication listener.
|
||||||
shared_config: Dict[str, Any] = {"listeners": listeners}
|
shared_config: Dict[str, Any] = {"listeners": listeners}
|
||||||
|
|
||||||
# List of dicts that describe workers.
|
# List of dicts that describe workers.
|
||||||
|
@ -437,31 +738,20 @@ def generate_worker_files(
|
||||||
# program blocks.
|
# program blocks.
|
||||||
worker_descriptors: List[Dict[str, Any]] = []
|
worker_descriptors: List[Dict[str, Any]] = []
|
||||||
|
|
||||||
# Upstreams for load-balancing purposes. This dict takes the form of a worker type to the
|
# Upstreams for load-balancing purposes. This dict takes the form of the worker
|
||||||
# ports of each worker. For example:
|
# type to the ports of each worker. For example:
|
||||||
# {
|
# {
|
||||||
# worker_type: {1234, 1235, ...}}
|
# worker_type: {1234, 1235, ...}}
|
||||||
# }
|
# }
|
||||||
# and will be used to construct 'upstream' nginx directives.
|
# and will be used to construct 'upstream' nginx directives.
|
||||||
nginx_upstreams: Dict[str, Set[int]] = {}
|
nginx_upstreams: Dict[str, Set[int]] = {}
|
||||||
|
|
||||||
# A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be
|
# A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what
|
||||||
# placed after the proxy_pass directive. The main benefit to representing this data as a
|
# will be placed after the proxy_pass directive. The main benefit to representing
|
||||||
# dict over a str is that we can easily deduplicate endpoints across multiple instances
|
# this data as a dict over a str is that we can easily deduplicate endpoints
|
||||||
# of the same worker.
|
# across multiple instances of the same worker. The final rendering will be combined
|
||||||
#
|
# with nginx_upstreams and placed in /etc/nginx/conf.d.
|
||||||
# An nginx site config that will be amended to depending on the workers that are
|
nginx_locations: Dict[str, str] = {}
|
||||||
# spun up. To be placed in /etc/nginx/conf.d.
|
|
||||||
nginx_locations = {}
|
|
||||||
|
|
||||||
# Read the desired worker configuration from the environment
|
|
||||||
worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
|
|
||||||
if not worker_types_env:
|
|
||||||
# No workers, just the main process
|
|
||||||
worker_types = []
|
|
||||||
else:
|
|
||||||
# Split type names by comma, ignoring whitespace.
|
|
||||||
worker_types = [x.strip() for x in worker_types_env.split(",")]
|
|
||||||
|
|
||||||
# Create the worker configuration directory if it doesn't already exist
|
# Create the worker configuration directory if it doesn't already exist
|
||||||
os.makedirs("/conf/workers", exist_ok=True)
|
os.makedirs("/conf/workers", exist_ok=True)
|
||||||
|
@ -469,66 +759,57 @@ def generate_worker_files(
|
||||||
# Start worker ports from this arbitrary port
|
# Start worker ports from this arbitrary port
|
||||||
worker_port = 18009
|
worker_port = 18009
|
||||||
|
|
||||||
# A counter of worker_type -> int. Used for determining the name for a given
|
|
||||||
# worker type when generating its config file, as each worker's name is just
|
|
||||||
# worker_type + instance #
|
|
||||||
worker_type_counter: Dict[str, int] = {}
|
|
||||||
|
|
||||||
# A list of internal endpoints to healthcheck, starting with the main process
|
# A list of internal endpoints to healthcheck, starting with the main process
|
||||||
# which exists even if no workers do.
|
# which exists even if no workers do.
|
||||||
healthcheck_urls = ["http://localhost:8080/health"]
|
healthcheck_urls = ["http://localhost:8080/health"]
|
||||||
|
|
||||||
# For each worker type specified by the user, create config values
|
# Get the set of all worker types that we have configured
|
||||||
for worker_type in worker_types:
|
all_worker_types_in_use = set(chain(*requested_worker_types.values()))
|
||||||
worker_config = WORKERS_CONFIG.get(worker_type)
|
# Map locations to upstreams (corresponding to worker types) in Nginx
|
||||||
if worker_config:
|
# but only if we use the appropriate worker type
|
||||||
worker_config = worker_config.copy()
|
for worker_type in all_worker_types_in_use:
|
||||||
else:
|
for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]:
|
||||||
error(worker_type + " is an unknown worker type! Please fix!")
|
nginx_locations[endpoint_pattern] = f"http://{worker_type}"
|
||||||
|
|
||||||
new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
|
# For each worker type specified by the user, create config values and write it's
|
||||||
worker_type_counter[worker_type] = new_worker_count
|
# yaml config file
|
||||||
|
for worker_name, worker_types_set in requested_worker_types.items():
|
||||||
|
# The collected and processed data will live here.
|
||||||
|
worker_config: Dict[str, Any] = {}
|
||||||
|
|
||||||
|
# Merge all worker config templates for this worker into a single config
|
||||||
|
for worker_type in worker_types_set:
|
||||||
|
copy_of_template_config = WORKERS_CONFIG[worker_type].copy()
|
||||||
|
|
||||||
|
# Merge worker type template configuration data. It's a combination of lists
|
||||||
|
# and dicts, so use this helper.
|
||||||
|
worker_config = merge_worker_template_configs(
|
||||||
|
worker_config, copy_of_template_config
|
||||||
|
)
|
||||||
|
|
||||||
|
# Replace placeholder names in the config template with the actual worker name.
|
||||||
|
worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
|
||||||
|
|
||||||
# Name workers by their type concatenated with an incrementing number
|
|
||||||
# e.g. federation_reader1
|
|
||||||
worker_name = worker_type + str(new_worker_count)
|
|
||||||
worker_config.update(
|
worker_config.update(
|
||||||
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
|
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
|
||||||
)
|
)
|
||||||
|
|
||||||
# Update the shared config with any worker-type specific options
|
# Update the shared config with any worker_type specific options. The first of a
|
||||||
shared_config.update(worker_config["shared_extra_conf"])
|
# given worker_type needs to stay assigned and not be replaced.
|
||||||
|
worker_config["shared_extra_conf"].update(shared_config)
|
||||||
|
shared_config = worker_config["shared_extra_conf"]
|
||||||
|
|
||||||
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
|
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
|
||||||
|
|
||||||
# Check if more than one instance of this worker type has been specified
|
|
||||||
worker_type_total_count = worker_types.count(worker_type)
|
|
||||||
|
|
||||||
# Update the shared config with sharding-related options if necessary
|
# Update the shared config with sharding-related options if necessary
|
||||||
add_worker_roles_to_shared_config(
|
add_worker_roles_to_shared_config(
|
||||||
shared_config, worker_type, worker_name, worker_port
|
shared_config, worker_types_set, worker_name, worker_port
|
||||||
)
|
)
|
||||||
|
|
||||||
# Enable the worker in supervisord
|
# Enable the worker in supervisord
|
||||||
worker_descriptors.append(worker_config)
|
worker_descriptors.append(worker_config)
|
||||||
|
|
||||||
# Add nginx location blocks for this worker's endpoints (if any are defined)
|
|
||||||
for pattern in worker_config["endpoint_patterns"]:
|
|
||||||
# Determine whether we need to load-balance this worker
|
|
||||||
if worker_type_total_count > 1:
|
|
||||||
# Create or add to a load-balanced upstream for this worker
|
|
||||||
nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
|
|
||||||
|
|
||||||
# Upstreams are named after the worker_type
|
|
||||||
upstream = "http://" + worker_type
|
|
||||||
else:
|
|
||||||
upstream = "http://localhost:%d" % (worker_port,)
|
|
||||||
|
|
||||||
# Note that this endpoint should proxy to this upstream
|
|
||||||
nginx_locations[pattern] = upstream
|
|
||||||
|
|
||||||
# Write out the worker's logging config file
|
# Write out the worker's logging config file
|
||||||
|
|
||||||
log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
|
log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
|
||||||
|
|
||||||
# Then a worker config file
|
# Then a worker config file
|
||||||
|
@ -539,6 +820,10 @@ def generate_worker_files(
|
||||||
worker_log_config_filepath=log_config_filepath,
|
worker_log_config_filepath=log_config_filepath,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Save this worker's port number to the correct nginx upstreams
|
||||||
|
for worker_type in worker_types_set:
|
||||||
|
nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
|
||||||
|
|
||||||
worker_port += 1
|
worker_port += 1
|
||||||
|
|
||||||
# Build the nginx location config blocks
|
# Build the nginx location config blocks
|
||||||
|
@ -551,15 +836,14 @@ def generate_worker_files(
|
||||||
|
|
||||||
# Determine the load-balancing upstreams to configure
|
# Determine the load-balancing upstreams to configure
|
||||||
nginx_upstream_config = ""
|
nginx_upstream_config = ""
|
||||||
|
for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items():
|
||||||
for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items():
|
|
||||||
body = ""
|
body = ""
|
||||||
for port in upstream_worker_ports:
|
for port in upstream_worker_ports:
|
||||||
body += " server localhost:%d;\n" % (port,)
|
body += f" server localhost:{port};\n"
|
||||||
|
|
||||||
# Add to the list of configured upstreams
|
# Add to the list of configured upstreams
|
||||||
nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
|
nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
|
||||||
upstream_worker_type=upstream_worker_type,
|
upstream_worker_base_name=upstream_worker_base_name,
|
||||||
body=body,
|
body=body,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -580,7 +864,7 @@ def generate_worker_files(
|
||||||
if reg_path.suffix.lower() in (".yaml", ".yml")
|
if reg_path.suffix.lower() in (".yaml", ".yml")
|
||||||
]
|
]
|
||||||
|
|
||||||
workers_in_use = len(worker_types) > 0
|
workers_in_use = len(requested_worker_types) > 0
|
||||||
|
|
||||||
# Shared homeserver config
|
# Shared homeserver config
|
||||||
convert(
|
convert(
|
||||||
|
@ -678,13 +962,26 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
|
||||||
generate_base_homeserver_config()
|
generate_base_homeserver_config()
|
||||||
else:
|
else:
|
||||||
log("Base homeserver config exists—not regenerating")
|
log("Base homeserver config exists—not regenerating")
|
||||||
# This script may be run multiple times (mostly by Complement, see note at top of file).
|
# This script may be run multiple times (mostly by Complement, see note at top of
|
||||||
# Don't re-configure workers in this instance.
|
# file). Don't re-configure workers in this instance.
|
||||||
mark_filepath = "/conf/workers_have_been_configured"
|
mark_filepath = "/conf/workers_have_been_configured"
|
||||||
if not os.path.exists(mark_filepath):
|
if not os.path.exists(mark_filepath):
|
||||||
|
# Collect and validate worker_type requests
|
||||||
|
# Read the desired worker configuration from the environment
|
||||||
|
worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
|
||||||
|
# Only process worker_types if they exist
|
||||||
|
if not worker_types_env:
|
||||||
|
# No workers, just the main process
|
||||||
|
worker_types = []
|
||||||
|
requested_worker_types: Dict[str, Any] = {}
|
||||||
|
else:
|
||||||
|
# Split type names by comma, ignoring whitespace.
|
||||||
|
worker_types = split_and_strip_string(worker_types_env, ",")
|
||||||
|
requested_worker_types = parse_worker_types(worker_types)
|
||||||
|
|
||||||
# Always regenerate all other config files
|
# Always regenerate all other config files
|
||||||
log("Generating worker config files")
|
log("Generating worker config files")
|
||||||
generate_worker_files(environ, config_path, data_dir)
|
generate_worker_files(environ, config_path, data_dir, requested_worker_types)
|
||||||
|
|
||||||
# Mark workers as being configured
|
# Mark workers as being configured
|
||||||
with open(mark_filepath, "w") as f:
|
with open(mark_filepath, "w") as f:
|
||||||
|
|
Loading…
Reference in a new issue