meerschaum.connectors.api
Interact with the Meerschaum API (send commands, pull data, etc.)
22class APIConnector(InstanceConnector): 23 """ 24 Connect to a Meerschaum API instance. 25 """ 26 27 IS_THREAD_SAFE: bool = False 28 OPTIONAL_ATTRIBUTES: List[str] = ['port', 'client_secret', 'client_id', 'api_key'] 29 30 from ._request import ( 31 make_request, 32 get, 33 post, 34 put, 35 patch, 36 delete, 37 wget, 38 ) 39 from ._actions import ( 40 get_actions, 41 do_action, 42 do_action_async, 43 do_action_legacy, 44 ) 45 from ._misc import get_mrsm_version, get_chaining_status 46 from ._pipes import ( 47 get_pipe_instance_keys, 48 register_pipe, 49 fetch_pipes_keys, 50 edit_pipe, 51 sync_pipe, 52 delete_pipe, 53 get_pipe_data, 54 get_pipe_id, 55 get_pipe_attributes, 56 get_sync_time, 57 pipe_exists, 58 create_metadata, 59 get_pipe_rowcount, 60 drop_pipe, 61 clear_pipe, 62 get_pipe_columns_types, 63 get_pipe_columns_indices, 64 ) 65 from ._fetch import fetch 66 from ._plugins import ( 67 register_plugin, 68 install_plugin, 69 delete_plugin, 70 get_plugins, 71 get_plugin_attributes, 72 ) 73 from ._login import login, test_connection 74 from ._users import ( 75 register_user, 76 get_user_id, 77 get_users, 78 edit_user, 79 delete_user, 80 get_user_password_hash, 81 get_user_type, 82 get_user_attributes, 83 ) 84 from ._tokens import ( 85 register_token, 86 get_token_model, 87 get_tokens, 88 edit_token, 89 invalidate_token, 90 get_token_scopes, 91 token_exists, 92 delete_token, 93 ) 94 from ._uri import from_uri 95 from ._jobs import ( 96 get_jobs, 97 get_job, 98 get_job_metadata, 99 get_job_properties, 100 get_job_exists, 101 delete_job, 102 start_job, 103 create_job, 104 stop_job, 105 pause_job, 106 get_logs, 107 get_job_stop_time, 108 monitor_logs, 109 monitor_logs_async, 110 get_job_is_blocking_on_stdin, 111 get_job_began, 112 get_job_ended, 113 get_job_paused, 114 get_job_status, 115 ) 116 117 def __init__( 118 self, 119 label: Optional[str] = None, 120 wait: bool = False, 121 debug: bool = False, 122 **kw 123 ): 124 if 'uri' in kw: 125 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 126 label = label or from_uri_params.get('label', None) 127 _ = from_uri_params.pop('label', None) 128 kw.update(from_uri_params) 129 130 super().__init__('api', label=label, **kw) 131 if 'protocol' not in self.__dict__: 132 self.protocol = ( 133 'https' if self.__dict__.get('uri', '').startswith('https') 134 else 'http' 135 ) 136 137 if 'uri' not in self.__dict__: 138 self.verify_attributes(required_attributes) 139 else: 140 from meerschaum.connectors.sql import SQLConnector 141 conn_attrs = SQLConnector.parse_uri(self.__dict__['uri']) 142 if 'host' not in conn_attrs: 143 raise Exception(f"Invalid URI for '{self}'.") 144 self.__dict__.update(conn_attrs) 145 146 self.url = ( 147 self.protocol + '://' + 148 self.host 149 + ( 150 (':' + str(self.port)) 151 if self.__dict__.get('port', None) 152 else '' 153 ) 154 ) 155 self._token = None 156 self._expires = None 157 self._session = None 158 self._instance_keys = self.__dict__.get('instance_keys', None) 159 160 161 @property 162 def URI(self) -> str: 163 """ 164 Return the fully qualified URI. 165 """ 166 import urllib.parse 167 username = self.__dict__.get('username', None) 168 password = self.__dict__.get('password', None) 169 client_id = self.__dict__.get('client_id', None) 170 client_secret = self.__dict__.get('client_secret', None) 171 api_key = self.__dict__.get('api_key', None) 172 creds = (username + ':' + password + '@') if username and password else '' 173 params = {} 174 params_str = ('?' + urllib.parse.urlencode(params)) if params else '' 175 return ( 176 self.protocol 177 + '://' 178 + creds 179 + self.host 180 + ( 181 (':' + str(self.port)) 182 if self.__dict__.get('port', None) 183 else '' 184 ) 185 + params_str 186 ) 187 188 @property 189 def session(self): 190 if self._session is None: 191 _ = attempt_import('certifi', lazy=False) 192 requests = attempt_import('requests', lazy=False) 193 if requests: 194 self._session = requests.Session() 195 if self._session is None: 196 error("Failed to import requests. Is requests installed?") 197 return self._session 198 199 @property 200 def token(self): 201 expired = ( 202 True if self._expires is None else ( 203 ( 204 self._expires 205 < 206 datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1) 207 ) 208 ) 209 ) 210 211 if self._token is None or expired: 212 success, msg = self.login() 213 if not success and not self.__dict__.get('_emitted_warning'): 214 warn(msg, stack=False) 215 self._emitted_warning = True 216 return self._token 217 218 @property 219 def instance_keys(self) -> Union[str, None]: 220 """ 221 Return the instance keys to be sent alongside pipe requests. 222 """ 223 return self._instance_keys 224 225 @property 226 def login_scheme(self) -> str: 227 """ 228 Return the login scheme to use based on the configured credentials. 229 """ 230 if 'username' in self.__dict__: 231 return 'password' 232 if 'client_id' in self.__dict__: 233 return 'client_credentials' 234 elif 'api_key' in self.__dict__: 235 return 'api_key' 236 237 return 'password'
Connect to a Meerschaum API instance.
117 def __init__( 118 self, 119 label: Optional[str] = None, 120 wait: bool = False, 121 debug: bool = False, 122 **kw 123 ): 124 if 'uri' in kw: 125 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 126 label = label or from_uri_params.get('label', None) 127 _ = from_uri_params.pop('label', None) 128 kw.update(from_uri_params) 129 130 super().__init__('api', label=label, **kw) 131 if 'protocol' not in self.__dict__: 132 self.protocol = ( 133 'https' if self.__dict__.get('uri', '').startswith('https') 134 else 'http' 135 ) 136 137 if 'uri' not in self.__dict__: 138 self.verify_attributes(required_attributes) 139 else: 140 from meerschaum.connectors.sql import SQLConnector 141 conn_attrs = SQLConnector.parse_uri(self.__dict__['uri']) 142 if 'host' not in conn_attrs: 143 raise Exception(f"Invalid URI for '{self}'.") 144 self.__dict__.update(conn_attrs) 145 146 self.url = ( 147 self.protocol + '://' + 148 self.host 149 + ( 150 (':' + str(self.port)) 151 if self.__dict__.get('port', None) 152 else '' 153 ) 154 ) 155 self._token = None 156 self._expires = None 157 self._session = None 158 self._instance_keys = self.__dict__.get('instance_keys', None)
161 @property 162 def URI(self) -> str: 163 """ 164 Return the fully qualified URI. 165 """ 166 import urllib.parse 167 username = self.__dict__.get('username', None) 168 password = self.__dict__.get('password', None) 169 client_id = self.__dict__.get('client_id', None) 170 client_secret = self.__dict__.get('client_secret', None) 171 api_key = self.__dict__.get('api_key', None) 172 creds = (username + ':' + password + '@') if username and password else '' 173 params = {} 174 params_str = ('?' + urllib.parse.urlencode(params)) if params else '' 175 return ( 176 self.protocol 177 + '://' 178 + creds 179 + self.host 180 + ( 181 (':' + str(self.port)) 182 if self.__dict__.get('port', None) 183 else '' 184 ) 185 + params_str 186 )
Return the fully qualified URI.
188 @property 189 def session(self): 190 if self._session is None: 191 _ = attempt_import('certifi', lazy=False) 192 requests = attempt_import('requests', lazy=False) 193 if requests: 194 self._session = requests.Session() 195 if self._session is None: 196 error("Failed to import requests. Is requests installed?") 197 return self._session
199 @property 200 def token(self): 201 expired = ( 202 True if self._expires is None else ( 203 ( 204 self._expires 205 < 206 datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1) 207 ) 208 ) 209 ) 210 211 if self._token is None or expired: 212 success, msg = self.login() 213 if not success and not self.__dict__.get('_emitted_warning'): 214 warn(msg, stack=False) 215 self._emitted_warning = True 216 return self._token
218 @property 219 def instance_keys(self) -> Union[str, None]: 220 """ 221 Return the instance keys to be sent alongside pipe requests. 222 """ 223 return self._instance_keys
Return the instance keys to be sent alongside pipe requests.
225 @property 226 def login_scheme(self) -> str: 227 """ 228 Return the login scheme to use based on the configured credentials. 229 """ 230 if 'username' in self.__dict__: 231 return 'password' 232 if 'client_id' in self.__dict__: 233 return 'client_credentials' 234 elif 'api_key' in self.__dict__: 235 return 'api_key' 236 237 return 'password'
Return the login scheme to use based on the configured credentials.
28def make_request( 29 self, 30 method: str, 31 r_url: str, 32 headers: Optional[Dict[str, Any]] = None, 33 use_token: bool = True, 34 debug: bool = False, 35 **kwargs: Any 36) -> 'requests.Response': 37 """ 38 Make a request to this APIConnector's endpoint using the in-memory session. 39 40 Parameters 41 ---------- 42 method: str 43 The kind of request to make. 44 Accepted values: 45 - `'GET'` 46 - `'OPTIONS'` 47 - `'HEAD'` 48 - `'POST'` 49 - `'PUT'` 50 - `'PATCH'` 51 - `'DELETE'` 52 53 r_url: str 54 The relative URL for the endpoint (e.g. `'/pipes'`). 55 56 headers: Optional[Dict[str, Any]], default None 57 The headers to use for the request. 58 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 59 60 use_token: bool, default True 61 If `True`, add the authorization token to the headers. 62 63 debug: bool, default False 64 Verbosity toggle. 65 66 kwargs: Any 67 All other keyword arguments are passed to `requests.request`. 68 69 Returns 70 ------- 71 A `requests.Reponse` object. 72 """ 73 if method.upper() not in METHODS: 74 raise ValueError(f"Method '{method}' is not supported.") 75 76 verify = self.__dict__.get('verify', None) 77 if 'verify' not in kwargs and isinstance(verify, bool): 78 kwargs['verify'] = verify 79 80 headers = ( 81 copy.deepcopy(headers) 82 if isinstance(headers, dict) 83 else {} 84 ) 85 86 if use_token: 87 headers.update({'Authorization': f'Bearer {self.token}'}) 88 89 if 'timeout' not in kwargs: 90 kwargs['timeout'] = STATIC_CONFIG['api']['default_timeout'] 91 92 request_url = urllib.parse.urljoin(self.url, r_url) 93 if debug: 94 dprint(f"[{self}] Sending a '{method.upper()}' request to {request_url}") 95 96 return self.session.request( 97 method.upper(), 98 request_url, 99 headers=headers, 100 **kwargs 101 )
Make a request to this APIConnector's endpoint using the in-memory session.
Parameters
- method (str):
The kind of request to make.
Accepted values:
'GET'
'OPTIONS'
'HEAD'
'POST'
'PUT'
'PATCH'
'DELETE'
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
104def get(self, r_url: str, **kwargs: Any) -> 'requests.Response': 105 """ 106 Wrapper for `requests.get`. 107 108 Parameters 109 ---------- 110 r_url: str 111 The relative URL for the endpoint (e.g. `'/pipes'`). 112 113 headers: Optional[Dict[str, Any]], default None 114 The headers to use for the request. 115 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 116 117 use_token: bool, default True 118 If `True`, add the authorization token to the headers. 119 120 debug: bool, default False 121 Verbosity toggle. 122 123 kwargs: Any 124 All other keyword arguments are passed to `requests.request`. 125 126 Returns 127 ------- 128 A `requests.Reponse` object. 129 130 """ 131 return self.make_request('GET', r_url, **kwargs)
Wrapper for requests.get
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
134def post(self, r_url: str, **kwargs: Any) -> 'requests.Response': 135 """ 136 Wrapper for `requests.post`. 137 138 Parameters 139 ---------- 140 r_url: str 141 The relative URL for the endpoint (e.g. `'/pipes'`). 142 143 headers: Optional[Dict[str, Any]], default None 144 The headers to use for the request. 145 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 146 147 use_token: bool, default True 148 If `True`, add the authorization token to the headers. 149 150 debug: bool, default False 151 Verbosity toggle. 152 153 kwargs: Any 154 All other keyword arguments are passed to `requests.request`. 155 156 Returns 157 ------- 158 A `requests.Reponse` object. 159 160 """ 161 return self.make_request('POST', r_url, **kwargs)
Wrapper for requests.post
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
193def put(self, r_url: str, **kwargs: Any) -> 'requests.Response': 194 """ 195 Wrapper for `requests.put`. 196 197 Parameters 198 ---------- 199 r_url: str 200 The relative URL for the endpoint (e.g. `'/pipes'`). 201 202 headers: Optional[Dict[str, Any]], default None 203 The headers to use for the request. 204 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 205 206 use_token: bool, default True 207 If `True`, add the authorization token to the headers. 208 209 debug: bool, default False 210 Verbosity toggle. 211 212 kwargs: Any 213 All other keyword arguments are passed to `requests.request`. 214 215 Returns 216 ------- 217 A `requests.Reponse` object. 218 """ 219 return self.make_request('PUT', r_url, **kwargs)
Wrapper for requests.put
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
164def patch(self, r_url: str, **kwargs: Any) -> 'requests.Response': 165 """ 166 Wrapper for `requests.patch`. 167 168 Parameters 169 ---------- 170 r_url: str 171 The relative URL for the endpoint (e.g. `'/pipes'`). 172 173 headers: Optional[Dict[str, Any]], default None 174 The headers to use for the request. 175 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 176 177 use_token: bool, default True 178 If `True`, add the authorization token to the headers. 179 180 debug: bool, default False 181 Verbosity toggle. 182 183 kwargs: Any 184 All other keyword arguments are passed to `requests.request`. 185 186 Returns 187 ------- 188 A `requests.Reponse` object. 189 """ 190 return self.make_request('PATCH', r_url, **kwargs)
Wrapper for requests.patch
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
222def delete(self, r_url: str, **kwargs: Any) -> 'requests.Response': 223 """ 224 Wrapper for `requests.delete`. 225 226 Parameters 227 ---------- 228 r_url: str 229 The relative URL for the endpoint (e.g. `'/pipes'`). 230 231 headers: Optional[Dict[str, Any]], default None 232 The headers to use for the request. 233 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 234 235 use_token: bool, default True 236 If `True`, add the authorization token to the headers. 237 238 debug: bool, default False 239 Verbosity toggle. 240 241 kwargs: Any 242 All other keyword arguments are passed to `requests.request`. 243 244 Returns 245 ------- 246 A `requests.Reponse` object. 247 """ 248 return self.make_request('DELETE', r_url, **kwargs)
Wrapper for requests.delete
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
251def wget( 252 self, 253 r_url: str, 254 dest: Optional[Union[str, pathlib.Path]] = None, 255 headers: Optional[Dict[str, Any]] = None, 256 use_token: bool = True, 257 debug: bool = False, 258 **kw: Any 259) -> pathlib.Path: 260 """Mimic wget with requests.""" 261 from meerschaum.utils.misc import wget 262 if headers is None: 263 headers = {} 264 if use_token: 265 headers.update({'Authorization': f'Bearer {self.token}'}) 266 request_url = urllib.parse.urljoin(self.url, r_url) 267 if debug: 268 dprint( 269 f"[{self}] Downloading {request_url}" 270 + (f' to {dest}' if dest is not None else '') 271 + "..." 272 ) 273 return wget(request_url, dest=dest, headers=headers, **kw)
Mimic wget with requests.
24def get_actions(self): 25 """Get available actions from the API instance.""" 26 return self.get(ACTIONS_ENDPOINT)
Get available actions from the API instance.
29def do_action(self, sysargs: List[str]) -> SuccessTuple: 30 """ 31 Execute a Meerschaum action remotely. 32 """ 33 return asyncio.run(self.do_action_async(sysargs))
Execute a Meerschaum action remotely.
36async def do_action_async( 37 self, 38 sysargs: List[str], 39 callback_function: Callable[[str], None] = partial(print, end=''), 40) -> SuccessTuple: 41 """ 42 Execute an action as a temporary remote job. 43 """ 44 from meerschaum._internal.arguments import remove_api_executor_keys 45 from meerschaum.utils.misc import generate_password 46 sysargs = remove_api_executor_keys(sysargs) 47 48 job_name = TEMP_PREFIX + generate_password(12) 49 job = mrsm.Job(job_name, sysargs, executor_keys=str(self)) 50 51 start_success, start_msg = job.start() 52 if not start_success: 53 return start_success, start_msg 54 55 await job.monitor_logs_async( 56 callback_function=callback_function, 57 stop_on_exit=True, 58 strip_timestamps=True, 59 ) 60 61 success, msg = job.result 62 job.delete() 63 return success, msg
Execute an action as a temporary remote job.
66def do_action_legacy( 67 self, 68 action: Optional[List[str]] = None, 69 sysargs: Optional[List[str]] = None, 70 debug: bool = False, 71 **kw 72) -> SuccessTuple: 73 """ 74 NOTE: This method is deprecated. 75 Please use `do_action()` or `do_action_async()`. 76 77 Execute a Meerschaum action remotely. 78 79 If `sysargs` are provided, parse those instead. 80 Otherwise infer everything from keyword arguments. 81 82 Examples 83 -------- 84 >>> conn = mrsm.get_connector('api:main') 85 >>> conn.do_action(['show', 'pipes']) 86 (True, "Success") 87 >>> conn.do_action(['show', 'arguments'], name='test') 88 (True, "Success") 89 """ 90 import sys, json 91 from meerschaum.utils.debug import dprint 92 from meerschaum._internal.static import STATIC_CONFIG 93 from meerschaum.utils.misc import json_serialize_datetime 94 if action is None: 95 action = [] 96 97 if sysargs is not None and action and action[0] == '': 98 from meerschaum._internal.arguments import parse_arguments 99 if debug: 100 dprint(f"Parsing sysargs:\n{sysargs}") 101 json_dict = parse_arguments(sysargs) 102 else: 103 json_dict = kw 104 json_dict['action'] = action 105 if 'noask' not in kw: 106 json_dict['noask'] = True 107 if 'yes' not in kw: 108 json_dict['yes'] = True 109 if debug: 110 json_dict['debug'] = debug 111 112 root_action = json_dict['action'][0] 113 del json_dict['action'][0] 114 r_url = f"{STATIC_CONFIG['api']['endpoints']['actions']}/{root_action}" 115 116 if debug: 117 from meerschaum.utils.formatting import pprint 118 dprint(f"Sending data to '{self.url + r_url}':") 119 pprint(json_dict, stream=sys.stderr) 120 121 response = self.post( 122 r_url, 123 data = json.dumps(json_dict, default=json_serialize_datetime), 124 debug = debug, 125 ) 126 try: 127 response_list = json.loads(response.text) 128 if isinstance(response_list, dict) and 'detail' in response_list: 129 return False, response_list['detail'] 130 except Exception as e: 131 print(f"Invalid response: {response}") 132 print(e) 133 return False, response.text 134 if debug: 135 dprint(response) 136 try: 137 return response_list[0], response_list[1] 138 except Exception as e: 139 return False, f"Failed to parse result from action '{root_action}'"
NOTE: This method is deprecated.
Please use do_action()
or do_action_async()
.
Execute a Meerschaum action remotely.
If sysargs
are provided, parse those instead.
Otherwise infer everything from keyword arguments.
Examples
>>> conn = mrsm.get_connector('api:main')
>>> conn.do_action(['show', 'pipes'])
(True, "Success")
>>> conn.do_action(['show', 'arguments'], name='test')
(True, "Success")
13def get_mrsm_version(self, **kw) -> Optional[str]: 14 """ 15 Return the Meerschaum version of the API instance. 16 """ 17 from meerschaum._internal.static import STATIC_CONFIG 18 try: 19 j = self.get( 20 STATIC_CONFIG['api']['endpoints']['version'] + '/mrsm', 21 use_token=False, 22 **kw 23 ).json() 24 except Exception: 25 return None 26 if isinstance(j, dict) and 'detail' in j: 27 return None 28 return j
Return the Meerschaum version of the API instance.
31def get_chaining_status(self, **kw) -> Optional[bool]: 32 """ 33 Fetch the chaining status of the API instance. 34 """ 35 from meerschaum._internal.static import STATIC_CONFIG 36 try: 37 response = self.get( 38 STATIC_CONFIG['api']['endpoints']['chaining'], 39 use_token = True, 40 **kw 41 ) 42 if not response: 43 return None 44 except Exception: 45 return None 46 47 return response.json()
Fetch the chaining status of the API instance.
35def get_pipe_instance_keys(self, pipe: mrsm.Pipe) -> Union[str, None]: 36 """ 37 Return the configured instance keys for a pipe if set, 38 else fall back to the default `instance_keys` for this `APIConnector`. 39 """ 40 return pipe.parameters.get('instance_keys', self.instance_keys)
Return the configured instance keys for a pipe if set,
else fall back to the default instance_keys
for this APIConnector
.
43def register_pipe( 44 self, 45 pipe: mrsm.Pipe, 46 debug: bool = False 47) -> SuccessTuple: 48 """Submit a POST to the API to register a new Pipe object. 49 Returns a tuple of (success_bool, response_dict). 50 """ 51 from meerschaum.utils.debug import dprint 52 r_url = pipe_r_url(pipe) 53 response = self.post( 54 r_url + '/register', 55 json=pipe._attributes.get('parameters', {}), 56 params={'instance_keys': self.get_pipe_instance_keys(pipe)}, 57 debug=debug, 58 ) 59 if debug: 60 dprint(response.text) 61 62 if not response: 63 return False, response.text 64 65 response_data = response.json() 66 if isinstance(response_data, list): 67 response_tuple = response_data[0], response_data[1] 68 elif 'detail' in response.json(): 69 response_tuple = response.__bool__(), response_data['detail'] 70 else: 71 response_tuple = response.__bool__(), response.text 72 return response_tuple
Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).
108def fetch_pipes_keys( 109 self, 110 connector_keys: Optional[List[str]] = None, 111 metric_keys: Optional[List[str]] = None, 112 location_keys: Optional[List[str]] = None, 113 tags: Optional[List[str]] = None, 114 params: Optional[Dict[str, Any]] = None, 115 debug: bool = False 116) -> List[ 117 Union[ 118 Tuple[str, str, Union[str, None]], 119 Tuple[str, str, Union[str, None], List[str]], 120 Tuple[str, str, Union[str, None], Dict[str, Any]] 121 ] 122 ]: 123 """ 124 Fetch registered Pipes' keys from the API. 125 126 Parameters 127 ---------- 128 connector_keys: Optional[List[str]], default None 129 The connector keys for the query. 130 131 metric_keys: Optional[List[str]], default None 132 The metric keys for the query. 133 134 location_keys: Optional[List[str]], default None 135 The location keys for the query. 136 137 tags: Optional[List[str]], default None 138 A list of tags for the query. 139 140 params: Optional[Dict[str, Any]], default None 141 A parameters dictionary for filtering against the `pipes` table 142 (e.g. `{'connector_keys': 'plugin:foo'}`). 143 Not recommeded to be used. 144 145 debug: bool, default False 146 Verbosity toggle. 147 148 Returns 149 ------- 150 A list of tuples containing pipes' keys. 151 """ 152 from meerschaum._internal.static import STATIC_CONFIG 153 if connector_keys is None: 154 connector_keys = [] 155 if metric_keys is None: 156 metric_keys = [] 157 if location_keys is None: 158 location_keys = [] 159 if tags is None: 160 tags = [] 161 162 r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys' 163 try: 164 j = self.get( 165 r_url, 166 params={ 167 'connector_keys': json.dumps(connector_keys), 168 'metric_keys': json.dumps(metric_keys), 169 'location_keys': json.dumps(location_keys), 170 'tags': json.dumps(tags), 171 'params': json.dumps(params), 172 'instance_keys': self.instance_keys, 173 }, 174 debug=debug 175 ).json() 176 except Exception as e: 177 import traceback 178 traceback.print_exc() 179 error(str(e)) 180 181 if 'detail' in j: 182 error(j['detail'], stack=False) 183 return [tuple(r) for r in j]
Fetch registered Pipes' keys from the API.
Parameters
- connector_keys (Optional[List[str]], default None): The connector keys for the query.
- metric_keys (Optional[List[str]], default None): The metric keys for the query.
- location_keys (Optional[List[str]], default None): The location keys for the query.
- tags (Optional[List[str]], default None): A list of tags for the query.
- params (Optional[Dict[str, Any]], default None):
A parameters dictionary for filtering against the
pipes
table (e.g.{'connector_keys': 'plugin:foo'}
). Not recommeded to be used. - debug (bool, default False): Verbosity toggle.
Returns
- A list of tuples containing pipes' keys.
75def edit_pipe( 76 self, 77 pipe: mrsm.Pipe, 78 patch: bool = False, 79 debug: bool = False, 80) -> SuccessTuple: 81 """Submit a PATCH to the API to edit an existing Pipe object. 82 Returns a tuple of (success_bool, response_dict). 83 """ 84 from meerschaum.utils.debug import dprint 85 ### NOTE: if `parameters` is supplied in the Pipe constructor, 86 ### then `pipe.parameters` will exist and not be fetched from the database. 87 r_url = pipe_r_url(pipe) 88 response = self.patch( 89 r_url + '/edit', 90 params={'patch': patch, 'instance_keys': self.get_pipe_instance_keys(pipe)}, 91 json=pipe.get_parameters(apply_symlinks=False), 92 debug=debug, 93 ) 94 if debug: 95 dprint(response.text) 96 97 response_data = response.json() 98 99 if isinstance(response.json(), list): 100 response_tuple = response_data[0], response_data[1] 101 elif 'detail' in response.json(): 102 response_tuple = response.__bool__(), response_data['detail'] 103 else: 104 response_tuple = response.__bool__(), response.text 105 return response_tuple
Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).
186def sync_pipe( 187 self, 188 pipe: mrsm.Pipe, 189 df: Optional[Union['pd.DataFrame', Dict[Any, Any], str]] = None, 190 chunksize: Optional[int] = -1, 191 debug: bool = False, 192 **kw: Any 193) -> SuccessTuple: 194 """Sync a DataFrame into a Pipe.""" 195 from decimal import Decimal 196 from meerschaum.utils.debug import dprint 197 from meerschaum.utils.dtypes import json_serialize_value 198 from meerschaum.utils.misc import items_str, interval_str 199 from meerschaum.config import get_config 200 from meerschaum.utils.packages import attempt_import 201 from meerschaum.utils.dataframe import get_special_cols, to_json 202 begin = time.perf_counter() 203 more_itertools = attempt_import('more_itertools') 204 if df is None: 205 msg = f"DataFrame is `None`. Cannot sync {pipe}." 206 return False, msg 207 208 def get_json_str(c): 209 ### allow syncing dict or JSON without needing to import pandas (for IOT devices) 210 if isinstance(c, str): 211 return c 212 if isinstance(c, (dict, list, tuple)): 213 return json.dumps(c, default=json_serialize_value) 214 return to_json(c, orient='columns') 215 216 df = json.loads(df) if isinstance(df, str) else df 217 218 _chunksize: Optional[int] = (1 if chunksize is None else ( 219 get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1 220 else chunksize 221 )) 222 keys: List[str] = list(df.columns) 223 chunks = [] 224 if hasattr(df, 'index'): 225 df = df.reset_index(drop=True) 226 is_dask = 'dask' in df.__module__ 227 chunks = ( 228 (df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize)) 229 if not is_dask 230 else [partition.compute() for partition in df.partitions] 231 ) 232 233 elif isinstance(df, dict): 234 ### `_chunks` is a dict of lists of dicts. 235 ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] } 236 _chunks = {k: [] for k in keys} 237 for k in keys: 238 chunk_iter = more_itertools.chunked(df[k], _chunksize) 239 for l in chunk_iter: 240 _chunks[k].append({k: l}) 241 242 ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON). 243 for k, l in _chunks.items(): 244 for i, c in enumerate(l): 245 try: 246 chunks[i].update(c) 247 except IndexError: 248 chunks.append(c) 249 elif isinstance(df, list): 250 chunks = (df[i] for i in more_itertools.chunked(df, _chunksize)) 251 252 ### Send columns in case the user has defined them locally. 253 request_params = kw.copy() 254 if pipe.columns: 255 request_params['columns'] = json.dumps(pipe.columns) 256 request_params['instance_keys'] = self.get_pipe_instance_keys(pipe) 257 r_url = pipe_r_url(pipe) + '/data' 258 259 rowcount = 0 260 num_success_chunks = 0 261 for i, c in enumerate(chunks): 262 if debug: 263 dprint(f"[{self}] Posting chunk {i} to {r_url}...") 264 if len(c) == 0: 265 if debug: 266 dprint(f"[{self}] Skipping empty chunk...") 267 continue 268 json_str = get_json_str(c) 269 270 try: 271 response = self.post( 272 r_url, 273 params=request_params, 274 data=json_str, 275 debug=debug, 276 ) 277 except Exception as e: 278 msg = f"Failed to post a chunk to {pipe}:\n{e}" 279 warn(msg) 280 return False, msg 281 282 if not response: 283 return False, f"Failed to sync a chunk:\n{response.text}" 284 285 try: 286 j = json.loads(response.text) 287 except Exception as e: 288 return False, f"Failed to parse response from syncing {pipe}:\n{e}" 289 290 if isinstance(j, dict) and 'detail' in j: 291 return False, j['detail'] 292 293 try: 294 j = tuple(j) 295 except Exception: 296 return False, response.text 297 298 if debug: 299 dprint("Received response: " + str(j)) 300 if not j[0]: 301 return j 302 303 rowcount += len(c) 304 num_success_chunks += 1 305 306 success_tuple = True, ( 307 f"It took {interval_str(timedelta(seconds=(time.perf_counter() - begin)))} " 308 + f"to sync {rowcount:,} row" 309 + ('s' if rowcount != 1 else '') 310 + f" across {num_success_chunks:,} chunk" + ('s' if num_success_chunks != 1 else '') + 311 f" to {pipe}." 312 ) 313 return success_tuple
Sync a DataFrame into a Pipe.
316def delete_pipe( 317 self, 318 pipe: Optional[mrsm.Pipe] = None, 319 debug: bool = False, 320) -> SuccessTuple: 321 """Delete a Pipe and drop its table.""" 322 if pipe is None: 323 error("Pipe cannot be None.") 324 r_url = pipe_r_url(pipe) 325 response = self.delete( 326 r_url + '/delete', 327 params={'instance_keys': self.get_pipe_instance_keys(pipe)}, 328 debug=debug, 329 ) 330 if debug: 331 dprint(response.text) 332 333 response_data = response.json() 334 if isinstance(response.json(), list): 335 response_tuple = response_data[0], response_data[1] 336 elif 'detail' in response.json(): 337 response_tuple = response.__bool__(), response_data['detail'] 338 else: 339 response_tuple = response.__bool__(), response.text 340 return response_tuple
Delete a Pipe and drop its table.
343def get_pipe_data( 344 self, 345 pipe: mrsm.Pipe, 346 select_columns: Optional[List[str]] = None, 347 omit_columns: Optional[List[str]] = None, 348 begin: Union[str, datetime, int, None] = None, 349 end: Union[str, datetime, int, None] = None, 350 params: Optional[Dict[str, Any]] = None, 351 as_chunks: bool = False, 352 debug: bool = False, 353 **kw: Any 354) -> Union[pandas.DataFrame, None]: 355 """Fetch data from the API.""" 356 r_url = pipe_r_url(pipe) 357 while True: 358 try: 359 response = self.get( 360 r_url + "/data", 361 params={ 362 'select_columns': json.dumps(select_columns), 363 'omit_columns': json.dumps(omit_columns), 364 'begin': begin, 365 'end': end, 366 'params': json.dumps(params, default=str), 367 'instance': self.get_pipe_instance_keys(pipe), 368 'as_chunks': as_chunks, 369 }, 370 debug=debug 371 ) 372 if not response.ok: 373 return None 374 j = response.json() 375 except Exception as e: 376 warn(f"Failed to get data for {pipe}:\n{e}") 377 return None 378 if isinstance(j, dict) and 'detail' in j: 379 return False, j['detail'] 380 break 381 382 from meerschaum.utils.dataframe import parse_df_datetimes, add_missing_cols_to_df 383 from meerschaum.utils.dtypes import are_dtypes_equal 384 try: 385 df = parse_df_datetimes( 386 j, 387 ignore_cols=[ 388 col 389 for col, dtype in pipe.dtypes.items() 390 if not are_dtypes_equal(str(dtype), 'datetime') 391 ], 392 strip_timezone=(pipe.tzinfo is None), 393 debug=debug, 394 ) 395 except Exception as e: 396 warn(f"Failed to parse response for {pipe}:\n{e}") 397 return None 398 399 if len(df.columns) == 0: 400 return add_missing_cols_to_df(df, pipe.dtypes) 401 402 return df
Fetch data from the API.
405def get_pipe_id( 406 self, 407 pipe: mrsm.Pipe, 408 debug: bool = False, 409) -> Union[int, str, None]: 410 """Get a Pipe's ID from the API.""" 411 from meerschaum.utils.misc import is_int 412 r_url = pipe_r_url(pipe) 413 response = self.get( 414 r_url + '/id', 415 params={ 416 'instance': self.get_pipe_instance_keys(pipe), 417 }, 418 debug=debug, 419 ) 420 if debug: 421 dprint(f"Got pipe ID: {response.text}") 422 try: 423 if is_int(response.text): 424 return int(response.text) 425 if response.text and response.text[0] != '{': 426 return response.text 427 except Exception as e: 428 warn(f"Failed to get the ID for {pipe}:\n{e}") 429 return None
Get a Pipe's ID from the API.
432def get_pipe_attributes( 433 self, 434 pipe: mrsm.Pipe, 435 debug: bool = False, 436) -> Dict[str, Any]: 437 """Get a Pipe's attributes from the API 438 439 Parameters 440 ---------- 441 pipe: meerschaum.Pipe 442 The pipe whose attributes we are fetching. 443 444 Returns 445 ------- 446 A dictionary of a pipe's attributes. 447 If the pipe does not exist, return an empty dictionary. 448 """ 449 r_url = pipe_r_url(pipe) 450 response = self.get( 451 r_url + '/attributes', 452 params={ 453 'instance': self.get_pipe_instance_keys(pipe), 454 }, 455 debug=debug 456 ) 457 try: 458 return json.loads(response.text) 459 except Exception as e: 460 warn(f"Failed to get the attributes for {pipe}:\n{e}") 461 return {}
Get a Pipe's attributes from the API
Parameters
- pipe (meerschaum.Pipe): The pipe whose attributes we are fetching.
Returns
- A dictionary of a pipe's attributes.
- If the pipe does not exist, return an empty dictionary.
464def get_sync_time( 465 self, 466 pipe: mrsm.Pipe, 467 params: Optional[Dict[str, Any]] = None, 468 newest: bool = True, 469 debug: bool = False, 470) -> Union[datetime, int, None]: 471 """Get a Pipe's most recent datetime value from the API. 472 473 Parameters 474 ---------- 475 pipe: meerschaum.Pipe 476 The pipe to select from. 477 478 params: Optional[Dict[str, Any]], default None 479 Optional params dictionary to build the WHERE clause. 480 481 newest: bool, default True 482 If `True`, get the most recent datetime (honoring `params`). 483 If `False`, get the oldest datetime (ASC instead of DESC). 484 485 Returns 486 ------- 487 The most recent (or oldest if `newest` is `False`) datetime of a pipe, 488 rounded down to the closest minute. 489 """ 490 from meerschaum.utils.misc import is_int 491 from meerschaum.utils.warnings import warn 492 r_url = pipe_r_url(pipe) 493 response = self.get( 494 r_url + '/sync_time', 495 json=params, 496 params={ 497 'instance': self.get_pipe_instance_keys(pipe), 498 'newest': newest, 499 'debug': debug, 500 }, 501 debug=debug, 502 ) 503 if not response: 504 warn(f"Failed to get the sync time for {pipe}:\n" + response.text) 505 return None 506 507 j = response.json() 508 if j is None: 509 dt = None 510 else: 511 try: 512 dt = ( 513 datetime.fromisoformat(j) 514 if not is_int(j) 515 else int(j) 516 ) 517 except Exception as e: 518 warn(f"Failed to parse the sync time '{j}' for {pipe}:\n{e}") 519 dt = None 520 return dt
Get a Pipe's most recent datetime value from the API.
Parameters
- pipe (meerschaum.Pipe): The pipe to select from.
- params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause.
- newest (bool, default True):
If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC instead of DESC).
Returns
- The most recent (or oldest if
newest
isFalse
) datetime of a pipe, - rounded down to the closest minute.
523def pipe_exists( 524 self, 525 pipe: mrsm.Pipe, 526 debug: bool = False 527) -> bool: 528 """Check the API to see if a Pipe exists. 529 530 Parameters 531 ---------- 532 pipe: 'meerschaum.Pipe' 533 The pipe which were are querying. 534 535 Returns 536 ------- 537 A bool indicating whether a pipe's underlying table exists. 538 """ 539 from meerschaum.utils.debug import dprint 540 from meerschaum.utils.warnings import warn 541 r_url = pipe_r_url(pipe) 542 response = self.get( 543 r_url + '/exists', 544 params={ 545 'instance': self.get_pipe_instance_keys(pipe), 546 }, 547 debug=debug, 548 ) 549 if not response: 550 warn(f"Failed to check if {pipe} exists:\n{response.text}") 551 return False 552 if debug: 553 dprint("Received response: " + str(response.text)) 554 j = response.json() 555 if isinstance(j, dict) and 'detail' in j: 556 warn(j['detail']) 557 return j
Check the API to see if a Pipe exists.
Parameters
- pipe ('meerschaum.Pipe'): The pipe which were are querying.
Returns
- A bool indicating whether a pipe's underlying table exists.
560def create_metadata( 561 self, 562 debug: bool = False 563) -> bool: 564 """Create metadata tables. 565 566 Returns 567 ------- 568 A bool indicating success. 569 """ 570 from meerschaum.utils.debug import dprint 571 from meerschaum._internal.static import STATIC_CONFIG 572 r_url = STATIC_CONFIG['api']['endpoints']['metadata'] 573 response = self.post(r_url, debug=debug) 574 if debug: 575 dprint("Create metadata response: {response.text}") 576 try: 577 _ = json.loads(response.text) 578 except Exception as e: 579 warn(f"Failed to create metadata on {self}:\n{e}") 580 return False
Create metadata tables.
Returns
- A bool indicating success.
583def get_pipe_rowcount( 584 self, 585 pipe: mrsm.Pipe, 586 begin: Union[str, datetime, int, None] = None, 587 end: Union[str, datetime, int, None] = None, 588 params: Optional[Dict[str, Any]] = None, 589 remote: bool = False, 590 debug: bool = False, 591) -> int: 592 """Get a pipe's row count from the API. 593 594 Parameters 595 ---------- 596 pipe: 'meerschaum.Pipe': 597 The pipe whose row count we are counting. 598 599 begin: Union[str, datetime, int, None], default None 600 If provided, bound the count by this datetime. 601 602 end: Union[str, datetime, int, None], default None 603 If provided, bound the count by this datetime. 604 605 params: Optional[Dict[str, Any]], default None 606 If provided, bound the count by these parameters. 607 608 remote: bool, default False 609 If `True`, return the rowcount for the fetch definition. 610 611 Returns 612 ------- 613 The number of rows in the pipe's table, bound the given parameters. 614 If the table does not exist, return 0. 615 """ 616 r_url = pipe_r_url(pipe) 617 response = self.get( 618 r_url + "/rowcount", 619 json = params, 620 params = { 621 'begin': begin, 622 'end': end, 623 'remote': remote, 624 'instance': self.get_pipe_instance_keys(pipe), 625 }, 626 debug = debug 627 ) 628 if not response: 629 warn(f"Failed to get the rowcount for {pipe}:\n{response.text}") 630 return 0 631 try: 632 return int(json.loads(response.text)) 633 except Exception as e: 634 warn(f"Failed to get the rowcount for {pipe}:\n{e}") 635 return 0
Get a pipe's row count from the API.
Parameters
- pipe ('meerschaum.Pipe':): The pipe whose row count we are counting.
- begin (Union[str, datetime, int, None], default None): If provided, bound the count by this datetime.
- end (Union[str, datetime, int, None], default None): If provided, bound the count by this datetime.
- params (Optional[Dict[str, Any]], default None): If provided, bound the count by these parameters.
- remote (bool, default False):
If
True
, return the rowcount for the fetch definition.
Returns
- The number of rows in the pipe's table, bound the given parameters.
- If the table does not exist, return 0.
638def drop_pipe( 639 self, 640 pipe: mrsm.Pipe, 641 debug: bool = False 642) -> SuccessTuple: 643 """ 644 Drop a pipe's table but maintain its registration. 645 646 Parameters 647 ---------- 648 pipe: meerschaum.Pipe: 649 The pipe to be dropped. 650 651 Returns 652 ------- 653 A success tuple (bool, str). 654 """ 655 from meerschaum.utils.warnings import error 656 from meerschaum.utils.debug import dprint 657 if pipe is None: 658 error("Pipe cannot be None.") 659 r_url = pipe_r_url(pipe) 660 response = self.delete( 661 r_url + '/drop', 662 params={ 663 'instance': self.get_pipe_instance_keys(pipe), 664 }, 665 debug=debug, 666 ) 667 if debug: 668 dprint(response.text) 669 670 try: 671 data = response.json() 672 except Exception as e: 673 return False, f"Failed to drop {pipe}." 674 675 if isinstance(data, list): 676 response_tuple = data[0], data[1] 677 elif 'detail' in response.json(): 678 response_tuple = response.__bool__(), data['detail'] 679 else: 680 response_tuple = response.__bool__(), response.text 681 682 return response_tuple
Drop a pipe's table but maintain its registration.
Parameters
- pipe (meerschaum.Pipe:): The pipe to be dropped.
Returns
- A success tuple (bool, str).
685def clear_pipe( 686 self, 687 pipe: mrsm.Pipe, 688 begin: Union[str, datetime, int, None] = None, 689 end: Union[str, datetime, int, None] = None, 690 params: Optional[Dict[str, Any]] = None, 691 debug: bool = False, 692 **kw 693) -> SuccessTuple: 694 """ 695 Delete rows in a pipe's table. 696 697 Parameters 698 ---------- 699 pipe: meerschaum.Pipe 700 The pipe with rows to be deleted. 701 702 Returns 703 ------- 704 A success tuple. 705 """ 706 r_url = pipe_r_url(pipe) 707 response = self.delete( 708 r_url + '/clear', 709 params={ 710 'begin': begin, 711 'end': end, 712 'params': json.dumps(params), 713 'instance': self.get_pipe_instance_keys(pipe), 714 }, 715 debug=debug, 716 ) 717 if debug: 718 dprint(response.text) 719 720 try: 721 data = response.json() 722 except Exception as e: 723 return False, f"Failed to clear {pipe} with constraints {begin=}, {end=}, {params=}." 724 725 if isinstance(data, list): 726 response_tuple = data[0], data[1] 727 elif 'detail' in response.json(): 728 response_tuple = response.__bool__(), data['detail'] 729 else: 730 response_tuple = response.__bool__(), response.text 731 732 return response_tuple
Delete rows in a pipe's table.
Parameters
- pipe (meerschaum.Pipe): The pipe with rows to be deleted.
Returns
- A success tuple.
735def get_pipe_columns_types( 736 self, 737 pipe: mrsm.Pipe, 738 debug: bool = False, 739) -> Union[Dict[str, str], None]: 740 """ 741 Fetch the columns and types of the pipe's table. 742 743 Parameters 744 ---------- 745 pipe: meerschaum.Pipe 746 The pipe whose columns to be queried. 747 748 Returns 749 ------- 750 A dictionary mapping column names to their database types. 751 752 Examples 753 -------- 754 >>> { 755 ... 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 756 ... 'id': 'BIGINT', 757 ... 'val': 'DOUBLE PRECISION', 758 ... } 759 >>> 760 """ 761 r_url = pipe_r_url(pipe) + '/columns/types' 762 response = self.get( 763 r_url, 764 params={ 765 'instance': self.get_pipe_instance_keys(pipe), 766 }, 767 debug=debug, 768 ) 769 j = response.json() 770 if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1: 771 warn(j['detail']) 772 return None 773 if not isinstance(j, dict): 774 warn(response.text) 775 return None 776 return j
Fetch the columns and types of the pipe's table.
Parameters
- pipe (meerschaum.Pipe): The pipe whose columns to be queried.
Returns
- A dictionary mapping column names to their database types.
Examples
>>> {
... 'dt': 'TIMESTAMP WITHOUT TIMEZONE',
... 'id': 'BIGINT',
... 'val': 'DOUBLE PRECISION',
... }
>>>
779def get_pipe_columns_indices( 780 self, 781 pipe: mrsm.Pipe, 782 debug: bool = False, 783) -> Union[Dict[str, str], None]: 784 """ 785 Fetch the index information for a pipe. 786 787 Parameters 788 ---------- 789 pipe: mrsm.Pipe 790 The pipe whose columns to be queried. 791 792 Returns 793 ------- 794 A dictionary mapping column names to a list of associated index information. 795 """ 796 r_url = pipe_r_url(pipe) + '/columns/indices' 797 response = self.get( 798 r_url, 799 params={ 800 'instance': self.get_pipe_instance_keys(pipe), 801 }, 802 debug=debug, 803 ) 804 j = response.json() 805 if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1: 806 warn(j['detail']) 807 return None 808 if not isinstance(j, dict): 809 warn(response.text) 810 return None 811 return j
Fetch the index information for a pipe.
Parameters
- pipe (mrsm.Pipe): The pipe whose columns to be queried.
Returns
- A dictionary mapping column names to a list of associated index information.
16def fetch( 17 self, 18 pipe: mrsm.Pipe, 19 begin: Union[datetime, str, int] = '', 20 end: Union[datetime, int] = None, 21 params: Optional[Dict, Any] = None, 22 debug: bool = False, 23 **kw: Any 24 ) -> Iterator['pd.DataFrame']: 25 """Get the Pipe data from the remote Pipe.""" 26 from meerschaum.utils.debug import dprint 27 from meerschaum.utils.warnings import warn, error 28 from meerschaum.config._patch import apply_patch_to_config 29 30 fetch_params = pipe.parameters.get('fetch', {}) 31 if not fetch_params: 32 warn(f"Missing 'fetch' parameters for {pipe}.", stack=False) 33 return None 34 35 pipe_meta = fetch_params.get('pipe', {}) 36 ### Legacy: check for `connector_keys`, etc. at the root. 37 if not pipe_meta: 38 ck, mk, lk = ( 39 fetch_params.get('connector_keys', None), 40 fetch_params.get('metric_key', None), 41 fetch_params.get('location_key', None), 42 ) 43 if not ck or not mk: 44 warn(f"Missing `fetch:pipe` keys for {pipe}.", stack=False) 45 return None 46 47 pipe_meta.update({ 48 'connector': ck, 49 'metric': mk, 50 'location': lk, 51 }) 52 53 pipe_meta['instance'] = self 54 source_pipe = mrsm.Pipe(**pipe_meta) 55 56 _params = copy.deepcopy(params) if params is not None else {} 57 _params = apply_patch_to_config(_params, fetch_params.get('params', {})) 58 select_columns = fetch_params.get('select_columns', []) 59 omit_columns = fetch_params.get('omit_columns', []) 60 61 return source_pipe.get_data( 62 select_columns = select_columns, 63 omit_columns = omit_columns, 64 begin = begin, 65 end = end, 66 params = _params, 67 debug = debug, 68 as_iterator = True, 69 )
Get the Pipe data from the remote Pipe.
24def register_plugin( 25 self, 26 plugin: mrsm.core.Plugin, 27 make_archive: bool = True, 28 debug: bool = False, 29) -> SuccessTuple: 30 """Register a plugin and upload its archive.""" 31 import json 32 archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path 33 file_pointer = open(archive_path, 'rb') 34 files = {'archive': file_pointer} 35 metadata = { 36 'version': plugin.version, 37 'attributes': json.dumps(plugin.attributes), 38 } 39 r_url = plugin_r_url(plugin) 40 try: 41 response = self.post(r_url, files=files, params=metadata, debug=debug) 42 except Exception: 43 return False, f"Failed to register plugin '{plugin}'." 44 finally: 45 file_pointer.close() 46 47 try: 48 success, msg = json.loads(response.text) 49 except Exception: 50 return False, response.text 51 52 return success, msg
Register a plugin and upload its archive.
55def install_plugin( 56 self, 57 name: str, 58 skip_deps: bool = False, 59 force: bool = False, 60 debug: bool = False 61) -> SuccessTuple: 62 """Download and attempt to install a plugin from the API.""" 63 import os 64 import pathlib 65 import json 66 from meerschaum.core import Plugin 67 from meerschaum.config._paths import PLUGINS_TEMP_RESOURCES_PATH 68 from meerschaum.utils.debug import dprint 69 from meerschaum.utils.packages import attempt_import 70 binaryornot_check = attempt_import('binaryornot.check', lazy=False) 71 r_url = plugin_r_url(name) 72 dest = pathlib.Path(os.path.join(PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz')) 73 if debug: 74 dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...") 75 archive_path = self.wget(r_url, dest, debug=debug) 76 is_binary = binaryornot_check.is_binary(str(archive_path)) 77 if not is_binary: 78 fail_msg = f"Failed to download binary for plugin '{name}'." 79 try: 80 with open(archive_path, 'r') as f: 81 j = json.load(f) 82 if isinstance(j, list): 83 success, msg = tuple(j) 84 elif isinstance(j, dict) and 'detail' in j: 85 success, msg = False, fail_msg 86 except Exception: 87 success, msg = False, fail_msg 88 return success, msg 89 plugin = Plugin(name, archive_path=archive_path, repo_connector=self) 90 return plugin.install(skip_deps=skip_deps, force=force, debug=debug)
Download and attempt to install a plugin from the API.
156def delete_plugin( 157 self, 158 plugin: mrsm.core.Plugin, 159 debug: bool = False 160) -> SuccessTuple: 161 """Delete a plugin from an API repository.""" 162 import json 163 r_url = plugin_r_url(plugin) 164 try: 165 response = self.delete(r_url, debug=debug) 166 except Exception: 167 return False, f"Failed to delete plugin '{plugin}'." 168 169 try: 170 success, msg = json.loads(response.text) 171 except Exception: 172 return False, response.text 173 174 return success, msg
Delete a plugin from an API repository.
93def get_plugins( 94 self, 95 user_id: Optional[int] = None, 96 search_term: Optional[str] = None, 97 debug: bool = False 98) -> List[str]: 99 """Return a list of registered plugin names. 100 101 Parameters 102 ---------- 103 user_id: Optional[int], default None 104 If specified, return all plugins from a certain user. 105 106 search_term: Optional[str], default None 107 If specified, return plugins beginning with this string. 108 109 Returns 110 ------- 111 A list of plugin names. 112 """ 113 import json 114 from meerschaum.utils.warnings import error 115 from meerschaum._internal.static import STATIC_CONFIG 116 response = self.get( 117 STATIC_CONFIG['api']['endpoints']['plugins'], 118 params = {'user_id': user_id, 'search_term': search_term}, 119 use_token = True, 120 debug = debug 121 ) 122 if not response: 123 return [] 124 plugins = json.loads(response.text) 125 if not isinstance(plugins, list): 126 error(response.text) 127 return plugins
Return a list of registered plugin names.
Parameters
- user_id (Optional[int], default None): If specified, return all plugins from a certain user.
- search_term (Optional[str], default None): If specified, return plugins beginning with this string.
Returns
- A list of plugin names.
130def get_plugin_attributes( 131 self, 132 plugin: mrsm.core.Plugin, 133 debug: bool = False 134) -> Dict[str, Any]: 135 """ 136 Return a plugin's attributes. 137 """ 138 import json 139 from meerschaum.utils.warnings import warn, error 140 r_url = plugin_r_url(plugin) + '/attributes' 141 response = self.get(r_url, use_token=True, debug=debug) 142 attributes = response.json() 143 if isinstance(attributes, str) and attributes and attributes[0] == '{': 144 try: 145 attributes = json.loads(attributes) 146 except Exception: 147 pass 148 if not isinstance(attributes, dict): 149 error(response.text) 150 elif not response and 'detail' in attributes: 151 warn(attributes['detail']) 152 return {} 153 return attributes
Return a plugin's attributes.
19def login( 20 self, 21 debug: bool = False, 22 warn: bool = True, 23 **kw: Any 24) -> SuccessTuple: 25 """Log in and set the session token.""" 26 if self.login_scheme == 'api_key': 27 validate_response = self.post( 28 STATIC_CONFIG['api']['endpoints']['tokens'] + '/validate', 29 headers={'Authorization': f'Bearer {self.api_key}'}, 30 use_token=False, 31 debug=debug, 32 ) 33 if not validate_response: 34 return False, "API key is not valid." 35 return True, "API key is valid." 36 37 try: 38 if self.login_scheme == 'password': 39 login_data = { 40 'username': self.username, 41 'password': self.password, 42 } 43 elif self.login_scheme == 'client_credentials': 44 login_data = { 45 'client_id': self.client_id, 46 'client_secret': self.client_secret, 47 } 48 except AttributeError: 49 login_data = {} 50 51 if not login_data: 52 return False, f"Please login with the command `login {self}`." 53 54 login_scheme_msg = ( 55 f" as user '{login_data['username']}'" 56 if self.login_scheme == 'username' 57 else '' 58 ) 59 60 response = self.post( 61 STATIC_CONFIG['api']['endpoints']['login'], 62 data=login_data, 63 use_token=False, 64 debug=debug, 65 ) 66 if response: 67 msg = f"Successfully logged into '{self}'{login_scheme_msg}'." 68 self._token = json.loads(response.text)['access_token'] 69 self._expires = datetime.datetime.strptime( 70 json.loads(response.text)['expires'], 71 '%Y-%m-%dT%H:%M:%S.%f' 72 ) 73 else: 74 msg = ( 75 f"Failed to log into '{self}'{login_scheme_msg}.\n" + 76 f" Please verify login details for connector '{self}'." 77 ) 78 if warn and not self.__dict__.get('_emitted_warning', False): 79 _warn(msg, stack=False) 80 self._emitted_warning = True 81 82 return response.__bool__(), msg
Log in and set the session token.
85def test_connection( 86 self, 87 **kw: Any 88) -> Union[bool, None]: 89 """Test if a successful connection to the API may be made.""" 90 from meerschaum.connectors.poll import retry_connect 91 _default_kw = { 92 'max_retries': 1, 'retry_wait': 0, 'warn': False, 93 'connector': self, 'enforce_chaining': False, 94 'enforce_login': False, 95 } 96 _default_kw.update(kw) 97 try: 98 return retry_connect(**_default_kw) 99 except Exception: 100 return False
Test if a successful connection to the API may be made.
70def register_user( 71 self, 72 user: mrsm.core.User, 73 debug: bool = False, 74 **kw: Any 75) -> SuccessTuple: 76 """Register a new user.""" 77 from meerschaum._internal.static import STATIC_CONFIG 78 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/register" 79 data = { 80 'username': user.username, 81 'password': user.password, 82 'attributes': json.dumps(user.attributes), 83 } 84 if user.type: 85 data['type'] = user.type 86 if user.email: 87 data['email'] = user.email 88 response = self.post(r_url, data=data, debug=debug) 89 try: 90 _json = json.loads(response.text) 91 if isinstance(_json, dict) and 'detail' in _json: 92 return False, _json['detail'] 93 success_tuple = tuple(_json) 94 except Exception: 95 msg = response.text if response else f"Failed to register user '{user}'." 96 return False, msg 97 98 return tuple(success_tuple)
Register a new user.
101def get_user_id( 102 self, 103 user: mrsm.core.User, 104 debug: bool = False, 105 **kw: Any 106) -> Union[int, str, UUID, None]: 107 """Get a user's ID.""" 108 from meerschaum._internal.static import STATIC_CONFIG 109 from meerschaum.utils.misc import is_int, is_uuid 110 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/id" 111 response = self.get(r_url, debug=debug, **kw) 112 try: 113 id_text = str(json.loads(response.text)) 114 if is_int(id_text): 115 user_id = int(id_text) 116 elif is_uuid(id_text): 117 user_id = UUID(id_text) 118 else: 119 user_id = id_text 120 except Exception as e: 121 user_id = None 122 return user_id
Get a user's ID.
19def get_users( 20 self, 21 debug: bool = False, 22 **kw: Any 23) -> List[str]: 24 """ 25 Return a list of registered usernames. 26 """ 27 from meerschaum._internal.static import STATIC_CONFIG 28 response = self.get( 29 f"{STATIC_CONFIG['api']['endpoints']['users']}", 30 debug = debug, 31 use_token = True, 32 ) 33 if not response: 34 return [] 35 try: 36 return response.json() 37 except Exception as e: 38 return []
Return a list of registered usernames.
41def edit_user( 42 self, 43 user: mrsm.core.User, 44 debug: bool = False, 45 **kw: Any 46) -> SuccessTuple: 47 """Edit an existing user.""" 48 from meerschaum._internal.static import STATIC_CONFIG 49 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit" 50 data = { 51 'username': user.username, 52 'password': user.password, 53 'type': user.type, 54 'email': user.email, 55 'attributes': json.dumps(user.attributes), 56 } 57 response = self.post(r_url, data=data, debug=debug) 58 try: 59 _json = json.loads(response.text) 60 if isinstance(_json, dict) and 'detail' in _json: 61 return False, _json['detail'] 62 success_tuple = tuple(_json) 63 except Exception: 64 msg = response.text if response else f"Failed to edit user '{user}'." 65 return False, msg 66 67 return tuple(success_tuple)
Edit an existing user.
125def delete_user( 126 self, 127 user: mrsm.core.User, 128 debug: bool = False, 129 **kw: Any 130) -> SuccessTuple: 131 """Delete a user.""" 132 from meerschaum._internal.static import STATIC_CONFIG 133 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}" 134 response = self.delete(r_url, debug=debug) 135 try: 136 _json = json.loads(response.text) 137 if isinstance(_json, dict) and 'detail' in _json: 138 return False, _json['detail'] 139 success_tuple = tuple(_json) 140 except Exception: 141 success_tuple = False, f"Failed to delete user '{user.username}'." 142 return success_tuple
Delete a user.
166def get_user_password_hash( 167 self, 168 user: mrsm.core.User, 169 debug: bool = False, 170 **kw: Any 171) -> Optional[str]: 172 """If configured, get a user's password hash.""" 173 from meerschaum._internal.static import STATIC_CONFIG 174 r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/password_hash' 175 response = self.get(r_url, debug=debug, **kw) 176 if not response: 177 return None 178 return response.json()
If configured, get a user's password hash.
181def get_user_type( 182 self, 183 user: mrsm.core.User, 184 debug: bool = False, 185 **kw: Any 186) -> Optional[str]: 187 """If configured, get a user's type.""" 188 from meerschaum._internal.static import STATIC_CONFIG 189 r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/type' 190 response = self.get(r_url, debug=debug, **kw) 191 if not response: 192 return None 193 return response.json()
If configured, get a user's type.
145def get_user_attributes( 146 self, 147 user: mrsm.core.User, 148 debug: bool = False, 149 **kw 150) -> int: 151 """Get a user's attributes.""" 152 from meerschaum._internal.static import STATIC_CONFIG 153 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/attributes" 154 response = self.get(r_url, debug=debug, **kw) 155 try: 156 attributes = json.loads(response.text) 157 except Exception: 158 attributes = None 159 return attributes
Get a user's attributes.
20def register_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple: 21 """ 22 Register the provided token to the API. 23 """ 24 from meerschaum.utils.dtypes import json_serialize_value 25 r_url = tokens_endpoint + '/register' 26 response = self.post( 27 r_url, 28 data=json.dumps({ 29 'label': token.label, 30 'scopes': token.scopes, 31 'expiration': token.expiration, 32 }, default=json_serialize_value), 33 debug=debug, 34 ) 35 if not response: 36 return False, f"Failed to register token:\n{response.text}" 37 38 data = response.json() 39 token.label = data['label'] 40 token.secret = data['secret'] 41 token.id = uuid.UUID(data['id']) 42 if data.get('expiration', None): 43 token.expiration = datetime.fromisoformat(data['expiration']) 44 45 return True, f"Registered token '{token.label}'."
Register the provided token to the API.
48def get_token_model(self, token_id: uuid.UUID, debug: bool = False) -> 'Union[TokenModel, None]': 49 """ 50 Return a token's model from the API instance. 51 """ 52 from meerschaum.models import TokenModel 53 r_url = tokens_endpoint + f'/{token_id}' 54 response = self.get(r_url, debug=debug) 55 if not response: 56 return None 57 data = response.json() 58 return TokenModel(**data)
Return a token's model from the API instance.
61def get_tokens(self, labels: Optional[List[str]] = None, debug: bool = False) -> List[Token]: 62 """ 63 Return the tokens registered to the current user. 64 """ 65 from meerschaum.utils.warnings import warn 66 r_url = tokens_endpoint 67 params = {} 68 if labels: 69 params['labels'] = ','.join(labels) 70 response = self.get(r_url, params={'labels': labels}, debug=debug) 71 if not response: 72 warn(f"Could not get tokens from '{self}':\n{response.text}") 73 return [] 74 75 tokens = [ 76 Token(instance=self, **payload) 77 for payload in response.json() 78 ] 79 return tokens
Return the tokens registered to the current user.
82def edit_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple: 83 """ 84 Persist the token's in-memory state to the API. 85 """ 86 r_url = tokens_endpoint + f"/{token.id}/edit" 87 response = self.post( 88 r_url, 89 json={ 90 'creation': token.creation.isoformat() if token.creation else None, 91 'expiration': token.expiration.isoformat() if token.expiration else None, 92 'label': token.label, 93 'is_valid': token.is_valid, 94 'scopes': token.scopes, 95 }, 96 ) 97 if not response: 98 return False, f"Failed to edit token:\n{response.text}" 99 100 success, msg = response.json() 101 return success, msg
Persist the token's in-memory state to the API.
104def invalidate_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple: 105 """ 106 Invalidate the token, disabling it for future requests. 107 """ 108 r_url = tokens_endpoint + f"/{token.id}/invalidate" 109 response = self.post(r_url) 110 if not response: 111 return False, f"Failed to invalidate token:\n{response.text}" 112 113 success, msg = response.json() 114 return success, msg
Invalidate the token, disabling it for future requests.
117def get_token_scopes(self, token_id: Union[uuid.UUID, Token], debug: bool = False) -> List[str]: 118 """ 119 Return the scopes for a token. 120 """ 121 _token_id = (token_id.id if isinstance(token_id, Token) else token_id) 122 model = self.get_token_model(_token_id, debug=debug).scopes 123 return getattr(model, 'scopes', [])
Return the scopes for a token.
126def token_exists(self, token_id: Union[uuid.UUID, Token], debug: bool = False) -> bool: 127 """ 128 Return `True` if a token exists. 129 """ 130 _token_id = (token_id.id if isinstance(token_id, Token) else token_id) 131 model = self.get_token_model(_token_id, debug=debug) 132 if model is None: 133 return False 134 return model.creation is not None
Return True
if a token exists.
137def delete_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple: 138 """ 139 Delete the token from the API. 140 """ 141 r_url = tokens_endpoint + f"/{token.id}" 142 response = self.delete(r_url, debug=debug) 143 if not response: 144 return False, f"Failed to delete token:\n{response.text}" 145 146 success, msg = response.json() 147 return success, msg
Delete the token from the API.
13@classmethod 14def from_uri( 15 cls, 16 uri: str, 17 label: Optional[str] = None, 18 as_dict: bool = False, 19) -> Union[ 20 'meerschaum.connectors.APIConnector', 21 Dict[str, Union[str, int]], 22 ]: 23 """ 24 Create a new APIConnector from a URI string. 25 26 Parameters 27 ---------- 28 uri: str 29 The URI connection string. 30 31 label: Optional[str], default None 32 If provided, use this as the connector label. 33 Otherwise use the determined database name. 34 35 as_dict: bool, default False 36 If `True`, return a dictionary of the keyword arguments 37 necessary to create a new `APIConnector`, otherwise create a new object. 38 39 Returns 40 ------- 41 A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`). 42 """ 43 from meerschaum.connectors.sql import SQLConnector 44 params = SQLConnector.parse_uri(uri) 45 if 'host' not in params: 46 error("No host was found in the provided URI.") 47 params['protocol'] = params.pop('flavor') 48 params['label'] = label or ( 49 ( 50 (params['username'] + '@' if 'username' in params else '') 51 + params['host'] 52 ).lower() 53 ) 54 55 return cls(**params) if not as_dict else params
Create a new APIConnector from a URI string.
Parameters
- uri (str): The URI connection string.
- label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
- as_dict (bool, default False):
If
True
, return a dictionary of the keyword arguments necessary to create a newAPIConnector
, otherwise create a new object.
Returns
- A new APIConnector object or a dictionary of attributes (if
as_dict
isTrue
).
28def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 29 """ 30 Return a dictionary of remote jobs. 31 """ 32 response = self.get(JOBS_ENDPOINT, debug=debug) 33 if not response: 34 warn(f"Failed to get remote jobs from {self}.") 35 return {} 36 return { 37 name: Job( 38 name, 39 job_meta['sysargs'], 40 executor_keys=str(self), 41 _properties=job_meta['daemon']['properties'] 42 ) 43 for name, job_meta in response.json().items() 44 }
Return a dictionary of remote jobs.
47def get_job(self, name: str, debug: bool = False) -> Job: 48 """ 49 Return a single Job object. 50 """ 51 metadata = self.get_job_metadata(name, debug=debug) 52 if not metadata: 53 raise ValueError(f"Job '{name}' does not exist.") 54 55 return Job( 56 name, 57 metadata['sysargs'], 58 executor_keys=str(self), 59 _properties=metadata['daemon']['properties'], 60 )
Return a single Job object.
63def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 64 """ 65 Return the metadata for a single job. 66 """ 67 now = time.perf_counter() 68 _job_metadata_cache = self.__dict__.get('_job_metadata_cache', None) 69 _job_metadata_timestamp = ( 70 _job_metadata_cache.get(name, {}).get('timestamp', None) 71 ) if _job_metadata_cache is not None else None 72 73 if ( 74 _job_metadata_timestamp is not None 75 and (now - _job_metadata_timestamp) < JOB_METADATA_CACHE_SECONDS 76 ): 77 if debug: 78 dprint(f"Returning cached metadata for job '{name}'.") 79 return _job_metadata_cache[name]['metadata'] 80 81 response = self.get(JOBS_ENDPOINT + f"/{name}", debug=debug) 82 if not response: 83 if debug: 84 msg = ( 85 response.json()['detail'] 86 if 'detail' in response.text 87 else response.text 88 ) 89 warn(f"Failed to get metadata for job '{name}':\n{msg}") 90 return {} 91 92 metadata = response.json() 93 if _job_metadata_cache is None: 94 self._job_metadata_cache = {} 95 96 self._job_metadata_cache[name] = { 97 'timestamp': now, 98 'metadata': metadata, 99 } 100 return metadata
Return the metadata for a single job.
102def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 103 """ 104 Return the daemon properties for a single job. 105 """ 106 metadata = self.get_job_metadata(name, debug=debug) 107 return metadata.get('daemon', {}).get('properties', {})
Return the daemon properties for a single job.
149def get_job_exists(self, name: str, debug: bool = False) -> bool: 150 """ 151 Return whether a job exists. 152 """ 153 response = self.get(JOBS_ENDPOINT + f'/{name}/exists', debug=debug) 154 if not response: 155 warn(f"Failed to determine whether job '{name}' exists.") 156 return False 157 158 return response.json()
Return whether a job exists.
161def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 162 """ 163 Delete a job. 164 """ 165 response = self.delete(JOBS_ENDPOINT + f"/{name}", debug=debug) 166 if not response: 167 if 'detail' in response.text: 168 return False, response.json()['detail'] 169 170 return False, response.text 171 172 return tuple(response.json())
Delete a job.
175def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 176 """ 177 Start a job. 178 """ 179 response = self.post(JOBS_ENDPOINT + f"/{name}/start", debug=debug) 180 if not response: 181 if 'detail' in response.text: 182 return False, response.json()['detail'] 183 return False, response.text 184 185 return tuple(response.json())
Start a job.
188def create_job( 189 self, 190 name: str, 191 sysargs: List[str], 192 properties: Optional[Dict[str, str]] = None, 193 debug: bool = False, 194) -> SuccessTuple: 195 """ 196 Create a job. 197 """ 198 response = self.post( 199 JOBS_ENDPOINT + f"/{name}", 200 json={ 201 'sysargs': sysargs, 202 'properties': properties, 203 }, 204 debug=debug, 205 ) 206 if not response: 207 if 'detail' in response.text: 208 return False, response.json()['detail'] 209 return False, response.text 210 211 return tuple(response.json())
Create a job.
214def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 215 """ 216 Stop a job. 217 """ 218 response = self.post(JOBS_ENDPOINT + f"/{name}/stop", debug=debug) 219 if not response: 220 if 'detail' in response.text: 221 return False, response.json()['detail'] 222 return False, response.text 223 224 return tuple(response.json())
Stop a job.
227def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 228 """ 229 Pause a job. 230 """ 231 response = self.post(JOBS_ENDPOINT + f"/{name}/pause", debug=debug) 232 if not response: 233 if 'detail' in response.text: 234 return False, response.json()['detail'] 235 return False, response.text 236 237 return tuple(response.json())
Pause a job.
240def get_logs(self, name: str, debug: bool = False) -> str: 241 """ 242 Return the logs for a job. 243 """ 244 response = self.get(LOGS_ENDPOINT + f"/{name}") 245 if not response: 246 raise ValueError(f"Cannot fetch logs for job '{name}':\n{response.text}") 247 248 return response.json()
Return the logs for a job.
251def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 252 """ 253 Return the job's manual stop time. 254 """ 255 response = self.get(JOBS_ENDPOINT + f"/{name}/stop_time") 256 if not response: 257 warn(f"Failed to get stop time for job '{name}':\n{response.text}") 258 return None 259 260 data = response.json() 261 if data is None: 262 return None 263 264 return datetime.fromisoformat(data)
Return the job's manual stop time.
348def monitor_logs( 349 self, 350 name: str, 351 callback_function: Callable[[Any], Any], 352 input_callback_function: Callable[[None], str], 353 stop_callback_function: Callable[[None], str], 354 stop_on_exit: bool = False, 355 strip_timestamps: bool = False, 356 accept_input: bool = True, 357 debug: bool = False, 358): 359 """ 360 Monitor a job's log files and execute a callback with the changes. 361 """ 362 return asyncio.run( 363 self.monitor_logs_async( 364 name, 365 callback_function, 366 input_callback_function=input_callback_function, 367 stop_callback_function=stop_callback_function, 368 stop_on_exit=stop_on_exit, 369 strip_timestamps=strip_timestamps, 370 accept_input=accept_input, 371 debug=debug 372 ) 373 )
Monitor a job's log files and execute a callback with the changes.
267async def monitor_logs_async( 268 self, 269 name: str, 270 callback_function: Callable[[Any], Any], 271 input_callback_function: Callable[[], str], 272 stop_callback_function: Callable[[SuccessTuple], str], 273 stop_on_exit: bool = False, 274 strip_timestamps: bool = False, 275 accept_input: bool = True, 276 debug: bool = False, 277): 278 """ 279 Monitor a job's log files and await a callback with the changes. 280 """ 281 import traceback 282 from meerschaum.jobs import StopMonitoringLogs 283 from meerschaum.utils.formatting._jobs import strip_timestamp_from_line 284 285 websockets, websockets_exceptions = mrsm.attempt_import('websockets', 'websockets.exceptions') 286 protocol = 'ws' if self.URI.startswith('http://') else 'wss' 287 port = self.port if 'port' in self.__dict__ else '' 288 uri = f"{protocol}://{self.host}:{port}{LOGS_ENDPOINT}/{name}/ws" 289 290 async def _stdin_callback(client): 291 if input_callback_function is None: 292 return 293 294 if asyncio.iscoroutinefunction(input_callback_function): 295 data = await input_callback_function() 296 else: 297 data = input_callback_function() 298 299 await client.send(data) 300 301 async def _stop_callback(client): 302 try: 303 result = tuple(json.loads(await client.recv())) 304 except Exception as e: 305 warn(traceback.format_exc()) 306 result = False, str(e) 307 308 if stop_callback_function is not None: 309 if asyncio.iscoroutinefunction(stop_callback_function): 310 await stop_callback_function(result) 311 else: 312 stop_callback_function(result) 313 314 if stop_on_exit: 315 raise StopMonitoringLogs 316 317 message_callbacks = { 318 JOBS_STDIN_MESSAGE: _stdin_callback, 319 JOBS_STOP_MESSAGE: _stop_callback, 320 } 321 322 async with websockets.connect(uri) as websocket: 323 try: 324 await websocket.send(self.token or 'no-login') 325 except websockets_exceptions.ConnectionClosedOK: 326 pass 327 328 while True: 329 try: 330 response = await websocket.recv() 331 callback = message_callbacks.get(response, None) 332 if callback is not None: 333 await callback(websocket) 334 continue 335 336 if strip_timestamps: 337 response = strip_timestamp_from_line(response) 338 339 if asyncio.iscoroutinefunction(callback_function): 340 await callback_function(response) 341 else: 342 callback_function(response) 343 except (KeyboardInterrupt, StopMonitoringLogs): 344 await websocket.close() 345 break
Monitor a job's log files and await a callback with the changes.
375def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 376 """ 377 Return whether a remote job is blocking on stdin. 378 """ 379 response = self.get(JOBS_ENDPOINT + f'/{name}/is_blocking_on_stdin', debug=debug) 380 if not response: 381 return False 382 383 return response.json()
Return whether a remote job is blocking on stdin.
116def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 117 """ 118 Return a job's `began` timestamp, if it exists. 119 """ 120 properties = self.get_job_properties(name, debug=debug) 121 began_str = properties.get('daemon', {}).get('began', None) 122 if began_str is None: 123 return None 124 125 return began_str
Return a job's began
timestamp, if it exists.
127def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 128 """ 129 Return a job's `ended` timestamp, if it exists. 130 """ 131 properties = self.get_job_properties(name, debug=debug) 132 ended_str = properties.get('daemon', {}).get('ended', None) 133 if ended_str is None: 134 return None 135 136 return ended_str
Return a job's ended
timestamp, if it exists.
138def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 139 """ 140 Return a job's `paused` timestamp, if it exists. 141 """ 142 properties = self.get_job_properties(name, debug=debug) 143 paused_str = properties.get('daemon', {}).get('paused', None) 144 if paused_str is None: 145 return None 146 147 return paused_str
Return a job's paused
timestamp, if it exists.
109def get_job_status(self, name: str, debug: bool = False) -> str: 110 """ 111 Return the job's status. 112 """ 113 metadata = self.get_job_metadata(name, debug=debug) 114 return metadata.get('status', 'stopped')
Return the job's status.