meerschaum.utils.schedule

Schedule processes and threads.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Schedule processes and threads.
  7"""
  8
  9from __future__ import annotations
 10import signal
 11import traceback
 12from datetime import datetime, timezone, timedelta
 13import meerschaum as mrsm
 14from meerschaum.utils.typing import Callable, Any, Optional, List, Dict
 15from meerschaum.utils.warnings import warn, error
 16
 17STARTING_KEYWORD: str = 'starting'
 18INTERVAL_UNITS: List[str] = ['months', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'years']
 19FREQUENCY_ALIASES: Dict[str, str] = {
 20    'daily': 'every 1 day',
 21    'hourly': 'every 1 hour',
 22    'minutely': 'every 1 minute',
 23    'weekly': 'every 1 week',
 24    'monthly': 'every 1 month',
 25    'secondly': 'every 1 second',
 26    'yearly': 'every 1 year',
 27}
 28LOGIC_ALIASES: Dict[str, str] = {
 29    'and': '&',
 30    'or': '|',
 31    ' through ': '-',
 32    ' thru ': '-',
 33    ' - ': '-',
 34    'beginning': STARTING_KEYWORD,
 35}
 36CRON_DAYS_OF_WEEK: List[str] = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
 37CRON_DAYS_OF_WEEK_ALIASES: Dict[str, str] = {
 38    'monday': 'mon',
 39    'tuesday': 'tue',
 40    'tues': 'tue',
 41    'wednesday': 'wed',
 42    'thursday': 'thu',
 43    'thurs': 'thu',
 44    'friday': 'fri',
 45    'saturday': 'sat',
 46    'sunday': 'sun',
 47}
 48CRON_MONTHS: List[str] = [
 49    'jan', 'feb', 'mar', 'apr', 'may', 'jun',
 50    'jul', 'aug', 'sep', 'oct', 'nov', 'dec',
 51]
 52CRON_MONTHS_ALIASES: Dict[str, str] = {
 53    'january': 'jan',
 54    'february': 'feb',
 55    'march': 'mar',
 56    'april': 'apr',
 57    'may': 'may',
 58    'june': 'jun',
 59    'july': 'jul',
 60    'august': 'aug',
 61    'september': 'sep',
 62    'october': 'oct',
 63    'november': 'nov',
 64    'december': 'dec',
 65}
 66SCHEDULE_ALIASES: Dict[str, str] = {
 67    **FREQUENCY_ALIASES,
 68    **LOGIC_ALIASES,
 69    **CRON_DAYS_OF_WEEK_ALIASES,
 70    **CRON_MONTHS_ALIASES,
 71}
 72
 73_scheduler = None
 74def schedule_function(
 75    function: Callable[[Any], Any],
 76    schedule: str,
 77    *args,
 78    debug: bool = False,
 79    **kw
 80) -> mrsm.SuccessTuple:
 81    """
 82    Block the process and execute the function intermittently according to the frequency.
 83    https://meerschaum.io/reference/background-jobs/#-schedules
 84
 85    Parameters
 86    ----------
 87    function: Callable[[Any], Any]
 88        The function to execute.
 89
 90    schedule: str
 91        The frequency schedule at which `function` should be executed (e.g. `'daily'`).
 92
 93    Returns
 94    -------
 95    A `SuccessTuple` upon exit.
 96    """
 97    import asyncio
 98    from meerschaum.utils.misc import filter_keywords, round_time
 99
100    global _scheduler
101    kw['debug'] = debug
102    kw = filter_keywords(function, **kw)
103
104    _ = mrsm.attempt_import('attrs', lazy=False)
105    apscheduler = mrsm.attempt_import('apscheduler', lazy=False)
106    now = round_time(datetime.now(timezone.utc), timedelta(minutes=1))
107    trigger = parse_schedule(schedule, now=now)
108    _scheduler = apscheduler.AsyncScheduler(identity='mrsm-scheduler')
109    try:
110        loop = asyncio.get_running_loop()
111    except RuntimeError:
112        loop = asyncio.new_event_loop()
113
114    async def run_scheduler():
115        async with _scheduler:
116            job = await _scheduler.add_schedule(
117                function,
118                trigger,
119                **filter_keywords(
120                    _scheduler.add_schedule,
121                    args=args,
122                    kwargs=kw,
123                    max_running_jobs=1,
124                    conflict_policy=apscheduler.ConflictPolicy.replace,
125                )
126            )
127            try:
128                await _scheduler.run_until_stopped()
129            except (KeyboardInterrupt, SystemExit) as e:
130                await _stop_scheduler()
131                raise e
132
133    try:
134        loop.run_until_complete(run_scheduler())
135    except (KeyboardInterrupt, SystemExit):
136        loop.run_until_complete(_stop_scheduler())
137
138    return True, "Success"
139
140
141def parse_schedule(schedule: str, now: Optional[datetime] = None):
142    """
143    Parse a schedule string (e.g. 'daily') into a Trigger object.
144    """
145    from meerschaum.utils.misc import items_str, is_int, filter_keywords
146    (
147        apscheduler_triggers_cron,
148        apscheduler_triggers_interval,
149        apscheduler_triggers_calendarinterval,
150        apscheduler_triggers_combining,
151    ) = (
152        mrsm.attempt_import(
153            'apscheduler.triggers.cron',
154            'apscheduler.triggers.interval',
155            'apscheduler.triggers.calendarinterval',
156            'apscheduler.triggers.combining',
157            lazy = False,
158        )
159    )
160
161    starting_ts = parse_start_time(schedule, now=now)
162    schedule = schedule.split(STARTING_KEYWORD, maxsplit=1)[0].strip()
163    for alias_keyword, true_keyword in SCHEDULE_ALIASES.items():
164        schedule = schedule.replace(alias_keyword, true_keyword)
165
166    ### TODO Allow for combining `and` + `or` logic.
167    if '&' in schedule and '|' in schedule:
168        raise ValueError("Cannot accept both 'and' + 'or' logic in the schedule frequency.")
169
170    join_str = '|' if '|' in schedule else '&'
171    join_trigger = (
172        apscheduler_triggers_combining.OrTrigger
173        if join_str == '|'
174        else apscheduler_triggers_combining.AndTrigger
175    )
176    join_kwargs = {
177        'max_iterations': 1_000_000,
178        'threshold': 0,
179    } if join_str == '&' else {}
180
181    schedule_parts = [part.strip() for part in schedule.split(join_str)]
182    triggers = []
183
184    has_seconds = 'second' in schedule
185    has_minutes = 'minute' in schedule
186
187    for schedule_part in schedule_parts:
188
189        ### Intervals must begin with 'every' (after alias substitution).
190        if schedule_part.lower().startswith('every '):
191            schedule_num_str, schedule_unit = (
192                schedule_part[len('every '):].split(' ', maxsplit=1)
193            )
194            schedule_unit = schedule_unit.rstrip('s') + 's'
195            if schedule_unit not in INTERVAL_UNITS:
196                raise ValueError(
197                    f"Invalid interval '{schedule_unit}'.\n"
198                    + f"    Accepted values are {items_str(INTERVAL_UNITS)}."
199                )
200
201            schedule_num = (
202                int(schedule_num_str)
203                if is_int(schedule_num_str)
204                else float(schedule_num_str)
205            )
206
207            trigger = (
208                apscheduler_triggers_interval.IntervalTrigger(
209                    **filter_keywords(
210                        apscheduler_triggers_interval.IntervalTrigger.__init__,
211                        **{
212                            schedule_unit: schedule_num,
213                            'start_time': starting_ts,
214                            'start_date': starting_ts,
215                        }
216                    )
217                )
218                if schedule_unit not in ('months', 'years') else (
219                    apscheduler_triggers_calendarinterval.CalendarIntervalTrigger(
220                        **{
221                            schedule_unit: schedule_num,
222                            'start_date': starting_ts,
223                            'timezone': starting_ts.tzinfo,
224                        }
225                    )
226                )
227            )
228
229        ### Determine whether this is a pure cron string or a cron subset (e.g. 'may-aug')_.
230        else:
231            first_three_prefix = schedule_part[:3].lower()
232            first_four_prefix = schedule_part[:4].lower()
233            cron_kw = {}
234            if first_three_prefix in CRON_DAYS_OF_WEEK:
235                cron_kw['day_of_week'] = schedule_part
236            elif first_three_prefix in CRON_MONTHS:
237                cron_kw['month'] = schedule_part
238            elif is_int(first_four_prefix) and len(first_four_prefix) == 4:
239                cron_kw['year'] = int(first_four_prefix)
240            trigger = (
241                apscheduler_triggers_cron.CronTrigger(
242                    **{
243                        **cron_kw,
244                        'hour': '*',
245                        'minute': '*' if has_minutes else starting_ts.minute,
246                        'second': '*' if has_seconds else starting_ts.second,
247                        'start_time': starting_ts,
248                        'timezone': starting_ts.tzinfo,
249                    }
250                )
251                if cron_kw
252                else apscheduler_triggers_cron.CronTrigger.from_crontab(
253                    schedule_part, 
254                    timezone = starting_ts.tzinfo,
255                )
256            )
257            ### Explicitly set the `start_time` after building with `from_crontab`.
258            if trigger.start_time != starting_ts:
259                trigger.start_time = starting_ts
260
261        triggers.append(trigger)
262
263    return (
264        join_trigger(triggers, **join_kwargs)
265        if len(triggers) != 1
266        else triggers[0]
267    )
268
269
270def parse_start_time(schedule: str, now: Optional[datetime] = None) -> datetime:
271    """
272    Return the datetime to use for the given schedule string.
273
274    Parameters
275    ----------
276    schedule: str
277        The schedule frequency to be parsed into a starting datetime.
278
279    now: Optional[datetime], default None
280        If provided, use this value as a default if no start time is explicitly stated.
281
282    Returns
283    -------
284    A `datetime` object, either `now` or the datetime embedded in the schedule string.
285
286    Examples
287    --------
288    >>> parse_start_time('daily starting 2024-01-01')
289    datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
290    >>> parse_start_time('monthly starting 1st')
291    datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc)
292    >>> parse_start_time('hourly starting 00:30')
293    datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc)
294    """
295    from meerschaum.utils.misc import round_time
296    dateutil_parser = mrsm.attempt_import('dateutil.parser')
297    starting_parts = schedule.split(STARTING_KEYWORD)
298    starting_str = ('now' if len(starting_parts) == 1 else starting_parts[-1]).strip()
299    now = now or round_time(datetime.now(timezone.utc), timedelta(minutes=1))
300    try:
301        if starting_str == 'now':
302            starting_ts = now
303        elif starting_str.startswith('in '):
304            delta_vals = starting_str.replace('in ', '').split(' ', maxsplit=1)
305            delta_unit = delta_vals[-1].rstrip('s') + 's'
306            delta_num = float(delta_vals[0])
307            starting_ts = now + timedelta(**{delta_unit: delta_num})
308        elif 'tomorrow' in starting_str or 'today' in starting_str:
309            today = round_time(now, timedelta(days=1))
310            tomorrow = today + timedelta(days=1)
311            is_tomorrow = 'tomorrow' in starting_str
312            time_str = starting_str.replace('tomorrow', '').replace('today', '').strip()
313            time_ts = dateutil_parser.parse(time_str) if time_str else today
314            starting_ts = (
315                (tomorrow if is_tomorrow else today)
316                + timedelta(hours=time_ts.hour)
317                + timedelta(minutes=time_ts.minute)
318            )
319        else:
320            starting_ts = dateutil_parser.parse(starting_str)
321        schedule_parse_error = None
322    except Exception as e:
323        warn(f"Unable to parse starting time from '{starting_str}'.", stack=False)
324        schedule_parse_error = str(e)
325    if schedule_parse_error:
326        error(schedule_parse_error, ValueError, stack=False)
327    if not starting_ts.tzinfo:
328        starting_ts = starting_ts.replace(tzinfo=timezone.utc)
329    return starting_ts
330
331
332async def _stop_scheduler():
333    if _scheduler is None:
334        return
335    await _scheduler.stop()
336    await _scheduler.wait_until_stopped()
STARTING_KEYWORD: str = 'starting'
INTERVAL_UNITS: List[str] = ['months', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'years']
FREQUENCY_ALIASES: Dict[str, str] = {'daily': 'every 1 day', 'hourly': 'every 1 hour', 'minutely': 'every 1 minute', 'weekly': 'every 1 week', 'monthly': 'every 1 month', 'secondly': 'every 1 second', 'yearly': 'every 1 year'}
LOGIC_ALIASES: Dict[str, str] = {'and': '&', 'or': '|', ' through ': '-', ' thru ': '-', ' - ': '-', 'beginning': 'starting'}
CRON_DAYS_OF_WEEK: List[str] = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
CRON_DAYS_OF_WEEK_ALIASES: Dict[str, str] = {'monday': 'mon', 'tuesday': 'tue', 'tues': 'tue', 'wednesday': 'wed', 'thursday': 'thu', 'thurs': 'thu', 'friday': 'fri', 'saturday': 'sat', 'sunday': 'sun'}
CRON_MONTHS: List[str] = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
CRON_MONTHS_ALIASES: Dict[str, str] = {'january': 'jan', 'february': 'feb', 'march': 'mar', 'april': 'apr', 'may': 'may', 'june': 'jun', 'july': 'jul', 'august': 'aug', 'september': 'sep', 'october': 'oct', 'november': 'nov', 'december': 'dec'}
SCHEDULE_ALIASES: Dict[str, str] = {'daily': 'every 1 day', 'hourly': 'every 1 hour', 'minutely': 'every 1 minute', 'weekly': 'every 1 week', 'monthly': 'every 1 month', 'secondly': 'every 1 second', 'yearly': 'every 1 year', 'and': '&', 'or': '|', ' through ': '-', ' thru ': '-', ' - ': '-', 'beginning': 'starting', 'monday': 'mon', 'tuesday': 'tue', 'tues': 'tue', 'wednesday': 'wed', 'thursday': 'thu', 'thurs': 'thu', 'friday': 'fri', 'saturday': 'sat', 'sunday': 'sun', 'january': 'jan', 'february': 'feb', 'march': 'mar', 'april': 'apr', 'may': 'may', 'june': 'jun', 'july': 'jul', 'august': 'aug', 'september': 'sep', 'october': 'oct', 'november': 'nov', 'december': 'dec'}
def schedule_function( function: Callable[[Any], Any], schedule: str, *args, debug: bool = False, **kw) -> Tuple[bool, str]:
 75def schedule_function(
 76    function: Callable[[Any], Any],
 77    schedule: str,
 78    *args,
 79    debug: bool = False,
 80    **kw
 81) -> mrsm.SuccessTuple:
 82    """
 83    Block the process and execute the function intermittently according to the frequency.
 84    https://meerschaum.io/reference/background-jobs/#-schedules
 85
 86    Parameters
 87    ----------
 88    function: Callable[[Any], Any]
 89        The function to execute.
 90
 91    schedule: str
 92        The frequency schedule at which `function` should be executed (e.g. `'daily'`).
 93
 94    Returns
 95    -------
 96    A `SuccessTuple` upon exit.
 97    """
 98    import asyncio
 99    from meerschaum.utils.misc import filter_keywords, round_time
100
101    global _scheduler
102    kw['debug'] = debug
103    kw = filter_keywords(function, **kw)
104
105    _ = mrsm.attempt_import('attrs', lazy=False)
106    apscheduler = mrsm.attempt_import('apscheduler', lazy=False)
107    now = round_time(datetime.now(timezone.utc), timedelta(minutes=1))
108    trigger = parse_schedule(schedule, now=now)
109    _scheduler = apscheduler.AsyncScheduler(identity='mrsm-scheduler')
110    try:
111        loop = asyncio.get_running_loop()
112    except RuntimeError:
113        loop = asyncio.new_event_loop()
114
115    async def run_scheduler():
116        async with _scheduler:
117            job = await _scheduler.add_schedule(
118                function,
119                trigger,
120                **filter_keywords(
121                    _scheduler.add_schedule,
122                    args=args,
123                    kwargs=kw,
124                    max_running_jobs=1,
125                    conflict_policy=apscheduler.ConflictPolicy.replace,
126                )
127            )
128            try:
129                await _scheduler.run_until_stopped()
130            except (KeyboardInterrupt, SystemExit) as e:
131                await _stop_scheduler()
132                raise e
133
134    try:
135        loop.run_until_complete(run_scheduler())
136    except (KeyboardInterrupt, SystemExit):
137        loop.run_until_complete(_stop_scheduler())
138
139    return True, "Success"

Block the process and execute the function intermittently according to the frequency. https://meerschaum.io/reference/background-jobs/#-schedules

Parameters
  • function (Callable[[Any], Any]): The function to execute.
  • schedule (str): The frequency schedule at which function should be executed (e.g. 'daily').
Returns
  • A SuccessTuple upon exit.
def parse_schedule(schedule: str, now: Optional[datetime.datetime] = None):
142def parse_schedule(schedule: str, now: Optional[datetime] = None):
143    """
144    Parse a schedule string (e.g. 'daily') into a Trigger object.
145    """
146    from meerschaum.utils.misc import items_str, is_int, filter_keywords
147    (
148        apscheduler_triggers_cron,
149        apscheduler_triggers_interval,
150        apscheduler_triggers_calendarinterval,
151        apscheduler_triggers_combining,
152    ) = (
153        mrsm.attempt_import(
154            'apscheduler.triggers.cron',
155            'apscheduler.triggers.interval',
156            'apscheduler.triggers.calendarinterval',
157            'apscheduler.triggers.combining',
158            lazy = False,
159        )
160    )
161
162    starting_ts = parse_start_time(schedule, now=now)
163    schedule = schedule.split(STARTING_KEYWORD, maxsplit=1)[0].strip()
164    for alias_keyword, true_keyword in SCHEDULE_ALIASES.items():
165        schedule = schedule.replace(alias_keyword, true_keyword)
166
167    ### TODO Allow for combining `and` + `or` logic.
168    if '&' in schedule and '|' in schedule:
169        raise ValueError("Cannot accept both 'and' + 'or' logic in the schedule frequency.")
170
171    join_str = '|' if '|' in schedule else '&'
172    join_trigger = (
173        apscheduler_triggers_combining.OrTrigger
174        if join_str == '|'
175        else apscheduler_triggers_combining.AndTrigger
176    )
177    join_kwargs = {
178        'max_iterations': 1_000_000,
179        'threshold': 0,
180    } if join_str == '&' else {}
181
182    schedule_parts = [part.strip() for part in schedule.split(join_str)]
183    triggers = []
184
185    has_seconds = 'second' in schedule
186    has_minutes = 'minute' in schedule
187
188    for schedule_part in schedule_parts:
189
190        ### Intervals must begin with 'every' (after alias substitution).
191        if schedule_part.lower().startswith('every '):
192            schedule_num_str, schedule_unit = (
193                schedule_part[len('every '):].split(' ', maxsplit=1)
194            )
195            schedule_unit = schedule_unit.rstrip('s') + 's'
196            if schedule_unit not in INTERVAL_UNITS:
197                raise ValueError(
198                    f"Invalid interval '{schedule_unit}'.\n"
199                    + f"    Accepted values are {items_str(INTERVAL_UNITS)}."
200                )
201
202            schedule_num = (
203                int(schedule_num_str)
204                if is_int(schedule_num_str)
205                else float(schedule_num_str)
206            )
207
208            trigger = (
209                apscheduler_triggers_interval.IntervalTrigger(
210                    **filter_keywords(
211                        apscheduler_triggers_interval.IntervalTrigger.__init__,
212                        **{
213                            schedule_unit: schedule_num,
214                            'start_time': starting_ts,
215                            'start_date': starting_ts,
216                        }
217                    )
218                )
219                if schedule_unit not in ('months', 'years') else (
220                    apscheduler_triggers_calendarinterval.CalendarIntervalTrigger(
221                        **{
222                            schedule_unit: schedule_num,
223                            'start_date': starting_ts,
224                            'timezone': starting_ts.tzinfo,
225                        }
226                    )
227                )
228            )
229
230        ### Determine whether this is a pure cron string or a cron subset (e.g. 'may-aug')_.
231        else:
232            first_three_prefix = schedule_part[:3].lower()
233            first_four_prefix = schedule_part[:4].lower()
234            cron_kw = {}
235            if first_three_prefix in CRON_DAYS_OF_WEEK:
236                cron_kw['day_of_week'] = schedule_part
237            elif first_three_prefix in CRON_MONTHS:
238                cron_kw['month'] = schedule_part
239            elif is_int(first_four_prefix) and len(first_four_prefix) == 4:
240                cron_kw['year'] = int(first_four_prefix)
241            trigger = (
242                apscheduler_triggers_cron.CronTrigger(
243                    **{
244                        **cron_kw,
245                        'hour': '*',
246                        'minute': '*' if has_minutes else starting_ts.minute,
247                        'second': '*' if has_seconds else starting_ts.second,
248                        'start_time': starting_ts,
249                        'timezone': starting_ts.tzinfo,
250                    }
251                )
252                if cron_kw
253                else apscheduler_triggers_cron.CronTrigger.from_crontab(
254                    schedule_part, 
255                    timezone = starting_ts.tzinfo,
256                )
257            )
258            ### Explicitly set the `start_time` after building with `from_crontab`.
259            if trigger.start_time != starting_ts:
260                trigger.start_time = starting_ts
261
262        triggers.append(trigger)
263
264    return (
265        join_trigger(triggers, **join_kwargs)
266        if len(triggers) != 1
267        else triggers[0]
268    )

Parse a schedule string (e.g. 'daily') into a Trigger object.

def parse_start_time( schedule: str, now: Optional[datetime.datetime] = None) -> datetime.datetime:
271def parse_start_time(schedule: str, now: Optional[datetime] = None) -> datetime:
272    """
273    Return the datetime to use for the given schedule string.
274
275    Parameters
276    ----------
277    schedule: str
278        The schedule frequency to be parsed into a starting datetime.
279
280    now: Optional[datetime], default None
281        If provided, use this value as a default if no start time is explicitly stated.
282
283    Returns
284    -------
285    A `datetime` object, either `now` or the datetime embedded in the schedule string.
286
287    Examples
288    --------
289    >>> parse_start_time('daily starting 2024-01-01')
290    datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
291    >>> parse_start_time('monthly starting 1st')
292    datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc)
293    >>> parse_start_time('hourly starting 00:30')
294    datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc)
295    """
296    from meerschaum.utils.misc import round_time
297    dateutil_parser = mrsm.attempt_import('dateutil.parser')
298    starting_parts = schedule.split(STARTING_KEYWORD)
299    starting_str = ('now' if len(starting_parts) == 1 else starting_parts[-1]).strip()
300    now = now or round_time(datetime.now(timezone.utc), timedelta(minutes=1))
301    try:
302        if starting_str == 'now':
303            starting_ts = now
304        elif starting_str.startswith('in '):
305            delta_vals = starting_str.replace('in ', '').split(' ', maxsplit=1)
306            delta_unit = delta_vals[-1].rstrip('s') + 's'
307            delta_num = float(delta_vals[0])
308            starting_ts = now + timedelta(**{delta_unit: delta_num})
309        elif 'tomorrow' in starting_str or 'today' in starting_str:
310            today = round_time(now, timedelta(days=1))
311            tomorrow = today + timedelta(days=1)
312            is_tomorrow = 'tomorrow' in starting_str
313            time_str = starting_str.replace('tomorrow', '').replace('today', '').strip()
314            time_ts = dateutil_parser.parse(time_str) if time_str else today
315            starting_ts = (
316                (tomorrow if is_tomorrow else today)
317                + timedelta(hours=time_ts.hour)
318                + timedelta(minutes=time_ts.minute)
319            )
320        else:
321            starting_ts = dateutil_parser.parse(starting_str)
322        schedule_parse_error = None
323    except Exception as e:
324        warn(f"Unable to parse starting time from '{starting_str}'.", stack=False)
325        schedule_parse_error = str(e)
326    if schedule_parse_error:
327        error(schedule_parse_error, ValueError, stack=False)
328    if not starting_ts.tzinfo:
329        starting_ts = starting_ts.replace(tzinfo=timezone.utc)
330    return starting_ts

Return the datetime to use for the given schedule string.

Parameters
  • schedule (str): The schedule frequency to be parsed into a starting datetime.
  • now (Optional[datetime], default None): If provided, use this value as a default if no start time is explicitly stated.
Returns
  • A datetime object, either now or the datetime embedded in the schedule string.
Examples
>>> parse_start_time('daily starting 2024-01-01')
datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
>>> parse_start_time('monthly starting 1st')
datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc)
>>> parse_start_time('hourly starting 00:30')
datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc)