Wording improvements for the TaskScheduler

As I found the current docstrings a bit unclear while trying to wrap my head around
this class.
This commit is contained in:
Andrew Morgan 2024-12-03 17:36:52 +00:00
parent 59ad4b18fc
commit e9bade2530

View file

@ -46,33 +46,43 @@ logger = logging.getLogger(__name__)
class TaskScheduler: class TaskScheduler:
""" """
This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background` This is a simple task scheduler designed for resumable tasks. Normally,
to launch a background task, or Twisted `deferLater` if we want to do so later on. you'd use `run_in_background` to start a background task or Twisted's
`deferLater` if you want to run it later.
The issue is that these tasks stop completely and won't resume if Synapse is
shut down for any reason.
Here's how it works:
- Register an Action: First, you need to register a function to a named
action using `register_action`. This function will be called to resume tasks
after a Synapse shutdown. Make sure to register it when Synapse initializes,
not right before scheduling the task.
- Schedule a Task: You can launch a task linked to the named action
using `schedule_task`. You can pass a `params` dictionary, which will be
passed to the registered function when it's executed. Tasks can be scheduled
to run either immediately or later by specifying a `timestamp`.
- Update Task: The function handling the task can call `update_task` at
any point to update the task's `result`. This lets you resume the task from
a specific point or pass results back to the code that scheduled it. When
the function completes, you can also return a `result` or an `error`.
Things to keep in mind:
- The reconciliation loop runs every minute, so this is not a high-precision
scheduler.
The problem with that is that the tasks will just stop and never be resumed if synapse - Only 10 tasks can run at the same time. If the pool is full, tasks may be
is stopped for whatever reason. delayed. Make sure your scheduled tasks can actually finish.
How this works: - Currently, there's no way to stop a task if it gets stuck.
- A function mapped to a named action should first be registered with `register_action`.
This function will be called when trying to resuming tasks after a synapse shutdown,
so this registration should happen when synapse is initialised, NOT right before scheduling
a task.
- A task can then be launched using this named action with `schedule_task`. A `params` dict
can be passed, and it will be available to the registered function when launched. This task
can be launch either now-ish, or later on by giving a `timestamp` parameter.
The function may call `update_task` at any time to update the `result` of the task, - Tasks will run on the worker defined by the `run_background_tasks_on`
and this can be used to resume the task at a specific point and/or to convey a result to setting in your configuration. If no worker is specified, they'll run on
the code launching the task. the main one by default.
You can also specify the `result` (and/or an `error`) when returning from the function.
The reconciliation loop runs every minute, so this is not a precise scheduler.
There is a limit of 10 concurrent tasks, so tasks may be delayed if the pool is already
full. In this regard, please take great care that scheduled tasks can actually finished.
For now there is no mechanism to stop a running task if it is stuck.
Tasks will be run on the worker specified with `run_background_tasks_on` config,
or the main one by default.
""" """
# Precision of the scheduler, evaluation of tasks to run will only happen # Precision of the scheduler, evaluation of tasks to run will only happen
@ -157,7 +167,7 @@ class TaskScheduler:
params: Optional[JsonMapping] = None, params: Optional[JsonMapping] = None,
) -> str: ) -> str:
"""Schedule a new potentially resumable task. A function matching the specified """Schedule a new potentially resumable task. A function matching the specified
`action` should have be registered with `register_action` before the task is run. `action` should've been registered with `register_action` before the task is run.
Args: Args:
action: the name of a previously registered action action: the name of a previously registered action
@ -207,15 +217,15 @@ class TaskScheduler:
result: Optional[JsonMapping] = None, result: Optional[JsonMapping] = None,
error: Optional[str] = None, error: Optional[str] = None,
) -> bool: ) -> bool:
"""Update some task associated values. This is exposed publicly so it can """Update some task-associated values. This is exposed publicly so it can
be used inside task functions, mainly to update the result and be able to be used inside task functions, mainly to update the result or resume
resume a task at a specific step after a restart of synapse. a task at a specific step after a restart of synapse.
It can also be used to stage a task, by setting the `status` to `SCHEDULED` with It can also be used to stage a task, by setting the `status` to `SCHEDULED` with
a new timestamp. a new timestamp.
The `status` can only be set to `ACTIVE` or `SCHEDULED`, `COMPLETE` and `FAILED` The `status` can only be set to `ACTIVE` or `SCHEDULED`. `COMPLETE` and `FAILED`
are terminal status and can only be set by returning it in the function. are terminal statuses and can only be set by returning them from the function.
Args: Args:
id: the id of the task to update id: the id of the task to update
@ -223,6 +233,12 @@ class TaskScheduler:
status: the new `TaskStatus` of the task status: the new `TaskStatus` of the task
result: the new result of the task result: the new result of the task
error: the new error of the task error: the new error of the task
Returns:
True if the update was successful, False otherwise.
Raises:
Exception: If a status other than `ACTIVE` or `SCHEDULED` was passed.
""" """
if status == TaskStatus.COMPLETE or status == TaskStatus.FAILED: if status == TaskStatus.COMPLETE or status == TaskStatus.FAILED:
raise Exception( raise Exception(
@ -260,9 +276,9 @@ class TaskScheduler:
max_timestamp: Optional[int] = None, max_timestamp: Optional[int] = None,
limit: Optional[int] = None, limit: Optional[int] = None,
) -> List[ScheduledTask]: ) -> List[ScheduledTask]:
"""Get a list of tasks. Returns all the tasks if no args is provided. """Get a list of tasks. Returns all the tasks if no args are provided.
If an arg is `None` all tasks matching the other args will be selected. If an arg is `None`, all tasks matching the other args will be selected.
If an arg is an empty list, the corresponding value of the task needs If an arg is an empty list, the corresponding value of the task needs
to be `None` to be selected. to be `None` to be selected.
@ -274,8 +290,8 @@ class TaskScheduler:
a timestamp inferior to the specified one a timestamp inferior to the specified one
limit: Only return `limit` number of rows if set. limit: Only return `limit` number of rows if set.
Returns Returns:
A list of `ScheduledTask`, ordered by increasing timestamps A list of `ScheduledTask`, ordered by increasing timestamps.
""" """
return await self._store.get_scheduled_tasks( return await self._store.get_scheduled_tasks(
actions=actions, actions=actions,