meerschaum.utils.schedule

Schedule processes and threads.

 1#! /usr/bin/env python3
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Schedule processes and threads.
 7"""
 8
 9from __future__ import annotations
10from meerschaum.utils.typing import Callable, Any, Optional
11
12def schedule_function(
13        function: Callable[[Any], Any],
14        frequency: str,
15        *args,
16        debug: bool = False,
17        **kw
18    ) -> None:
19    """
20    Block the process and execute the function intermittently according to the frequency.
21    https://rocketry.readthedocs.io/en/stable/condition_syntax/index.html
22
23    Parameters
24    ----------
25    function: Callable[[Any], Any]
26        The function to execute.
27
28    frequency: str
29        The frequency at which `function` should be executed (e.g. `'daily'`).
30
31    """
32    import warnings
33    from meerschaum.utils.warnings import warn
34    from meerschaum.utils.packages import attempt_import
35    from meerschaum.utils.misc import filter_keywords
36    from concurrent.futures._base import CancelledError
37    kw['debug'] = debug
38    kw = filter_keywords(function, **kw)
39
40    def _wrapper():
41        return function(*args, **kw)
42
43    pydantic = attempt_import('pydantic', debug=debug, lazy=False)
44    rocketry = attempt_import('rocketry', debug=debug, lazy=False)
45    try:
46        app = rocketry.Rocketry()
47        FuncTask = rocketry.tasks.FuncTask
48        with warnings.catch_warnings():
49            warnings.filterwarnings('ignore', 'Task\'s session not defined.')
50            task = FuncTask(_wrapper, start_cond=frequency)
51            app.session.add_task(task)
52        return app.run(debug=debug)
53    except (KeyboardInterrupt, CancelledError):
54        try:
55            app.session.shut_down(force=True)
56        except CancelledError:
57            pass
58        return None
59    except AttributeError:
60        warn(
61            "Failed to import scheduler.\n\n   "
62            + "Run `mrsm install package 'pydantic<2.0.0'` and try again.",
63            stack = False,
64        )
def schedule_function( function: Callable[[Any], Any], frequency: str, *args, debug: bool = False, **kw) -> None:
13def schedule_function(
14        function: Callable[[Any], Any],
15        frequency: str,
16        *args,
17        debug: bool = False,
18        **kw
19    ) -> None:
20    """
21    Block the process and execute the function intermittently according to the frequency.
22    https://rocketry.readthedocs.io/en/stable/condition_syntax/index.html
23
24    Parameters
25    ----------
26    function: Callable[[Any], Any]
27        The function to execute.
28
29    frequency: str
30        The frequency at which `function` should be executed (e.g. `'daily'`).
31
32    """
33    import warnings
34    from meerschaum.utils.warnings import warn
35    from meerschaum.utils.packages import attempt_import
36    from meerschaum.utils.misc import filter_keywords
37    from concurrent.futures._base import CancelledError
38    kw['debug'] = debug
39    kw = filter_keywords(function, **kw)
40
41    def _wrapper():
42        return function(*args, **kw)
43
44    pydantic = attempt_import('pydantic', debug=debug, lazy=False)
45    rocketry = attempt_import('rocketry', debug=debug, lazy=False)
46    try:
47        app = rocketry.Rocketry()
48        FuncTask = rocketry.tasks.FuncTask
49        with warnings.catch_warnings():
50            warnings.filterwarnings('ignore', 'Task\'s session not defined.')
51            task = FuncTask(_wrapper, start_cond=frequency)
52            app.session.add_task(task)
53        return app.run(debug=debug)
54    except (KeyboardInterrupt, CancelledError):
55        try:
56            app.session.shut_down(force=True)
57        except CancelledError:
58            pass
59        return None
60    except AttributeError:
61        warn(
62            "Failed to import scheduler.\n\n   "
63            + "Run `mrsm install package 'pydantic<2.0.0'` and try again.",
64            stack = False,
65        )

Block the process and execute the function intermittently according to the frequency. https://rocketry.readthedocs.io/en/stable/condition_syntax/index.html

Parameters
  • function (Callable[[Any], Any]): The function to execute.
  • frequency (str): The frequency at which function should be executed (e.g. 'daily').