Module meerschaum.utils.schedule
Schedule processes and threads.
Expand source code
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
"""
Schedule processes and threads.
"""
from __future__ import annotations
from meerschaum.utils.typing import Callable, Any, Optional
def schedule_function(
function: Callable[[Any], Any],
frequency: str,
*args,
debug: bool = False,
**kw
) -> None:
"""
Block the process and execute the function intermittently according to the frequency.
https://red-engine.readthedocs.io/en/stable/condition_syntax/index.html
Parameters
----------
function: Callable[[Any], Any]
The function to execute.
frequency: str
The frequency at which `function` should be executed (e.g. `'daily'`).
"""
import warnings
from meerschaum.utils.packages import attempt_import
from meerschaum.utils.misc import filter_keywords
from concurrent.futures._base import CancelledError
kw['debug'] = debug
kw = filter_keywords(function, **kw)
def _wrapper():
return function(*args, **kw)
rocketry = attempt_import('rocketry', debug=debug, lazy=False)
try:
app = rocketry.Rocketry()
FuncTask = rocketry.tasks.FuncTask
with warnings.catch_warnings():
warnings.filterwarnings('ignore', 'Task\'s session not defined.')
task = FuncTask(_wrapper, start_cond=frequency)
app.session.add_task(task)
return app.run(debug=debug)
except (KeyboardInterrupt, CancelledError):
try:
app.session.shut_down(force=True)
except CancelledError:
pass
return None
Functions
def schedule_function(function: Callable[[Any], Any], frequency: str, *args, debug: bool = False, **kw) ‑> None
-
Block the process and execute the function intermittently according to the frequency. https://red-engine.readthedocs.io/en/stable/condition_syntax/index.html
Parameters
function
:Callable[[Any], Any]
- The function to execute.
frequency
:str
- The frequency at which
function
should be executed (e.g.'daily'
).
Expand source code
def schedule_function( function: Callable[[Any], Any], frequency: str, *args, debug: bool = False, **kw ) -> None: """ Block the process and execute the function intermittently according to the frequency. https://red-engine.readthedocs.io/en/stable/condition_syntax/index.html Parameters ---------- function: Callable[[Any], Any] The function to execute. frequency: str The frequency at which `function` should be executed (e.g. `'daily'`). """ import warnings from meerschaum.utils.packages import attempt_import from meerschaum.utils.misc import filter_keywords from concurrent.futures._base import CancelledError kw['debug'] = debug kw = filter_keywords(function, **kw) def _wrapper(): return function(*args, **kw) rocketry = attempt_import('rocketry', debug=debug, lazy=False) try: app = rocketry.Rocketry() FuncTask = rocketry.tasks.FuncTask with warnings.catch_warnings(): warnings.filterwarnings('ignore', 'Task\'s session not defined.') task = FuncTask(_wrapper, start_cond=frequency) app.session.add_task(task) return app.run(debug=debug) except (KeyboardInterrupt, CancelledError): try: app.session.shut_down(force=True) except CancelledError: pass return None