meerschaum.connectors.api
Interact with the Meerschaum API (send commands, pull data, etc.)
22class APIConnector(InstanceConnector): 23 """ 24 Connect to a Meerschaum API instance. 25 """ 26 27 IS_THREAD_SAFE: bool = False 28 OPTIONAL_ATTRIBUTES: List[str] = ['port', 'client_secret', 'client_id', 'api_key'] 29 30 from ._request import ( 31 make_request, 32 get, 33 post, 34 put, 35 patch, 36 delete, 37 wget, 38 ) 39 from ._actions import ( 40 get_actions, 41 do_action, 42 do_action_async, 43 do_action_legacy, 44 ) 45 from ._misc import get_mrsm_version, get_chaining_status 46 from ._pipes import ( 47 get_pipe_instance_keys, 48 register_pipe, 49 fetch_pipes_keys, 50 edit_pipe, 51 sync_pipe, 52 delete_pipe, 53 get_pipe_data, 54 get_pipe_id, 55 get_pipe_attributes, 56 get_sync_time, 57 pipe_exists, 58 create_metadata, 59 get_pipe_rowcount, 60 drop_pipe, 61 clear_pipe, 62 get_pipe_columns_types, 63 get_pipe_columns_indices, 64 ) 65 from ._fetch import fetch 66 from ._plugins import ( 67 register_plugin, 68 install_plugin, 69 delete_plugin, 70 get_plugins, 71 get_plugin_attributes, 72 ) 73 from ._login import login, test_connection 74 from ._users import ( 75 register_user, 76 get_user_id, 77 get_users, 78 edit_user, 79 delete_user, 80 get_user_password_hash, 81 get_user_type, 82 get_user_attributes, 83 ) 84 from ._tokens import ( 85 register_token, 86 get_token_model, 87 get_tokens, 88 edit_token, 89 invalidate_token, 90 get_token_scopes, 91 token_exists, 92 delete_token, 93 ) 94 from ._uri import from_uri 95 from ._jobs import ( 96 get_jobs, 97 get_job, 98 get_job_metadata, 99 get_job_properties, 100 get_job_exists, 101 delete_job, 102 start_job, 103 create_job, 104 stop_job, 105 pause_job, 106 get_logs, 107 get_job_stop_time, 108 monitor_logs, 109 monitor_logs_async, 110 get_job_is_blocking_on_stdin, 111 get_job_began, 112 get_job_ended, 113 get_job_paused, 114 get_job_status, 115 ) 116 117 def __init__( 118 self, 119 label: Optional[str] = None, 120 wait: bool = False, 121 debug: bool = False, 122 **kw 123 ): 124 if 'uri' in kw: 125 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 126 label = label or from_uri_params.get('label', None) 127 _ = from_uri_params.pop('label', None) 128 kw.update(from_uri_params) 129 130 super().__init__('api', label=label, **kw) 131 if 'protocol' not in self.__dict__: 132 self.protocol = ( 133 'https' if self.__dict__.get('uri', '').startswith('https') 134 else 'http' 135 ) 136 137 if 'uri' not in self.__dict__: 138 self.verify_attributes(required_attributes) 139 else: 140 from meerschaum.connectors.sql import SQLConnector 141 conn_attrs = SQLConnector.parse_uri(self.__dict__['uri']) 142 if 'host' not in conn_attrs: 143 raise Exception(f"Invalid URI for '{self}'.") 144 self.__dict__.update(conn_attrs) 145 146 self.url = ( 147 self.protocol + '://' + 148 self.host 149 + ( 150 (':' + str(self.port)) 151 if self.__dict__.get('port', None) 152 else '' 153 ) 154 ) 155 self._token = None 156 self._expires = None 157 self._session = None 158 self._instance_keys = self.__dict__.get('instance_keys', None) 159 160 161 @property 162 def URI(self) -> str: 163 """ 164 Return the fully qualified URI. 165 """ 166 import urllib.parse 167 username = self.__dict__.get('username', None) 168 password = self.__dict__.get('password', None) 169 client_id = self.__dict__.get('client_id', None) 170 client_secret = self.__dict__.get('client_secret', None) 171 api_key = self.__dict__.get('api_key', None) 172 creds = (username + ':' + password + '@') if username and password else '' 173 params = {} 174 params_str = ('?' + urllib.parse.urlencode(params)) if params else '' 175 return ( 176 self.protocol 177 + '://' 178 + creds 179 + self.host 180 + ( 181 (':' + str(self.port)) 182 if self.__dict__.get('port', None) 183 else '' 184 ) 185 + params_str 186 ) 187 188 @property 189 def session(self): 190 if self._session is None: 191 _ = attempt_import('certifi', lazy=False) 192 requests = attempt_import('requests', lazy=False) 193 if requests: 194 self._session = requests.Session() 195 if self._session is None: 196 error("Failed to import requests. Is requests installed?") 197 return self._session 198 199 @property 200 def token(self): 201 if self.login_scheme == 'api_key': 202 return self.api_key 203 204 expired = ( 205 True if self._expires is None else ( 206 ( 207 self._expires 208 < 209 datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1) 210 ) 211 ) 212 ) 213 214 if self._token is None or expired: 215 success, msg = self.login() 216 if not success and not self.__dict__.get('_emitted_warning'): 217 warn(msg, stack=False) 218 self._emitted_warning = True 219 return self._token 220 221 @property 222 def instance_keys(self) -> Union[str, None]: 223 """ 224 Return the instance keys to be sent alongside pipe requests. 225 """ 226 return self._instance_keys 227 228 @property 229 def login_scheme(self) -> str: 230 """ 231 Return the login scheme to use based on the configured credentials. 232 """ 233 if 'username' in self.__dict__: 234 return 'password' 235 if 'client_id' in self.__dict__: 236 return 'client_credentials' 237 elif 'api_key' in self.__dict__: 238 return 'api_key' 239 240 return 'password'
Connect to a Meerschaum API instance.
117 def __init__( 118 self, 119 label: Optional[str] = None, 120 wait: bool = False, 121 debug: bool = False, 122 **kw 123 ): 124 if 'uri' in kw: 125 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 126 label = label or from_uri_params.get('label', None) 127 _ = from_uri_params.pop('label', None) 128 kw.update(from_uri_params) 129 130 super().__init__('api', label=label, **kw) 131 if 'protocol' not in self.__dict__: 132 self.protocol = ( 133 'https' if self.__dict__.get('uri', '').startswith('https') 134 else 'http' 135 ) 136 137 if 'uri' not in self.__dict__: 138 self.verify_attributes(required_attributes) 139 else: 140 from meerschaum.connectors.sql import SQLConnector 141 conn_attrs = SQLConnector.parse_uri(self.__dict__['uri']) 142 if 'host' not in conn_attrs: 143 raise Exception(f"Invalid URI for '{self}'.") 144 self.__dict__.update(conn_attrs) 145 146 self.url = ( 147 self.protocol + '://' + 148 self.host 149 + ( 150 (':' + str(self.port)) 151 if self.__dict__.get('port', None) 152 else '' 153 ) 154 ) 155 self._token = None 156 self._expires = None 157 self._session = None 158 self._instance_keys = self.__dict__.get('instance_keys', None)
161 @property 162 def URI(self) -> str: 163 """ 164 Return the fully qualified URI. 165 """ 166 import urllib.parse 167 username = self.__dict__.get('username', None) 168 password = self.__dict__.get('password', None) 169 client_id = self.__dict__.get('client_id', None) 170 client_secret = self.__dict__.get('client_secret', None) 171 api_key = self.__dict__.get('api_key', None) 172 creds = (username + ':' + password + '@') if username and password else '' 173 params = {} 174 params_str = ('?' + urllib.parse.urlencode(params)) if params else '' 175 return ( 176 self.protocol 177 + '://' 178 + creds 179 + self.host 180 + ( 181 (':' + str(self.port)) 182 if self.__dict__.get('port', None) 183 else '' 184 ) 185 + params_str 186 )
Return the fully qualified URI.
188 @property 189 def session(self): 190 if self._session is None: 191 _ = attempt_import('certifi', lazy=False) 192 requests = attempt_import('requests', lazy=False) 193 if requests: 194 self._session = requests.Session() 195 if self._session is None: 196 error("Failed to import requests. Is requests installed?") 197 return self._session
199 @property 200 def token(self): 201 if self.login_scheme == 'api_key': 202 return self.api_key 203 204 expired = ( 205 True if self._expires is None else ( 206 ( 207 self._expires 208 < 209 datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1) 210 ) 211 ) 212 ) 213 214 if self._token is None or expired: 215 success, msg = self.login() 216 if not success and not self.__dict__.get('_emitted_warning'): 217 warn(msg, stack=False) 218 self._emitted_warning = True 219 return self._token
221 @property 222 def instance_keys(self) -> Union[str, None]: 223 """ 224 Return the instance keys to be sent alongside pipe requests. 225 """ 226 return self._instance_keys
Return the instance keys to be sent alongside pipe requests.
228 @property 229 def login_scheme(self) -> str: 230 """ 231 Return the login scheme to use based on the configured credentials. 232 """ 233 if 'username' in self.__dict__: 234 return 'password' 235 if 'client_id' in self.__dict__: 236 return 'client_credentials' 237 elif 'api_key' in self.__dict__: 238 return 'api_key' 239 240 return 'password'
Return the login scheme to use based on the configured credentials.
28def make_request( 29 self, 30 method: str, 31 r_url: str, 32 headers: Optional[Dict[str, Any]] = None, 33 use_token: bool = True, 34 debug: bool = False, 35 **kwargs: Any 36) -> 'requests.Response': 37 """ 38 Make a request to this APIConnector's endpoint using the in-memory session. 39 40 Parameters 41 ---------- 42 method: str 43 The kind of request to make. 44 Accepted values: 45 - `'GET'` 46 - `'OPTIONS'` 47 - `'HEAD'` 48 - `'POST'` 49 - `'PUT'` 50 - `'PATCH'` 51 - `'DELETE'` 52 53 r_url: str 54 The relative URL for the endpoint (e.g. `'/pipes'`). 55 56 headers: Optional[Dict[str, Any]], default None 57 The headers to use for the request. 58 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 59 60 use_token: bool, default True 61 If `True`, add the authorization token to the headers. 62 63 debug: bool, default False 64 Verbosity toggle. 65 66 kwargs: Any 67 All other keyword arguments are passed to `requests.request`. 68 69 Returns 70 ------- 71 A `requests.Reponse` object. 72 """ 73 if method.upper() not in METHODS: 74 raise ValueError(f"Method '{method}' is not supported.") 75 76 verify = self.__dict__.get('verify', None) 77 if 'verify' not in kwargs and isinstance(verify, bool): 78 kwargs['verify'] = verify 79 80 headers = ( 81 copy.deepcopy(headers) 82 if isinstance(headers, dict) 83 else {} 84 ) 85 86 if use_token: 87 headers.update({'Authorization': f'Bearer {self.token}'}) 88 89 if 'timeout' not in kwargs: 90 kwargs['timeout'] = STATIC_CONFIG['api']['default_timeout'] 91 92 request_url = urllib.parse.urljoin(self.url, r_url) 93 if debug: 94 dprint(f"[{self}] Sending a '{method.upper()}' request to {request_url}") 95 96 return self.session.request( 97 method.upper(), 98 request_url, 99 headers=headers, 100 **kwargs 101 )
Make a request to this APIConnector's endpoint using the in-memory session.
Parameters
- method (str):
The kind of request to make.
Accepted values:
'GET''OPTIONS''HEAD''POST''PUT''PATCH''DELETE'
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_tokenisTrue, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request.
Returns
- A
requests.Reponseobject.
104def get(self, r_url: str, **kwargs: Any) -> 'requests.Response': 105 """ 106 Wrapper for `requests.get`. 107 108 Parameters 109 ---------- 110 r_url: str 111 The relative URL for the endpoint (e.g. `'/pipes'`). 112 113 headers: Optional[Dict[str, Any]], default None 114 The headers to use for the request. 115 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 116 117 use_token: bool, default True 118 If `True`, add the authorization token to the headers. 119 120 debug: bool, default False 121 Verbosity toggle. 122 123 kwargs: Any 124 All other keyword arguments are passed to `requests.request`. 125 126 Returns 127 ------- 128 A `requests.Reponse` object. 129 130 """ 131 return self.make_request('GET', r_url, **kwargs)
Wrapper for requests.get.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_tokenisTrue, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request.
Returns
- A
requests.Reponseobject.
134def post(self, r_url: str, **kwargs: Any) -> 'requests.Response': 135 """ 136 Wrapper for `requests.post`. 137 138 Parameters 139 ---------- 140 r_url: str 141 The relative URL for the endpoint (e.g. `'/pipes'`). 142 143 headers: Optional[Dict[str, Any]], default None 144 The headers to use for the request. 145 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 146 147 use_token: bool, default True 148 If `True`, add the authorization token to the headers. 149 150 debug: bool, default False 151 Verbosity toggle. 152 153 kwargs: Any 154 All other keyword arguments are passed to `requests.request`. 155 156 Returns 157 ------- 158 A `requests.Reponse` object. 159 160 """ 161 return self.make_request('POST', r_url, **kwargs)
Wrapper for requests.post.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_tokenisTrue, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request.
Returns
- A
requests.Reponseobject.
193def put(self, r_url: str, **kwargs: Any) -> 'requests.Response': 194 """ 195 Wrapper for `requests.put`. 196 197 Parameters 198 ---------- 199 r_url: str 200 The relative URL for the endpoint (e.g. `'/pipes'`). 201 202 headers: Optional[Dict[str, Any]], default None 203 The headers to use for the request. 204 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 205 206 use_token: bool, default True 207 If `True`, add the authorization token to the headers. 208 209 debug: bool, default False 210 Verbosity toggle. 211 212 kwargs: Any 213 All other keyword arguments are passed to `requests.request`. 214 215 Returns 216 ------- 217 A `requests.Reponse` object. 218 """ 219 return self.make_request('PUT', r_url, **kwargs)
Wrapper for requests.put.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_tokenisTrue, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request.
Returns
- A
requests.Reponseobject.
164def patch(self, r_url: str, **kwargs: Any) -> 'requests.Response': 165 """ 166 Wrapper for `requests.patch`. 167 168 Parameters 169 ---------- 170 r_url: str 171 The relative URL for the endpoint (e.g. `'/pipes'`). 172 173 headers: Optional[Dict[str, Any]], default None 174 The headers to use for the request. 175 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 176 177 use_token: bool, default True 178 If `True`, add the authorization token to the headers. 179 180 debug: bool, default False 181 Verbosity toggle. 182 183 kwargs: Any 184 All other keyword arguments are passed to `requests.request`. 185 186 Returns 187 ------- 188 A `requests.Reponse` object. 189 """ 190 return self.make_request('PATCH', r_url, **kwargs)
Wrapper for requests.patch.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_tokenisTrue, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request.
Returns
- A
requests.Reponseobject.
222def delete(self, r_url: str, **kwargs: Any) -> 'requests.Response': 223 """ 224 Wrapper for `requests.delete`. 225 226 Parameters 227 ---------- 228 r_url: str 229 The relative URL for the endpoint (e.g. `'/pipes'`). 230 231 headers: Optional[Dict[str, Any]], default None 232 The headers to use for the request. 233 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 234 235 use_token: bool, default True 236 If `True`, add the authorization token to the headers. 237 238 debug: bool, default False 239 Verbosity toggle. 240 241 kwargs: Any 242 All other keyword arguments are passed to `requests.request`. 243 244 Returns 245 ------- 246 A `requests.Reponse` object. 247 """ 248 return self.make_request('DELETE', r_url, **kwargs)
Wrapper for requests.delete.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_tokenisTrue, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request.
Returns
- A
requests.Reponseobject.
251def wget( 252 self, 253 r_url: str, 254 dest: Optional[Union[str, pathlib.Path]] = None, 255 headers: Optional[Dict[str, Any]] = None, 256 use_token: bool = True, 257 debug: bool = False, 258 **kw: Any 259) -> pathlib.Path: 260 """Mimic wget with requests.""" 261 from meerschaum.utils.misc import wget 262 if headers is None: 263 headers = {} 264 if use_token: 265 headers.update({'Authorization': f'Bearer {self.token}'}) 266 request_url = urllib.parse.urljoin(self.url, r_url) 267 if debug: 268 dprint( 269 f"[{self}] Downloading {request_url}" 270 + (f' to {dest}' if dest is not None else '') 271 + "..." 272 ) 273 return wget(request_url, dest=dest, headers=headers, **kw)
Mimic wget with requests.
24def get_actions(self): 25 """Get available actions from the API instance.""" 26 return self.get(ACTIONS_ENDPOINT)
Get available actions from the API instance.
29def do_action(self, sysargs: List[str]) -> SuccessTuple: 30 """ 31 Execute a Meerschaum action remotely. 32 """ 33 return asyncio.run(self.do_action_async(sysargs))
Execute a Meerschaum action remotely.
36async def do_action_async( 37 self, 38 sysargs: List[str], 39 callback_function: Callable[[str], None] = partial(print, end=''), 40) -> SuccessTuple: 41 """ 42 Execute an action as a temporary remote job. 43 """ 44 from meerschaum._internal.arguments import remove_api_executor_keys 45 from meerschaum.utils.misc import generate_password 46 sysargs = remove_api_executor_keys(sysargs) 47 48 job_name = TEMP_PREFIX + generate_password(12) 49 job = mrsm.Job(job_name, sysargs, executor_keys=str(self)) 50 51 start_success, start_msg = job.start() 52 if not start_success: 53 return start_success, start_msg 54 55 await job.monitor_logs_async( 56 callback_function=callback_function, 57 stop_on_exit=True, 58 strip_timestamps=True, 59 ) 60 61 success, msg = job.result 62 job.delete() 63 return success, msg
Execute an action as a temporary remote job.
66def do_action_legacy( 67 self, 68 action: Optional[List[str]] = None, 69 sysargs: Optional[List[str]] = None, 70 debug: bool = False, 71 **kw 72) -> SuccessTuple: 73 """ 74 NOTE: This method is deprecated. 75 Please use `do_action()` or `do_action_async()`. 76 77 Execute a Meerschaum action remotely. 78 79 If `sysargs` are provided, parse those instead. 80 Otherwise infer everything from keyword arguments. 81 82 Examples 83 -------- 84 >>> conn = mrsm.get_connector('api:main') 85 >>> conn.do_action(['show', 'pipes']) 86 (True, "Success") 87 >>> conn.do_action(['show', 'arguments'], name='test') 88 (True, "Success") 89 """ 90 import sys, json 91 from meerschaum.utils.debug import dprint 92 from meerschaum._internal.static import STATIC_CONFIG 93 from meerschaum.utils.misc import json_serialize_datetime 94 if action is None: 95 action = [] 96 97 if sysargs is not None and action and action[0] == '': 98 from meerschaum._internal.arguments import parse_arguments 99 if debug: 100 dprint(f"Parsing sysargs:\n{sysargs}") 101 json_dict = parse_arguments(sysargs) 102 else: 103 json_dict = kw 104 json_dict['action'] = action 105 if 'noask' not in kw: 106 json_dict['noask'] = True 107 if 'yes' not in kw: 108 json_dict['yes'] = True 109 if debug: 110 json_dict['debug'] = debug 111 112 root_action = json_dict['action'][0] 113 del json_dict['action'][0] 114 r_url = f"{STATIC_CONFIG['api']['endpoints']['actions']}/{root_action}" 115 116 if debug: 117 from meerschaum.utils.formatting import pprint 118 dprint(f"Sending data to '{self.url + r_url}':") 119 pprint(json_dict, stream=sys.stderr) 120 121 response = self.post( 122 r_url, 123 data = json.dumps(json_dict, default=json_serialize_datetime), 124 debug = debug, 125 ) 126 try: 127 response_list = json.loads(response.text) 128 if isinstance(response_list, dict) and 'detail' in response_list: 129 return False, response_list['detail'] 130 except Exception as e: 131 print(f"Invalid response: {response}") 132 print(e) 133 return False, response.text 134 if debug: 135 dprint(response) 136 try: 137 return response_list[0], response_list[1] 138 except Exception as e: 139 return False, f"Failed to parse result from action '{root_action}'"
NOTE: This method is deprecated.
Please use do_action() or do_action_async().
Execute a Meerschaum action remotely.
If sysargs are provided, parse those instead.
Otherwise infer everything from keyword arguments.
Examples
>>> conn = mrsm.get_connector('api:main')
>>> conn.do_action(['show', 'pipes'])
(True, "Success")
>>> conn.do_action(['show', 'arguments'], name='test')
(True, "Success")
13def get_mrsm_version(self, **kw) -> Optional[str]: 14 """ 15 Return the Meerschaum version of the API instance. 16 """ 17 from meerschaum._internal.static import STATIC_CONFIG 18 try: 19 j = self.get( 20 STATIC_CONFIG['api']['endpoints']['version'] + '/mrsm', 21 use_token=False, 22 **kw 23 ).json() 24 except Exception: 25 return None 26 if isinstance(j, dict) and 'detail' in j: 27 return None 28 return j
Return the Meerschaum version of the API instance.
31def get_chaining_status(self, **kw) -> Optional[bool]: 32 """ 33 Fetch the chaining status of the API instance. 34 """ 35 from meerschaum._internal.static import STATIC_CONFIG 36 try: 37 response = self.get( 38 STATIC_CONFIG['api']['endpoints']['chaining'], 39 use_token = True, 40 **kw 41 ) 42 if not response: 43 return None 44 except Exception: 45 return None 46 47 return response.json()
Fetch the chaining status of the API instance.
35def get_pipe_instance_keys(self, pipe: mrsm.Pipe) -> Union[str, None]: 36 """ 37 Return the configured instance keys for a pipe if set, 38 else fall back to the default `instance_keys` for this `APIConnector`. 39 """ 40 return pipe.parameters.get('instance_keys', self.instance_keys)
Return the configured instance keys for a pipe if set,
else fall back to the default instance_keys for this APIConnector.
43def register_pipe( 44 self, 45 pipe: mrsm.Pipe, 46 debug: bool = False 47) -> SuccessTuple: 48 """Submit a POST to the API to register a new Pipe object. 49 Returns a tuple of (success_bool, response_dict). 50 """ 51 from meerschaum.utils.debug import dprint 52 r_url = pipe_r_url(pipe) 53 response = self.post( 54 r_url + '/register', 55 json=pipe._attributes.get('parameters', {}), 56 params={'instance_keys': self.get_pipe_instance_keys(pipe)}, 57 debug=debug, 58 ) 59 if debug: 60 dprint(response.text) 61 62 if not response: 63 return False, response.text 64 65 response_data = response.json() 66 if isinstance(response_data, list): 67 response_tuple = response_data[0], response_data[1] 68 elif 'detail' in response.json(): 69 response_tuple = response.__bool__(), response_data['detail'] 70 else: 71 response_tuple = response.__bool__(), response.text 72 return response_tuple
Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).
108def fetch_pipes_keys( 109 self, 110 connector_keys: Optional[List[str]] = None, 111 metric_keys: Optional[List[str]] = None, 112 location_keys: Optional[List[str]] = None, 113 tags: Optional[List[str]] = None, 114 params: Optional[Dict[str, Any]] = None, 115 debug: bool = False 116) -> List[ 117 Union[ 118 Tuple[str, str, Union[str, None]], 119 Tuple[str, str, Union[str, None], List[str]], 120 Tuple[str, str, Union[str, None], Dict[str, Any]] 121 ] 122 ]: 123 """ 124 Fetch registered Pipes' keys from the API. 125 126 Parameters 127 ---------- 128 connector_keys: Optional[List[str]], default None 129 The connector keys for the query. 130 131 metric_keys: Optional[List[str]], default None 132 The metric keys for the query. 133 134 location_keys: Optional[List[str]], default None 135 The location keys for the query. 136 137 tags: Optional[List[str]], default None 138 A list of tags for the query. 139 140 params: Optional[Dict[str, Any]], default None 141 A parameters dictionary for filtering against the `pipes` table 142 (e.g. `{'connector_keys': 'plugin:foo'}`). 143 Not recommeded to be used. 144 145 debug: bool, default False 146 Verbosity toggle. 147 148 Returns 149 ------- 150 A list of tuples containing pipes' keys. 151 """ 152 from meerschaum._internal.static import STATIC_CONFIG 153 if connector_keys is None: 154 connector_keys = [] 155 if metric_keys is None: 156 metric_keys = [] 157 if location_keys is None: 158 location_keys = [] 159 if tags is None: 160 tags = [] 161 162 r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys' 163 try: 164 j = self.get( 165 r_url, 166 params={ 167 'connector_keys': json.dumps(connector_keys), 168 'metric_keys': json.dumps(metric_keys), 169 'location_keys': json.dumps(location_keys), 170 'tags': json.dumps(tags), 171 'params': json.dumps(params), 172 'instance_keys': self.instance_keys, 173 }, 174 debug=debug 175 ).json() 176 except Exception as e: 177 import traceback 178 traceback.print_exc() 179 error(str(e)) 180 181 if 'detail' in j: 182 error(j['detail'], stack=False) 183 return [tuple(r) for r in j]
Fetch registered Pipes' keys from the API.
Parameters
- connector_keys (Optional[List[str]], default None): The connector keys for the query.
- metric_keys (Optional[List[str]], default None): The metric keys for the query.
- location_keys (Optional[List[str]], default None): The location keys for the query.
- tags (Optional[List[str]], default None): A list of tags for the query.
- params (Optional[Dict[str, Any]], default None):
A parameters dictionary for filtering against the
pipestable (e.g.{'connector_keys': 'plugin:foo'}). Not recommeded to be used. - debug (bool, default False): Verbosity toggle.
Returns
- A list of tuples containing pipes' keys.
75def edit_pipe( 76 self, 77 pipe: mrsm.Pipe, 78 patch: bool = False, 79 debug: bool = False, 80) -> SuccessTuple: 81 """Submit a PATCH to the API to edit an existing Pipe object. 82 Returns a tuple of (success_bool, response_dict). 83 """ 84 from meerschaum.utils.debug import dprint 85 ### NOTE: if `parameters` is supplied in the Pipe constructor, 86 ### then `pipe.parameters` will exist and not be fetched from the database. 87 r_url = pipe_r_url(pipe) 88 response = self.patch( 89 r_url + '/edit', 90 params={'patch': patch, 'instance_keys': self.get_pipe_instance_keys(pipe)}, 91 json=pipe.get_parameters(apply_symlinks=False), 92 debug=debug, 93 ) 94 if debug: 95 dprint(response.text) 96 97 response_data = response.json() 98 99 if isinstance(response.json(), list): 100 response_tuple = response_data[0], response_data[1] 101 elif 'detail' in response.json(): 102 response_tuple = response.__bool__(), response_data['detail'] 103 else: 104 response_tuple = response.__bool__(), response.text 105 return response_tuple
Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).
186def sync_pipe( 187 self, 188 pipe: mrsm.Pipe, 189 df: Optional[Union['pd.DataFrame', Dict[Any, Any], str]] = None, 190 chunksize: Optional[int] = -1, 191 debug: bool = False, 192 **kw: Any 193) -> SuccessTuple: 194 """Sync a DataFrame into a Pipe.""" 195 from decimal import Decimal 196 from meerschaum.utils.debug import dprint 197 from meerschaum.utils.dtypes import json_serialize_value 198 from meerschaum.utils.misc import items_str, interval_str 199 from meerschaum.config import get_config 200 from meerschaum.utils.packages import attempt_import 201 from meerschaum.utils.dataframe import get_special_cols, to_json 202 begin = time.perf_counter() 203 more_itertools = attempt_import('more_itertools') 204 if df is None: 205 msg = f"DataFrame is `None`. Cannot sync {pipe}." 206 return False, msg 207 208 def get_json_str(c): 209 if isinstance(c, str): 210 return c 211 if isinstance(c, (dict, list, tuple)): 212 return json.dumps(c, default=json_serialize_value) 213 return to_json(c, orient='columns', geometry_format='wkb_hex') 214 215 df = json.loads(df) if isinstance(df, str) else df 216 217 _chunksize: Optional[int] = (1 if chunksize is None else ( 218 get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1 219 else chunksize 220 )) 221 keys: List[str] = list(df.columns) 222 chunks = [] 223 if hasattr(df, 'index'): 224 df = df.reset_index(drop=True) 225 is_dask = 'dask' in df.__module__ 226 chunks = ( 227 (df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize)) 228 if not is_dask 229 else [partition.compute() for partition in df.partitions] 230 ) 231 232 elif isinstance(df, dict): 233 ### `_chunks` is a dict of lists of dicts. 234 ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] } 235 _chunks = {k: [] for k in keys} 236 for k in keys: 237 chunk_iter = more_itertools.chunked(df[k], _chunksize) 238 for l in chunk_iter: 239 _chunks[k].append({k: l}) 240 241 ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON). 242 for k, l in _chunks.items(): 243 for i, c in enumerate(l): 244 try: 245 chunks[i].update(c) 246 except IndexError: 247 chunks.append(c) 248 elif isinstance(df, list): 249 chunks = (df[i] for i in more_itertools.chunked(df, _chunksize)) 250 251 ### Send columns in case the user has defined them locally. 252 request_params = kw.copy() 253 if pipe.columns: 254 request_params['columns'] = json.dumps(pipe.columns) 255 request_params['instance_keys'] = self.get_pipe_instance_keys(pipe) 256 r_url = pipe_r_url(pipe) + '/data' 257 258 rowcount = 0 259 num_success_chunks = 0 260 for i, c in enumerate(chunks): 261 if debug: 262 dprint(f"[{self}] Posting chunk {i} to {r_url}...") 263 if len(c) == 0: 264 if debug: 265 dprint(f"[{self}] Skipping empty chunk...") 266 continue 267 json_str = get_json_str(c) 268 269 try: 270 response = self.post( 271 r_url, 272 params=request_params, 273 data=json_str, 274 debug=debug, 275 ) 276 except Exception as e: 277 msg = f"Failed to post a chunk to {pipe}:\n{e}" 278 warn(msg) 279 return False, msg 280 281 if not response: 282 return False, f"Failed to sync a chunk:\n{response.text}" 283 284 try: 285 j = json.loads(response.text) 286 except Exception as e: 287 return False, f"Failed to parse response from syncing {pipe}:\n{e}" 288 289 if isinstance(j, dict) and 'detail' in j: 290 return False, j['detail'] 291 292 try: 293 j = tuple(j) 294 except Exception: 295 return False, response.text 296 297 if debug: 298 dprint("Received response: " + str(j)) 299 if not j[0]: 300 return j 301 302 rowcount += len(c) 303 num_success_chunks += 1 304 305 success_tuple = True, ( 306 f"It took {interval_str(timedelta(seconds=(time.perf_counter() - begin)))} " 307 + f"to sync {rowcount:,} row" 308 + ('s' if rowcount != 1 else '') 309 + f" across {num_success_chunks:,} chunk" + ('s' if num_success_chunks != 1 else '') + 310 f" to {pipe}." 311 ) 312 return success_tuple
Sync a DataFrame into a Pipe.
315def delete_pipe( 316 self, 317 pipe: Optional[mrsm.Pipe] = None, 318 debug: bool = False, 319) -> SuccessTuple: 320 """Delete a Pipe and drop its table.""" 321 if pipe is None: 322 error("Pipe cannot be None.") 323 r_url = pipe_r_url(pipe) 324 response = self.delete( 325 r_url + '/delete', 326 params={'instance_keys': self.get_pipe_instance_keys(pipe)}, 327 debug=debug, 328 ) 329 if debug: 330 dprint(response.text) 331 332 response_data = response.json() 333 if isinstance(response.json(), list): 334 response_tuple = response_data[0], response_data[1] 335 elif 'detail' in response.json(): 336 response_tuple = response.__bool__(), response_data['detail'] 337 else: 338 response_tuple = response.__bool__(), response.text 339 return response_tuple
Delete a Pipe and drop its table.
342def get_pipe_data( 343 self, 344 pipe: mrsm.Pipe, 345 select_columns: Optional[List[str]] = None, 346 omit_columns: Optional[List[str]] = None, 347 begin: Union[str, datetime, int, None] = None, 348 end: Union[str, datetime, int, None] = None, 349 params: Optional[Dict[str, Any]] = None, 350 as_chunks: bool = False, 351 debug: bool = False, 352 **kw: Any 353) -> Union[pandas.DataFrame, None]: 354 """Fetch data from the API.""" 355 r_url = pipe_r_url(pipe) 356 while True: 357 try: 358 response = self.get( 359 r_url + "/data", 360 params={ 361 'select_columns': json.dumps(select_columns), 362 'omit_columns': json.dumps(omit_columns), 363 'begin': begin, 364 'end': end, 365 'params': json.dumps(params, default=str), 366 'instance': self.get_pipe_instance_keys(pipe), 367 'as_chunks': as_chunks, 368 }, 369 debug=debug 370 ) 371 if not response.ok: 372 return None 373 j = response.json() 374 except Exception as e: 375 warn(f"Failed to get data for {pipe}:\n{e}") 376 return None 377 if isinstance(j, dict) and 'detail' in j: 378 return False, j['detail'] 379 break 380 381 from meerschaum.utils.dataframe import parse_df_datetimes, add_missing_cols_to_df 382 from meerschaum.utils.dtypes import are_dtypes_equal 383 try: 384 df = parse_df_datetimes( 385 j, 386 ignore_cols=[ 387 col 388 for col, dtype in pipe.dtypes.items() 389 if not are_dtypes_equal(str(dtype), 'datetime') 390 ], 391 strip_timezone=(pipe.tzinfo is None), 392 debug=debug, 393 ) 394 except Exception as e: 395 warn(f"Failed to parse response for {pipe}:\n{e}") 396 return None 397 398 if len(df.columns) == 0: 399 return add_missing_cols_to_df(df, pipe.dtypes) 400 401 return df
Fetch data from the API.
404def get_pipe_id( 405 self, 406 pipe: mrsm.Pipe, 407 debug: bool = False, 408) -> Union[int, str, None]: 409 """Get a Pipe's ID from the API.""" 410 from meerschaum.utils.misc import is_int 411 r_url = pipe_r_url(pipe) 412 response = self.get( 413 r_url + '/id', 414 params={ 415 'instance': self.get_pipe_instance_keys(pipe), 416 }, 417 debug=debug, 418 ) 419 if debug: 420 dprint(f"Got pipe ID: {response.text}") 421 try: 422 if is_int(response.text): 423 return int(response.text) 424 if response.text and response.text[0] != '{': 425 return response.text 426 except Exception as e: 427 warn(f"Failed to get the ID for {pipe}:\n{e}") 428 return None
Get a Pipe's ID from the API.
431def get_pipe_attributes( 432 self, 433 pipe: mrsm.Pipe, 434 debug: bool = False, 435) -> Dict[str, Any]: 436 """Get a Pipe's attributes from the API 437 438 Parameters 439 ---------- 440 pipe: meerschaum.Pipe 441 The pipe whose attributes we are fetching. 442 443 Returns 444 ------- 445 A dictionary of a pipe's attributes. 446 If the pipe does not exist, return an empty dictionary. 447 """ 448 r_url = pipe_r_url(pipe) 449 response = self.get( 450 r_url + '/attributes', 451 params={ 452 'instance': self.get_pipe_instance_keys(pipe), 453 }, 454 debug=debug 455 ) 456 try: 457 return json.loads(response.text) 458 except Exception as e: 459 warn(f"Failed to get the attributes for {pipe}:\n{e}") 460 return {}
Get a Pipe's attributes from the API
Parameters
- pipe (meerschaum.Pipe): The pipe whose attributes we are fetching.
Returns
- A dictionary of a pipe's attributes.
- If the pipe does not exist, return an empty dictionary.
463def get_sync_time( 464 self, 465 pipe: mrsm.Pipe, 466 params: Optional[Dict[str, Any]] = None, 467 newest: bool = True, 468 debug: bool = False, 469) -> Union[datetime, int, None]: 470 """Get a Pipe's most recent datetime value from the API. 471 472 Parameters 473 ---------- 474 pipe: meerschaum.Pipe 475 The pipe to select from. 476 477 params: Optional[Dict[str, Any]], default None 478 Optional params dictionary to build the WHERE clause. 479 480 newest: bool, default True 481 If `True`, get the most recent datetime (honoring `params`). 482 If `False`, get the oldest datetime (ASC instead of DESC). 483 484 Returns 485 ------- 486 The most recent (or oldest if `newest` is `False`) datetime of a pipe, 487 rounded down to the closest minute. 488 """ 489 from meerschaum.utils.misc import is_int 490 from meerschaum.utils.warnings import warn 491 r_url = pipe_r_url(pipe) 492 response = self.get( 493 r_url + '/sync_time', 494 json=params, 495 params={ 496 'instance': self.get_pipe_instance_keys(pipe), 497 'newest': newest, 498 'debug': debug, 499 }, 500 debug=debug, 501 ) 502 if not response: 503 warn(f"Failed to get the sync time for {pipe}:\n" + response.text) 504 return None 505 506 j = response.json() 507 if j is None: 508 dt = None 509 else: 510 try: 511 dt = ( 512 datetime.fromisoformat(j) 513 if not is_int(j) 514 else int(j) 515 ) 516 except Exception as e: 517 warn(f"Failed to parse the sync time '{j}' for {pipe}:\n{e}") 518 dt = None 519 return dt
Get a Pipe's most recent datetime value from the API.
Parameters
- pipe (meerschaum.Pipe): The pipe to select from.
- params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause.
- newest (bool, default True):
If
True, get the most recent datetime (honoringparams). IfFalse, get the oldest datetime (ASC instead of DESC).
Returns
- The most recent (or oldest if
newestisFalse) datetime of a pipe, - rounded down to the closest minute.
522def pipe_exists( 523 self, 524 pipe: mrsm.Pipe, 525 debug: bool = False 526) -> bool: 527 """Check the API to see if a Pipe exists. 528 529 Parameters 530 ---------- 531 pipe: 'meerschaum.Pipe' 532 The pipe which were are querying. 533 534 Returns 535 ------- 536 A bool indicating whether a pipe's underlying table exists. 537 """ 538 from meerschaum.utils.debug import dprint 539 from meerschaum.utils.warnings import warn 540 r_url = pipe_r_url(pipe) 541 response = self.get( 542 r_url + '/exists', 543 params={ 544 'instance': self.get_pipe_instance_keys(pipe), 545 }, 546 debug=debug, 547 ) 548 if not response: 549 warn(f"Failed to check if {pipe} exists:\n{response.text}") 550 return False 551 if debug: 552 dprint("Received response: " + str(response.text)) 553 j = response.json() 554 if isinstance(j, dict) and 'detail' in j: 555 warn(j['detail']) 556 return j
Check the API to see if a Pipe exists.
Parameters
- pipe ('meerschaum.Pipe'): The pipe which were are querying.
Returns
- A bool indicating whether a pipe's underlying table exists.
559def create_metadata( 560 self, 561 debug: bool = False 562) -> bool: 563 """Create metadata tables. 564 565 Returns 566 ------- 567 A bool indicating success. 568 """ 569 from meerschaum.utils.debug import dprint 570 from meerschaum._internal.static import STATIC_CONFIG 571 r_url = STATIC_CONFIG['api']['endpoints']['metadata'] 572 response = self.post(r_url, debug=debug) 573 if debug: 574 dprint("Create metadata response: {response.text}") 575 try: 576 _ = json.loads(response.text) 577 except Exception as e: 578 warn(f"Failed to create metadata on {self}:\n{e}") 579 return False
Create metadata tables.
Returns
- A bool indicating success.
582def get_pipe_rowcount( 583 self, 584 pipe: mrsm.Pipe, 585 begin: Union[str, datetime, int, None] = None, 586 end: Union[str, datetime, int, None] = None, 587 params: Optional[Dict[str, Any]] = None, 588 remote: bool = False, 589 debug: bool = False, 590) -> int: 591 """Get a pipe's row count from the API. 592 593 Parameters 594 ---------- 595 pipe: 'meerschaum.Pipe': 596 The pipe whose row count we are counting. 597 598 begin: Union[str, datetime, int, None], default None 599 If provided, bound the count by this datetime. 600 601 end: Union[str, datetime, int, None], default None 602 If provided, bound the count by this datetime. 603 604 params: Optional[Dict[str, Any]], default None 605 If provided, bound the count by these parameters. 606 607 remote: bool, default False 608 If `True`, return the rowcount for the fetch definition. 609 610 Returns 611 ------- 612 The number of rows in the pipe's table, bound the given parameters. 613 If the table does not exist, return 0. 614 """ 615 r_url = pipe_r_url(pipe) 616 response = self.get( 617 r_url + "/rowcount", 618 json = params, 619 params = { 620 'begin': begin, 621 'end': end, 622 'remote': remote, 623 'instance': self.get_pipe_instance_keys(pipe), 624 }, 625 debug = debug 626 ) 627 if not response: 628 warn(f"Failed to get the rowcount for {pipe}:\n{response.text}") 629 return 0 630 try: 631 return int(json.loads(response.text)) 632 except Exception as e: 633 warn(f"Failed to get the rowcount for {pipe}:\n{e}") 634 return 0
Get a pipe's row count from the API.
Parameters
- pipe ('meerschaum.Pipe':): The pipe whose row count we are counting.
- begin (Union[str, datetime, int, None], default None): If provided, bound the count by this datetime.
- end (Union[str, datetime, int, None], default None): If provided, bound the count by this datetime.
- params (Optional[Dict[str, Any]], default None): If provided, bound the count by these parameters.
- remote (bool, default False):
If
True, return the rowcount for the fetch definition.
Returns
- The number of rows in the pipe's table, bound the given parameters.
- If the table does not exist, return 0.
637def drop_pipe( 638 self, 639 pipe: mrsm.Pipe, 640 debug: bool = False 641) -> SuccessTuple: 642 """ 643 Drop a pipe's table but maintain its registration. 644 645 Parameters 646 ---------- 647 pipe: meerschaum.Pipe: 648 The pipe to be dropped. 649 650 Returns 651 ------- 652 A success tuple (bool, str). 653 """ 654 from meerschaum.utils.warnings import error 655 from meerschaum.utils.debug import dprint 656 if pipe is None: 657 error("Pipe cannot be None.") 658 r_url = pipe_r_url(pipe) 659 response = self.delete( 660 r_url + '/drop', 661 params={ 662 'instance': self.get_pipe_instance_keys(pipe), 663 }, 664 debug=debug, 665 ) 666 if debug: 667 dprint(response.text) 668 669 try: 670 data = response.json() 671 except Exception as e: 672 return False, f"Failed to drop {pipe}." 673 674 if isinstance(data, list): 675 response_tuple = data[0], data[1] 676 elif 'detail' in response.json(): 677 response_tuple = response.__bool__(), data['detail'] 678 else: 679 response_tuple = response.__bool__(), response.text 680 681 return response_tuple
Drop a pipe's table but maintain its registration.
Parameters
- pipe (meerschaum.Pipe:): The pipe to be dropped.
Returns
- A success tuple (bool, str).
684def clear_pipe( 685 self, 686 pipe: mrsm.Pipe, 687 begin: Union[str, datetime, int, None] = None, 688 end: Union[str, datetime, int, None] = None, 689 params: Optional[Dict[str, Any]] = None, 690 debug: bool = False, 691 **kw 692) -> SuccessTuple: 693 """ 694 Delete rows in a pipe's table. 695 696 Parameters 697 ---------- 698 pipe: meerschaum.Pipe 699 The pipe with rows to be deleted. 700 701 Returns 702 ------- 703 A success tuple. 704 """ 705 r_url = pipe_r_url(pipe) 706 response = self.delete( 707 r_url + '/clear', 708 params={ 709 'begin': begin, 710 'end': end, 711 'params': json.dumps(params), 712 'instance': self.get_pipe_instance_keys(pipe), 713 }, 714 debug=debug, 715 ) 716 if debug: 717 dprint(response.text) 718 719 try: 720 data = response.json() 721 except Exception as e: 722 return False, f"Failed to clear {pipe} with constraints {begin=}, {end=}, {params=}." 723 724 if isinstance(data, list): 725 response_tuple = data[0], data[1] 726 elif 'detail' in response.json(): 727 response_tuple = response.__bool__(), data['detail'] 728 else: 729 response_tuple = response.__bool__(), response.text 730 731 return response_tuple
Delete rows in a pipe's table.
Parameters
- pipe (meerschaum.Pipe): The pipe with rows to be deleted.
Returns
- A success tuple.
734def get_pipe_columns_types( 735 self, 736 pipe: mrsm.Pipe, 737 debug: bool = False, 738) -> Union[Dict[str, str], None]: 739 """ 740 Fetch the columns and types of the pipe's table. 741 742 Parameters 743 ---------- 744 pipe: meerschaum.Pipe 745 The pipe whose columns to be queried. 746 747 Returns 748 ------- 749 A dictionary mapping column names to their database types. 750 751 Examples 752 -------- 753 >>> { 754 ... 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 755 ... 'id': 'BIGINT', 756 ... 'val': 'DOUBLE PRECISION', 757 ... } 758 >>> 759 """ 760 r_url = pipe_r_url(pipe) + '/columns/types' 761 response = self.get( 762 r_url, 763 params={ 764 'instance': self.get_pipe_instance_keys(pipe), 765 }, 766 debug=debug, 767 ) 768 j = response.json() 769 if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1: 770 warn(j['detail']) 771 return None 772 if not isinstance(j, dict): 773 warn(response.text) 774 return None 775 return j
Fetch the columns and types of the pipe's table.
Parameters
- pipe (meerschaum.Pipe): The pipe whose columns to be queried.
Returns
- A dictionary mapping column names to their database types.
Examples
>>> {
... 'dt': 'TIMESTAMP WITHOUT TIMEZONE',
... 'id': 'BIGINT',
... 'val': 'DOUBLE PRECISION',
... }
>>>
778def get_pipe_columns_indices( 779 self, 780 pipe: mrsm.Pipe, 781 debug: bool = False, 782) -> Union[Dict[str, str], None]: 783 """ 784 Fetch the index information for a pipe. 785 786 Parameters 787 ---------- 788 pipe: mrsm.Pipe 789 The pipe whose columns to be queried. 790 791 Returns 792 ------- 793 A dictionary mapping column names to a list of associated index information. 794 """ 795 r_url = pipe_r_url(pipe) + '/columns/indices' 796 response = self.get( 797 r_url, 798 params={ 799 'instance': self.get_pipe_instance_keys(pipe), 800 }, 801 debug=debug, 802 ) 803 j = response.json() 804 if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1: 805 warn(j['detail']) 806 return None 807 if not isinstance(j, dict): 808 warn(response.text) 809 return None 810 return j
Fetch the index information for a pipe.
Parameters
- pipe (mrsm.Pipe): The pipe whose columns to be queried.
Returns
- A dictionary mapping column names to a list of associated index information.
16def fetch( 17 self, 18 pipe: mrsm.Pipe, 19 begin: Union[datetime, str, int] = '', 20 end: Union[datetime, int] = None, 21 params: Optional[Dict, Any] = None, 22 debug: bool = False, 23 **kw: Any 24 ) -> Iterator['pd.DataFrame']: 25 """Get the Pipe data from the remote Pipe.""" 26 from meerschaum.utils.debug import dprint 27 from meerschaum.utils.warnings import warn, error 28 from meerschaum.config._patch import apply_patch_to_config 29 30 fetch_params = pipe.parameters.get('fetch', {}) 31 if not fetch_params: 32 warn(f"Missing 'fetch' parameters for {pipe}.", stack=False) 33 return None 34 35 pipe_meta = fetch_params.get('pipe', {}) 36 ### Legacy: check for `connector_keys`, etc. at the root. 37 if not pipe_meta: 38 ck, mk, lk = ( 39 fetch_params.get('connector_keys', None), 40 fetch_params.get('metric_key', None), 41 fetch_params.get('location_key', None), 42 ) 43 if not ck or not mk: 44 warn(f"Missing `fetch:pipe` keys for {pipe}.", stack=False) 45 return None 46 47 pipe_meta.update({ 48 'connector': ck, 49 'metric': mk, 50 'location': lk, 51 }) 52 53 pipe_meta['instance'] = self 54 source_pipe = mrsm.Pipe(**pipe_meta) 55 56 _params = copy.deepcopy(params) if params is not None else {} 57 _params = apply_patch_to_config(_params, fetch_params.get('params', {})) 58 select_columns = fetch_params.get('select_columns', []) 59 omit_columns = fetch_params.get('omit_columns', []) 60 61 return source_pipe.get_data( 62 select_columns = select_columns, 63 omit_columns = omit_columns, 64 begin = begin, 65 end = end, 66 params = _params, 67 debug = debug, 68 as_iterator = True, 69 )
Get the Pipe data from the remote Pipe.
24def register_plugin( 25 self, 26 plugin: mrsm.core.Plugin, 27 make_archive: bool = True, 28 debug: bool = False, 29) -> SuccessTuple: 30 """Register a plugin and upload its archive.""" 31 import json 32 archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path 33 file_pointer = open(archive_path, 'rb') 34 files = {'archive': file_pointer} 35 metadata = { 36 'version': plugin.version, 37 'attributes': json.dumps(plugin.attributes), 38 } 39 r_url = plugin_r_url(plugin) 40 try: 41 response = self.post(r_url, files=files, params=metadata, debug=debug) 42 except Exception: 43 return False, f"Failed to register plugin '{plugin}'." 44 finally: 45 file_pointer.close() 46 47 try: 48 success, msg = json.loads(response.text) 49 except Exception: 50 return False, response.text 51 52 return success, msg
Register a plugin and upload its archive.
55def install_plugin( 56 self, 57 name: str, 58 skip_deps: bool = False, 59 force: bool = False, 60 debug: bool = False 61) -> SuccessTuple: 62 """Download and attempt to install a plugin from the API.""" 63 import os 64 import pathlib 65 import json 66 from meerschaum.core import Plugin 67 import meerschaum.config.paths as paths 68 from meerschaum.utils.debug import dprint 69 from meerschaum.utils.packages import attempt_import 70 binaryornot_check = attempt_import('binaryornot.check', lazy=False) 71 r_url = plugin_r_url(name) 72 dest = pathlib.Path(os.path.join(paths.PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz')) 73 if debug: 74 dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...") 75 archive_path = self.wget(r_url, dest, debug=debug) 76 is_binary = binaryornot_check.is_binary(str(archive_path)) 77 if not is_binary: 78 fail_msg = f"Failed to download binary for plugin '{name}'." 79 try: 80 with open(archive_path, 'r') as f: 81 j = json.load(f) 82 if isinstance(j, list): 83 success, msg = tuple(j) 84 elif isinstance(j, dict) and 'detail' in j: 85 success, msg = False, fail_msg 86 except Exception: 87 success, msg = False, fail_msg 88 return success, msg 89 plugin = Plugin(name, archive_path=archive_path, repo_connector=self) 90 return plugin.install(skip_deps=skip_deps, force=force, debug=debug)
Download and attempt to install a plugin from the API.
156def delete_plugin( 157 self, 158 plugin: mrsm.core.Plugin, 159 debug: bool = False 160) -> SuccessTuple: 161 """Delete a plugin from an API repository.""" 162 import json 163 r_url = plugin_r_url(plugin) 164 try: 165 response = self.delete(r_url, debug=debug) 166 except Exception: 167 return False, f"Failed to delete plugin '{plugin}'." 168 169 try: 170 success, msg = json.loads(response.text) 171 except Exception: 172 return False, response.text 173 174 return success, msg
Delete a plugin from an API repository.
93def get_plugins( 94 self, 95 user_id: Optional[int] = None, 96 search_term: Optional[str] = None, 97 debug: bool = False 98) -> List[str]: 99 """Return a list of registered plugin names. 100 101 Parameters 102 ---------- 103 user_id: Optional[int], default None 104 If specified, return all plugins from a certain user. 105 106 search_term: Optional[str], default None 107 If specified, return plugins beginning with this string. 108 109 Returns 110 ------- 111 A list of plugin names. 112 """ 113 import json 114 from meerschaum.utils.warnings import error 115 from meerschaum._internal.static import STATIC_CONFIG 116 response = self.get( 117 STATIC_CONFIG['api']['endpoints']['plugins'], 118 params = {'user_id': user_id, 'search_term': search_term}, 119 use_token = True, 120 debug = debug 121 ) 122 if not response: 123 return [] 124 plugins = json.loads(response.text) 125 if not isinstance(plugins, list): 126 error(response.text) 127 return plugins
Return a list of registered plugin names.
Parameters
- user_id (Optional[int], default None): If specified, return all plugins from a certain user.
- search_term (Optional[str], default None): If specified, return plugins beginning with this string.
Returns
- A list of plugin names.
130def get_plugin_attributes( 131 self, 132 plugin: mrsm.core.Plugin, 133 debug: bool = False 134) -> Dict[str, Any]: 135 """ 136 Return a plugin's attributes. 137 """ 138 import json 139 from meerschaum.utils.warnings import warn, error 140 r_url = plugin_r_url(plugin) + '/attributes' 141 response = self.get(r_url, use_token=True, debug=debug) 142 attributes = response.json() 143 if isinstance(attributes, str) and attributes and attributes[0] == '{': 144 try: 145 attributes = json.loads(attributes) 146 except Exception: 147 pass 148 if not isinstance(attributes, dict): 149 error(response.text) 150 elif not response and 'detail' in attributes: 151 warn(attributes['detail']) 152 return {} 153 return attributes
Return a plugin's attributes.
19def login( 20 self, 21 debug: bool = False, 22 warn: bool = True, 23 **kw: Any 24) -> SuccessTuple: 25 """Log in and set the session token.""" 26 if self.login_scheme == 'api_key': 27 validate_response = self.post( 28 STATIC_CONFIG['api']['endpoints']['tokens'] + '/validate', 29 headers={'Authorization': f'Bearer {self.api_key}'}, 30 use_token=False, 31 debug=debug, 32 ) 33 if not validate_response: 34 return False, "API key is not valid." 35 return True, "API key is valid." 36 37 try: 38 if self.login_scheme == 'password': 39 login_data = { 40 'username': self.username, 41 'password': self.password, 42 } 43 elif self.login_scheme == 'client_credentials': 44 login_data = { 45 'client_id': self.client_id, 46 'client_secret': self.client_secret, 47 } 48 except AttributeError: 49 login_data = {} 50 51 if not login_data: 52 return False, f"Please login with the command `login {self}`." 53 54 login_scheme_msg = ( 55 f" as user '{login_data['username']}'" 56 if self.login_scheme == 'username' 57 else '' 58 ) 59 60 response = self.post( 61 STATIC_CONFIG['api']['endpoints']['login'], 62 data=login_data, 63 use_token=False, 64 debug=debug, 65 ) 66 if response: 67 msg = f"Successfully logged into '{self}'{login_scheme_msg}'." 68 self._token = json.loads(response.text)['access_token'] 69 self._expires = datetime.datetime.strptime( 70 json.loads(response.text)['expires'], 71 '%Y-%m-%dT%H:%M:%S.%f' 72 ) 73 else: 74 msg = ( 75 f"Failed to log into '{self}'{login_scheme_msg}.\n" + 76 f" Please verify login details for connector '{self}'." 77 ) 78 if warn and not self.__dict__.get('_emitted_warning', False): 79 _warn(msg, stack=False) 80 self._emitted_warning = True 81 82 return response.__bool__(), msg
Log in and set the session token.
85def test_connection( 86 self, 87 **kw: Any 88) -> Union[bool, None]: 89 """Test if a successful connection to the API may be made.""" 90 from meerschaum.connectors.poll import retry_connect 91 _default_kw = { 92 'max_retries': 1, 'retry_wait': 0, 'warn': False, 93 'connector': self, 'enforce_chaining': False, 94 'enforce_login': False, 95 } 96 _default_kw.update(kw) 97 try: 98 return retry_connect(**_default_kw) 99 except Exception: 100 return False
Test if a successful connection to the API may be made.
70def register_user( 71 self, 72 user: mrsm.core.User, 73 debug: bool = False, 74 **kw: Any 75) -> SuccessTuple: 76 """Register a new user.""" 77 from meerschaum._internal.static import STATIC_CONFIG 78 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/register" 79 data = { 80 'username': user.username, 81 'password': user.password, 82 'attributes': json.dumps(user.attributes), 83 } 84 if user.type: 85 data['type'] = user.type 86 if user.email: 87 data['email'] = user.email 88 response = self.post(r_url, data=data, debug=debug) 89 try: 90 _json = json.loads(response.text) 91 if isinstance(_json, dict) and 'detail' in _json: 92 return False, _json['detail'] 93 success_tuple = tuple(_json) 94 except Exception: 95 msg = response.text if response else f"Failed to register user '{user}'." 96 return False, msg 97 98 return tuple(success_tuple)
Register a new user.
101def get_user_id( 102 self, 103 user: mrsm.core.User, 104 debug: bool = False, 105 **kw: Any 106) -> Union[int, str, UUID, None]: 107 """Get a user's ID.""" 108 from meerschaum._internal.static import STATIC_CONFIG 109 from meerschaum.utils.misc import is_int, is_uuid 110 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/id" 111 response = self.get(r_url, debug=debug, **kw) 112 try: 113 id_text = str(json.loads(response.text)) 114 if is_int(id_text): 115 user_id = int(id_text) 116 elif is_uuid(id_text): 117 user_id = UUID(id_text) 118 else: 119 user_id = id_text 120 except Exception as e: 121 user_id = None 122 return user_id
Get a user's ID.
19def get_users( 20 self, 21 debug: bool = False, 22 **kw: Any 23) -> List[str]: 24 """ 25 Return a list of registered usernames. 26 """ 27 from meerschaum._internal.static import STATIC_CONFIG 28 response = self.get( 29 f"{STATIC_CONFIG['api']['endpoints']['users']}", 30 debug = debug, 31 use_token = True, 32 ) 33 if not response: 34 return [] 35 try: 36 return response.json() 37 except Exception as e: 38 return []
Return a list of registered usernames.
41def edit_user( 42 self, 43 user: mrsm.core.User, 44 debug: bool = False, 45 **kw: Any 46) -> SuccessTuple: 47 """Edit an existing user.""" 48 from meerschaum._internal.static import STATIC_CONFIG 49 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit" 50 data = { 51 'username': user.username, 52 'password': user.password, 53 'type': user.type, 54 'email': user.email, 55 'attributes': json.dumps(user.attributes), 56 } 57 response = self.post(r_url, data=data, debug=debug) 58 try: 59 _json = json.loads(response.text) 60 if isinstance(_json, dict) and 'detail' in _json: 61 return False, _json['detail'] 62 success_tuple = tuple(_json) 63 except Exception: 64 msg = response.text if response else f"Failed to edit user '{user}'." 65 return False, msg 66 67 return tuple(success_tuple)
Edit an existing user.
125def delete_user( 126 self, 127 user: mrsm.core.User, 128 debug: bool = False, 129 **kw: Any 130) -> SuccessTuple: 131 """Delete a user.""" 132 from meerschaum._internal.static import STATIC_CONFIG 133 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}" 134 response = self.delete(r_url, debug=debug) 135 try: 136 _json = json.loads(response.text) 137 if isinstance(_json, dict) and 'detail' in _json: 138 return False, _json['detail'] 139 success_tuple = tuple(_json) 140 except Exception: 141 success_tuple = False, f"Failed to delete user '{user.username}'." 142 return success_tuple
Delete a user.
166def get_user_password_hash( 167 self, 168 user: mrsm.core.User, 169 debug: bool = False, 170 **kw: Any 171) -> Optional[str]: 172 """If configured, get a user's password hash.""" 173 from meerschaum._internal.static import STATIC_CONFIG 174 r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/password_hash' 175 response = self.get(r_url, debug=debug, **kw) 176 if not response: 177 return None 178 return response.json()
If configured, get a user's password hash.
181def get_user_type( 182 self, 183 user: mrsm.core.User, 184 debug: bool = False, 185 **kw: Any 186) -> Optional[str]: 187 """If configured, get a user's type.""" 188 from meerschaum._internal.static import STATIC_CONFIG 189 r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/type' 190 response = self.get(r_url, debug=debug, **kw) 191 if not response: 192 return None 193 return response.json()
If configured, get a user's type.
145def get_user_attributes( 146 self, 147 user: mrsm.core.User, 148 debug: bool = False, 149 **kw 150) -> int: 151 """Get a user's attributes.""" 152 from meerschaum._internal.static import STATIC_CONFIG 153 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/attributes" 154 response = self.get(r_url, debug=debug, **kw) 155 try: 156 attributes = json.loads(response.text) 157 except Exception: 158 attributes = None 159 return attributes
Get a user's attributes.
20def register_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple: 21 """ 22 Register the provided token to the API. 23 """ 24 from meerschaum.utils.dtypes import json_serialize_value 25 r_url = tokens_endpoint + '/register' 26 response = self.post( 27 r_url, 28 data=json.dumps({ 29 'label': token.label, 30 'scopes': token.scopes, 31 'expiration': token.expiration, 32 }, default=json_serialize_value), 33 debug=debug, 34 ) 35 if not response: 36 return False, f"Failed to register token:\n{response.text}" 37 38 data = response.json() 39 token.label = data['label'] 40 token.secret = data['secret'] 41 token.id = uuid.UUID(data['id']) 42 if data.get('expiration', None): 43 token.expiration = datetime.fromisoformat(data['expiration']) 44 45 return True, f"Registered token '{token.label}'."
Register the provided token to the API.
48def get_token_model(self, token_id: uuid.UUID, debug: bool = False) -> 'Union[TokenModel, None]': 49 """ 50 Return a token's model from the API instance. 51 """ 52 from meerschaum.models import TokenModel 53 r_url = tokens_endpoint + f'/{token_id}' 54 response = self.get(r_url, debug=debug) 55 if not response: 56 return None 57 data = response.json() 58 return TokenModel(**data)
Return a token's model from the API instance.
61def get_tokens(self, labels: Optional[List[str]] = None, debug: bool = False) -> List[Token]: 62 """ 63 Return the tokens registered to the current user. 64 """ 65 from meerschaum.utils.warnings import warn 66 r_url = tokens_endpoint 67 params = {} 68 if labels: 69 params['labels'] = ','.join(labels) 70 response = self.get(r_url, params={'labels': labels}, debug=debug) 71 if not response: 72 warn(f"Could not get tokens from '{self}':\n{response.text}") 73 return [] 74 75 tokens = [ 76 Token(instance=self, **payload) 77 for payload in response.json() 78 ] 79 return tokens
Return the tokens registered to the current user.
82def edit_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple: 83 """ 84 Persist the token's in-memory state to the API. 85 """ 86 r_url = tokens_endpoint + f"/{token.id}/edit" 87 response = self.post( 88 r_url, 89 json={ 90 'creation': token.creation.isoformat() if token.creation else None, 91 'expiration': token.expiration.isoformat() if token.expiration else None, 92 'label': token.label, 93 'is_valid': token.is_valid, 94 'scopes': token.scopes, 95 }, 96 ) 97 if not response: 98 return False, f"Failed to edit token:\n{response.text}" 99 100 success, msg = response.json() 101 return success, msg
Persist the token's in-memory state to the API.
104def invalidate_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple: 105 """ 106 Invalidate the token, disabling it for future requests. 107 """ 108 r_url = tokens_endpoint + f"/{token.id}/invalidate" 109 response = self.post(r_url) 110 if not response: 111 return False, f"Failed to invalidate token:\n{response.text}" 112 113 success, msg = response.json() 114 return success, msg
Invalidate the token, disabling it for future requests.
117def get_token_scopes(self, token_id: Union[uuid.UUID, Token], debug: bool = False) -> List[str]: 118 """ 119 Return the scopes for a token. 120 """ 121 _token_id = (token_id.id if isinstance(token_id, Token) else token_id) 122 model = self.get_token_model(_token_id, debug=debug).scopes 123 return getattr(model, 'scopes', [])
Return the scopes for a token.
126def token_exists(self, token_id: Union[uuid.UUID, Token], debug: bool = False) -> bool: 127 """ 128 Return `True` if a token exists. 129 """ 130 _token_id = (token_id.id if isinstance(token_id, Token) else token_id) 131 model = self.get_token_model(_token_id, debug=debug) 132 if model is None: 133 return False 134 return model.creation is not None
Return True if a token exists.
137def delete_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple: 138 """ 139 Delete the token from the API. 140 """ 141 r_url = tokens_endpoint + f"/{token.id}" 142 response = self.delete(r_url, debug=debug) 143 if not response: 144 return False, f"Failed to delete token:\n{response.text}" 145 146 success, msg = response.json() 147 return success, msg
Delete the token from the API.
13@classmethod 14def from_uri( 15 cls, 16 uri: str, 17 label: Optional[str] = None, 18 as_dict: bool = False, 19) -> Union[ 20 'meerschaum.connectors.APIConnector', 21 Dict[str, Union[str, int]], 22 ]: 23 """ 24 Create a new APIConnector from a URI string. 25 26 Parameters 27 ---------- 28 uri: str 29 The URI connection string. 30 31 label: Optional[str], default None 32 If provided, use this as the connector label. 33 Otherwise use the determined database name. 34 35 as_dict: bool, default False 36 If `True`, return a dictionary of the keyword arguments 37 necessary to create a new `APIConnector`, otherwise create a new object. 38 39 Returns 40 ------- 41 A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`). 42 """ 43 from meerschaum.connectors.sql import SQLConnector 44 params = SQLConnector.parse_uri(uri) 45 if 'host' not in params: 46 error("No host was found in the provided URI.") 47 params['protocol'] = params.pop('flavor') 48 params['label'] = label or ( 49 ( 50 (params['username'] + '@' if 'username' in params else '') 51 + params['host'] 52 ).lower() 53 ) 54 55 return cls(**params) if not as_dict else params
Create a new APIConnector from a URI string.
Parameters
- uri (str): The URI connection string.
- label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
- as_dict (bool, default False):
If
True, return a dictionary of the keyword arguments necessary to create a newAPIConnector, otherwise create a new object.
Returns
- A new APIConnector object or a dictionary of attributes (if
as_dictisTrue).
28def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 29 """ 30 Return a dictionary of remote jobs. 31 """ 32 response = self.get(JOBS_ENDPOINT, debug=debug) 33 if not response: 34 warn(f"Failed to get remote jobs from {self}.") 35 return {} 36 return { 37 name: Job( 38 name, 39 job_meta['sysargs'], 40 executor_keys=str(self), 41 _properties=job_meta['daemon']['properties'] 42 ) 43 for name, job_meta in response.json().items() 44 }
Return a dictionary of remote jobs.
47def get_job(self, name: str, debug: bool = False) -> Job: 48 """ 49 Return a single Job object. 50 """ 51 metadata = self.get_job_metadata(name, debug=debug) 52 if not metadata: 53 raise ValueError(f"Job '{name}' does not exist.") 54 55 return Job( 56 name, 57 metadata['sysargs'], 58 executor_keys=str(self), 59 _properties=metadata['daemon']['properties'], 60 )
Return a single Job object.
63def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 64 """ 65 Return the metadata for a single job. 66 """ 67 now = time.perf_counter() 68 _job_metadata_cache = self.__dict__.get('_job_metadata_cache', None) 69 _job_metadata_timestamp = ( 70 _job_metadata_cache.get(name, {}).get('timestamp', None) 71 ) if _job_metadata_cache is not None else None 72 73 if ( 74 _job_metadata_timestamp is not None 75 and (now - _job_metadata_timestamp) < JOB_METADATA_CACHE_SECONDS 76 ): 77 if debug: 78 dprint(f"Returning cached metadata for job '{name}'.") 79 return _job_metadata_cache[name]['metadata'] 80 81 response = self.get(JOBS_ENDPOINT + f"/{name}", debug=debug) 82 if not response: 83 if debug: 84 msg = ( 85 response.json()['detail'] 86 if 'detail' in response.text 87 else response.text 88 ) 89 warn(f"Failed to get metadata for job '{name}':\n{msg}") 90 return {} 91 92 metadata = response.json() 93 if _job_metadata_cache is None: 94 self._job_metadata_cache = {} 95 96 self._job_metadata_cache[name] = { 97 'timestamp': now, 98 'metadata': metadata, 99 } 100 return metadata
Return the metadata for a single job.
102def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 103 """ 104 Return the daemon properties for a single job. 105 """ 106 metadata = self.get_job_metadata(name, debug=debug) 107 return metadata.get('daemon', {}).get('properties', {})
Return the daemon properties for a single job.
149def get_job_exists(self, name: str, debug: bool = False) -> bool: 150 """ 151 Return whether a job exists. 152 """ 153 response = self.get(JOBS_ENDPOINT + f'/{name}/exists', debug=debug) 154 if not response: 155 warn(f"Failed to determine whether job '{name}' exists.") 156 return False 157 158 return response.json()
Return whether a job exists.
161def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 162 """ 163 Delete a job. 164 """ 165 response = self.delete(JOBS_ENDPOINT + f"/{name}", debug=debug) 166 if not response: 167 if 'detail' in response.text: 168 return False, response.json()['detail'] 169 170 return False, response.text 171 172 return tuple(response.json())
Delete a job.
175def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 176 """ 177 Start a job. 178 """ 179 response = self.post(JOBS_ENDPOINT + f"/{name}/start", debug=debug) 180 if not response: 181 if 'detail' in response.text: 182 return False, response.json()['detail'] 183 return False, response.text 184 185 return tuple(response.json())
Start a job.
188def create_job( 189 self, 190 name: str, 191 sysargs: List[str], 192 properties: Optional[Dict[str, str]] = None, 193 debug: bool = False, 194) -> SuccessTuple: 195 """ 196 Create a job. 197 """ 198 response = self.post( 199 JOBS_ENDPOINT + f"/{name}", 200 json={ 201 'sysargs': sysargs, 202 'properties': properties, 203 }, 204 debug=debug, 205 ) 206 if not response: 207 if 'detail' in response.text: 208 return False, response.json()['detail'] 209 return False, response.text 210 211 return tuple(response.json())
Create a job.
214def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 215 """ 216 Stop a job. 217 """ 218 response = self.post(JOBS_ENDPOINT + f"/{name}/stop", debug=debug) 219 if not response: 220 if 'detail' in response.text: 221 return False, response.json()['detail'] 222 return False, response.text 223 224 return tuple(response.json())
Stop a job.
227def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 228 """ 229 Pause a job. 230 """ 231 response = self.post(JOBS_ENDPOINT + f"/{name}/pause", debug=debug) 232 if not response: 233 if 'detail' in response.text: 234 return False, response.json()['detail'] 235 return False, response.text 236 237 return tuple(response.json())
Pause a job.
240def get_logs(self, name: str, debug: bool = False) -> str: 241 """ 242 Return the logs for a job. 243 """ 244 response = self.get(LOGS_ENDPOINT + f"/{name}") 245 if not response: 246 raise ValueError(f"Cannot fetch logs for job '{name}':\n{response.text}") 247 248 return response.json()
Return the logs for a job.
251def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 252 """ 253 Return the job's manual stop time. 254 """ 255 response = self.get(JOBS_ENDPOINT + f"/{name}/stop_time") 256 if not response: 257 warn(f"Failed to get stop time for job '{name}':\n{response.text}") 258 return None 259 260 data = response.json() 261 if data is None: 262 return None 263 264 return datetime.fromisoformat(data)
Return the job's manual stop time.
348def monitor_logs( 349 self, 350 name: str, 351 callback_function: Callable[[Any], Any], 352 input_callback_function: Callable[[None], str], 353 stop_callback_function: Callable[[None], str], 354 stop_on_exit: bool = False, 355 strip_timestamps: bool = False, 356 accept_input: bool = True, 357 debug: bool = False, 358): 359 """ 360 Monitor a job's log files and execute a callback with the changes. 361 """ 362 return asyncio.run( 363 self.monitor_logs_async( 364 name, 365 callback_function, 366 input_callback_function=input_callback_function, 367 stop_callback_function=stop_callback_function, 368 stop_on_exit=stop_on_exit, 369 strip_timestamps=strip_timestamps, 370 accept_input=accept_input, 371 debug=debug 372 ) 373 )
Monitor a job's log files and execute a callback with the changes.
267async def monitor_logs_async( 268 self, 269 name: str, 270 callback_function: Callable[[Any], Any], 271 input_callback_function: Callable[[], str], 272 stop_callback_function: Callable[[SuccessTuple], str], 273 stop_on_exit: bool = False, 274 strip_timestamps: bool = False, 275 accept_input: bool = True, 276 debug: bool = False, 277): 278 """ 279 Monitor a job's log files and await a callback with the changes. 280 """ 281 import traceback 282 from meerschaum.jobs import StopMonitoringLogs 283 from meerschaum.utils.formatting._jobs import strip_timestamp_from_line 284 285 websockets, websockets_exceptions = mrsm.attempt_import('websockets', 'websockets.exceptions') 286 protocol = 'ws' if self.URI.startswith('http://') else 'wss' 287 port = self.port if 'port' in self.__dict__ else '' 288 uri = f"{protocol}://{self.host}:{port}{LOGS_ENDPOINT}/{name}/ws" 289 290 async def _stdin_callback(client): 291 if input_callback_function is None: 292 return 293 294 if asyncio.iscoroutinefunction(input_callback_function): 295 data = await input_callback_function() 296 else: 297 data = input_callback_function() 298 299 await client.send(data) 300 301 async def _stop_callback(client): 302 try: 303 result = tuple(json.loads(await client.recv())) 304 except Exception as e: 305 warn(traceback.format_exc()) 306 result = False, str(e) 307 308 if stop_callback_function is not None: 309 if asyncio.iscoroutinefunction(stop_callback_function): 310 await stop_callback_function(result) 311 else: 312 stop_callback_function(result) 313 314 if stop_on_exit: 315 raise StopMonitoringLogs 316 317 message_callbacks = { 318 JOBS_STDIN_MESSAGE: _stdin_callback, 319 JOBS_STOP_MESSAGE: _stop_callback, 320 } 321 322 async with websockets.connect(uri) as websocket: 323 try: 324 await websocket.send(self.token or 'no-login') 325 except websockets_exceptions.ConnectionClosedOK: 326 pass 327 328 while True: 329 try: 330 response = await websocket.recv() 331 callback = message_callbacks.get(response, None) 332 if callback is not None: 333 await callback(websocket) 334 continue 335 336 if strip_timestamps: 337 response = strip_timestamp_from_line(response) 338 339 if asyncio.iscoroutinefunction(callback_function): 340 await callback_function(response) 341 else: 342 callback_function(response) 343 except (KeyboardInterrupt, StopMonitoringLogs): 344 await websocket.close() 345 break
Monitor a job's log files and await a callback with the changes.
375def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 376 """ 377 Return whether a remote job is blocking on stdin. 378 """ 379 response = self.get(JOBS_ENDPOINT + f'/{name}/is_blocking_on_stdin', debug=debug) 380 if not response: 381 return False 382 383 return response.json()
Return whether a remote job is blocking on stdin.
116def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 117 """ 118 Return a job's `began` timestamp, if it exists. 119 """ 120 properties = self.get_job_properties(name, debug=debug) 121 began_str = properties.get('daemon', {}).get('began', None) 122 if began_str is None: 123 return None 124 125 return began_str
Return a job's began timestamp, if it exists.
127def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 128 """ 129 Return a job's `ended` timestamp, if it exists. 130 """ 131 properties = self.get_job_properties(name, debug=debug) 132 ended_str = properties.get('daemon', {}).get('ended', None) 133 if ended_str is None: 134 return None 135 136 return ended_str
Return a job's ended timestamp, if it exists.
138def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 139 """ 140 Return a job's `paused` timestamp, if it exists. 141 """ 142 properties = self.get_job_properties(name, debug=debug) 143 paused_str = properties.get('daemon', {}).get('paused', None) 144 if paused_str is None: 145 return None 146 147 return paused_str
Return a job's paused timestamp, if it exists.
109def get_job_status(self, name: str, debug: bool = False) -> str: 110 """ 111 Return the job's status. 112 """ 113 metadata = self.get_job_metadata(name, debug=debug) 114 return metadata.get('status', 'stopped')
Return the job's status.