meerschaum.utils.schedule

Schedule processes and threads.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Schedule processes and threads.
  7"""
  8
  9from __future__ import annotations
 10import signal
 11import traceback
 12from datetime import datetime, timezone, timedelta
 13import meerschaum as mrsm
 14from meerschaum.utils.typing import Callable, Any, Optional, List, Dict
 15from meerschaum.utils.warnings import warn, error
 16
 17STARTING_KEYWORD: str = 'starting'
 18INTERVAL_UNITS: List[str] = ['months', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'years']
 19FREQUENCY_ALIASES: Dict[str, str] = {
 20    'daily': 'every 1 day',
 21    'hourly': 'every 1 hour',
 22    'minutely': 'every 1 minute',
 23    'weekly': 'every 1 week',
 24    'monthly': 'every 1 month',
 25    'secondly': 'every 1 second',
 26    'yearly': 'every 1 year',
 27}
 28LOGIC_ALIASES: Dict[str, str] = {
 29    'and': '&',
 30    'or': '|',
 31    ' through ': '-',
 32    ' thru ': '-',
 33    ' - ': '-',
 34    'beginning': STARTING_KEYWORD,
 35}
 36CRON_DAYS_OF_WEEK: List[str] = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
 37CRON_DAYS_OF_WEEK_ALIASES: Dict[str, str] = {
 38    'monday': 'mon',
 39    'tuesday': 'tue',
 40    'tues': 'tue',
 41    'wednesday': 'wed',
 42    'thursday': 'thu',
 43    'thurs': 'thu',
 44    'friday': 'fri',
 45    'saturday': 'sat',
 46    'sunday': 'sun',
 47}
 48CRON_MONTHS: List[str] = [
 49    'jan', 'feb', 'mar', 'apr', 'may', 'jun',
 50    'jul', 'aug', 'sep', 'oct', 'nov', 'dec',
 51]
 52CRON_MONTHS_ALIASES: Dict[str, str] = {
 53    'january': 'jan',
 54    'february': 'feb',
 55    'march': 'mar',
 56    'april': 'apr',
 57    'may': 'may',
 58    'june': 'jun',
 59    'july': 'jul',
 60    'august': 'aug',
 61    'september': 'sep',
 62    'october': 'oct',
 63    'november': 'nov',
 64    'december': 'dec',
 65}
 66SCHEDULE_ALIASES: Dict[str, str] = {
 67    **FREQUENCY_ALIASES,
 68    **LOGIC_ALIASES,
 69    **CRON_DAYS_OF_WEEK_ALIASES,
 70    **CRON_MONTHS_ALIASES,
 71}
 72
 73_scheduler = None
 74def schedule_function(
 75    function: Callable[[Any], Any],
 76    schedule: str,
 77    *args,
 78    debug: bool = False,
 79    **kw
 80) -> mrsm.SuccessTuple:
 81    """
 82    Block the process and execute the function intermittently according to the frequency.
 83    https://meerschaum.io/reference/background-jobs/#-schedules
 84
 85    Parameters
 86    ----------
 87    function: Callable[[Any], Any]
 88        The function to execute.
 89
 90    schedule: str
 91        The frequency schedule at which `function` should be executed (e.g. `'daily'`).
 92
 93    Returns
 94    -------
 95    A `SuccessTuple` upon exit.
 96    """
 97    import asyncio
 98    from meerschaum.utils.misc import filter_keywords, round_time
 99
100    global _scheduler
101    kw['debug'] = debug
102    kw = filter_keywords(function, **kw)
103
104    apscheduler = mrsm.attempt_import('apscheduler', lazy=False)
105    now = round_time(datetime.now(timezone.utc), timedelta(minutes=1))
106    trigger = parse_schedule(schedule, now=now)
107    _scheduler = apscheduler.AsyncScheduler(identity='mrsm-scheduler')
108    try:
109        loop = asyncio.get_running_loop()
110    except RuntimeError:
111        loop = asyncio.new_event_loop()
112
113
114    async def run_scheduler():
115        async with _scheduler:
116            job = await _scheduler.add_schedule(
117                function,
118                trigger,
119                args=args,
120                kwargs=kw,
121                max_running_jobs=1,
122                conflict_policy=apscheduler.ConflictPolicy.replace,
123            )
124            try:
125                await _scheduler.run_until_stopped()
126            except (KeyboardInterrupt, SystemExit) as e:
127                await _stop_scheduler()
128                raise e
129
130    try:
131        loop.run_until_complete(run_scheduler())
132    except (KeyboardInterrupt, SystemExit) as e:
133        loop.run_until_complete(_stop_scheduler())
134
135    return True, "Success"
136
137
138def parse_schedule(schedule: str, now: Optional[datetime] = None):
139    """
140    Parse a schedule string (e.g. 'daily') into a Trigger object.
141    """
142    from meerschaum.utils.misc import items_str, is_int
143    (
144        apscheduler_triggers_cron,
145        apscheduler_triggers_interval,
146        apscheduler_triggers_calendarinterval,
147        apscheduler_triggers_combining,
148    ) = (
149        mrsm.attempt_import(
150            'apscheduler.triggers.cron',
151            'apscheduler.triggers.interval',
152            'apscheduler.triggers.calendarinterval',
153            'apscheduler.triggers.combining',
154            lazy = False,
155        )
156    )
157
158    starting_ts = parse_start_time(schedule, now=now)
159    schedule = schedule.split(STARTING_KEYWORD)[0].strip()
160    for alias_keyword, true_keyword in SCHEDULE_ALIASES.items():
161        schedule = schedule.replace(alias_keyword, true_keyword)
162
163    ### TODO Allow for combining `and` + `or` logic.
164    if '&' in schedule and '|' in schedule:
165        raise ValueError(f"Cannot accept both 'and' + 'or' logic in the schedule frequency.")
166
167    join_str = '|' if '|' in schedule else '&'
168    join_trigger = (
169        apscheduler_triggers_combining.OrTrigger
170        if join_str == '|'
171        else apscheduler_triggers_combining.AndTrigger
172    )
173    join_kwargs = {
174        'max_iterations': 1_000_000,
175        'threshold': 0,
176    } if join_str == '&' else {}
177
178    schedule_parts = [part.strip() for part in schedule.split(join_str)]
179    triggers = []
180
181    has_seconds = 'second' in schedule
182    has_minutes = 'minute' in schedule
183
184    for schedule_part in schedule_parts:
185
186        ### Intervals must begin with 'every' (after alias substitution).
187        if schedule_part.lower().startswith('every '):
188            schedule_num_str, schedule_unit = (
189                schedule_part[len('every '):].split(' ', maxsplit=1)
190            )
191            schedule_unit = schedule_unit.rstrip('s') + 's'
192            if schedule_unit not in INTERVAL_UNITS:
193                raise ValueError(
194                    f"Invalid interval '{schedule_unit}'.\n"
195                    + f"    Accepted values are {items_str(INTERVAL_UNITS)}."
196                )
197
198            schedule_num = (
199                int(schedule_num_str)
200                if is_int(schedule_num_str)
201                else float(schedule_num_str)
202            )
203
204            trigger = (
205                apscheduler_triggers_interval.IntervalTrigger(
206                    **{
207                        schedule_unit: schedule_num,
208                        'start_time': starting_ts,
209                    }
210                )
211                if schedule_unit not in ('months', 'years') else (
212                    apscheduler_triggers_calendarinterval.CalendarIntervalTrigger(
213                        **{
214                            schedule_unit: schedule_num,
215                            'start_date': starting_ts,
216                            'timezone': starting_ts.tzinfo,
217                        }
218                    )
219                )
220            )
221
222        ### Determine whether this is a pure cron string or a cron subset (e.g. 'may-aug')_.
223        else:
224            first_three_prefix = schedule_part[:3].lower()
225            first_four_prefix = schedule_part[:4].lower()
226            cron_kw = {}
227            if first_three_prefix in CRON_DAYS_OF_WEEK:
228                cron_kw['day_of_week'] = schedule_part
229            elif first_three_prefix in CRON_MONTHS:
230                cron_kw['month'] = schedule_part
231            elif is_int(first_four_prefix) and len(first_four_prefix) == 4:
232                cron_kw['year'] = int(first_four_prefix)
233            trigger = (
234                apscheduler_triggers_cron.CronTrigger(
235                    **{
236                        **cron_kw,
237                        'hour': '*',
238                        'minute': '*' if has_minutes else starting_ts.minute,
239                        'second': '*' if has_seconds else starting_ts.second,
240                        'start_time': starting_ts,
241                        'timezone': starting_ts.tzinfo,
242                    }
243                )
244                if cron_kw
245                else apscheduler_triggers_cron.CronTrigger.from_crontab(
246                    schedule_part, 
247                    timezone = starting_ts.tzinfo,
248                )
249            )
250            ### Explicitly set the `start_time` after building with `from_crontab`.
251            if trigger.start_time != starting_ts:
252                trigger.start_time = starting_ts
253
254        triggers.append(trigger)
255
256    return (
257        join_trigger(triggers, **join_kwargs)
258        if len(triggers) != 1
259        else triggers[0]
260    )
261
262
263def parse_start_time(schedule: str, now: Optional[datetime] = None) -> datetime:
264    """
265    Return the datetime to use for the given schedule string.
266
267    Parameters
268    ----------
269    schedule: str
270        The schedule frequency to be parsed into a starting datetime.
271
272    now: Optional[datetime], default None
273        If provided, use this value as a default if no start time is explicitly stated.
274
275    Returns
276    -------
277    A `datetime` object, either `now` or the datetime embedded in the schedule string.
278
279    Examples
280    --------
281    >>> parse_start_time('daily starting 2024-01-01')
282    datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
283    >>> parse_start_time('monthly starting 1st')
284    datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc)
285    >>> parse_start_time('hourly starting 00:30')
286    datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc)
287    """
288    from meerschaum.utils.misc import round_time
289    dateutil_parser = mrsm.attempt_import('dateutil.parser')
290    starting_parts = schedule.split(STARTING_KEYWORD)
291    starting_str = ('now' if len(starting_parts) == 1 else starting_parts[-1]).strip()
292    now = now or round_time(datetime.now(timezone.utc), timedelta(minutes=1))
293    try:
294        if starting_str == 'now':
295            starting_ts = now
296        elif 'tomorrow' in starting_str or 'today' in starting_str:
297            today = round_time(now, timedelta(days=1))
298            tomorrow = today + timedelta(days=1)
299            is_tomorrow = 'tomorrow' in starting_str
300            time_str = starting_str.replace('tomorrow', '').replace('today', '').strip()
301            time_ts = dateutil_parser.parse(time_str) if time_str else today
302            starting_ts = (
303                (tomorrow if is_tomorrow else today)
304                + timedelta(hours=time_ts.hour)
305                + timedelta(minutes=time_ts.minute)
306            )
307        else:
308            starting_ts = dateutil_parser.parse(starting_str)
309        schedule_parse_error = None
310    except Exception as e:
311        warn(f"Unable to parse starting time from '{starting_str}'.", stack=False)
312        schedule_parse_error = str(e)
313    if schedule_parse_error:
314        error(schedule_parse_error, ValueError, stack=False)
315    if not starting_ts.tzinfo:
316        starting_ts = starting_ts.replace(tzinfo=timezone.utc)
317    return starting_ts
318
319
320async def _stop_scheduler():
321    if _scheduler is None:
322        return
323    await _scheduler.stop()
324    await _scheduler.wait_until_stopped()
STARTING_KEYWORD: str = 'starting'
INTERVAL_UNITS: List[str] = ['months', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'years']
FREQUENCY_ALIASES: Dict[str, str] = {'daily': 'every 1 day', 'hourly': 'every 1 hour', 'minutely': 'every 1 minute', 'weekly': 'every 1 week', 'monthly': 'every 1 month', 'secondly': 'every 1 second', 'yearly': 'every 1 year'}
LOGIC_ALIASES: Dict[str, str] = {'and': '&', 'or': '|', ' through ': '-', ' thru ': '-', ' - ': '-', 'beginning': 'starting'}
CRON_DAYS_OF_WEEK: List[str] = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
CRON_DAYS_OF_WEEK_ALIASES: Dict[str, str] = {'monday': 'mon', 'tuesday': 'tue', 'tues': 'tue', 'wednesday': 'wed', 'thursday': 'thu', 'thurs': 'thu', 'friday': 'fri', 'saturday': 'sat', 'sunday': 'sun'}
CRON_MONTHS: List[str] = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
CRON_MONTHS_ALIASES: Dict[str, str] = {'january': 'jan', 'february': 'feb', 'march': 'mar', 'april': 'apr', 'may': 'may', 'june': 'jun', 'july': 'jul', 'august': 'aug', 'september': 'sep', 'october': 'oct', 'november': 'nov', 'december': 'dec'}
SCHEDULE_ALIASES: Dict[str, str] = {'daily': 'every 1 day', 'hourly': 'every 1 hour', 'minutely': 'every 1 minute', 'weekly': 'every 1 week', 'monthly': 'every 1 month', 'secondly': 'every 1 second', 'yearly': 'every 1 year', 'and': '&', 'or': '|', ' through ': '-', ' thru ': '-', ' - ': '-', 'beginning': 'starting', 'monday': 'mon', 'tuesday': 'tue', 'tues': 'tue', 'wednesday': 'wed', 'thursday': 'thu', 'thurs': 'thu', 'friday': 'fri', 'saturday': 'sat', 'sunday': 'sun', 'january': 'jan', 'february': 'feb', 'march': 'mar', 'april': 'apr', 'may': 'may', 'june': 'jun', 'july': 'jul', 'august': 'aug', 'september': 'sep', 'october': 'oct', 'november': 'nov', 'december': 'dec'}
def schedule_function( function: Callable[[Any], Any], schedule: str, *args, debug: bool = False, **kw) -> Tuple[bool, str]:
 75def schedule_function(
 76    function: Callable[[Any], Any],
 77    schedule: str,
 78    *args,
 79    debug: bool = False,
 80    **kw
 81) -> mrsm.SuccessTuple:
 82    """
 83    Block the process and execute the function intermittently according to the frequency.
 84    https://meerschaum.io/reference/background-jobs/#-schedules
 85
 86    Parameters
 87    ----------
 88    function: Callable[[Any], Any]
 89        The function to execute.
 90
 91    schedule: str
 92        The frequency schedule at which `function` should be executed (e.g. `'daily'`).
 93
 94    Returns
 95    -------
 96    A `SuccessTuple` upon exit.
 97    """
 98    import asyncio
 99    from meerschaum.utils.misc import filter_keywords, round_time
100
101    global _scheduler
102    kw['debug'] = debug
103    kw = filter_keywords(function, **kw)
104
105    apscheduler = mrsm.attempt_import('apscheduler', lazy=False)
106    now = round_time(datetime.now(timezone.utc), timedelta(minutes=1))
107    trigger = parse_schedule(schedule, now=now)
108    _scheduler = apscheduler.AsyncScheduler(identity='mrsm-scheduler')
109    try:
110        loop = asyncio.get_running_loop()
111    except RuntimeError:
112        loop = asyncio.new_event_loop()
113
114
115    async def run_scheduler():
116        async with _scheduler:
117            job = await _scheduler.add_schedule(
118                function,
119                trigger,
120                args=args,
121                kwargs=kw,
122                max_running_jobs=1,
123                conflict_policy=apscheduler.ConflictPolicy.replace,
124            )
125            try:
126                await _scheduler.run_until_stopped()
127            except (KeyboardInterrupt, SystemExit) as e:
128                await _stop_scheduler()
129                raise e
130
131    try:
132        loop.run_until_complete(run_scheduler())
133    except (KeyboardInterrupt, SystemExit) as e:
134        loop.run_until_complete(_stop_scheduler())
135
136    return True, "Success"

Block the process and execute the function intermittently according to the frequency. https://meerschaum.io/reference/background-jobs/#-schedules

Parameters
  • function (Callable[[Any], Any]): The function to execute.
  • schedule (str): The frequency schedule at which function should be executed (e.g. 'daily').
Returns
  • A SuccessTuple upon exit.
def parse_schedule(schedule: str, now: Optional[datetime.datetime] = None):
139def parse_schedule(schedule: str, now: Optional[datetime] = None):
140    """
141    Parse a schedule string (e.g. 'daily') into a Trigger object.
142    """
143    from meerschaum.utils.misc import items_str, is_int
144    (
145        apscheduler_triggers_cron,
146        apscheduler_triggers_interval,
147        apscheduler_triggers_calendarinterval,
148        apscheduler_triggers_combining,
149    ) = (
150        mrsm.attempt_import(
151            'apscheduler.triggers.cron',
152            'apscheduler.triggers.interval',
153            'apscheduler.triggers.calendarinterval',
154            'apscheduler.triggers.combining',
155            lazy = False,
156        )
157    )
158
159    starting_ts = parse_start_time(schedule, now=now)
160    schedule = schedule.split(STARTING_KEYWORD)[0].strip()
161    for alias_keyword, true_keyword in SCHEDULE_ALIASES.items():
162        schedule = schedule.replace(alias_keyword, true_keyword)
163
164    ### TODO Allow for combining `and` + `or` logic.
165    if '&' in schedule and '|' in schedule:
166        raise ValueError(f"Cannot accept both 'and' + 'or' logic in the schedule frequency.")
167
168    join_str = '|' if '|' in schedule else '&'
169    join_trigger = (
170        apscheduler_triggers_combining.OrTrigger
171        if join_str == '|'
172        else apscheduler_triggers_combining.AndTrigger
173    )
174    join_kwargs = {
175        'max_iterations': 1_000_000,
176        'threshold': 0,
177    } if join_str == '&' else {}
178
179    schedule_parts = [part.strip() for part in schedule.split(join_str)]
180    triggers = []
181
182    has_seconds = 'second' in schedule
183    has_minutes = 'minute' in schedule
184
185    for schedule_part in schedule_parts:
186
187        ### Intervals must begin with 'every' (after alias substitution).
188        if schedule_part.lower().startswith('every '):
189            schedule_num_str, schedule_unit = (
190                schedule_part[len('every '):].split(' ', maxsplit=1)
191            )
192            schedule_unit = schedule_unit.rstrip('s') + 's'
193            if schedule_unit not in INTERVAL_UNITS:
194                raise ValueError(
195                    f"Invalid interval '{schedule_unit}'.\n"
196                    + f"    Accepted values are {items_str(INTERVAL_UNITS)}."
197                )
198
199            schedule_num = (
200                int(schedule_num_str)
201                if is_int(schedule_num_str)
202                else float(schedule_num_str)
203            )
204
205            trigger = (
206                apscheduler_triggers_interval.IntervalTrigger(
207                    **{
208                        schedule_unit: schedule_num,
209                        'start_time': starting_ts,
210                    }
211                )
212                if schedule_unit not in ('months', 'years') else (
213                    apscheduler_triggers_calendarinterval.CalendarIntervalTrigger(
214                        **{
215                            schedule_unit: schedule_num,
216                            'start_date': starting_ts,
217                            'timezone': starting_ts.tzinfo,
218                        }
219                    )
220                )
221            )
222
223        ### Determine whether this is a pure cron string or a cron subset (e.g. 'may-aug')_.
224        else:
225            first_three_prefix = schedule_part[:3].lower()
226            first_four_prefix = schedule_part[:4].lower()
227            cron_kw = {}
228            if first_three_prefix in CRON_DAYS_OF_WEEK:
229                cron_kw['day_of_week'] = schedule_part
230            elif first_three_prefix in CRON_MONTHS:
231                cron_kw['month'] = schedule_part
232            elif is_int(first_four_prefix) and len(first_four_prefix) == 4:
233                cron_kw['year'] = int(first_four_prefix)
234            trigger = (
235                apscheduler_triggers_cron.CronTrigger(
236                    **{
237                        **cron_kw,
238                        'hour': '*',
239                        'minute': '*' if has_minutes else starting_ts.minute,
240                        'second': '*' if has_seconds else starting_ts.second,
241                        'start_time': starting_ts,
242                        'timezone': starting_ts.tzinfo,
243                    }
244                )
245                if cron_kw
246                else apscheduler_triggers_cron.CronTrigger.from_crontab(
247                    schedule_part, 
248                    timezone = starting_ts.tzinfo,
249                )
250            )
251            ### Explicitly set the `start_time` after building with `from_crontab`.
252            if trigger.start_time != starting_ts:
253                trigger.start_time = starting_ts
254
255        triggers.append(trigger)
256
257    return (
258        join_trigger(triggers, **join_kwargs)
259        if len(triggers) != 1
260        else triggers[0]
261    )

Parse a schedule string (e.g. 'daily') into a Trigger object.

def parse_start_time( schedule: str, now: Optional[datetime.datetime] = None) -> datetime.datetime:
264def parse_start_time(schedule: str, now: Optional[datetime] = None) -> datetime:
265    """
266    Return the datetime to use for the given schedule string.
267
268    Parameters
269    ----------
270    schedule: str
271        The schedule frequency to be parsed into a starting datetime.
272
273    now: Optional[datetime], default None
274        If provided, use this value as a default if no start time is explicitly stated.
275
276    Returns
277    -------
278    A `datetime` object, either `now` or the datetime embedded in the schedule string.
279
280    Examples
281    --------
282    >>> parse_start_time('daily starting 2024-01-01')
283    datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
284    >>> parse_start_time('monthly starting 1st')
285    datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc)
286    >>> parse_start_time('hourly starting 00:30')
287    datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc)
288    """
289    from meerschaum.utils.misc import round_time
290    dateutil_parser = mrsm.attempt_import('dateutil.parser')
291    starting_parts = schedule.split(STARTING_KEYWORD)
292    starting_str = ('now' if len(starting_parts) == 1 else starting_parts[-1]).strip()
293    now = now or round_time(datetime.now(timezone.utc), timedelta(minutes=1))
294    try:
295        if starting_str == 'now':
296            starting_ts = now
297        elif 'tomorrow' in starting_str or 'today' in starting_str:
298            today = round_time(now, timedelta(days=1))
299            tomorrow = today + timedelta(days=1)
300            is_tomorrow = 'tomorrow' in starting_str
301            time_str = starting_str.replace('tomorrow', '').replace('today', '').strip()
302            time_ts = dateutil_parser.parse(time_str) if time_str else today
303            starting_ts = (
304                (tomorrow if is_tomorrow else today)
305                + timedelta(hours=time_ts.hour)
306                + timedelta(minutes=time_ts.minute)
307            )
308        else:
309            starting_ts = dateutil_parser.parse(starting_str)
310        schedule_parse_error = None
311    except Exception as e:
312        warn(f"Unable to parse starting time from '{starting_str}'.", stack=False)
313        schedule_parse_error = str(e)
314    if schedule_parse_error:
315        error(schedule_parse_error, ValueError, stack=False)
316    if not starting_ts.tzinfo:
317        starting_ts = starting_ts.replace(tzinfo=timezone.utc)
318    return starting_ts

Return the datetime to use for the given schedule string.

Parameters
  • schedule (str): The schedule frequency to be parsed into a starting datetime.
  • now (Optional[datetime], default None): If provided, use this value as a default if no start time is explicitly stated.
Returns
  • A datetime object, either now or the datetime embedded in the schedule string.
Examples
>>> parse_start_time('daily starting 2024-01-01')
datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
>>> parse_start_time('monthly starting 1st')
datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc)
>>> parse_start_time('hourly starting 00:30')
datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc)