meerschaum.utils.schedule
Schedule processes and threads.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Schedule processes and threads. 7""" 8 9from __future__ import annotations 10import signal 11import traceback 12from datetime import datetime, timezone, timedelta 13import meerschaum as mrsm 14from meerschaum.utils.typing import Callable, Any, Optional, List, Dict 15from meerschaum.utils.warnings import warn, error 16 17STARTING_KEYWORD: str = 'starting' 18INTERVAL_UNITS: List[str] = ['months', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'years'] 19FREQUENCY_ALIASES: Dict[str, str] = { 20 'daily': 'every 1 day', 21 'hourly': 'every 1 hour', 22 'minutely': 'every 1 minute', 23 'weekly': 'every 1 week', 24 'monthly': 'every 1 month', 25 'secondly': 'every 1 second', 26 'yearly': 'every 1 year', 27} 28LOGIC_ALIASES: Dict[str, str] = { 29 'and': '&', 30 'or': '|', 31 ' through ': '-', 32 ' thru ': '-', 33 ' - ': '-', 34 'beginning': STARTING_KEYWORD, 35} 36CRON_DAYS_OF_WEEK: List[str] = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'] 37CRON_DAYS_OF_WEEK_ALIASES: Dict[str, str] = { 38 'monday': 'mon', 39 'tuesday': 'tue', 40 'tues': 'tue', 41 'wednesday': 'wed', 42 'thursday': 'thu', 43 'thurs': 'thu', 44 'friday': 'fri', 45 'saturday': 'sat', 46 'sunday': 'sun', 47} 48CRON_MONTHS: List[str] = [ 49 'jan', 'feb', 'mar', 'apr', 'may', 'jun', 50 'jul', 'aug', 'sep', 'oct', 'nov', 'dec', 51] 52CRON_MONTHS_ALIASES: Dict[str, str] = { 53 'january': 'jan', 54 'february': 'feb', 55 'march': 'mar', 56 'april': 'apr', 57 'may': 'may', 58 'june': 'jun', 59 'july': 'jul', 60 'august': 'aug', 61 'september': 'sep', 62 'october': 'oct', 63 'november': 'nov', 64 'december': 'dec', 65} 66SCHEDULE_ALIASES: Dict[str, str] = { 67 **FREQUENCY_ALIASES, 68 **LOGIC_ALIASES, 69 **CRON_DAYS_OF_WEEK_ALIASES, 70 **CRON_MONTHS_ALIASES, 71} 72 73_scheduler = None 74def schedule_function( 75 function: Callable[[Any], Any], 76 schedule: str, 77 *args, 78 debug: bool = False, 79 **kw 80) -> mrsm.SuccessTuple: 81 """ 82 Block the process and execute the function intermittently according to the frequency. 83 https://meerschaum.io/reference/background-jobs/#-schedules 84 85 Parameters 86 ---------- 87 function: Callable[[Any], Any] 88 The function to execute. 89 90 schedule: str 91 The frequency schedule at which `function` should be executed (e.g. `'daily'`). 92 93 Returns 94 ------- 95 A `SuccessTuple` upon exit. 96 """ 97 import asyncio 98 from meerschaum.utils.misc import filter_keywords, round_time 99 100 global _scheduler 101 kw['debug'] = debug 102 kw = filter_keywords(function, **kw) 103 104 _ = mrsm.attempt_import('attrs', lazy=False) 105 apscheduler = mrsm.attempt_import('apscheduler', lazy=False) 106 now = round_time(datetime.now(timezone.utc), timedelta(minutes=1)) 107 trigger = parse_schedule(schedule, now=now) 108 _scheduler = apscheduler.AsyncScheduler(identity='mrsm-scheduler') 109 try: 110 loop = asyncio.get_running_loop() 111 except RuntimeError: 112 loop = asyncio.new_event_loop() 113 114 async def run_scheduler(): 115 async with _scheduler: 116 job = await _scheduler.add_schedule( 117 function, 118 trigger, 119 **filter_keywords( 120 _scheduler.add_schedule, 121 args=args, 122 kwargs=kw, 123 max_running_jobs=1, 124 conflict_policy=apscheduler.ConflictPolicy.replace, 125 ) 126 ) 127 try: 128 await _scheduler.run_until_stopped() 129 except (KeyboardInterrupt, SystemExit) as e: 130 await _stop_scheduler() 131 raise e 132 133 try: 134 loop.run_until_complete(run_scheduler()) 135 except (KeyboardInterrupt, SystemExit): 136 loop.run_until_complete(_stop_scheduler()) 137 138 return True, "Success" 139 140 141def parse_schedule(schedule: str, now: Optional[datetime] = None): 142 """ 143 Parse a schedule string (e.g. 'daily') into a Trigger object. 144 """ 145 from meerschaum.utils.misc import items_str, is_int, filter_keywords 146 ( 147 apscheduler_triggers_cron, 148 apscheduler_triggers_interval, 149 apscheduler_triggers_calendarinterval, 150 apscheduler_triggers_combining, 151 ) = ( 152 mrsm.attempt_import( 153 'apscheduler.triggers.cron', 154 'apscheduler.triggers.interval', 155 'apscheduler.triggers.calendarinterval', 156 'apscheduler.triggers.combining', 157 lazy = False, 158 ) 159 ) 160 161 starting_ts = parse_start_time(schedule, now=now) 162 schedule = schedule.split(STARTING_KEYWORD, maxsplit=1)[0].strip() 163 for alias_keyword, true_keyword in SCHEDULE_ALIASES.items(): 164 schedule = schedule.replace(alias_keyword, true_keyword) 165 166 ### TODO Allow for combining `and` + `or` logic. 167 if '&' in schedule and '|' in schedule: 168 raise ValueError("Cannot accept both 'and' + 'or' logic in the schedule frequency.") 169 170 join_str = '|' if '|' in schedule else '&' 171 join_trigger = ( 172 apscheduler_triggers_combining.OrTrigger 173 if join_str == '|' 174 else apscheduler_triggers_combining.AndTrigger 175 ) 176 join_kwargs = { 177 'max_iterations': 1_000_000, 178 'threshold': 0, 179 } if join_str == '&' else {} 180 181 schedule_parts = [part.strip() for part in schedule.split(join_str)] 182 triggers = [] 183 184 has_seconds = 'second' in schedule 185 has_minutes = 'minute' in schedule 186 187 for schedule_part in schedule_parts: 188 189 ### Intervals must begin with 'every' (after alias substitution). 190 if schedule_part.lower().startswith('every '): 191 schedule_num_str, schedule_unit = ( 192 schedule_part[len('every '):].split(' ', maxsplit=1) 193 ) 194 schedule_unit = schedule_unit.rstrip('s') + 's' 195 if schedule_unit not in INTERVAL_UNITS: 196 raise ValueError( 197 f"Invalid interval '{schedule_unit}'.\n" 198 + f" Accepted values are {items_str(INTERVAL_UNITS)}." 199 ) 200 201 schedule_num = ( 202 int(schedule_num_str) 203 if is_int(schedule_num_str) 204 else float(schedule_num_str) 205 ) 206 207 trigger = ( 208 apscheduler_triggers_interval.IntervalTrigger( 209 **filter_keywords( 210 apscheduler_triggers_interval.IntervalTrigger.__init__, 211 **{ 212 schedule_unit: schedule_num, 213 'start_time': starting_ts, 214 'start_date': starting_ts, 215 } 216 ) 217 ) 218 if schedule_unit not in ('months', 'years') else ( 219 apscheduler_triggers_calendarinterval.CalendarIntervalTrigger( 220 **{ 221 schedule_unit: schedule_num, 222 'start_date': starting_ts, 223 'timezone': starting_ts.tzinfo, 224 } 225 ) 226 ) 227 ) 228 229 ### Determine whether this is a pure cron string or a cron subset (e.g. 'may-aug')_. 230 else: 231 first_three_prefix = schedule_part[:3].lower() 232 first_four_prefix = schedule_part[:4].lower() 233 cron_kw = {} 234 if first_three_prefix in CRON_DAYS_OF_WEEK: 235 cron_kw['day_of_week'] = schedule_part 236 elif first_three_prefix in CRON_MONTHS: 237 cron_kw['month'] = schedule_part 238 elif is_int(first_four_prefix) and len(first_four_prefix) == 4: 239 cron_kw['year'] = int(first_four_prefix) 240 trigger = ( 241 apscheduler_triggers_cron.CronTrigger( 242 **{ 243 **cron_kw, 244 'hour': '*', 245 'minute': '*' if has_minutes else starting_ts.minute, 246 'second': '*' if has_seconds else starting_ts.second, 247 'start_time': starting_ts, 248 'timezone': starting_ts.tzinfo, 249 } 250 ) 251 if cron_kw 252 else apscheduler_triggers_cron.CronTrigger.from_crontab( 253 schedule_part, 254 timezone = starting_ts.tzinfo, 255 ) 256 ) 257 ### Explicitly set the `start_time` after building with `from_crontab`. 258 if trigger.start_time != starting_ts: 259 trigger.start_time = starting_ts 260 261 triggers.append(trigger) 262 263 return ( 264 join_trigger(triggers, **join_kwargs) 265 if len(triggers) != 1 266 else triggers[0] 267 ) 268 269 270def parse_start_time(schedule: str, now: Optional[datetime] = None) -> datetime: 271 """ 272 Return the datetime to use for the given schedule string. 273 274 Parameters 275 ---------- 276 schedule: str 277 The schedule frequency to be parsed into a starting datetime. 278 279 now: Optional[datetime], default None 280 If provided, use this value as a default if no start time is explicitly stated. 281 282 Returns 283 ------- 284 A `datetime` object, either `now` or the datetime embedded in the schedule string. 285 286 Examples 287 -------- 288 >>> parse_start_time('daily starting 2024-01-01') 289 datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 290 >>> parse_start_time('monthly starting 1st') 291 datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc) 292 >>> parse_start_time('hourly starting 00:30') 293 datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc) 294 """ 295 from meerschaum.utils.misc import round_time 296 dateutil_parser = mrsm.attempt_import('dateutil.parser') 297 starting_parts = schedule.split(STARTING_KEYWORD) 298 starting_str = ('now' if len(starting_parts) == 1 else starting_parts[-1]).strip() 299 now = now or round_time(datetime.now(timezone.utc), timedelta(minutes=1)) 300 try: 301 if starting_str == 'now': 302 starting_ts = now 303 elif starting_str.startswith('in '): 304 delta_vals = starting_str.replace('in ', '').split(' ', maxsplit=1) 305 delta_unit = delta_vals[-1].rstrip('s') + 's' 306 delta_num = float(delta_vals[0]) 307 starting_ts = now + timedelta(**{delta_unit: delta_num}) 308 elif 'tomorrow' in starting_str or 'today' in starting_str: 309 today = round_time(now, timedelta(days=1)) 310 tomorrow = today + timedelta(days=1) 311 is_tomorrow = 'tomorrow' in starting_str 312 time_str = starting_str.replace('tomorrow', '').replace('today', '').strip() 313 time_ts = dateutil_parser.parse(time_str) if time_str else today 314 starting_ts = ( 315 (tomorrow if is_tomorrow else today) 316 + timedelta(hours=time_ts.hour) 317 + timedelta(minutes=time_ts.minute) 318 ) 319 else: 320 starting_ts = dateutil_parser.parse(starting_str) 321 schedule_parse_error = None 322 except Exception as e: 323 warn(f"Unable to parse starting time from '{starting_str}'.", stack=False) 324 schedule_parse_error = str(e) 325 if schedule_parse_error: 326 error(schedule_parse_error, ValueError, stack=False) 327 if not starting_ts.tzinfo: 328 starting_ts = starting_ts.replace(tzinfo=timezone.utc) 329 return starting_ts 330 331 332async def _stop_scheduler(): 333 if _scheduler is None: 334 return 335 await _scheduler.stop() 336 await _scheduler.wait_until_stopped()
STARTING_KEYWORD: str =
'starting'
INTERVAL_UNITS: List[str] =
['months', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'years']
FREQUENCY_ALIASES: Dict[str, str] =
{'daily': 'every 1 day', 'hourly': 'every 1 hour', 'minutely': 'every 1 minute', 'weekly': 'every 1 week', 'monthly': 'every 1 month', 'secondly': 'every 1 second', 'yearly': 'every 1 year'}
LOGIC_ALIASES: Dict[str, str] =
{'and': '&', 'or': '|', ' through ': '-', ' thru ': '-', ' - ': '-', 'beginning': 'starting'}
CRON_DAYS_OF_WEEK: List[str] =
['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
CRON_DAYS_OF_WEEK_ALIASES: Dict[str, str] =
{'monday': 'mon', 'tuesday': 'tue', 'tues': 'tue', 'wednesday': 'wed', 'thursday': 'thu', 'thurs': 'thu', 'friday': 'fri', 'saturday': 'sat', 'sunday': 'sun'}
CRON_MONTHS: List[str] =
['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
CRON_MONTHS_ALIASES: Dict[str, str] =
{'january': 'jan', 'february': 'feb', 'march': 'mar', 'april': 'apr', 'may': 'may', 'june': 'jun', 'july': 'jul', 'august': 'aug', 'september': 'sep', 'october': 'oct', 'november': 'nov', 'december': 'dec'}
SCHEDULE_ALIASES: Dict[str, str] =
{'daily': 'every 1 day', 'hourly': 'every 1 hour', 'minutely': 'every 1 minute', 'weekly': 'every 1 week', 'monthly': 'every 1 month', 'secondly': 'every 1 second', 'yearly': 'every 1 year', 'and': '&', 'or': '|', ' through ': '-', ' thru ': '-', ' - ': '-', 'beginning': 'starting', 'monday': 'mon', 'tuesday': 'tue', 'tues': 'tue', 'wednesday': 'wed', 'thursday': 'thu', 'thurs': 'thu', 'friday': 'fri', 'saturday': 'sat', 'sunday': 'sun', 'january': 'jan', 'february': 'feb', 'march': 'mar', 'april': 'apr', 'may': 'may', 'june': 'jun', 'july': 'jul', 'august': 'aug', 'september': 'sep', 'october': 'oct', 'november': 'nov', 'december': 'dec'}
def
schedule_function( function: Callable[[Any], Any], schedule: str, *args, debug: bool = False, **kw) -> Tuple[bool, str]:
75def schedule_function( 76 function: Callable[[Any], Any], 77 schedule: str, 78 *args, 79 debug: bool = False, 80 **kw 81) -> mrsm.SuccessTuple: 82 """ 83 Block the process and execute the function intermittently according to the frequency. 84 https://meerschaum.io/reference/background-jobs/#-schedules 85 86 Parameters 87 ---------- 88 function: Callable[[Any], Any] 89 The function to execute. 90 91 schedule: str 92 The frequency schedule at which `function` should be executed (e.g. `'daily'`). 93 94 Returns 95 ------- 96 A `SuccessTuple` upon exit. 97 """ 98 import asyncio 99 from meerschaum.utils.misc import filter_keywords, round_time 100 101 global _scheduler 102 kw['debug'] = debug 103 kw = filter_keywords(function, **kw) 104 105 _ = mrsm.attempt_import('attrs', lazy=False) 106 apscheduler = mrsm.attempt_import('apscheduler', lazy=False) 107 now = round_time(datetime.now(timezone.utc), timedelta(minutes=1)) 108 trigger = parse_schedule(schedule, now=now) 109 _scheduler = apscheduler.AsyncScheduler(identity='mrsm-scheduler') 110 try: 111 loop = asyncio.get_running_loop() 112 except RuntimeError: 113 loop = asyncio.new_event_loop() 114 115 async def run_scheduler(): 116 async with _scheduler: 117 job = await _scheduler.add_schedule( 118 function, 119 trigger, 120 **filter_keywords( 121 _scheduler.add_schedule, 122 args=args, 123 kwargs=kw, 124 max_running_jobs=1, 125 conflict_policy=apscheduler.ConflictPolicy.replace, 126 ) 127 ) 128 try: 129 await _scheduler.run_until_stopped() 130 except (KeyboardInterrupt, SystemExit) as e: 131 await _stop_scheduler() 132 raise e 133 134 try: 135 loop.run_until_complete(run_scheduler()) 136 except (KeyboardInterrupt, SystemExit): 137 loop.run_until_complete(_stop_scheduler()) 138 139 return True, "Success"
Block the process and execute the function intermittently according to the frequency. https://meerschaum.io/reference/background-jobs/#-schedules
Parameters
- function (Callable[[Any], Any]): The function to execute.
- schedule (str):
The frequency schedule at which
function
should be executed (e.g.'daily'
).
Returns
- A
SuccessTuple
upon exit.
def
parse_schedule(schedule: str, now: Optional[datetime.datetime] = None):
142def parse_schedule(schedule: str, now: Optional[datetime] = None): 143 """ 144 Parse a schedule string (e.g. 'daily') into a Trigger object. 145 """ 146 from meerschaum.utils.misc import items_str, is_int, filter_keywords 147 ( 148 apscheduler_triggers_cron, 149 apscheduler_triggers_interval, 150 apscheduler_triggers_calendarinterval, 151 apscheduler_triggers_combining, 152 ) = ( 153 mrsm.attempt_import( 154 'apscheduler.triggers.cron', 155 'apscheduler.triggers.interval', 156 'apscheduler.triggers.calendarinterval', 157 'apscheduler.triggers.combining', 158 lazy = False, 159 ) 160 ) 161 162 starting_ts = parse_start_time(schedule, now=now) 163 schedule = schedule.split(STARTING_KEYWORD, maxsplit=1)[0].strip() 164 for alias_keyword, true_keyword in SCHEDULE_ALIASES.items(): 165 schedule = schedule.replace(alias_keyword, true_keyword) 166 167 ### TODO Allow for combining `and` + `or` logic. 168 if '&' in schedule and '|' in schedule: 169 raise ValueError("Cannot accept both 'and' + 'or' logic in the schedule frequency.") 170 171 join_str = '|' if '|' in schedule else '&' 172 join_trigger = ( 173 apscheduler_triggers_combining.OrTrigger 174 if join_str == '|' 175 else apscheduler_triggers_combining.AndTrigger 176 ) 177 join_kwargs = { 178 'max_iterations': 1_000_000, 179 'threshold': 0, 180 } if join_str == '&' else {} 181 182 schedule_parts = [part.strip() for part in schedule.split(join_str)] 183 triggers = [] 184 185 has_seconds = 'second' in schedule 186 has_minutes = 'minute' in schedule 187 188 for schedule_part in schedule_parts: 189 190 ### Intervals must begin with 'every' (after alias substitution). 191 if schedule_part.lower().startswith('every '): 192 schedule_num_str, schedule_unit = ( 193 schedule_part[len('every '):].split(' ', maxsplit=1) 194 ) 195 schedule_unit = schedule_unit.rstrip('s') + 's' 196 if schedule_unit not in INTERVAL_UNITS: 197 raise ValueError( 198 f"Invalid interval '{schedule_unit}'.\n" 199 + f" Accepted values are {items_str(INTERVAL_UNITS)}." 200 ) 201 202 schedule_num = ( 203 int(schedule_num_str) 204 if is_int(schedule_num_str) 205 else float(schedule_num_str) 206 ) 207 208 trigger = ( 209 apscheduler_triggers_interval.IntervalTrigger( 210 **filter_keywords( 211 apscheduler_triggers_interval.IntervalTrigger.__init__, 212 **{ 213 schedule_unit: schedule_num, 214 'start_time': starting_ts, 215 'start_date': starting_ts, 216 } 217 ) 218 ) 219 if schedule_unit not in ('months', 'years') else ( 220 apscheduler_triggers_calendarinterval.CalendarIntervalTrigger( 221 **{ 222 schedule_unit: schedule_num, 223 'start_date': starting_ts, 224 'timezone': starting_ts.tzinfo, 225 } 226 ) 227 ) 228 ) 229 230 ### Determine whether this is a pure cron string or a cron subset (e.g. 'may-aug')_. 231 else: 232 first_three_prefix = schedule_part[:3].lower() 233 first_four_prefix = schedule_part[:4].lower() 234 cron_kw = {} 235 if first_three_prefix in CRON_DAYS_OF_WEEK: 236 cron_kw['day_of_week'] = schedule_part 237 elif first_three_prefix in CRON_MONTHS: 238 cron_kw['month'] = schedule_part 239 elif is_int(first_four_prefix) and len(first_four_prefix) == 4: 240 cron_kw['year'] = int(first_four_prefix) 241 trigger = ( 242 apscheduler_triggers_cron.CronTrigger( 243 **{ 244 **cron_kw, 245 'hour': '*', 246 'minute': '*' if has_minutes else starting_ts.minute, 247 'second': '*' if has_seconds else starting_ts.second, 248 'start_time': starting_ts, 249 'timezone': starting_ts.tzinfo, 250 } 251 ) 252 if cron_kw 253 else apscheduler_triggers_cron.CronTrigger.from_crontab( 254 schedule_part, 255 timezone = starting_ts.tzinfo, 256 ) 257 ) 258 ### Explicitly set the `start_time` after building with `from_crontab`. 259 if trigger.start_time != starting_ts: 260 trigger.start_time = starting_ts 261 262 triggers.append(trigger) 263 264 return ( 265 join_trigger(triggers, **join_kwargs) 266 if len(triggers) != 1 267 else triggers[0] 268 )
Parse a schedule string (e.g. 'daily') into a Trigger object.
def
parse_start_time( schedule: str, now: Optional[datetime.datetime] = None) -> datetime.datetime:
271def parse_start_time(schedule: str, now: Optional[datetime] = None) -> datetime: 272 """ 273 Return the datetime to use for the given schedule string. 274 275 Parameters 276 ---------- 277 schedule: str 278 The schedule frequency to be parsed into a starting datetime. 279 280 now: Optional[datetime], default None 281 If provided, use this value as a default if no start time is explicitly stated. 282 283 Returns 284 ------- 285 A `datetime` object, either `now` or the datetime embedded in the schedule string. 286 287 Examples 288 -------- 289 >>> parse_start_time('daily starting 2024-01-01') 290 datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 291 >>> parse_start_time('monthly starting 1st') 292 datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc) 293 >>> parse_start_time('hourly starting 00:30') 294 datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc) 295 """ 296 from meerschaum.utils.misc import round_time 297 dateutil_parser = mrsm.attempt_import('dateutil.parser') 298 starting_parts = schedule.split(STARTING_KEYWORD) 299 starting_str = ('now' if len(starting_parts) == 1 else starting_parts[-1]).strip() 300 now = now or round_time(datetime.now(timezone.utc), timedelta(minutes=1)) 301 try: 302 if starting_str == 'now': 303 starting_ts = now 304 elif starting_str.startswith('in '): 305 delta_vals = starting_str.replace('in ', '').split(' ', maxsplit=1) 306 delta_unit = delta_vals[-1].rstrip('s') + 's' 307 delta_num = float(delta_vals[0]) 308 starting_ts = now + timedelta(**{delta_unit: delta_num}) 309 elif 'tomorrow' in starting_str or 'today' in starting_str: 310 today = round_time(now, timedelta(days=1)) 311 tomorrow = today + timedelta(days=1) 312 is_tomorrow = 'tomorrow' in starting_str 313 time_str = starting_str.replace('tomorrow', '').replace('today', '').strip() 314 time_ts = dateutil_parser.parse(time_str) if time_str else today 315 starting_ts = ( 316 (tomorrow if is_tomorrow else today) 317 + timedelta(hours=time_ts.hour) 318 + timedelta(minutes=time_ts.minute) 319 ) 320 else: 321 starting_ts = dateutil_parser.parse(starting_str) 322 schedule_parse_error = None 323 except Exception as e: 324 warn(f"Unable to parse starting time from '{starting_str}'.", stack=False) 325 schedule_parse_error = str(e) 326 if schedule_parse_error: 327 error(schedule_parse_error, ValueError, stack=False) 328 if not starting_ts.tzinfo: 329 starting_ts = starting_ts.replace(tzinfo=timezone.utc) 330 return starting_ts
Return the datetime to use for the given schedule string.
Parameters
- schedule (str): The schedule frequency to be parsed into a starting datetime.
- now (Optional[datetime], default None): If provided, use this value as a default if no start time is explicitly stated.
Returns
- A
datetime
object, eithernow
or the datetime embedded in the schedule string.
Examples
>>> parse_start_time('daily starting 2024-01-01')
datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
>>> parse_start_time('monthly starting 1st')
datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc)
>>> parse_start_time('hourly starting 00:30')
datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc)