meerschaum.utils.schedule

Schedule processes and threads.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Schedule processes and threads.
  7"""
  8
  9from __future__ import annotations
 10import signal
 11import traceback
 12from datetime import datetime, timezone, timedelta
 13import meerschaum as mrsm
 14from meerschaum.utils.typing import Callable, Any, Optional, List, Dict
 15from meerschaum.utils.warnings import warn, error
 16
 17
 18STARTING_KEYWORD: str = 'starting'
 19INTERVAL_UNITS: List[str] = ['months', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'years']
 20FREQUENCY_ALIASES: Dict[str, str] = {
 21    'daily': 'every 1 day',
 22    'hourly': 'every 1 hour',
 23    'minutely': 'every 1 minute',
 24    'weekly': 'every 1 week',
 25    'monthly': 'every 1 month',
 26    'secondly': 'every 1 second',
 27    'yearly': 'every 1 year',
 28}
 29LOGIC_ALIASES: Dict[str, str] = {
 30    'and': '&',
 31    'or': '|',
 32    ' through ': '-',
 33    ' thru ': '-',
 34    ' - ': '-',
 35    'beginning': STARTING_KEYWORD,
 36}
 37CRON_DAYS_OF_WEEK: List[str] = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
 38CRON_DAYS_OF_WEEK_ALIASES: Dict[str, str] = {
 39    'monday': 'mon',
 40    'tuesday': 'tue',
 41    'tues': 'tue',
 42    'wednesday': 'wed',
 43    'thursday': 'thu',
 44    'thurs': 'thu',
 45    'friday': 'fri',
 46    'saturday': 'sat',
 47    'sunday': 'sun',
 48}
 49CRON_MONTHS: List[str] = [
 50    'jan', 'feb', 'mar', 'apr', 'may', 'jun',
 51    'jul', 'aug', 'sep', 'oct', 'nov', 'dec',
 52]
 53CRON_MONTHS_ALIASES: Dict[str, str] = {
 54    'january': 'jan',
 55    'february': 'feb',
 56    'march': 'mar',
 57    'april': 'apr',
 58    'may': 'may',
 59    'june': 'jun',
 60    'july': 'jul',
 61    'august': 'aug',
 62    'september': 'sep',
 63    'october': 'oct',
 64    'november': 'nov',
 65    'december': 'dec',
 66}
 67SCHEDULE_ALIASES: Dict[str, str] = {
 68    **FREQUENCY_ALIASES,
 69    **LOGIC_ALIASES,
 70    **CRON_DAYS_OF_WEEK_ALIASES,
 71    **CRON_MONTHS_ALIASES,
 72}
 73
 74_scheduler = None
 75def schedule_function(
 76    function: Callable[[Any], Any],
 77    schedule: str,
 78    *args,
 79    debug: bool = False,
 80    **kw
 81) -> mrsm.SuccessTuple:
 82    """
 83    Block the process and execute the function intermittently according to the frequency.
 84    https://meerschaum.io/reference/background-jobs/#-schedules
 85
 86    Parameters
 87    ----------
 88    function: Callable[[Any], Any]
 89        The function to execute.
 90
 91    schedule: str
 92        The frequency schedule at which `function` should be executed (e.g. `'daily'`).
 93
 94    Returns
 95    -------
 96    A `SuccessTuple` upon exit.
 97    """
 98    import asyncio
 99    from meerschaum.utils.misc import filter_keywords
100
101    global _scheduler
102    kw['debug'] = debug
103    kw = filter_keywords(function, **kw)
104
105    _ = mrsm.attempt_import('attrs', lazy=False)
106    apscheduler = mrsm.attempt_import('apscheduler', lazy=False)
107    now = datetime.now(timezone.utc)
108    trigger = parse_schedule(schedule, now=now)
109    _scheduler = apscheduler.AsyncScheduler(identity='mrsm-scheduler')
110    try:
111        loop = asyncio.get_running_loop()
112    except RuntimeError:
113        loop = asyncio.new_event_loop()
114
115    async def run_scheduler():
116        async with _scheduler:
117            job = await _scheduler.add_schedule(
118                function,
119                trigger,
120                **filter_keywords(
121                    _scheduler.add_schedule,
122                    args=args,
123                    kwargs=kw,
124                    max_running_jobs=1,
125                    conflict_policy=apscheduler.ConflictPolicy.replace,
126                )
127            )
128            try:
129                await _scheduler.run_until_stopped()
130            except (KeyboardInterrupt, SystemExit) as e:
131                await _stop_scheduler()
132                raise e
133
134    try:
135        loop.run_until_complete(run_scheduler())
136    except (KeyboardInterrupt, SystemExit):
137        loop.run_until_complete(_stop_scheduler())
138
139    return True, "Success"
140
141
142def parse_schedule(schedule: str, now: Optional[datetime] = None):
143    """
144    Parse a schedule string (e.g. 'daily') into a Trigger object.
145    """
146    from meerschaum.utils.misc import items_str, is_int, filter_keywords
147    (
148        apscheduler_triggers_cron,
149        apscheduler_triggers_interval,
150        apscheduler_triggers_calendarinterval,
151        apscheduler_triggers_combining,
152    ) = (
153        mrsm.attempt_import(
154            'apscheduler.triggers.cron',
155            'apscheduler.triggers.interval',
156            'apscheduler.triggers.calendarinterval',
157            'apscheduler.triggers.combining',
158            lazy = False,
159        )
160    )
161
162    starting_ts = parse_start_time(schedule, now=now)
163    schedule = schedule.split(STARTING_KEYWORD, maxsplit=1)[0].strip()
164    for alias_keyword, true_keyword in SCHEDULE_ALIASES.items():
165        schedule = schedule.replace(alias_keyword, true_keyword)
166
167    ### TODO Allow for combining `and` + `or` logic.
168    if '&' in schedule and '|' in schedule:
169        raise ValueError("Cannot accept both 'and' + 'or' logic in the schedule frequency.")
170
171    join_str = '|' if '|' in schedule else '&'
172    join_trigger = (
173        apscheduler_triggers_combining.OrTrigger
174        if join_str == '|'
175        else apscheduler_triggers_combining.AndTrigger
176    )
177    join_kwargs = {
178        'max_iterations': 1_000_000,
179        'threshold': 0,
180    } if join_str == '&' else {}
181
182    schedule_parts = [part.strip() for part in schedule.split(join_str)]
183    triggers = []
184
185    has_seconds = 'second' in schedule
186    has_minutes = 'minute' in schedule
187
188    for schedule_part in schedule_parts:
189
190        ### Intervals must begin with 'every' (after alias substitution).
191        if schedule_part.lower().startswith('every '):
192            schedule_num_str, schedule_unit = (
193                schedule_part[len('every '):].split(' ', maxsplit=1)
194            )
195            schedule_unit = schedule_unit.rstrip('s') + 's'
196            if schedule_unit not in INTERVAL_UNITS:
197                raise ValueError(
198                    f"Invalid interval '{schedule_unit}'.\n"
199                    + f"    Accepted values are {items_str(INTERVAL_UNITS)}."
200                )
201
202            schedule_num = (
203                int(schedule_num_str)
204                if is_int(schedule_num_str)
205                else float(schedule_num_str)
206            )
207
208            trigger = (
209                apscheduler_triggers_interval.IntervalTrigger(
210                    **filter_keywords(
211                        apscheduler_triggers_interval.IntervalTrigger.__init__,
212                        **{
213                            schedule_unit: schedule_num,
214                            'start_time': starting_ts,
215                            'start_date': starting_ts,
216                        }
217                    )
218                )
219                if schedule_unit not in ('months', 'years') else (
220                    apscheduler_triggers_calendarinterval.CalendarIntervalTrigger(
221                        **{
222                            schedule_unit: schedule_num,
223                            'start_date': starting_ts,
224                            'timezone': starting_ts.tzinfo,
225                        }
226                    )
227                )
228            )
229
230        ### Determine whether this is a pure cron string or a cron subset (e.g. 'may-aug')_.
231        else:
232            first_three_prefix = schedule_part[:3].lower()
233            first_four_prefix = schedule_part[:4].lower()
234            cron_kw = {}
235            if first_three_prefix in CRON_DAYS_OF_WEEK:
236                cron_kw['day_of_week'] = schedule_part
237            elif first_three_prefix in CRON_MONTHS:
238                cron_kw['month'] = schedule_part
239            elif is_int(first_four_prefix) and len(first_four_prefix) == 4:
240                cron_kw['year'] = int(first_four_prefix)
241            trigger = (
242                apscheduler_triggers_cron.CronTrigger(
243                    **{
244                        **cron_kw,
245                        'hour': '*',
246                        'minute': '*' if has_minutes else starting_ts.minute,
247                        'second': '*' if has_seconds else starting_ts.second,
248                        'start_time': starting_ts,
249                        'timezone': starting_ts.tzinfo,
250                    }
251                )
252                if cron_kw
253                else apscheduler_triggers_cron.CronTrigger.from_crontab(
254                    schedule_part, 
255                    timezone = starting_ts.tzinfo,
256                )
257            )
258            ### Explicitly set the `start_time` after building with `from_crontab`.
259            if trigger.start_time != starting_ts:
260                trigger.start_time = starting_ts
261
262        triggers.append(trigger)
263
264    return (
265        join_trigger(triggers, **join_kwargs)
266        if len(triggers) != 1
267        else triggers[0]
268    )
269
270
271def parse_start_time(schedule: str, now: Optional[datetime] = None) -> datetime:
272    """
273    Return the datetime to use for the given schedule string.
274
275    Parameters
276    ----------
277    schedule: str
278        The schedule frequency to be parsed into a starting datetime.
279
280    now: Optional[datetime], default None
281        If provided, use this value as a default if no start time is explicitly stated.
282
283    Returns
284    -------
285    A `datetime` object, either `now` or the datetime embedded in the schedule string.
286
287    Examples
288    --------
289    >>> parse_start_time('daily starting 2024-01-01')
290    datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
291    >>> parse_start_time('monthly starting 1st')
292    datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc)
293    >>> parse_start_time('hourly starting 00:30')
294    datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc)
295    """
296    from meerschaum.utils.dtypes import round_time
297    dateutil_parser = mrsm.attempt_import('dateutil.parser')
298    starting_parts = schedule.split(STARTING_KEYWORD)
299    starting_str = ('now' if len(starting_parts) == 1 else starting_parts[-1]).strip()
300    now = now or datetime.now(timezone.utc)
301    try:
302        if starting_str == 'now':
303            starting_ts = now
304        elif starting_str.startswith('in '):
305            delta_vals = starting_str.replace('in ', '').split(' ', maxsplit=1)
306            delta_unit = delta_vals[-1].rstrip('s') + 's'
307            delta_num = float(delta_vals[0])
308            starting_ts = now + timedelta(**{delta_unit: delta_num})
309        elif 'tomorrow' in starting_str or 'today' in starting_str:
310            today = round_time(now, timedelta(days=1))
311            tomorrow = today + timedelta(days=1)
312            is_tomorrow = 'tomorrow' in starting_str
313            time_str = starting_str.replace('tomorrow', '').replace('today', '').strip()
314            time_ts = dateutil_parser.parse(time_str) if time_str else today
315            starting_ts = (
316                (tomorrow if is_tomorrow else today)
317                + timedelta(hours=time_ts.hour)
318                + timedelta(minutes=time_ts.minute)
319            )
320        else:
321            starting_ts = dateutil_parser.parse(starting_str)
322        schedule_parse_error = None
323    except Exception as e:
324        warn(f"Unable to parse starting time from '{starting_str}'.", stack=False)
325        schedule_parse_error = str(e)
326    if schedule_parse_error:
327        error(schedule_parse_error, ValueError, stack=False)
328    if not starting_ts.tzinfo:
329        starting_ts = starting_ts.replace(tzinfo=timezone.utc)
330    return starting_ts
331
332
333async def _stop_scheduler():
334    if _scheduler is None:
335        return
336    await _scheduler.stop()
337    await _scheduler.wait_until_stopped()
STARTING_KEYWORD: str = 'starting'
INTERVAL_UNITS: List[str] = ['months', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'years']
FREQUENCY_ALIASES: Dict[str, str] = {'daily': 'every 1 day', 'hourly': 'every 1 hour', 'minutely': 'every 1 minute', 'weekly': 'every 1 week', 'monthly': 'every 1 month', 'secondly': 'every 1 second', 'yearly': 'every 1 year'}
LOGIC_ALIASES: Dict[str, str] = {'and': '&', 'or': '|', ' through ': '-', ' thru ': '-', ' - ': '-', 'beginning': 'starting'}
CRON_DAYS_OF_WEEK: List[str] = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
CRON_DAYS_OF_WEEK_ALIASES: Dict[str, str] = {'monday': 'mon', 'tuesday': 'tue', 'tues': 'tue', 'wednesday': 'wed', 'thursday': 'thu', 'thurs': 'thu', 'friday': 'fri', 'saturday': 'sat', 'sunday': 'sun'}
CRON_MONTHS: List[str] = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
CRON_MONTHS_ALIASES: Dict[str, str] = {'january': 'jan', 'february': 'feb', 'march': 'mar', 'april': 'apr', 'may': 'may', 'june': 'jun', 'july': 'jul', 'august': 'aug', 'september': 'sep', 'october': 'oct', 'november': 'nov', 'december': 'dec'}
SCHEDULE_ALIASES: Dict[str, str] = {'daily': 'every 1 day', 'hourly': 'every 1 hour', 'minutely': 'every 1 minute', 'weekly': 'every 1 week', 'monthly': 'every 1 month', 'secondly': 'every 1 second', 'yearly': 'every 1 year', 'and': '&', 'or': '|', ' through ': '-', ' thru ': '-', ' - ': '-', 'beginning': 'starting', 'monday': 'mon', 'tuesday': 'tue', 'tues': 'tue', 'wednesday': 'wed', 'thursday': 'thu', 'thurs': 'thu', 'friday': 'fri', 'saturday': 'sat', 'sunday': 'sun', 'january': 'jan', 'february': 'feb', 'march': 'mar', 'april': 'apr', 'may': 'may', 'june': 'jun', 'july': 'jul', 'august': 'aug', 'september': 'sep', 'october': 'oct', 'november': 'nov', 'december': 'dec'}
def schedule_function( function: Callable[[Any], Any], schedule: str, *args, debug: bool = False, **kw) -> Tuple[bool, str]:
 76def schedule_function(
 77    function: Callable[[Any], Any],
 78    schedule: str,
 79    *args,
 80    debug: bool = False,
 81    **kw
 82) -> mrsm.SuccessTuple:
 83    """
 84    Block the process and execute the function intermittently according to the frequency.
 85    https://meerschaum.io/reference/background-jobs/#-schedules
 86
 87    Parameters
 88    ----------
 89    function: Callable[[Any], Any]
 90        The function to execute.
 91
 92    schedule: str
 93        The frequency schedule at which `function` should be executed (e.g. `'daily'`).
 94
 95    Returns
 96    -------
 97    A `SuccessTuple` upon exit.
 98    """
 99    import asyncio
100    from meerschaum.utils.misc import filter_keywords
101
102    global _scheduler
103    kw['debug'] = debug
104    kw = filter_keywords(function, **kw)
105
106    _ = mrsm.attempt_import('attrs', lazy=False)
107    apscheduler = mrsm.attempt_import('apscheduler', lazy=False)
108    now = datetime.now(timezone.utc)
109    trigger = parse_schedule(schedule, now=now)
110    _scheduler = apscheduler.AsyncScheduler(identity='mrsm-scheduler')
111    try:
112        loop = asyncio.get_running_loop()
113    except RuntimeError:
114        loop = asyncio.new_event_loop()
115
116    async def run_scheduler():
117        async with _scheduler:
118            job = await _scheduler.add_schedule(
119                function,
120                trigger,
121                **filter_keywords(
122                    _scheduler.add_schedule,
123                    args=args,
124                    kwargs=kw,
125                    max_running_jobs=1,
126                    conflict_policy=apscheduler.ConflictPolicy.replace,
127                )
128            )
129            try:
130                await _scheduler.run_until_stopped()
131            except (KeyboardInterrupt, SystemExit) as e:
132                await _stop_scheduler()
133                raise e
134
135    try:
136        loop.run_until_complete(run_scheduler())
137    except (KeyboardInterrupt, SystemExit):
138        loop.run_until_complete(_stop_scheduler())
139
140    return True, "Success"

Block the process and execute the function intermittently according to the frequency. https://meerschaum.io/reference/background-jobs/#-schedules

Parameters
  • function (Callable[[Any], Any]): The function to execute.
  • schedule (str): The frequency schedule at which function should be executed (e.g. 'daily').
Returns
  • A SuccessTuple upon exit.
def parse_schedule(schedule: str, now: Optional[datetime.datetime] = None):
143def parse_schedule(schedule: str, now: Optional[datetime] = None):
144    """
145    Parse a schedule string (e.g. 'daily') into a Trigger object.
146    """
147    from meerschaum.utils.misc import items_str, is_int, filter_keywords
148    (
149        apscheduler_triggers_cron,
150        apscheduler_triggers_interval,
151        apscheduler_triggers_calendarinterval,
152        apscheduler_triggers_combining,
153    ) = (
154        mrsm.attempt_import(
155            'apscheduler.triggers.cron',
156            'apscheduler.triggers.interval',
157            'apscheduler.triggers.calendarinterval',
158            'apscheduler.triggers.combining',
159            lazy = False,
160        )
161    )
162
163    starting_ts = parse_start_time(schedule, now=now)
164    schedule = schedule.split(STARTING_KEYWORD, maxsplit=1)[0].strip()
165    for alias_keyword, true_keyword in SCHEDULE_ALIASES.items():
166        schedule = schedule.replace(alias_keyword, true_keyword)
167
168    ### TODO Allow for combining `and` + `or` logic.
169    if '&' in schedule and '|' in schedule:
170        raise ValueError("Cannot accept both 'and' + 'or' logic in the schedule frequency.")
171
172    join_str = '|' if '|' in schedule else '&'
173    join_trigger = (
174        apscheduler_triggers_combining.OrTrigger
175        if join_str == '|'
176        else apscheduler_triggers_combining.AndTrigger
177    )
178    join_kwargs = {
179        'max_iterations': 1_000_000,
180        'threshold': 0,
181    } if join_str == '&' else {}
182
183    schedule_parts = [part.strip() for part in schedule.split(join_str)]
184    triggers = []
185
186    has_seconds = 'second' in schedule
187    has_minutes = 'minute' in schedule
188
189    for schedule_part in schedule_parts:
190
191        ### Intervals must begin with 'every' (after alias substitution).
192        if schedule_part.lower().startswith('every '):
193            schedule_num_str, schedule_unit = (
194                schedule_part[len('every '):].split(' ', maxsplit=1)
195            )
196            schedule_unit = schedule_unit.rstrip('s') + 's'
197            if schedule_unit not in INTERVAL_UNITS:
198                raise ValueError(
199                    f"Invalid interval '{schedule_unit}'.\n"
200                    + f"    Accepted values are {items_str(INTERVAL_UNITS)}."
201                )
202
203            schedule_num = (
204                int(schedule_num_str)
205                if is_int(schedule_num_str)
206                else float(schedule_num_str)
207            )
208
209            trigger = (
210                apscheduler_triggers_interval.IntervalTrigger(
211                    **filter_keywords(
212                        apscheduler_triggers_interval.IntervalTrigger.__init__,
213                        **{
214                            schedule_unit: schedule_num,
215                            'start_time': starting_ts,
216                            'start_date': starting_ts,
217                        }
218                    )
219                )
220                if schedule_unit not in ('months', 'years') else (
221                    apscheduler_triggers_calendarinterval.CalendarIntervalTrigger(
222                        **{
223                            schedule_unit: schedule_num,
224                            'start_date': starting_ts,
225                            'timezone': starting_ts.tzinfo,
226                        }
227                    )
228                )
229            )
230
231        ### Determine whether this is a pure cron string or a cron subset (e.g. 'may-aug')_.
232        else:
233            first_three_prefix = schedule_part[:3].lower()
234            first_four_prefix = schedule_part[:4].lower()
235            cron_kw = {}
236            if first_three_prefix in CRON_DAYS_OF_WEEK:
237                cron_kw['day_of_week'] = schedule_part
238            elif first_three_prefix in CRON_MONTHS:
239                cron_kw['month'] = schedule_part
240            elif is_int(first_four_prefix) and len(first_four_prefix) == 4:
241                cron_kw['year'] = int(first_four_prefix)
242            trigger = (
243                apscheduler_triggers_cron.CronTrigger(
244                    **{
245                        **cron_kw,
246                        'hour': '*',
247                        'minute': '*' if has_minutes else starting_ts.minute,
248                        'second': '*' if has_seconds else starting_ts.second,
249                        'start_time': starting_ts,
250                        'timezone': starting_ts.tzinfo,
251                    }
252                )
253                if cron_kw
254                else apscheduler_triggers_cron.CronTrigger.from_crontab(
255                    schedule_part, 
256                    timezone = starting_ts.tzinfo,
257                )
258            )
259            ### Explicitly set the `start_time` after building with `from_crontab`.
260            if trigger.start_time != starting_ts:
261                trigger.start_time = starting_ts
262
263        triggers.append(trigger)
264
265    return (
266        join_trigger(triggers, **join_kwargs)
267        if len(triggers) != 1
268        else triggers[0]
269    )

Parse a schedule string (e.g. 'daily') into a Trigger object.

def parse_start_time( schedule: str, now: Optional[datetime.datetime] = None) -> datetime.datetime:
272def parse_start_time(schedule: str, now: Optional[datetime] = None) -> datetime:
273    """
274    Return the datetime to use for the given schedule string.
275
276    Parameters
277    ----------
278    schedule: str
279        The schedule frequency to be parsed into a starting datetime.
280
281    now: Optional[datetime], default None
282        If provided, use this value as a default if no start time is explicitly stated.
283
284    Returns
285    -------
286    A `datetime` object, either `now` or the datetime embedded in the schedule string.
287
288    Examples
289    --------
290    >>> parse_start_time('daily starting 2024-01-01')
291    datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
292    >>> parse_start_time('monthly starting 1st')
293    datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc)
294    >>> parse_start_time('hourly starting 00:30')
295    datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc)
296    """
297    from meerschaum.utils.dtypes import round_time
298    dateutil_parser = mrsm.attempt_import('dateutil.parser')
299    starting_parts = schedule.split(STARTING_KEYWORD)
300    starting_str = ('now' if len(starting_parts) == 1 else starting_parts[-1]).strip()
301    now = now or datetime.now(timezone.utc)
302    try:
303        if starting_str == 'now':
304            starting_ts = now
305        elif starting_str.startswith('in '):
306            delta_vals = starting_str.replace('in ', '').split(' ', maxsplit=1)
307            delta_unit = delta_vals[-1].rstrip('s') + 's'
308            delta_num = float(delta_vals[0])
309            starting_ts = now + timedelta(**{delta_unit: delta_num})
310        elif 'tomorrow' in starting_str or 'today' in starting_str:
311            today = round_time(now, timedelta(days=1))
312            tomorrow = today + timedelta(days=1)
313            is_tomorrow = 'tomorrow' in starting_str
314            time_str = starting_str.replace('tomorrow', '').replace('today', '').strip()
315            time_ts = dateutil_parser.parse(time_str) if time_str else today
316            starting_ts = (
317                (tomorrow if is_tomorrow else today)
318                + timedelta(hours=time_ts.hour)
319                + timedelta(minutes=time_ts.minute)
320            )
321        else:
322            starting_ts = dateutil_parser.parse(starting_str)
323        schedule_parse_error = None
324    except Exception as e:
325        warn(f"Unable to parse starting time from '{starting_str}'.", stack=False)
326        schedule_parse_error = str(e)
327    if schedule_parse_error:
328        error(schedule_parse_error, ValueError, stack=False)
329    if not starting_ts.tzinfo:
330        starting_ts = starting_ts.replace(tzinfo=timezone.utc)
331    return starting_ts

Return the datetime to use for the given schedule string.

Parameters
  • schedule (str): The schedule frequency to be parsed into a starting datetime.
  • now (Optional[datetime], default None): If provided, use this value as a default if no start time is explicitly stated.
Returns
  • A datetime object, either now or the datetime embedded in the schedule string.
Examples
>>> parse_start_time('daily starting 2024-01-01')
datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
>>> parse_start_time('monthly starting 1st')
datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc)
>>> parse_start_time('hourly starting 00:30')
datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc)