meerschaum.utils.schedule
Schedule processes and threads.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Schedule processes and threads. 7""" 8 9from __future__ import annotations 10import signal 11import traceback 12from datetime import datetime, timezone, timedelta 13import meerschaum as mrsm 14from meerschaum.utils.typing import Callable, Any, Optional, List, Dict 15from meerschaum.utils.warnings import warn, error 16 17STARTING_KEYWORD: str = 'starting' 18INTERVAL_UNITS: List[str] = ['months', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'years'] 19FREQUENCY_ALIASES: Dict[str, str] = { 20 'daily': 'every 1 day', 21 'hourly': 'every 1 hour', 22 'minutely': 'every 1 minute', 23 'weekly': 'every 1 week', 24 'monthly': 'every 1 month', 25 'secondly': 'every 1 second', 26 'yearly': 'every 1 year', 27} 28LOGIC_ALIASES: Dict[str, str] = { 29 'and': '&', 30 'or': '|', 31 ' through ': '-', 32 ' thru ': '-', 33 ' - ': '-', 34 'beginning': STARTING_KEYWORD, 35} 36CRON_DAYS_OF_WEEK: List[str] = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'] 37CRON_DAYS_OF_WEEK_ALIASES: Dict[str, str] = { 38 'monday': 'mon', 39 'tuesday': 'tue', 40 'tues': 'tue', 41 'wednesday': 'wed', 42 'thursday': 'thu', 43 'thurs': 'thu', 44 'friday': 'fri', 45 'saturday': 'sat', 46 'sunday': 'sun', 47} 48CRON_MONTHS: List[str] = [ 49 'jan', 'feb', 'mar', 'apr', 'may', 'jun', 50 'jul', 'aug', 'sep', 'oct', 'nov', 'dec', 51] 52CRON_MONTHS_ALIASES: Dict[str, str] = { 53 'january': 'jan', 54 'february': 'feb', 55 'march': 'mar', 56 'april': 'apr', 57 'may': 'may', 58 'june': 'jun', 59 'july': 'jul', 60 'august': 'aug', 61 'september': 'sep', 62 'october': 'oct', 63 'november': 'nov', 64 'december': 'dec', 65} 66SCHEDULE_ALIASES: Dict[str, str] = { 67 **FREQUENCY_ALIASES, 68 **LOGIC_ALIASES, 69 **CRON_DAYS_OF_WEEK_ALIASES, 70 **CRON_MONTHS_ALIASES, 71} 72 73_scheduler = None 74def schedule_function( 75 function: Callable[[Any], Any], 76 schedule: str, 77 *args, 78 debug: bool = False, 79 **kw 80) -> mrsm.SuccessTuple: 81 """ 82 Block the process and execute the function intermittently according to the frequency. 83 https://meerschaum.io/reference/background-jobs/#-schedules 84 85 Parameters 86 ---------- 87 function: Callable[[Any], Any] 88 The function to execute. 89 90 schedule: str 91 The frequency schedule at which `function` should be executed (e.g. `'daily'`). 92 93 Returns 94 ------- 95 A `SuccessTuple` upon exit. 96 """ 97 import asyncio 98 from meerschaum.utils.misc import filter_keywords, round_time 99 100 global _scheduler 101 kw['debug'] = debug 102 kw = filter_keywords(function, **kw) 103 104 apscheduler = mrsm.attempt_import('apscheduler', lazy=False) 105 now = round_time(datetime.now(timezone.utc), timedelta(minutes=1)) 106 trigger = parse_schedule(schedule, now=now) 107 _scheduler = apscheduler.AsyncScheduler(identity='mrsm-scheduler') 108 try: 109 loop = asyncio.get_running_loop() 110 except RuntimeError: 111 loop = asyncio.new_event_loop() 112 113 114 async def run_scheduler(): 115 async with _scheduler: 116 job = await _scheduler.add_schedule( 117 function, 118 trigger, 119 args=args, 120 kwargs=kw, 121 max_running_jobs=1, 122 conflict_policy=apscheduler.ConflictPolicy.replace, 123 ) 124 try: 125 await _scheduler.run_until_stopped() 126 except (KeyboardInterrupt, SystemExit) as e: 127 await _stop_scheduler() 128 raise e 129 130 try: 131 loop.run_until_complete(run_scheduler()) 132 except (KeyboardInterrupt, SystemExit) as e: 133 loop.run_until_complete(_stop_scheduler()) 134 135 return True, "Success" 136 137 138def parse_schedule(schedule: str, now: Optional[datetime] = None): 139 """ 140 Parse a schedule string (e.g. 'daily') into a Trigger object. 141 """ 142 from meerschaum.utils.misc import items_str, is_int 143 ( 144 apscheduler_triggers_cron, 145 apscheduler_triggers_interval, 146 apscheduler_triggers_calendarinterval, 147 apscheduler_triggers_combining, 148 ) = ( 149 mrsm.attempt_import( 150 'apscheduler.triggers.cron', 151 'apscheduler.triggers.interval', 152 'apscheduler.triggers.calendarinterval', 153 'apscheduler.triggers.combining', 154 lazy = False, 155 ) 156 ) 157 158 starting_ts = parse_start_time(schedule, now=now) 159 schedule = schedule.split(STARTING_KEYWORD)[0].strip() 160 for alias_keyword, true_keyword in SCHEDULE_ALIASES.items(): 161 schedule = schedule.replace(alias_keyword, true_keyword) 162 163 ### TODO Allow for combining `and` + `or` logic. 164 if '&' in schedule and '|' in schedule: 165 raise ValueError(f"Cannot accept both 'and' + 'or' logic in the schedule frequency.") 166 167 join_str = '|' if '|' in schedule else '&' 168 join_trigger = ( 169 apscheduler_triggers_combining.OrTrigger 170 if join_str == '|' 171 else apscheduler_triggers_combining.AndTrigger 172 ) 173 join_kwargs = { 174 'max_iterations': 1_000_000, 175 'threshold': 0, 176 } if join_str == '&' else {} 177 178 schedule_parts = [part.strip() for part in schedule.split(join_str)] 179 triggers = [] 180 181 has_seconds = 'second' in schedule 182 has_minutes = 'minute' in schedule 183 184 for schedule_part in schedule_parts: 185 186 ### Intervals must begin with 'every' (after alias substitution). 187 if schedule_part.lower().startswith('every '): 188 schedule_num_str, schedule_unit = ( 189 schedule_part[len('every '):].split(' ', maxsplit=1) 190 ) 191 schedule_unit = schedule_unit.rstrip('s') + 's' 192 if schedule_unit not in INTERVAL_UNITS: 193 raise ValueError( 194 f"Invalid interval '{schedule_unit}'.\n" 195 + f" Accepted values are {items_str(INTERVAL_UNITS)}." 196 ) 197 198 schedule_num = ( 199 int(schedule_num_str) 200 if is_int(schedule_num_str) 201 else float(schedule_num_str) 202 ) 203 204 trigger = ( 205 apscheduler_triggers_interval.IntervalTrigger( 206 **{ 207 schedule_unit: schedule_num, 208 'start_time': starting_ts, 209 } 210 ) 211 if schedule_unit not in ('months', 'years') else ( 212 apscheduler_triggers_calendarinterval.CalendarIntervalTrigger( 213 **{ 214 schedule_unit: schedule_num, 215 'start_date': starting_ts, 216 'timezone': starting_ts.tzinfo, 217 } 218 ) 219 ) 220 ) 221 222 ### Determine whether this is a pure cron string or a cron subset (e.g. 'may-aug')_. 223 else: 224 first_three_prefix = schedule_part[:3].lower() 225 first_four_prefix = schedule_part[:4].lower() 226 cron_kw = {} 227 if first_three_prefix in CRON_DAYS_OF_WEEK: 228 cron_kw['day_of_week'] = schedule_part 229 elif first_three_prefix in CRON_MONTHS: 230 cron_kw['month'] = schedule_part 231 elif is_int(first_four_prefix) and len(first_four_prefix) == 4: 232 cron_kw['year'] = int(first_four_prefix) 233 trigger = ( 234 apscheduler_triggers_cron.CronTrigger( 235 **{ 236 **cron_kw, 237 'hour': '*', 238 'minute': '*' if has_minutes else starting_ts.minute, 239 'second': '*' if has_seconds else starting_ts.second, 240 'start_time': starting_ts, 241 'timezone': starting_ts.tzinfo, 242 } 243 ) 244 if cron_kw 245 else apscheduler_triggers_cron.CronTrigger.from_crontab( 246 schedule_part, 247 timezone = starting_ts.tzinfo, 248 ) 249 ) 250 ### Explicitly set the `start_time` after building with `from_crontab`. 251 if trigger.start_time != starting_ts: 252 trigger.start_time = starting_ts 253 254 triggers.append(trigger) 255 256 return ( 257 join_trigger(triggers, **join_kwargs) 258 if len(triggers) != 1 259 else triggers[0] 260 ) 261 262 263def parse_start_time(schedule: str, now: Optional[datetime] = None) -> datetime: 264 """ 265 Return the datetime to use for the given schedule string. 266 267 Parameters 268 ---------- 269 schedule: str 270 The schedule frequency to be parsed into a starting datetime. 271 272 now: Optional[datetime], default None 273 If provided, use this value as a default if no start time is explicitly stated. 274 275 Returns 276 ------- 277 A `datetime` object, either `now` or the datetime embedded in the schedule string. 278 279 Examples 280 -------- 281 >>> parse_start_time('daily starting 2024-01-01') 282 datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 283 >>> parse_start_time('monthly starting 1st') 284 datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc) 285 >>> parse_start_time('hourly starting 00:30') 286 datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc) 287 """ 288 from meerschaum.utils.misc import round_time 289 dateutil_parser = mrsm.attempt_import('dateutil.parser') 290 starting_parts = schedule.split(STARTING_KEYWORD) 291 starting_str = ('now' if len(starting_parts) == 1 else starting_parts[-1]).strip() 292 now = now or round_time(datetime.now(timezone.utc), timedelta(minutes=1)) 293 try: 294 if starting_str == 'now': 295 starting_ts = now 296 elif 'tomorrow' in starting_str or 'today' in starting_str: 297 today = round_time(now, timedelta(days=1)) 298 tomorrow = today + timedelta(days=1) 299 is_tomorrow = 'tomorrow' in starting_str 300 time_str = starting_str.replace('tomorrow', '').replace('today', '').strip() 301 time_ts = dateutil_parser.parse(time_str) if time_str else today 302 starting_ts = ( 303 (tomorrow if is_tomorrow else today) 304 + timedelta(hours=time_ts.hour) 305 + timedelta(minutes=time_ts.minute) 306 ) 307 else: 308 starting_ts = dateutil_parser.parse(starting_str) 309 schedule_parse_error = None 310 except Exception as e: 311 warn(f"Unable to parse starting time from '{starting_str}'.", stack=False) 312 schedule_parse_error = str(e) 313 if schedule_parse_error: 314 error(schedule_parse_error, ValueError, stack=False) 315 if not starting_ts.tzinfo: 316 starting_ts = starting_ts.replace(tzinfo=timezone.utc) 317 return starting_ts 318 319 320async def _stop_scheduler(): 321 if _scheduler is None: 322 return 323 await _scheduler.stop() 324 await _scheduler.wait_until_stopped()
STARTING_KEYWORD: str =
'starting'
INTERVAL_UNITS: List[str] =
['months', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'years']
FREQUENCY_ALIASES: Dict[str, str] =
{'daily': 'every 1 day', 'hourly': 'every 1 hour', 'minutely': 'every 1 minute', 'weekly': 'every 1 week', 'monthly': 'every 1 month', 'secondly': 'every 1 second', 'yearly': 'every 1 year'}
LOGIC_ALIASES: Dict[str, str] =
{'and': '&', 'or': '|', ' through ': '-', ' thru ': '-', ' - ': '-', 'beginning': 'starting'}
CRON_DAYS_OF_WEEK: List[str] =
['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
CRON_DAYS_OF_WEEK_ALIASES: Dict[str, str] =
{'monday': 'mon', 'tuesday': 'tue', 'tues': 'tue', 'wednesday': 'wed', 'thursday': 'thu', 'thurs': 'thu', 'friday': 'fri', 'saturday': 'sat', 'sunday': 'sun'}
CRON_MONTHS: List[str] =
['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
CRON_MONTHS_ALIASES: Dict[str, str] =
{'january': 'jan', 'february': 'feb', 'march': 'mar', 'april': 'apr', 'may': 'may', 'june': 'jun', 'july': 'jul', 'august': 'aug', 'september': 'sep', 'october': 'oct', 'november': 'nov', 'december': 'dec'}
SCHEDULE_ALIASES: Dict[str, str] =
{'daily': 'every 1 day', 'hourly': 'every 1 hour', 'minutely': 'every 1 minute', 'weekly': 'every 1 week', 'monthly': 'every 1 month', 'secondly': 'every 1 second', 'yearly': 'every 1 year', 'and': '&', 'or': '|', ' through ': '-', ' thru ': '-', ' - ': '-', 'beginning': 'starting', 'monday': 'mon', 'tuesday': 'tue', 'tues': 'tue', 'wednesday': 'wed', 'thursday': 'thu', 'thurs': 'thu', 'friday': 'fri', 'saturday': 'sat', 'sunday': 'sun', 'january': 'jan', 'february': 'feb', 'march': 'mar', 'april': 'apr', 'may': 'may', 'june': 'jun', 'july': 'jul', 'august': 'aug', 'september': 'sep', 'october': 'oct', 'november': 'nov', 'december': 'dec'}
def
schedule_function( function: Callable[[Any], Any], schedule: str, *args, debug: bool = False, **kw) -> Tuple[bool, str]:
75def schedule_function( 76 function: Callable[[Any], Any], 77 schedule: str, 78 *args, 79 debug: bool = False, 80 **kw 81) -> mrsm.SuccessTuple: 82 """ 83 Block the process and execute the function intermittently according to the frequency. 84 https://meerschaum.io/reference/background-jobs/#-schedules 85 86 Parameters 87 ---------- 88 function: Callable[[Any], Any] 89 The function to execute. 90 91 schedule: str 92 The frequency schedule at which `function` should be executed (e.g. `'daily'`). 93 94 Returns 95 ------- 96 A `SuccessTuple` upon exit. 97 """ 98 import asyncio 99 from meerschaum.utils.misc import filter_keywords, round_time 100 101 global _scheduler 102 kw['debug'] = debug 103 kw = filter_keywords(function, **kw) 104 105 apscheduler = mrsm.attempt_import('apscheduler', lazy=False) 106 now = round_time(datetime.now(timezone.utc), timedelta(minutes=1)) 107 trigger = parse_schedule(schedule, now=now) 108 _scheduler = apscheduler.AsyncScheduler(identity='mrsm-scheduler') 109 try: 110 loop = asyncio.get_running_loop() 111 except RuntimeError: 112 loop = asyncio.new_event_loop() 113 114 115 async def run_scheduler(): 116 async with _scheduler: 117 job = await _scheduler.add_schedule( 118 function, 119 trigger, 120 args=args, 121 kwargs=kw, 122 max_running_jobs=1, 123 conflict_policy=apscheduler.ConflictPolicy.replace, 124 ) 125 try: 126 await _scheduler.run_until_stopped() 127 except (KeyboardInterrupt, SystemExit) as e: 128 await _stop_scheduler() 129 raise e 130 131 try: 132 loop.run_until_complete(run_scheduler()) 133 except (KeyboardInterrupt, SystemExit) as e: 134 loop.run_until_complete(_stop_scheduler()) 135 136 return True, "Success"
Block the process and execute the function intermittently according to the frequency. https://meerschaum.io/reference/background-jobs/#-schedules
Parameters
- function (Callable[[Any], Any]): The function to execute.
- schedule (str):
The frequency schedule at which
function
should be executed (e.g.'daily'
).
Returns
- A
SuccessTuple
upon exit.
def
parse_schedule(schedule: str, now: Optional[datetime.datetime] = None):
139def parse_schedule(schedule: str, now: Optional[datetime] = None): 140 """ 141 Parse a schedule string (e.g. 'daily') into a Trigger object. 142 """ 143 from meerschaum.utils.misc import items_str, is_int 144 ( 145 apscheduler_triggers_cron, 146 apscheduler_triggers_interval, 147 apscheduler_triggers_calendarinterval, 148 apscheduler_triggers_combining, 149 ) = ( 150 mrsm.attempt_import( 151 'apscheduler.triggers.cron', 152 'apscheduler.triggers.interval', 153 'apscheduler.triggers.calendarinterval', 154 'apscheduler.triggers.combining', 155 lazy = False, 156 ) 157 ) 158 159 starting_ts = parse_start_time(schedule, now=now) 160 schedule = schedule.split(STARTING_KEYWORD)[0].strip() 161 for alias_keyword, true_keyword in SCHEDULE_ALIASES.items(): 162 schedule = schedule.replace(alias_keyword, true_keyword) 163 164 ### TODO Allow for combining `and` + `or` logic. 165 if '&' in schedule and '|' in schedule: 166 raise ValueError(f"Cannot accept both 'and' + 'or' logic in the schedule frequency.") 167 168 join_str = '|' if '|' in schedule else '&' 169 join_trigger = ( 170 apscheduler_triggers_combining.OrTrigger 171 if join_str == '|' 172 else apscheduler_triggers_combining.AndTrigger 173 ) 174 join_kwargs = { 175 'max_iterations': 1_000_000, 176 'threshold': 0, 177 } if join_str == '&' else {} 178 179 schedule_parts = [part.strip() for part in schedule.split(join_str)] 180 triggers = [] 181 182 has_seconds = 'second' in schedule 183 has_minutes = 'minute' in schedule 184 185 for schedule_part in schedule_parts: 186 187 ### Intervals must begin with 'every' (after alias substitution). 188 if schedule_part.lower().startswith('every '): 189 schedule_num_str, schedule_unit = ( 190 schedule_part[len('every '):].split(' ', maxsplit=1) 191 ) 192 schedule_unit = schedule_unit.rstrip('s') + 's' 193 if schedule_unit not in INTERVAL_UNITS: 194 raise ValueError( 195 f"Invalid interval '{schedule_unit}'.\n" 196 + f" Accepted values are {items_str(INTERVAL_UNITS)}." 197 ) 198 199 schedule_num = ( 200 int(schedule_num_str) 201 if is_int(schedule_num_str) 202 else float(schedule_num_str) 203 ) 204 205 trigger = ( 206 apscheduler_triggers_interval.IntervalTrigger( 207 **{ 208 schedule_unit: schedule_num, 209 'start_time': starting_ts, 210 } 211 ) 212 if schedule_unit not in ('months', 'years') else ( 213 apscheduler_triggers_calendarinterval.CalendarIntervalTrigger( 214 **{ 215 schedule_unit: schedule_num, 216 'start_date': starting_ts, 217 'timezone': starting_ts.tzinfo, 218 } 219 ) 220 ) 221 ) 222 223 ### Determine whether this is a pure cron string or a cron subset (e.g. 'may-aug')_. 224 else: 225 first_three_prefix = schedule_part[:3].lower() 226 first_four_prefix = schedule_part[:4].lower() 227 cron_kw = {} 228 if first_three_prefix in CRON_DAYS_OF_WEEK: 229 cron_kw['day_of_week'] = schedule_part 230 elif first_three_prefix in CRON_MONTHS: 231 cron_kw['month'] = schedule_part 232 elif is_int(first_four_prefix) and len(first_four_prefix) == 4: 233 cron_kw['year'] = int(first_four_prefix) 234 trigger = ( 235 apscheduler_triggers_cron.CronTrigger( 236 **{ 237 **cron_kw, 238 'hour': '*', 239 'minute': '*' if has_minutes else starting_ts.minute, 240 'second': '*' if has_seconds else starting_ts.second, 241 'start_time': starting_ts, 242 'timezone': starting_ts.tzinfo, 243 } 244 ) 245 if cron_kw 246 else apscheduler_triggers_cron.CronTrigger.from_crontab( 247 schedule_part, 248 timezone = starting_ts.tzinfo, 249 ) 250 ) 251 ### Explicitly set the `start_time` after building with `from_crontab`. 252 if trigger.start_time != starting_ts: 253 trigger.start_time = starting_ts 254 255 triggers.append(trigger) 256 257 return ( 258 join_trigger(triggers, **join_kwargs) 259 if len(triggers) != 1 260 else triggers[0] 261 )
Parse a schedule string (e.g. 'daily') into a Trigger object.
def
parse_start_time( schedule: str, now: Optional[datetime.datetime] = None) -> datetime.datetime:
264def parse_start_time(schedule: str, now: Optional[datetime] = None) -> datetime: 265 """ 266 Return the datetime to use for the given schedule string. 267 268 Parameters 269 ---------- 270 schedule: str 271 The schedule frequency to be parsed into a starting datetime. 272 273 now: Optional[datetime], default None 274 If provided, use this value as a default if no start time is explicitly stated. 275 276 Returns 277 ------- 278 A `datetime` object, either `now` or the datetime embedded in the schedule string. 279 280 Examples 281 -------- 282 >>> parse_start_time('daily starting 2024-01-01') 283 datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 284 >>> parse_start_time('monthly starting 1st') 285 datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc) 286 >>> parse_start_time('hourly starting 00:30') 287 datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc) 288 """ 289 from meerschaum.utils.misc import round_time 290 dateutil_parser = mrsm.attempt_import('dateutil.parser') 291 starting_parts = schedule.split(STARTING_KEYWORD) 292 starting_str = ('now' if len(starting_parts) == 1 else starting_parts[-1]).strip() 293 now = now or round_time(datetime.now(timezone.utc), timedelta(minutes=1)) 294 try: 295 if starting_str == 'now': 296 starting_ts = now 297 elif 'tomorrow' in starting_str or 'today' in starting_str: 298 today = round_time(now, timedelta(days=1)) 299 tomorrow = today + timedelta(days=1) 300 is_tomorrow = 'tomorrow' in starting_str 301 time_str = starting_str.replace('tomorrow', '').replace('today', '').strip() 302 time_ts = dateutil_parser.parse(time_str) if time_str else today 303 starting_ts = ( 304 (tomorrow if is_tomorrow else today) 305 + timedelta(hours=time_ts.hour) 306 + timedelta(minutes=time_ts.minute) 307 ) 308 else: 309 starting_ts = dateutil_parser.parse(starting_str) 310 schedule_parse_error = None 311 except Exception as e: 312 warn(f"Unable to parse starting time from '{starting_str}'.", stack=False) 313 schedule_parse_error = str(e) 314 if schedule_parse_error: 315 error(schedule_parse_error, ValueError, stack=False) 316 if not starting_ts.tzinfo: 317 starting_ts = starting_ts.replace(tzinfo=timezone.utc) 318 return starting_ts
Return the datetime to use for the given schedule string.
Parameters
- schedule (str): The schedule frequency to be parsed into a starting datetime.
- now (Optional[datetime], default None): If provided, use this value as a default if no start time is explicitly stated.
Returns
- A
datetime
object, eithernow
or the datetime embedded in the schedule string.
Examples
>>> parse_start_time('daily starting 2024-01-01')
datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
>>> parse_start_time('monthly starting 1st')
datetime.datetime(2024, 5, 1, 0, 0, tzinfo=datetime.timezone.utc)
>>> parse_start_time('hourly starting 00:30')
datetime.datetime(2024, 5, 13, 0, 30, tzinfo=datetime.timezone.utc)