meerschaum.connectors.api
Interact with the Meerschaum API (send commands, pull data, etc.)
22class APIConnector(InstanceConnector): 23 """ 24 Connect to a Meerschaum API instance. 25 """ 26 27 IS_THREAD_SAFE: bool = False 28 OPTIONAL_ATTRIBUTES: List[str] = ['port', 'client_secret', 'client_id', 'api_key'] 29 30 from ._request import ( 31 make_request, 32 get, 33 post, 34 put, 35 patch, 36 delete, 37 wget, 38 ) 39 from ._actions import ( 40 get_actions, 41 do_action, 42 do_action_async, 43 do_action_legacy, 44 ) 45 from ._misc import get_mrsm_version, get_chaining_status 46 from ._pipes import ( 47 get_pipe_instance_keys, 48 register_pipe, 49 fetch_pipes_keys, 50 edit_pipe, 51 sync_pipe, 52 delete_pipe, 53 delete_pipe_cache, 54 get_pipe_data, 55 get_pipe_id, 56 get_pipe_attributes, 57 get_sync_time, 58 pipe_exists, 59 create_metadata, 60 get_pipe_rowcount, 61 drop_pipe, 62 clear_pipe, 63 get_pipe_columns_types, 64 get_pipe_columns_indices, 65 get_pipe_docs, 66 get_pipe_size, 67 compress_pipe, 68 decompress_pipe, 69 vacuum_pipe, 70 analyze_pipe, 71 partition_pipe, 72 ) 73 from ._fetch import fetch 74 from ._plugins import ( 75 register_plugin, 76 install_plugin, 77 delete_plugin, 78 get_plugins, 79 get_plugin_attributes, 80 ) 81 from ._login import login, test_connection 82 from ._users import ( 83 register_user, 84 get_user_id, 85 get_users, 86 edit_user, 87 delete_user, 88 get_user_password_hash, 89 get_user_type, 90 get_user_attributes, 91 ) 92 from ._tokens import ( 93 register_token, 94 get_token_model, 95 get_tokens, 96 edit_token, 97 invalidate_token, 98 get_token_scopes, 99 token_exists, 100 delete_token, 101 ) 102 from ._uri import from_uri 103 from ._jobs import ( 104 get_jobs, 105 get_job, 106 get_job_metadata, 107 get_job_properties, 108 get_job_exists, 109 delete_job, 110 start_job, 111 create_job, 112 stop_job, 113 pause_job, 114 get_logs, 115 get_job_stop_time, 116 monitor_logs, 117 monitor_logs_async, 118 get_job_is_blocking_on_stdin, 119 get_job_began, 120 get_job_ended, 121 get_job_paused, 122 get_job_status, 123 ) 124 125 def __init__( 126 self, 127 label: Optional[str] = None, 128 wait: bool = False, 129 debug: bool = False, 130 **kw 131 ): 132 if 'uri' in kw: 133 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 134 label = label or from_uri_params.get('label', None) 135 _ = from_uri_params.pop('label', None) 136 kw.update(from_uri_params) 137 138 super().__init__('api', label=label, **kw) 139 if 'protocol' not in self.__dict__: 140 self.protocol = ( 141 'https' if self.__dict__.get('uri', '').startswith('https') 142 else 'http' 143 ) 144 145 if 'uri' not in self.__dict__: 146 self.verify_attributes(required_attributes) 147 else: 148 from meerschaum.connectors.sql import SQLConnector 149 conn_attrs = SQLConnector.parse_uri(self.__dict__['uri']) 150 if 'host' not in conn_attrs: 151 raise Exception(f"Invalid URI for '{self}'.") 152 self.__dict__.update(conn_attrs) 153 154 self.url = ( 155 self.protocol + '://' + 156 self.host 157 + ( 158 (':' + str(self.port)) 159 if self.__dict__.get('port', None) 160 else '' 161 ) 162 ) 163 self._token = None 164 self._expires = None 165 self._session = None 166 self._instance_keys = self.__dict__.get('instance_keys', None) 167 168 169 @property 170 def URI(self) -> str: 171 """ 172 Return the fully qualified URI. 173 """ 174 import urllib.parse 175 username = self.__dict__.get('username', None) 176 password = self.__dict__.get('password', None) 177 client_id = self.__dict__.get('client_id', None) 178 client_secret = self.__dict__.get('client_secret', None) 179 api_key = self.__dict__.get('api_key', None) 180 creds = (username + ':' + password + '@') if username and password else '' 181 params = {} 182 params_str = ('?' + urllib.parse.urlencode(params)) if params else '' 183 return ( 184 self.protocol 185 + '://' 186 + creds 187 + self.host 188 + ( 189 (':' + str(self.port)) 190 if self.__dict__.get('port', None) 191 else '' 192 ) 193 + params_str 194 ) 195 196 @property 197 def session(self): 198 if self._session is None: 199 _ = attempt_import('certifi', lazy=False) 200 requests = attempt_import('requests', lazy=False) 201 if requests: 202 self._session = requests.Session() 203 if self._session is None: 204 error("Failed to import requests. Is requests installed?") 205 return self._session 206 207 @property 208 def token(self): 209 if self.login_scheme == 'api_key': 210 return self.api_key 211 212 expired = ( 213 True if self._expires is None else ( 214 ( 215 self._expires 216 < 217 datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1) 218 ) 219 ) 220 ) 221 222 if self._token is None or expired: 223 success, msg = self.login() 224 if not success and not self.__dict__.get('_emitted_warning'): 225 warn(msg, stack=False) 226 self._emitted_warning = True 227 return self._token 228 229 @property 230 def instance_keys(self) -> Union[str, None]: 231 """ 232 Return the instance keys to be sent alongside pipe requests. 233 """ 234 return self._instance_keys 235 236 @property 237 def login_scheme(self) -> str: 238 """ 239 Return the login scheme to use based on the configured credentials. 240 """ 241 if 'username' in self.__dict__: 242 return 'password' 243 if 'client_id' in self.__dict__: 244 return 'client_credentials' 245 elif 'api_key' in self.__dict__: 246 return 'api_key' 247 248 return 'password'
Connect to a Meerschaum API instance.
125 def __init__( 126 self, 127 label: Optional[str] = None, 128 wait: bool = False, 129 debug: bool = False, 130 **kw 131 ): 132 if 'uri' in kw: 133 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 134 label = label or from_uri_params.get('label', None) 135 _ = from_uri_params.pop('label', None) 136 kw.update(from_uri_params) 137 138 super().__init__('api', label=label, **kw) 139 if 'protocol' not in self.__dict__: 140 self.protocol = ( 141 'https' if self.__dict__.get('uri', '').startswith('https') 142 else 'http' 143 ) 144 145 if 'uri' not in self.__dict__: 146 self.verify_attributes(required_attributes) 147 else: 148 from meerschaum.connectors.sql import SQLConnector 149 conn_attrs = SQLConnector.parse_uri(self.__dict__['uri']) 150 if 'host' not in conn_attrs: 151 raise Exception(f"Invalid URI for '{self}'.") 152 self.__dict__.update(conn_attrs) 153 154 self.url = ( 155 self.protocol + '://' + 156 self.host 157 + ( 158 (':' + str(self.port)) 159 if self.__dict__.get('port', None) 160 else '' 161 ) 162 ) 163 self._token = None 164 self._expires = None 165 self._session = None 166 self._instance_keys = self.__dict__.get('instance_keys', None)
169 @property 170 def URI(self) -> str: 171 """ 172 Return the fully qualified URI. 173 """ 174 import urllib.parse 175 username = self.__dict__.get('username', None) 176 password = self.__dict__.get('password', None) 177 client_id = self.__dict__.get('client_id', None) 178 client_secret = self.__dict__.get('client_secret', None) 179 api_key = self.__dict__.get('api_key', None) 180 creds = (username + ':' + password + '@') if username and password else '' 181 params = {} 182 params_str = ('?' + urllib.parse.urlencode(params)) if params else '' 183 return ( 184 self.protocol 185 + '://' 186 + creds 187 + self.host 188 + ( 189 (':' + str(self.port)) 190 if self.__dict__.get('port', None) 191 else '' 192 ) 193 + params_str 194 )
Return the fully qualified URI.
196 @property 197 def session(self): 198 if self._session is None: 199 _ = attempt_import('certifi', lazy=False) 200 requests = attempt_import('requests', lazy=False) 201 if requests: 202 self._session = requests.Session() 203 if self._session is None: 204 error("Failed to import requests. Is requests installed?") 205 return self._session
207 @property 208 def token(self): 209 if self.login_scheme == 'api_key': 210 return self.api_key 211 212 expired = ( 213 True if self._expires is None else ( 214 ( 215 self._expires 216 < 217 datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1) 218 ) 219 ) 220 ) 221 222 if self._token is None or expired: 223 success, msg = self.login() 224 if not success and not self.__dict__.get('_emitted_warning'): 225 warn(msg, stack=False) 226 self._emitted_warning = True 227 return self._token
229 @property 230 def instance_keys(self) -> Union[str, None]: 231 """ 232 Return the instance keys to be sent alongside pipe requests. 233 """ 234 return self._instance_keys
Return the instance keys to be sent alongside pipe requests.
236 @property 237 def login_scheme(self) -> str: 238 """ 239 Return the login scheme to use based on the configured credentials. 240 """ 241 if 'username' in self.__dict__: 242 return 'password' 243 if 'client_id' in self.__dict__: 244 return 'client_credentials' 245 elif 'api_key' in self.__dict__: 246 return 'api_key' 247 248 return 'password'
Return the login scheme to use based on the configured credentials.
28def make_request( 29 self, 30 method: str, 31 r_url: str, 32 headers: Optional[Dict[str, Any]] = None, 33 use_token: bool = True, 34 debug: bool = False, 35 **kwargs: Any 36) -> 'requests.Response': 37 """ 38 Make a request to this APIConnector's endpoint using the in-memory session. 39 40 Parameters 41 ---------- 42 method: str 43 The kind of request to make. 44 Accepted values: 45 - `'GET'` 46 - `'OPTIONS'` 47 - `'HEAD'` 48 - `'POST'` 49 - `'PUT'` 50 - `'PATCH'` 51 - `'DELETE'` 52 53 r_url: str 54 The relative URL for the endpoint (e.g. `'/pipes'`). 55 56 headers: Optional[Dict[str, Any]], default None 57 The headers to use for the request. 58 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 59 60 use_token: bool, default True 61 If `True`, add the authorization token to the headers. 62 63 debug: bool, default False 64 Verbosity toggle. 65 66 kwargs: Any 67 All other keyword arguments are passed to `requests.request`. 68 69 Returns 70 ------- 71 A `requests.Reponse` object. 72 """ 73 if method.upper() not in METHODS: 74 raise ValueError(f"Method '{method}' is not supported.") 75 76 verify = self.__dict__.get('verify', None) 77 if 'verify' not in kwargs and isinstance(verify, bool): 78 kwargs['verify'] = verify 79 80 headers = ( 81 copy.deepcopy(headers) 82 if isinstance(headers, dict) 83 else {} 84 ) 85 86 if use_token: 87 headers.update({'Authorization': f'Bearer {self.token}'}) 88 89 if 'timeout' not in kwargs: 90 kwargs['timeout'] = STATIC_CONFIG['api']['default_timeout'] 91 92 request_url = urllib.parse.urljoin(self.url, r_url) 93 if debug: 94 dprint(f"[{self}] Sending a '{method.upper()}' request to {request_url}") 95 96 return self.session.request( 97 method.upper(), 98 request_url, 99 headers=headers, 100 **kwargs 101 )
Make a request to this APIConnector's endpoint using the in-memory session.
Parameters
- method (str):
The kind of request to make.
Accepted values:
'GET''OPTIONS''HEAD''POST''PUT''PATCH''DELETE'
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_tokenisTrue, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request.
Returns
- A
requests.Reponseobject.
104def get(self, r_url: str, **kwargs: Any) -> 'requests.Response': 105 """ 106 Wrapper for `requests.get`. 107 108 Parameters 109 ---------- 110 r_url: str 111 The relative URL for the endpoint (e.g. `'/pipes'`). 112 113 headers: Optional[Dict[str, Any]], default None 114 The headers to use for the request. 115 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 116 117 use_token: bool, default True 118 If `True`, add the authorization token to the headers. 119 120 debug: bool, default False 121 Verbosity toggle. 122 123 kwargs: Any 124 All other keyword arguments are passed to `requests.request`. 125 126 Returns 127 ------- 128 A `requests.Reponse` object. 129 130 """ 131 return self.make_request('GET', r_url, **kwargs)
Wrapper for requests.get.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_tokenisTrue, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request.
Returns
- A
requests.Reponseobject.
134def post(self, r_url: str, **kwargs: Any) -> 'requests.Response': 135 """ 136 Wrapper for `requests.post`. 137 138 Parameters 139 ---------- 140 r_url: str 141 The relative URL for the endpoint (e.g. `'/pipes'`). 142 143 headers: Optional[Dict[str, Any]], default None 144 The headers to use for the request. 145 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 146 147 use_token: bool, default True 148 If `True`, add the authorization token to the headers. 149 150 debug: bool, default False 151 Verbosity toggle. 152 153 kwargs: Any 154 All other keyword arguments are passed to `requests.request`. 155 156 Returns 157 ------- 158 A `requests.Reponse` object. 159 160 """ 161 return self.make_request('POST', r_url, **kwargs)
Wrapper for requests.post.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_tokenisTrue, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request.
Returns
- A
requests.Reponseobject.
193def put(self, r_url: str, **kwargs: Any) -> 'requests.Response': 194 """ 195 Wrapper for `requests.put`. 196 197 Parameters 198 ---------- 199 r_url: str 200 The relative URL for the endpoint (e.g. `'/pipes'`). 201 202 headers: Optional[Dict[str, Any]], default None 203 The headers to use for the request. 204 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 205 206 use_token: bool, default True 207 If `True`, add the authorization token to the headers. 208 209 debug: bool, default False 210 Verbosity toggle. 211 212 kwargs: Any 213 All other keyword arguments are passed to `requests.request`. 214 215 Returns 216 ------- 217 A `requests.Reponse` object. 218 """ 219 return self.make_request('PUT', r_url, **kwargs)
Wrapper for requests.put.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_tokenisTrue, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request.
Returns
- A
requests.Reponseobject.
164def patch(self, r_url: str, **kwargs: Any) -> 'requests.Response': 165 """ 166 Wrapper for `requests.patch`. 167 168 Parameters 169 ---------- 170 r_url: str 171 The relative URL for the endpoint (e.g. `'/pipes'`). 172 173 headers: Optional[Dict[str, Any]], default None 174 The headers to use for the request. 175 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 176 177 use_token: bool, default True 178 If `True`, add the authorization token to the headers. 179 180 debug: bool, default False 181 Verbosity toggle. 182 183 kwargs: Any 184 All other keyword arguments are passed to `requests.request`. 185 186 Returns 187 ------- 188 A `requests.Reponse` object. 189 """ 190 return self.make_request('PATCH', r_url, **kwargs)
Wrapper for requests.patch.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_tokenisTrue, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request.
Returns
- A
requests.Reponseobject.
222def delete(self, r_url: str, **kwargs: Any) -> 'requests.Response': 223 """ 224 Wrapper for `requests.delete`. 225 226 Parameters 227 ---------- 228 r_url: str 229 The relative URL for the endpoint (e.g. `'/pipes'`). 230 231 headers: Optional[Dict[str, Any]], default None 232 The headers to use for the request. 233 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 234 235 use_token: bool, default True 236 If `True`, add the authorization token to the headers. 237 238 debug: bool, default False 239 Verbosity toggle. 240 241 kwargs: Any 242 All other keyword arguments are passed to `requests.request`. 243 244 Returns 245 ------- 246 A `requests.Reponse` object. 247 """ 248 return self.make_request('DELETE', r_url, **kwargs)
Wrapper for requests.delete.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_tokenisTrue, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request.
Returns
- A
requests.Reponseobject.
251def wget( 252 self, 253 r_url: str, 254 dest: Optional[Union[str, pathlib.Path]] = None, 255 headers: Optional[Dict[str, Any]] = None, 256 use_token: bool = True, 257 debug: bool = False, 258 **kw: Any 259) -> pathlib.Path: 260 """Mimic wget with requests.""" 261 from meerschaum.utils.misc import wget 262 if headers is None: 263 headers = {} 264 if use_token: 265 headers.update({'Authorization': f'Bearer {self.token}'}) 266 request_url = urllib.parse.urljoin(self.url, r_url) 267 if debug: 268 dprint( 269 f"[{self}] Downloading {request_url}" 270 + (f' to {dest}' if dest is not None else '') 271 + "..." 272 ) 273 return wget(request_url, dest=dest, headers=headers, **kw)
Mimic wget with requests.
24def get_actions(self): 25 """Get available actions from the API instance.""" 26 return self.get(ACTIONS_ENDPOINT)
Get available actions from the API instance.
29def do_action(self, sysargs: List[str]) -> SuccessTuple: 30 """ 31 Execute a Meerschaum action remotely. 32 """ 33 return asyncio.run(self.do_action_async(sysargs))
Execute a Meerschaum action remotely.
36async def do_action_async( 37 self, 38 sysargs: List[str], 39 callback_function: Callable[[str], None] = partial(print, end=''), 40) -> SuccessTuple: 41 """ 42 Execute an action as a temporary remote job. 43 """ 44 from meerschaum._internal.arguments import remove_api_executor_keys 45 from meerschaum.utils.misc import generate_password 46 sysargs = remove_api_executor_keys(sysargs) 47 48 job_name = TEMP_PREFIX + generate_password(12) 49 job = mrsm.Job(job_name, sysargs, executor_keys=str(self)) 50 51 start_success, start_msg = job.start() 52 if not start_success: 53 return start_success, start_msg 54 55 await job.monitor_logs_async( 56 callback_function=callback_function, 57 stop_on_exit=True, 58 strip_timestamps=True, 59 ) 60 61 success, msg = job.result 62 job.delete() 63 return success, msg
Execute an action as a temporary remote job.
66def do_action_legacy( 67 self, 68 action: Optional[List[str]] = None, 69 sysargs: Optional[List[str]] = None, 70 debug: bool = False, 71 **kw 72) -> SuccessTuple: 73 """ 74 NOTE: This method is deprecated. 75 Please use `do_action()` or `do_action_async()`. 76 77 Execute a Meerschaum action remotely. 78 79 If `sysargs` are provided, parse those instead. 80 Otherwise infer everything from keyword arguments. 81 82 Examples 83 -------- 84 >>> conn = mrsm.get_connector('api:main') 85 >>> conn.do_action(['show', 'pipes']) 86 (True, "Success") 87 >>> conn.do_action(['show', 'arguments'], name='test') 88 (True, "Success") 89 """ 90 import sys, json 91 from meerschaum.utils.debug import dprint 92 from meerschaum._internal.static import STATIC_CONFIG 93 from meerschaum.utils.misc import json_serialize_datetime 94 if action is None: 95 action = [] 96 97 if sysargs is not None and action and action[0] == '': 98 from meerschaum._internal.arguments import parse_arguments 99 if debug: 100 dprint(f"Parsing sysargs:\n{sysargs}") 101 json_dict = parse_arguments(sysargs) 102 else: 103 json_dict = kw 104 json_dict['action'] = action 105 if 'noask' not in kw: 106 json_dict['noask'] = True 107 if 'yes' not in kw: 108 json_dict['yes'] = True 109 if debug: 110 json_dict['debug'] = debug 111 112 root_action = json_dict['action'][0] 113 del json_dict['action'][0] 114 r_url = f"{STATIC_CONFIG['api']['endpoints']['actions']}/{root_action}" 115 116 if debug: 117 from meerschaum.utils.formatting import pprint 118 dprint(f"Sending data to '{self.url + r_url}':") 119 pprint(json_dict, stream=sys.stderr) 120 121 response = self.post( 122 r_url, 123 data = json.dumps(json_dict, default=json_serialize_datetime), 124 debug = debug, 125 ) 126 try: 127 response_list = json.loads(response.text) 128 if isinstance(response_list, dict) and 'detail' in response_list: 129 return False, response_list['detail'] 130 except Exception as e: 131 print(f"Invalid response: {response}") 132 print(e) 133 return False, response.text 134 if debug: 135 dprint(response) 136 try: 137 return response_list[0], response_list[1] 138 except Exception as e: 139 return False, f"Failed to parse result from action '{root_action}'"
NOTE: This method is deprecated.
Please use do_action() or do_action_async().
Execute a Meerschaum action remotely.
If sysargs are provided, parse those instead.
Otherwise infer everything from keyword arguments.
Examples
>>> conn = mrsm.get_connector('api:main')
>>> conn.do_action(['show', 'pipes'])
(True, "Success")
>>> conn.do_action(['show', 'arguments'], name='test')
(True, "Success")
13def get_mrsm_version(self, **kw) -> Optional[str]: 14 """ 15 Return the Meerschaum version of the API instance. 16 """ 17 from meerschaum._internal.static import STATIC_CONFIG 18 try: 19 j = self.get( 20 STATIC_CONFIG['api']['endpoints']['version'] + '/mrsm', 21 use_token=False, 22 **kw 23 ).json() 24 except Exception: 25 return None 26 if isinstance(j, dict) and 'detail' in j: 27 return None 28 return j
Return the Meerschaum version of the API instance.
31def get_chaining_status(self, **kw) -> Optional[bool]: 32 """ 33 Fetch the chaining status of the API instance. 34 """ 35 from meerschaum._internal.static import STATIC_CONFIG 36 try: 37 response = self.get( 38 STATIC_CONFIG['api']['endpoints']['chaining'], 39 use_token = True, 40 **kw 41 ) 42 if not response: 43 return None 44 except Exception: 45 return None 46 47 return response.json()
Fetch the chaining status of the API instance.
35def get_pipe_instance_keys(self, pipe: mrsm.Pipe) -> Union[str, None]: 36 """ 37 Return the configured instance keys for a pipe if set, 38 else fall back to the default `instance_keys` for this `APIConnector`. 39 """ 40 return pipe.parameters.get('instance_keys', self.instance_keys)
Return the configured instance keys for a pipe if set,
else fall back to the default instance_keys for this APIConnector.
43def register_pipe( 44 self, 45 pipe: mrsm.Pipe, 46 debug: bool = False 47) -> SuccessTuple: 48 """Submit a POST to the API to register a new Pipe object. 49 Returns a tuple of (success_bool, response_dict). 50 """ 51 from meerschaum.utils.debug import dprint 52 r_url = pipe_r_url(pipe) 53 response = self.post( 54 r_url + '/register', 55 json=pipe._attributes.get('parameters', {}), 56 params={'instance_keys': self.get_pipe_instance_keys(pipe)}, 57 debug=debug, 58 ) 59 if debug: 60 dprint(response.text) 61 62 if not response: 63 return False, response.text 64 65 response_data = response.json() 66 if isinstance(response_data, list): 67 response_tuple = response_data[0], response_data[1] 68 elif 'detail' in response.json(): 69 response_tuple = response.__bool__(), response_data['detail'] 70 else: 71 response_tuple = response.__bool__(), response.text 72 return response_tuple
Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).
108def fetch_pipes_keys( 109 self, 110 connector_keys: Optional[List[str]] = None, 111 metric_keys: Optional[List[str]] = None, 112 location_keys: Optional[List[str]] = None, 113 tags: Optional[List[str]] = None, 114 params: Optional[Dict[str, Any]] = None, 115 debug: bool = False 116) -> Union[ 117 Dict[Union[int, str], Tuple[str, str, Union[str, None], Dict[str, Any]]], 118 List[Tuple[str, str, Union[str, None]]], 119]: 120 """ 121 Fetch registered Pipes' keys from the API. 122 123 Parameters 124 ---------- 125 connector_keys: Optional[List[str]], default None 126 The connector keys for the query. 127 128 metric_keys: Optional[List[str]], default None 129 The metric keys for the query. 130 131 location_keys: Optional[List[str]], default None 132 The location keys for the query. 133 134 tags: Optional[List[str]], default None 135 A list of tags for the query. 136 137 params: Optional[Dict[str, Any]], default None 138 A parameters dictionary for filtering against the `pipes` table 139 (e.g. `{'connector_keys': 'plugin:foo'}`). 140 Not recommeded to be used. 141 142 debug: bool, default False 143 Verbosity toggle. 144 145 Returns 146 ------- 147 A dictionary mapping pipe IDs to key tuples, or a list of key tuples for older servers. 148 """ 149 from meerschaum._internal.static import STATIC_CONFIG 150 if connector_keys is None: 151 connector_keys = [] 152 if metric_keys is None: 153 metric_keys = [] 154 if location_keys is None: 155 location_keys = [] 156 if tags is None: 157 tags = [] 158 159 r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys' 160 try: 161 j = self.get( 162 r_url, 163 params={ 164 'connector_keys': json.dumps(connector_keys), 165 'metric_keys': json.dumps(metric_keys), 166 'location_keys': json.dumps(location_keys), 167 'tags': json.dumps(tags), 168 'params': json.dumps(params), 169 'instance_keys': self.instance_keys, 170 'as_dict': True, 171 }, 172 debug=debug 173 ).json() 174 except Exception as e: 175 import traceback 176 traceback.print_exc() 177 error(str(e)) 178 179 if 'detail' in j: 180 error(j['detail'], stack=False) 181 182 if isinstance(j, dict): 183 return { 184 (int(k) if str(k).isdigit() else k): tuple(v) 185 for k, v in j.items() 186 } 187 return [tuple(r) for r in j]
Fetch registered Pipes' keys from the API.
Parameters
- connector_keys (Optional[List[str]], default None): The connector keys for the query.
- metric_keys (Optional[List[str]], default None): The metric keys for the query.
- location_keys (Optional[List[str]], default None): The location keys for the query.
- tags (Optional[List[str]], default None): A list of tags for the query.
- params (Optional[Dict[str, Any]], default None):
A parameters dictionary for filtering against the
pipestable (e.g.{'connector_keys': 'plugin:foo'}). Not recommeded to be used. - debug (bool, default False): Verbosity toggle.
Returns
- A dictionary mapping pipe IDs to key tuples, or a list of key tuples for older servers.
75def edit_pipe( 76 self, 77 pipe: mrsm.Pipe, 78 patch: bool = False, 79 debug: bool = False, 80) -> SuccessTuple: 81 """Submit a PATCH to the API to edit an existing Pipe object. 82 Returns a tuple of (success_bool, response_dict). 83 """ 84 from meerschaum.utils.debug import dprint 85 ### NOTE: if `parameters` is supplied in the Pipe constructor, 86 ### then `pipe.parameters` will exist and not be fetched from the database. 87 r_url = pipe_r_url(pipe) 88 response = self.patch( 89 r_url + '/edit', 90 params={'patch': patch, 'instance_keys': self.get_pipe_instance_keys(pipe)}, 91 json=pipe.get_parameters(apply_symlinks=False), 92 debug=debug, 93 ) 94 if debug: 95 dprint(response.text) 96 97 response_data = response.json() 98 99 if isinstance(response.json(), list): 100 response_tuple = response_data[0], response_data[1] 101 elif 'detail' in response.json(): 102 response_tuple = response.__bool__(), response_data['detail'] 103 else: 104 response_tuple = response.__bool__(), response.text 105 return response_tuple
Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).
190def sync_pipe( 191 self, 192 pipe: mrsm.Pipe, 193 df: Optional[Union['pd.DataFrame', Dict[Any, Any], str]] = None, 194 chunksize: Optional[int] = -1, 195 debug: bool = False, 196 **kw: Any 197) -> SuccessTuple: 198 """Sync a DataFrame into a Pipe.""" 199 from decimal import Decimal 200 from meerschaum.utils.debug import dprint 201 from meerschaum.utils.dtypes import json_serialize_value 202 from meerschaum.utils.misc import items_str, interval_str 203 from meerschaum.config import get_config 204 from meerschaum.utils.packages import attempt_import 205 from meerschaum.utils.dataframe import get_special_cols, to_json 206 begin = time.perf_counter() 207 more_itertools = attempt_import('more_itertools') 208 if df is None: 209 msg = f"DataFrame is `None`. Cannot sync {pipe}." 210 return False, msg 211 212 def get_json_str(c): 213 if isinstance(c, str): 214 return c 215 if isinstance(c, (dict, list, tuple)): 216 return json.dumps(c, default=json_serialize_value) 217 return to_json(c, orient='columns', geometry_format='wkb_hex') 218 219 df = json.loads(df) if isinstance(df, str) else df 220 221 _chunksize: Optional[int] = (1 if chunksize is None else ( 222 get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1 223 else chunksize 224 )) 225 keys: List[str] = list(df.columns) 226 chunks = [] 227 if hasattr(df, 'index'): 228 df = df.reset_index(drop=True) 229 is_dask = 'dask' in df.__module__ 230 chunks = ( 231 (df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize)) 232 if not is_dask 233 else [partition.compute() for partition in df.partitions] 234 ) 235 236 elif isinstance(df, dict): 237 ### `_chunks` is a dict of lists of dicts. 238 ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] } 239 _chunks = {k: [] for k in keys} 240 for k in keys: 241 chunk_iter = more_itertools.chunked(df[k], _chunksize) 242 for l in chunk_iter: 243 _chunks[k].append({k: l}) 244 245 ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON). 246 for k, l in _chunks.items(): 247 for i, c in enumerate(l): 248 try: 249 chunks[i].update(c) 250 except IndexError: 251 chunks.append(c) 252 elif isinstance(df, list): 253 chunks = (df[i] for i in more_itertools.chunked(df, _chunksize)) 254 255 ### Send columns in case the user has defined them locally. 256 request_params = kw.copy() 257 if pipe.columns: 258 request_params['columns'] = json.dumps(pipe.columns) 259 request_params['instance_keys'] = self.get_pipe_instance_keys(pipe) 260 r_url = pipe_r_url(pipe) + '/data' 261 262 rowcount = 0 263 num_success_chunks = 0 264 for i, c in enumerate(chunks): 265 if debug: 266 dprint(f"[{self}] Posting chunk {i} to {r_url}...") 267 if len(c) == 0: 268 if debug: 269 dprint(f"[{self}] Skipping empty chunk...") 270 continue 271 json_str = get_json_str(c) 272 273 try: 274 response = self.post( 275 r_url, 276 params=request_params, 277 data=json_str, 278 debug=debug, 279 ) 280 except Exception as e: 281 msg = f"Failed to post a chunk to {pipe}:\n{e}" 282 warn(msg) 283 return False, msg 284 285 if not response: 286 return False, f"Failed to sync a chunk:\n{response.text}" 287 288 try: 289 j = json.loads(response.text) 290 except Exception as e: 291 return False, f"Failed to parse response from syncing {pipe}:\n{e}" 292 293 if isinstance(j, dict) and 'detail' in j: 294 return False, j['detail'] 295 296 try: 297 j = tuple(j) 298 except Exception: 299 return False, response.text 300 301 if debug: 302 dprint("Received response: " + str(j)) 303 if not j[0]: 304 return j 305 306 rowcount += len(c) 307 num_success_chunks += 1 308 309 self.delete_pipe_cache(pipe, debug=debug) 310 success_tuple = True, ( 311 f"It took {interval_str(timedelta(seconds=(time.perf_counter() - begin)))} " 312 + f"to sync {rowcount:,} row" 313 + ('s' if rowcount != 1 else '') 314 + f" across {num_success_chunks:,} chunk" + ('s' if num_success_chunks != 1 else '') + 315 f" to {pipe}." 316 ) 317 return success_tuple
Sync a DataFrame into a Pipe.
342def delete_pipe( 343 self, 344 pipe: Optional[mrsm.Pipe] = None, 345 debug: bool = False, 346) -> SuccessTuple: 347 """Delete a Pipe and drop its table.""" 348 if pipe is None: 349 error("Pipe cannot be None.") 350 r_url = pipe_r_url(pipe) 351 response = self.delete( 352 r_url + '/delete', 353 params={'instance_keys': self.get_pipe_instance_keys(pipe)}, 354 debug=debug, 355 ) 356 if debug: 357 dprint(response.text) 358 359 response_data = response.json() 360 if isinstance(response.json(), list): 361 response_tuple = response_data[0], response_data[1] 362 elif 'detail' in response.json(): 363 response_tuple = response.__bool__(), response_data['detail'] 364 else: 365 response_tuple = response.__bool__(), response.text 366 return response_tuple
Delete a Pipe and drop its table.
320def delete_pipe_cache( 321 self, 322 pipe: mrsm.Pipe, 323 debug: bool = False, 324 **kw: Any 325) -> SuccessTuple: 326 """Invalidate the server-side cache for a pipe.""" 327 r_url = pipe_r_url(pipe) 328 response = self.delete( 329 r_url + '/cache', 330 params={'instance_keys': self.get_pipe_instance_keys(pipe)}, 331 debug=debug, 332 ) 333 if not response.ok: 334 return False, f"Failed to invalidate cache for {pipe}: {response.text}" 335 try: 336 data = response.json() 337 return tuple(data) if isinstance(data, list) else (response.ok, response.text) 338 except Exception: 339 return response.ok, response.text
Invalidate the server-side cache for a pipe.
369def get_pipe_data( 370 self, 371 pipe: mrsm.Pipe, 372 select_columns: Optional[List[str]] = None, 373 omit_columns: Optional[List[str]] = None, 374 begin: Union[str, datetime, int, None] = None, 375 end: Union[str, datetime, int, None] = None, 376 params: Optional[Dict[str, Any]] = None, 377 as_chunks: bool = False, 378 debug: bool = False, 379 **kw: Any 380) -> Union[pandas.DataFrame, None]: 381 """Fetch data from the API.""" 382 r_url = pipe_r_url(pipe) 383 while True: 384 try: 385 response = self.get( 386 r_url + "/data", 387 params={ 388 'select_columns': json.dumps(select_columns), 389 'omit_columns': json.dumps(omit_columns), 390 'begin': begin, 391 'end': end, 392 'params': json.dumps(params, default=str), 393 'instance': self.get_pipe_instance_keys(pipe), 394 'as_chunks': as_chunks, 395 }, 396 debug=debug 397 ) 398 if not response.ok: 399 return None 400 j = response.json() 401 except Exception as e: 402 warn(f"Failed to get data for {pipe}:\n{e}") 403 return None 404 if isinstance(j, dict) and 'detail' in j: 405 return False, j['detail'] 406 break 407 408 from meerschaum.utils.dataframe import parse_df_datetimes, add_missing_cols_to_df 409 from meerschaum.utils.dtypes import are_dtypes_equal 410 try: 411 df = parse_df_datetimes( 412 j, 413 ignore_cols=[ 414 col 415 for col, dtype in pipe.dtypes.items() 416 if not are_dtypes_equal(str(dtype), 'datetime') 417 ], 418 strip_timezone=(pipe.tzinfo is None), 419 debug=debug, 420 ) 421 except Exception as e: 422 warn(f"Failed to parse response for {pipe}:\n{e}") 423 return None 424 425 if len(df.columns) == 0: 426 return add_missing_cols_to_df(df, pipe.dtypes) 427 428 return df
Fetch data from the API.
431def get_pipe_id( 432 self, 433 pipe: mrsm.Pipe, 434 debug: bool = False, 435) -> Union[int, str, None]: 436 """Get a Pipe's ID from the API.""" 437 from meerschaum.utils.misc import is_int 438 r_url = pipe_r_url(pipe) 439 response = self.get( 440 r_url + '/id', 441 params={ 442 'instance': self.get_pipe_instance_keys(pipe), 443 }, 444 debug=debug, 445 ) 446 if debug: 447 dprint(f"Got pipe ID: {response.text}") 448 try: 449 if is_int(response.text): 450 return int(response.text) 451 if response.text and response.text[0] != '{': 452 return response.text 453 except Exception as e: 454 warn(f"Failed to get the ID for {pipe}:\n{e}") 455 return None
Get a Pipe's ID from the API.
458def get_pipe_attributes( 459 self, 460 pipe: mrsm.Pipe, 461 debug: bool = False, 462) -> Dict[str, Any]: 463 """Get a Pipe's attributes from the API 464 465 Parameters 466 ---------- 467 pipe: meerschaum.Pipe 468 The pipe whose attributes we are fetching. 469 470 Returns 471 ------- 472 A dictionary of a pipe's attributes. 473 If the pipe does not exist, return an empty dictionary. 474 """ 475 r_url = pipe_r_url(pipe) 476 response = self.get( 477 r_url + '/attributes', 478 params={ 479 'instance': self.get_pipe_instance_keys(pipe), 480 }, 481 debug=debug 482 ) 483 try: 484 return json.loads(response.text) 485 except Exception as e: 486 warn(f"Failed to get the attributes for {pipe}:\n{e}") 487 return {}
Get a Pipe's attributes from the API
Parameters
- pipe (meerschaum.Pipe): The pipe whose attributes we are fetching.
Returns
- A dictionary of a pipe's attributes.
- If the pipe does not exist, return an empty dictionary.
490def get_sync_time( 491 self, 492 pipe: mrsm.Pipe, 493 params: Optional[Dict[str, Any]] = None, 494 newest: bool = True, 495 debug: bool = False, 496) -> Union[datetime, int, None]: 497 """Get a Pipe's most recent datetime value from the API. 498 499 Parameters 500 ---------- 501 pipe: meerschaum.Pipe 502 The pipe to select from. 503 504 params: Optional[Dict[str, Any]], default None 505 Optional params dictionary to build the WHERE clause. 506 507 newest: bool, default True 508 If `True`, get the most recent datetime (honoring `params`). 509 If `False`, get the oldest datetime (ASC instead of DESC). 510 511 Returns 512 ------- 513 The most recent (or oldest if `newest` is `False`) datetime of a pipe, 514 rounded down to the closest minute. 515 """ 516 from meerschaum.utils.misc import is_int 517 from meerschaum.utils.warnings import warn 518 r_url = pipe_r_url(pipe) 519 response = self.get( 520 r_url + '/sync_time', 521 json=params, 522 params={ 523 'instance': self.get_pipe_instance_keys(pipe), 524 'newest': newest, 525 'debug': debug, 526 }, 527 debug=debug, 528 ) 529 if not response: 530 warn(f"Failed to get the sync time for {pipe}:\n" + response.text) 531 return None 532 533 j = response.json() 534 if j is None: 535 dt = None 536 else: 537 try: 538 dt = ( 539 datetime.fromisoformat(j) 540 if not is_int(j) 541 else int(j) 542 ) 543 except Exception as e: 544 warn(f"Failed to parse the sync time '{j}' for {pipe}:\n{e}") 545 dt = None 546 return dt
Get a Pipe's most recent datetime value from the API.
Parameters
- pipe (meerschaum.Pipe): The pipe to select from.
- params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause.
- newest (bool, default True):
If
True, get the most recent datetime (honoringparams). IfFalse, get the oldest datetime (ASC instead of DESC).
Returns
- The most recent (or oldest if
newestisFalse) datetime of a pipe, - rounded down to the closest minute.
549def pipe_exists( 550 self, 551 pipe: mrsm.Pipe, 552 debug: bool = False 553) -> bool: 554 """Check the API to see if a Pipe exists. 555 556 Parameters 557 ---------- 558 pipe: 'meerschaum.Pipe' 559 The pipe which were are querying. 560 561 Returns 562 ------- 563 A bool indicating whether a pipe's underlying table exists. 564 """ 565 from meerschaum.utils.debug import dprint 566 from meerschaum.utils.warnings import warn 567 r_url = pipe_r_url(pipe) 568 response = self.get( 569 r_url + '/exists', 570 params={ 571 'instance': self.get_pipe_instance_keys(pipe), 572 }, 573 debug=debug, 574 ) 575 if not response: 576 warn(f"Failed to check if {pipe} exists:\n{response.text}") 577 return False 578 if debug: 579 dprint("Received response: " + str(response.text)) 580 j = response.json() 581 if isinstance(j, dict) and 'detail' in j: 582 warn(j['detail']) 583 return j
Check the API to see if a Pipe exists.
Parameters
- pipe ('meerschaum.Pipe'): The pipe which were are querying.
Returns
- A bool indicating whether a pipe's underlying table exists.
586def create_metadata( 587 self, 588 debug: bool = False 589) -> bool: 590 """Create metadata tables. 591 592 Returns 593 ------- 594 A bool indicating success. 595 """ 596 from meerschaum.utils.debug import dprint 597 from meerschaum._internal.static import STATIC_CONFIG 598 r_url = STATIC_CONFIG['api']['endpoints']['metadata'] 599 response = self.post(r_url, debug=debug) 600 if debug: 601 dprint("Create metadata response: {response.text}") 602 try: 603 _ = json.loads(response.text) 604 except Exception as e: 605 warn(f"Failed to create metadata on {self}:\n{e}") 606 return False
Create metadata tables.
Returns
- A bool indicating success.
609def get_pipe_rowcount( 610 self, 611 pipe: mrsm.Pipe, 612 begin: Union[str, datetime, int, None] = None, 613 end: Union[str, datetime, int, None] = None, 614 params: Optional[Dict[str, Any]] = None, 615 remote: bool = False, 616 debug: bool = False, 617) -> int: 618 """Get a pipe's row count from the API. 619 620 Parameters 621 ---------- 622 pipe: 'meerschaum.Pipe': 623 The pipe whose row count we are counting. 624 625 begin: Union[str, datetime, int, None], default None 626 If provided, bound the count by this datetime. 627 628 end: Union[str, datetime, int, None], default None 629 If provided, bound the count by this datetime. 630 631 params: Optional[Dict[str, Any]], default None 632 If provided, bound the count by these parameters. 633 634 remote: bool, default False 635 If `True`, return the rowcount for the fetch definition. 636 637 Returns 638 ------- 639 The number of rows in the pipe's table, bound the given parameters. 640 If the table does not exist, return 0. 641 """ 642 r_url = pipe_r_url(pipe) 643 response = self.get( 644 r_url + "/rowcount", 645 json = params, 646 params = { 647 'begin': begin, 648 'end': end, 649 'remote': remote, 650 'instance': self.get_pipe_instance_keys(pipe), 651 }, 652 debug = debug 653 ) 654 if not response: 655 warn(f"Failed to get the rowcount for {pipe}:\n{response.text}") 656 return 0 657 try: 658 return int(json.loads(response.text)) 659 except Exception as e: 660 warn(f"Failed to get the rowcount for {pipe}:\n{e}") 661 return 0
Get a pipe's row count from the API.
Parameters
- pipe ('meerschaum.Pipe':): The pipe whose row count we are counting.
- begin (Union[str, datetime, int, None], default None): If provided, bound the count by this datetime.
- end (Union[str, datetime, int, None], default None): If provided, bound the count by this datetime.
- params (Optional[Dict[str, Any]], default None): If provided, bound the count by these parameters.
- remote (bool, default False):
If
True, return the rowcount for the fetch definition.
Returns
- The number of rows in the pipe's table, bound the given parameters.
- If the table does not exist, return 0.
664def drop_pipe( 665 self, 666 pipe: mrsm.Pipe, 667 debug: bool = False 668) -> SuccessTuple: 669 """ 670 Drop a pipe's table but maintain its registration. 671 672 Parameters 673 ---------- 674 pipe: meerschaum.Pipe: 675 The pipe to be dropped. 676 677 Returns 678 ------- 679 A success tuple (bool, str). 680 """ 681 from meerschaum.utils.warnings import error 682 from meerschaum.utils.debug import dprint 683 if pipe is None: 684 error("Pipe cannot be None.") 685 r_url = pipe_r_url(pipe) 686 response = self.delete( 687 r_url + '/drop', 688 params={ 689 'instance': self.get_pipe_instance_keys(pipe), 690 }, 691 debug=debug, 692 ) 693 if debug: 694 dprint(response.text) 695 696 try: 697 data = response.json() 698 except Exception as e: 699 return False, f"Failed to drop {pipe}." 700 701 if isinstance(data, list): 702 response_tuple = data[0], data[1] 703 elif 'detail' in response.json(): 704 response_tuple = response.__bool__(), data['detail'] 705 else: 706 response_tuple = response.__bool__(), response.text 707 708 return response_tuple
Drop a pipe's table but maintain its registration.
Parameters
- pipe (meerschaum.Pipe:): The pipe to be dropped.
Returns
- A success tuple (bool, str).
711def clear_pipe( 712 self, 713 pipe: mrsm.Pipe, 714 begin: Union[str, datetime, int, None] = None, 715 end: Union[str, datetime, int, None] = None, 716 params: Optional[Dict[str, Any]] = None, 717 debug: bool = False, 718 **kw 719) -> SuccessTuple: 720 """ 721 Delete rows in a pipe's table. 722 723 Parameters 724 ---------- 725 pipe: meerschaum.Pipe 726 The pipe with rows to be deleted. 727 728 Returns 729 ------- 730 A success tuple. 731 """ 732 r_url = pipe_r_url(pipe) 733 response = self.delete( 734 r_url + '/clear', 735 params={ 736 'begin': begin, 737 'end': end, 738 'params': json.dumps(params), 739 'instance': self.get_pipe_instance_keys(pipe), 740 }, 741 debug=debug, 742 ) 743 if debug: 744 dprint(response.text) 745 746 try: 747 data = response.json() 748 except Exception as e: 749 return False, f"Failed to clear {pipe} with constraints {begin=}, {end=}, {params=}." 750 751 if isinstance(data, list): 752 response_tuple = data[0], data[1] 753 elif 'detail' in response.json(): 754 response_tuple = response.__bool__(), data['detail'] 755 else: 756 response_tuple = response.__bool__(), response.text 757 758 return response_tuple
Delete rows in a pipe's table.
Parameters
- pipe (meerschaum.Pipe): The pipe with rows to be deleted.
Returns
- A success tuple.
761def get_pipe_columns_types( 762 self, 763 pipe: mrsm.Pipe, 764 debug: bool = False, 765) -> Union[Dict[str, str], None]: 766 """ 767 Fetch the columns and types of the pipe's table. 768 769 Parameters 770 ---------- 771 pipe: meerschaum.Pipe 772 The pipe whose columns to be queried. 773 774 Returns 775 ------- 776 A dictionary mapping column names to their database types. 777 778 Examples 779 -------- 780 >>> { 781 ... 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 782 ... 'id': 'BIGINT', 783 ... 'val': 'DOUBLE PRECISION', 784 ... } 785 >>> 786 """ 787 r_url = pipe_r_url(pipe) + '/columns/types' 788 response = self.get( 789 r_url, 790 params={ 791 'instance': self.get_pipe_instance_keys(pipe), 792 }, 793 debug=debug, 794 ) 795 j = response.json() 796 if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1: 797 warn(j['detail']) 798 return None 799 if not isinstance(j, dict): 800 warn(response.text) 801 return None 802 return j
Fetch the columns and types of the pipe's table.
Parameters
- pipe (meerschaum.Pipe): The pipe whose columns to be queried.
Returns
- A dictionary mapping column names to their database types.
Examples
>>> {
... 'dt': 'TIMESTAMP WITHOUT TIMEZONE',
... 'id': 'BIGINT',
... 'val': 'DOUBLE PRECISION',
... }
>>>
805def get_pipe_columns_indices( 806 self, 807 pipe: mrsm.Pipe, 808 debug: bool = False, 809) -> Union[Dict[str, str], None]: 810 """ 811 Fetch the index information for a pipe. 812 813 Parameters 814 ---------- 815 pipe: mrsm.Pipe 816 The pipe whose columns to be queried. 817 818 Returns 819 ------- 820 A dictionary mapping column names to a list of associated index information. 821 """ 822 r_url = pipe_r_url(pipe) + '/columns/indices' 823 response = self.get( 824 r_url, 825 params={ 826 'instance': self.get_pipe_instance_keys(pipe), 827 }, 828 debug=debug, 829 ) 830 j = response.json() 831 if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1: 832 warn(j['detail']) 833 return None 834 if not isinstance(j, dict): 835 warn(response.text) 836 return None 837 return j
Fetch the index information for a pipe.
Parameters
- pipe (mrsm.Pipe): The pipe whose columns to be queried.
Returns
- A dictionary mapping column names to a list of associated index information.
840def get_pipe_docs( 841 self, 842 pipe: mrsm.Pipe, 843 select_columns: Optional[List[str]] = None, 844 omit_columns: Optional[List[str]] = None, 845 begin: Union[str, datetime, int, None] = None, 846 end: Union[str, datetime, int, None] = None, 847 params: Optional[Dict[str, Any]] = None, 848 order: str = 'asc', 849 limit: Optional[int] = None, 850 debug: bool = False, 851 **kw: Any 852) -> List[Dict[str, Any]]: 853 """Fetch a pipe's data as a list of documents from the API.""" 854 r_url = pipe_r_url(pipe) 855 try: 856 response = self.get( 857 r_url + "/docs", 858 params={ 859 'select_columns': json.dumps(select_columns), 860 'omit_columns': json.dumps(omit_columns), 861 'begin': begin, 862 'end': end, 863 'params': json.dumps(params, default=str), 864 'order': order, 865 'limit': limit, 866 'instance_keys': self.get_pipe_instance_keys(pipe), 867 }, 868 debug=debug, 869 ) 870 if not response.ok: 871 warn(f"Failed to get docs for {pipe}:\n{response.text}") 872 return [] 873 j = response.json() 874 if isinstance(j, list): 875 return j 876 return [] 877 except Exception as e: 878 warn(f"Failed to get docs for {pipe}:\n{e}") 879 return []
Fetch a pipe's data as a list of documents from the API.
882def get_pipe_size( 883 self, 884 pipe: mrsm.Pipe, 885 debug: bool = False, 886 **kw: Any 887) -> Union[int, None]: 888 """ 889 Return the on-disk size of a pipe's target table in bytes via the API. 890 891 Parameters 892 ---------- 893 pipe: mrsm.Pipe 894 The pipe whose target table size to measure. 895 896 Returns 897 ------- 898 An `int` of the number of bytes occupied by the target table, 899 or `None` if the size could not be determined. 900 """ 901 r_url = pipe_r_url(pipe) + '/size' 902 response = self.get( 903 r_url, 904 params={ 905 'instance': self.get_pipe_instance_keys(pipe), 906 }, 907 debug=debug, 908 ) 909 if not response: 910 warn(f"Failed to get the size for {pipe}:\n{response.text}") 911 return None 912 try: 913 j = json.loads(response.text) 914 except Exception as e: 915 warn(f"Failed to parse the size for {pipe}:\n{e}") 916 return None 917 if j is None: 918 return None 919 if isinstance(j, dict) and 'detail' in j: 920 warn(j['detail']) 921 return None 922 try: 923 return int(j) 924 except Exception: 925 return None
Return the on-disk size of a pipe's target table in bytes via the API.
Parameters
- pipe (mrsm.Pipe): The pipe whose target table size to measure.
Returns
- An
intof the number of bytes occupied by the target table, - or
Noneif the size could not be determined.
928def compress_pipe( 929 self, 930 pipe: mrsm.Pipe, 931 debug: bool = False, 932 **kw: Any 933) -> SuccessTuple: 934 """ 935 Compress a pipe's target table via the API. 936 937 Parameters 938 ---------- 939 pipe: mrsm.Pipe 940 The pipe whose target table to compress. 941 942 Returns 943 ------- 944 A `SuccessTuple` indicating success. 945 """ 946 r_url = pipe_r_url(pipe) + '/compress' 947 response = self.post( 948 r_url, 949 params={ 950 'instance_keys': self.get_pipe_instance_keys(pipe), 951 }, 952 debug=debug, 953 ) 954 if debug: 955 dprint(response.text) 956 957 try: 958 data = response.json() 959 except Exception: 960 return False, f"Failed to compress {pipe}." 961 962 if isinstance(data, list): 963 return data[0], data[1] 964 if isinstance(data, dict) and 'detail' in data: 965 return response.__bool__(), data['detail'] 966 return response.__bool__(), response.text
Compress a pipe's target table via the API.
Parameters
- pipe (mrsm.Pipe): The pipe whose target table to compress.
Returns
- A
SuccessTupleindicating success.
969def decompress_pipe( 970 self, 971 pipe: mrsm.Pipe, 972 no_policy: bool = False, 973 debug: bool = False, 974 **kw: Any 975) -> SuccessTuple: 976 """ 977 Decompress a pipe's target table via the API, the inverse of `compress_pipe()`. 978 979 Parameters 980 ---------- 981 pipe: mrsm.Pipe 982 The pipe whose target table to decompress. 983 984 no_policy: bool, default False 985 If `True`, decompress existing data now but leave the compression policy in place. 986 987 Returns 988 ------- 989 A `SuccessTuple` indicating success. 990 """ 991 r_url = pipe_r_url(pipe) + '/decompress' 992 response = self.post( 993 r_url, 994 params={ 995 'instance_keys': self.get_pipe_instance_keys(pipe), 996 'no_policy': no_policy, 997 }, 998 debug=debug, 999 ) 1000 if debug: 1001 dprint(response.text) 1002 1003 try: 1004 data = response.json() 1005 except Exception: 1006 return False, f"Failed to decompress {pipe}." 1007 1008 if isinstance(data, list): 1009 return data[0], data[1] 1010 if isinstance(data, dict) and 'detail' in data: 1011 return response.__bool__(), data['detail'] 1012 return response.__bool__(), response.text
Decompress a pipe's target table via the API, the inverse of compress_pipe().
Parameters
- pipe (mrsm.Pipe): The pipe whose target table to decompress.
- no_policy (bool, default False):
If
True, decompress existing data now but leave the compression policy in place.
Returns
- A
SuccessTupleindicating success.
1015def vacuum_pipe( 1016 self, 1017 pipe: mrsm.Pipe, 1018 full: bool = False, 1019 debug: bool = False, 1020 **kw: Any 1021) -> SuccessTuple: 1022 """ 1023 Vacuum a pipe's target table via the API. 1024 1025 Parameters 1026 ---------- 1027 pipe: mrsm.Pipe 1028 The pipe whose target table to vacuum. 1029 1030 full: bool, default False 1031 If `True`, run `VACUUM FULL` (PostgreSQL family only). 1032 1033 Returns 1034 ------- 1035 A `SuccessTuple` indicating success. 1036 """ 1037 r_url = pipe_r_url(pipe) + '/vacuum' 1038 response = self.post( 1039 r_url, 1040 params={ 1041 'instance_keys': self.get_pipe_instance_keys(pipe), 1042 'full': full, 1043 }, 1044 debug=debug, 1045 ) 1046 if debug: 1047 dprint(response.text) 1048 1049 try: 1050 data = response.json() 1051 except Exception: 1052 return False, f"Failed to vacuum {pipe}." 1053 1054 if isinstance(data, list): 1055 return data[0], data[1] 1056 if isinstance(data, dict) and 'detail' in data: 1057 return response.__bool__(), data['detail'] 1058 return response.__bool__(), response.text
Vacuum a pipe's target table via the API.
Parameters
- pipe (mrsm.Pipe): The pipe whose target table to vacuum.
- full (bool, default False):
If
True, runVACUUM FULL(PostgreSQL family only).
Returns
- A
SuccessTupleindicating success.
1107def analyze_pipe( 1108 self, 1109 pipe: mrsm.Pipe, 1110 debug: bool = False, 1111 **kw: Any 1112) -> SuccessTuple: 1113 """ 1114 Analyze a pipe's target table via the API. 1115 1116 Parameters 1117 ---------- 1118 pipe: mrsm.Pipe 1119 The pipe whose target table to analyze. 1120 1121 Returns 1122 ------- 1123 A `SuccessTuple` indicating success. 1124 """ 1125 r_url = pipe_r_url(pipe) + '/analyze' 1126 response = self.post( 1127 r_url, 1128 params={ 1129 'instance_keys': self.get_pipe_instance_keys(pipe), 1130 }, 1131 debug=debug, 1132 ) 1133 if debug: 1134 dprint(response.text) 1135 1136 try: 1137 data = response.json() 1138 except Exception: 1139 return False, f"Failed to analyze {pipe}." 1140 1141 if isinstance(data, list): 1142 return data[0], data[1] 1143 if isinstance(data, dict) and 'detail' in data: 1144 return response.__bool__(), data['detail'] 1145 return response.__bool__(), response.text
Analyze a pipe's target table via the API.
Parameters
- pipe (mrsm.Pipe): The pipe whose target table to analyze.
Returns
- A
SuccessTupleindicating success.
1061def partition_pipe( 1062 self, 1063 pipe: mrsm.Pipe, 1064 chunk_minutes: Optional[int] = None, 1065 debug: bool = False, 1066 **kw: Any 1067) -> SuccessTuple: 1068 """ 1069 Repartition a pipe's target table to a new chunk width via the API. 1070 1071 Parameters 1072 ---------- 1073 pipe: mrsm.Pipe 1074 The partitioned pipe whose target table to repartition. 1075 1076 chunk_minutes: Optional[int], default None 1077 The new partition width in minutes. Defaults to the pipe's `verify.chunk_minutes`. 1078 1079 Returns 1080 ------- 1081 A `SuccessTuple` indicating success. 1082 """ 1083 r_url = pipe_r_url(pipe) + '/partition' 1084 response = self.post( 1085 r_url, 1086 params={ 1087 'instance_keys': self.get_pipe_instance_keys(pipe), 1088 **({'chunk_minutes': chunk_minutes} if chunk_minutes is not None else {}), 1089 }, 1090 debug=debug, 1091 ) 1092 if debug: 1093 dprint(response.text) 1094 1095 try: 1096 data = response.json() 1097 except Exception: 1098 return False, f"Failed to repartition {pipe}." 1099 1100 if isinstance(data, list): 1101 return data[0], data[1] 1102 if isinstance(data, dict) and 'detail' in data: 1103 return response.__bool__(), data['detail'] 1104 return response.__bool__(), response.text
Repartition a pipe's target table to a new chunk width via the API.
Parameters
- pipe (mrsm.Pipe): The partitioned pipe whose target table to repartition.
- chunk_minutes (Optional[int], default None):
The new partition width in minutes. Defaults to the pipe's
verify.chunk_minutes.
Returns
- A
SuccessTupleindicating success.
16def fetch( 17 self, 18 pipe: mrsm.Pipe, 19 begin: Union[datetime, str, int] = '', 20 end: Union[datetime, int] = None, 21 params: Optional[Dict[str, Any]] = None, 22 debug: bool = False, 23 **kw: Any 24 ) -> Iterator['pd.DataFrame']: 25 """Get the Pipe data from the remote Pipe.""" 26 from meerschaum.utils.debug import dprint 27 from meerschaum.utils.warnings import warn, error 28 from meerschaum.config._patch import apply_patch_to_config 29 30 fetch_params = pipe.parameters.get('fetch', {}) 31 if not fetch_params: 32 warn(f"Missing 'fetch' parameters for {pipe}.", stack=False) 33 return None 34 35 pipe_meta = fetch_params.get('pipe', {}) 36 ### Legacy: check for `connector_keys`, etc. at the root. 37 if not pipe_meta: 38 ck, mk, lk = ( 39 fetch_params.get('connector_keys', None), 40 fetch_params.get('metric_key', None), 41 fetch_params.get('location_key', None), 42 ) 43 if not ck or not mk: 44 warn(f"Missing `fetch:pipe` keys for {pipe}.", stack=False) 45 return None 46 47 pipe_meta.update({ 48 'connector': ck, 49 'metric': mk, 50 'location': lk, 51 }) 52 53 pipe_meta['instance'] = self 54 source_pipe = mrsm.Pipe(**pipe_meta) 55 56 _params = copy.deepcopy(params) if params is not None else {} 57 _params = apply_patch_to_config(_params, fetch_params.get('params', {})) 58 select_columns = fetch_params.get('select_columns', []) 59 omit_columns = fetch_params.get('omit_columns', []) 60 61 return source_pipe.get_data( 62 select_columns = select_columns, 63 omit_columns = omit_columns, 64 begin = begin, 65 end = end, 66 params = _params, 67 debug = debug, 68 as_iterator = True, 69 )
Get the Pipe data from the remote Pipe.
24def register_plugin( 25 self, 26 plugin: mrsm.core.Plugin, 27 make_archive: bool = True, 28 debug: bool = False, 29) -> SuccessTuple: 30 """Register a plugin and upload its archive.""" 31 import json 32 archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path 33 file_pointer = open(archive_path, 'rb') 34 files = {'archive': file_pointer} 35 metadata = { 36 'version': plugin.version, 37 'attributes': json.dumps(plugin.attributes), 38 } 39 r_url = plugin_r_url(plugin) 40 try: 41 response = self.post(r_url, files=files, params=metadata, debug=debug) 42 except Exception: 43 return False, f"Failed to register plugin '{plugin}'." 44 finally: 45 file_pointer.close() 46 47 try: 48 success, msg = json.loads(response.text) 49 except Exception: 50 return False, response.text 51 52 return success, msg
Register a plugin and upload its archive.
55def install_plugin( 56 self, 57 name: str, 58 skip_deps: bool = False, 59 force: bool = False, 60 debug: bool = False 61) -> SuccessTuple: 62 """Download and attempt to install a plugin from the API.""" 63 import os 64 import pathlib 65 import json 66 from meerschaum.core import Plugin 67 import meerschaum.config.paths as paths 68 from meerschaum.utils.debug import dprint 69 from meerschaum.utils.packages import attempt_import 70 binaryornot_check = attempt_import('binaryornot.check', lazy=False) 71 r_url = plugin_r_url(name) 72 dest = pathlib.Path(os.path.join(paths.PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz')) 73 if debug: 74 dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...") 75 archive_path = self.wget(r_url, dest, debug=debug) 76 is_binary = binaryornot_check.is_binary(str(archive_path)) 77 if not is_binary: 78 fail_msg = f"Failed to download binary for plugin '{name}'." 79 try: 80 with open(archive_path, 'r') as f: 81 j = json.load(f) 82 if isinstance(j, list): 83 success, msg = tuple(j) 84 elif isinstance(j, dict) and 'detail' in j: 85 success, msg = False, fail_msg 86 except Exception: 87 success, msg = False, fail_msg 88 return success, msg 89 plugin = Plugin(name, archive_path=archive_path, repo_connector=self) 90 return plugin.install(skip_deps=skip_deps, force=force, debug=debug)
Download and attempt to install a plugin from the API.
156def delete_plugin( 157 self, 158 plugin: mrsm.core.Plugin, 159 debug: bool = False 160) -> SuccessTuple: 161 """Delete a plugin from an API repository.""" 162 import json 163 r_url = plugin_r_url(plugin) 164 try: 165 response = self.delete(r_url, debug=debug) 166 except Exception: 167 return False, f"Failed to delete plugin '{plugin}'." 168 169 try: 170 success, msg = json.loads(response.text) 171 except Exception: 172 return False, response.text 173 174 return success, msg
Delete a plugin from an API repository.
93def get_plugins( 94 self, 95 user_id: Optional[int] = None, 96 search_term: Optional[str] = None, 97 debug: bool = False 98) -> List[str]: 99 """Return a list of registered plugin names. 100 101 Parameters 102 ---------- 103 user_id: Optional[int], default None 104 If specified, return all plugins from a certain user. 105 106 search_term: Optional[str], default None 107 If specified, return plugins beginning with this string. 108 109 Returns 110 ------- 111 A list of plugin names. 112 """ 113 import json 114 from meerschaum.utils.warnings import error 115 from meerschaum._internal.static import STATIC_CONFIG 116 response = self.get( 117 STATIC_CONFIG['api']['endpoints']['plugins'], 118 params = {'user_id': user_id, 'search_term': search_term}, 119 use_token = True, 120 debug = debug 121 ) 122 if not response: 123 return [] 124 plugins = json.loads(response.text) 125 if not isinstance(plugins, list): 126 error(response.text) 127 return plugins
Return a list of registered plugin names.
Parameters
- user_id (Optional[int], default None): If specified, return all plugins from a certain user.
- search_term (Optional[str], default None): If specified, return plugins beginning with this string.
Returns
- A list of plugin names.
130def get_plugin_attributes( 131 self, 132 plugin: mrsm.core.Plugin, 133 debug: bool = False 134) -> Dict[str, Any]: 135 """ 136 Return a plugin's attributes. 137 """ 138 import json 139 from meerschaum.utils.warnings import warn, error 140 r_url = plugin_r_url(plugin) + '/attributes' 141 response = self.get(r_url, use_token=True, debug=debug) 142 attributes = response.json() 143 if isinstance(attributes, str) and attributes and attributes[0] == '{': 144 try: 145 attributes = json.loads(attributes) 146 except Exception: 147 pass 148 if not isinstance(attributes, dict): 149 error(response.text) 150 elif not response and 'detail' in attributes: 151 warn(attributes['detail']) 152 return {} 153 return attributes
Return a plugin's attributes.
19def login( 20 self, 21 debug: bool = False, 22 warn: bool = True, 23 **kw: Any 24) -> SuccessTuple: 25 """Log in and set the session token.""" 26 if self.login_scheme == 'api_key': 27 validate_response = self.post( 28 STATIC_CONFIG['api']['endpoints']['tokens'] + '/validate', 29 headers={'Authorization': f'Bearer {self.api_key}'}, 30 use_token=False, 31 debug=debug, 32 ) 33 if not validate_response: 34 return False, "API key is not valid." 35 return True, "API key is valid." 36 37 try: 38 if self.login_scheme == 'password': 39 login_data = { 40 'username': self.username, 41 'password': self.password, 42 } 43 elif self.login_scheme == 'client_credentials': 44 login_data = { 45 'client_id': self.client_id, 46 'client_secret': self.client_secret, 47 } 48 except AttributeError: 49 login_data = {} 50 51 if not login_data: 52 return False, f"Please login with the command `login {self}`." 53 54 login_scheme_msg = ( 55 f" as user '{login_data['username']}'" 56 if self.login_scheme == 'username' 57 else '' 58 ) 59 60 response = self.post( 61 STATIC_CONFIG['api']['endpoints']['login'], 62 data=login_data, 63 use_token=False, 64 debug=debug, 65 ) 66 if response: 67 msg = f"Successfully logged into '{self}'{login_scheme_msg}'." 68 self._token = json.loads(response.text)['access_token'] 69 self._expires = datetime.datetime.strptime( 70 json.loads(response.text)['expires'], 71 '%Y-%m-%dT%H:%M:%S.%f' 72 ) 73 else: 74 msg = ( 75 f"Failed to log into '{self}'{login_scheme_msg}.\n" + 76 f" Please verify login details for connector '{self}'." 77 ) 78 if warn and not self.__dict__.get('_emitted_warning', False): 79 _warn(msg, stack=False) 80 self._emitted_warning = True 81 82 return response.__bool__(), msg
Log in and set the session token.
85def test_connection( 86 self, 87 **kw: Any 88) -> Union[bool, None]: 89 """Test if a successful connection to the API may be made.""" 90 from meerschaum.connectors.poll import retry_connect 91 _default_kw = { 92 'max_retries': 1, 'retry_wait': 0, 'warn': False, 93 'connector': self, 'enforce_chaining': False, 94 'enforce_login': False, 95 } 96 _default_kw.update(kw) 97 try: 98 return retry_connect(**_default_kw) 99 except Exception: 100 return False
Test if a successful connection to the API may be made.
70def register_user( 71 self, 72 user: mrsm.core.User, 73 debug: bool = False, 74 **kw: Any 75) -> SuccessTuple: 76 """Register a new user.""" 77 from meerschaum._internal.static import STATIC_CONFIG 78 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/register" 79 data = { 80 'username': user.username, 81 'password': user.password, 82 'attributes': json.dumps(user.attributes), 83 } 84 if user.type: 85 data['type'] = user.type 86 if user.email: 87 data['email'] = user.email 88 response = self.post(r_url, data=data, debug=debug) 89 try: 90 _json = json.loads(response.text) 91 if isinstance(_json, dict) and 'detail' in _json: 92 return False, _json['detail'] 93 success_tuple = tuple(_json) 94 except Exception: 95 msg = response.text if response else f"Failed to register user '{user}'." 96 return False, msg 97 98 return tuple(success_tuple)
Register a new user.
101def get_user_id( 102 self, 103 user: mrsm.core.User, 104 debug: bool = False, 105 **kw: Any 106) -> Union[int, str, UUID, None]: 107 """Get a user's ID.""" 108 from meerschaum._internal.static import STATIC_CONFIG 109 from meerschaum.utils.misc import is_int, is_uuid 110 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/id" 111 response = self.get(r_url, debug=debug, **kw) 112 try: 113 id_text = str(json.loads(response.text)) 114 if is_int(id_text): 115 user_id = int(id_text) 116 elif is_uuid(id_text): 117 user_id = UUID(id_text) 118 else: 119 user_id = id_text 120 except Exception as e: 121 user_id = None 122 return user_id
Get a user's ID.
19def get_users( 20 self, 21 debug: bool = False, 22 **kw: Any 23) -> List[str]: 24 """ 25 Return a list of registered usernames. 26 """ 27 from meerschaum._internal.static import STATIC_CONFIG 28 response = self.get( 29 f"{STATIC_CONFIG['api']['endpoints']['users']}", 30 debug = debug, 31 use_token = True, 32 ) 33 if not response: 34 return [] 35 try: 36 return response.json() 37 except Exception as e: 38 return []
Return a list of registered usernames.
41def edit_user( 42 self, 43 user: mrsm.core.User, 44 debug: bool = False, 45 **kw: Any 46) -> SuccessTuple: 47 """Edit an existing user.""" 48 from meerschaum._internal.static import STATIC_CONFIG 49 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit" 50 data = { 51 'username': user.username, 52 'password': user.password, 53 'type': user.type, 54 'email': user.email, 55 'attributes': json.dumps(user.attributes), 56 } 57 response = self.post(r_url, data=data, debug=debug) 58 try: 59 _json = json.loads(response.text) 60 if isinstance(_json, dict) and 'detail' in _json: 61 return False, _json['detail'] 62 success_tuple = tuple(_json) 63 except Exception: 64 msg = response.text if response else f"Failed to edit user '{user}'." 65 return False, msg 66 67 return tuple(success_tuple)
Edit an existing user.
125def delete_user( 126 self, 127 user: mrsm.core.User, 128 debug: bool = False, 129 **kw: Any 130) -> SuccessTuple: 131 """Delete a user.""" 132 from meerschaum._internal.static import STATIC_CONFIG 133 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}" 134 response = self.delete(r_url, debug=debug) 135 try: 136 _json = json.loads(response.text) 137 if isinstance(_json, dict) and 'detail' in _json: 138 return False, _json['detail'] 139 success_tuple = tuple(_json) 140 except Exception: 141 success_tuple = False, f"Failed to delete user '{user.username}'." 142 return success_tuple
Delete a user.
166def get_user_password_hash( 167 self, 168 user: mrsm.core.User, 169 debug: bool = False, 170 **kw: Any 171) -> Optional[str]: 172 """If configured, get a user's password hash.""" 173 from meerschaum._internal.static import STATIC_CONFIG 174 r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/password_hash' 175 response = self.get(r_url, debug=debug, **kw) 176 if not response: 177 return None 178 return response.json()
If configured, get a user's password hash.
181def get_user_type( 182 self, 183 user: mrsm.core.User, 184 debug: bool = False, 185 **kw: Any 186) -> Optional[str]: 187 """If configured, get a user's type.""" 188 from meerschaum._internal.static import STATIC_CONFIG 189 r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/type' 190 response = self.get(r_url, debug=debug, **kw) 191 if not response: 192 return None 193 return response.json()
If configured, get a user's type.
145def get_user_attributes( 146 self, 147 user: mrsm.core.User, 148 debug: bool = False, 149 **kw 150) -> int: 151 """Get a user's attributes.""" 152 from meerschaum._internal.static import STATIC_CONFIG 153 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/attributes" 154 response = self.get(r_url, debug=debug, **kw) 155 try: 156 attributes = json.loads(response.text) 157 except Exception: 158 attributes = None 159 return attributes
Get a user's attributes.
20def register_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple: 21 """ 22 Register the provided token to the API. 23 """ 24 from meerschaum.utils.dtypes import json_serialize_value 25 r_url = tokens_endpoint + '/register' 26 response = self.post( 27 r_url, 28 data=json.dumps({ 29 'label': token.label, 30 'scopes': token.scopes, 31 'expiration': token.expiration, 32 }, default=json_serialize_value), 33 debug=debug, 34 ) 35 if not response: 36 return False, f"Failed to register token:\n{response.text}" 37 38 data = response.json() 39 token.label = data['label'] 40 token.secret = data['secret'] 41 token.id = uuid.UUID(data['id']) 42 if data.get('expiration', None): 43 token.expiration = datetime.fromisoformat(data['expiration']) 44 45 return True, f"Registered token '{token.label}'."
Register the provided token to the API.
48def get_token_model(self, token_id: uuid.UUID, debug: bool = False) -> 'Union[TokenModel, None]': 49 """ 50 Return a token's model from the API instance. 51 """ 52 from meerschaum.models import TokenModel 53 r_url = tokens_endpoint + f'/{token_id}' 54 response = self.get(r_url, debug=debug) 55 if not response: 56 return None 57 data = response.json() 58 return TokenModel(**data)
Return a token's model from the API instance.
61def get_tokens(self, labels: Optional[List[str]] = None, debug: bool = False) -> List[Token]: 62 """ 63 Return the tokens registered to the current user. 64 """ 65 from meerschaum.utils.warnings import warn 66 r_url = tokens_endpoint 67 params = {} 68 if labels: 69 params['labels'] = ','.join(labels) 70 response = self.get(r_url, params={'labels': labels}, debug=debug) 71 if not response: 72 warn(f"Could not get tokens from '{self}':\n{response.text}") 73 return [] 74 75 tokens = [ 76 Token(instance=self, **payload) 77 for payload in response.json() 78 ] 79 return tokens
Return the tokens registered to the current user.
82def edit_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple: 83 """ 84 Persist the token's in-memory state to the API. 85 """ 86 r_url = tokens_endpoint + f"/{token.id}/edit" 87 response = self.post( 88 r_url, 89 json={ 90 'creation': token.creation.isoformat() if token.creation else None, 91 'expiration': token.expiration.isoformat() if token.expiration else None, 92 'label': token.label, 93 'is_valid': token.is_valid, 94 'scopes': token.scopes, 95 }, 96 ) 97 if not response: 98 return False, f"Failed to edit token:\n{response.text}" 99 100 success, msg = response.json() 101 return success, msg
Persist the token's in-memory state to the API.
104def invalidate_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple: 105 """ 106 Invalidate the token, disabling it for future requests. 107 """ 108 r_url = tokens_endpoint + f"/{token.id}/invalidate" 109 response = self.post(r_url) 110 if not response: 111 return False, f"Failed to invalidate token:\n{response.text}" 112 113 success, msg = response.json() 114 return success, msg
Invalidate the token, disabling it for future requests.
117def get_token_scopes(self, token_id: Union[uuid.UUID, Token], debug: bool = False) -> List[str]: 118 """ 119 Return the scopes for a token. 120 """ 121 _token_id = (token_id.id if isinstance(token_id, Token) else token_id) 122 model = self.get_token_model(_token_id, debug=debug).scopes 123 return getattr(model, 'scopes', [])
Return the scopes for a token.
126def token_exists(self, token_id: Union[uuid.UUID, Token], debug: bool = False) -> bool: 127 """ 128 Return `True` if a token exists. 129 """ 130 _token_id = (token_id.id if isinstance(token_id, Token) else token_id) 131 model = self.get_token_model(_token_id, debug=debug) 132 if model is None: 133 return False 134 return model.creation is not None
Return True if a token exists.
137def delete_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple: 138 """ 139 Delete the token from the API. 140 """ 141 r_url = tokens_endpoint + f"/{token.id}" 142 response = self.delete(r_url, debug=debug) 143 if not response: 144 return False, f"Failed to delete token:\n{response.text}" 145 146 success, msg = response.json() 147 return success, msg
Delete the token from the API.
13@classmethod 14def from_uri( 15 cls, 16 uri: str, 17 label: Optional[str] = None, 18 as_dict: bool = False, 19) -> Union[ 20 'meerschaum.connectors.APIConnector', 21 Dict[str, Union[str, int]], 22 ]: 23 """ 24 Create a new APIConnector from a URI string. 25 26 Parameters 27 ---------- 28 uri: str 29 The URI connection string. 30 31 label: Optional[str], default None 32 If provided, use this as the connector label. 33 Otherwise use the determined database name. 34 35 as_dict: bool, default False 36 If `True`, return a dictionary of the keyword arguments 37 necessary to create a new `APIConnector`, otherwise create a new object. 38 39 Returns 40 ------- 41 A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`). 42 """ 43 from meerschaum.connectors.sql import SQLConnector 44 params = SQLConnector.parse_uri(uri) 45 if 'host' not in params: 46 error("No host was found in the provided URI.") 47 params['protocol'] = params.pop('flavor') 48 params['label'] = label or ( 49 ( 50 (params['username'] + '@' if 'username' in params else '') 51 + params['host'] 52 ).lower() 53 ) 54 55 return cls(**params) if not as_dict else params
Create a new APIConnector from a URI string.
Parameters
- uri (str): The URI connection string.
- label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
- as_dict (bool, default False):
If
True, return a dictionary of the keyword arguments necessary to create a newAPIConnector, otherwise create a new object.
Returns
- A new APIConnector object or a dictionary of attributes (if
as_dictisTrue).
28def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 29 """ 30 Return a dictionary of remote jobs. 31 """ 32 response = self.get(JOBS_ENDPOINT, debug=debug) 33 if not response: 34 warn(f"Failed to get remote jobs from {self}.") 35 return {} 36 return { 37 name: Job( 38 name, 39 job_meta['sysargs'], 40 executor_keys=str(self), 41 _properties=job_meta['daemon']['properties'] 42 ) 43 for name, job_meta in response.json().items() 44 }
Return a dictionary of remote jobs.
47def get_job(self, name: str, debug: bool = False) -> Job: 48 """ 49 Return a single Job object. 50 """ 51 metadata = self.get_job_metadata(name, debug=debug) 52 if not metadata: 53 raise ValueError(f"Job '{name}' does not exist.") 54 55 return Job( 56 name, 57 metadata['sysargs'], 58 executor_keys=str(self), 59 _properties=metadata['daemon']['properties'], 60 )
Return a single Job object.
63def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 64 """ 65 Return the metadata for a single job. 66 """ 67 now = time.perf_counter() 68 _job_metadata_cache = self.__dict__.get('_job_metadata_cache', None) 69 _job_metadata_timestamp = ( 70 _job_metadata_cache.get(name, {}).get('timestamp', None) 71 ) if _job_metadata_cache is not None else None 72 73 if ( 74 _job_metadata_timestamp is not None 75 and (now - _job_metadata_timestamp) < JOB_METADATA_CACHE_SECONDS 76 ): 77 if debug: 78 dprint(f"Returning cached metadata for job '{name}'.") 79 return _job_metadata_cache[name]['metadata'] 80 81 response = self.get(JOBS_ENDPOINT + f"/{name}", debug=debug) 82 if not response: 83 if debug: 84 msg = ( 85 response.json()['detail'] 86 if 'detail' in response.text 87 else response.text 88 ) 89 warn(f"Failed to get metadata for job '{name}':\n{msg}") 90 return {} 91 92 metadata = response.json() 93 if _job_metadata_cache is None: 94 self._job_metadata_cache = {} 95 96 self._job_metadata_cache[name] = { 97 'timestamp': now, 98 'metadata': metadata, 99 } 100 return metadata
Return the metadata for a single job.
102def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 103 """ 104 Return the daemon properties for a single job. 105 """ 106 metadata = self.get_job_metadata(name, debug=debug) 107 return metadata.get('daemon', {}).get('properties', {})
Return the daemon properties for a single job.
149def get_job_exists(self, name: str, debug: bool = False) -> bool: 150 """ 151 Return whether a job exists. 152 """ 153 response = self.get(JOBS_ENDPOINT + f'/{name}/exists', debug=debug) 154 if not response: 155 warn(f"Failed to determine whether job '{name}' exists.") 156 return False 157 158 return response.json()
Return whether a job exists.
161def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 162 """ 163 Delete a job. 164 """ 165 response = self.delete(JOBS_ENDPOINT + f"/{name}", debug=debug) 166 if not response: 167 if 'detail' in response.text: 168 return False, response.json()['detail'] 169 170 return False, response.text 171 172 return tuple(response.json())
Delete a job.
175def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 176 """ 177 Start a job. 178 """ 179 response = self.post(JOBS_ENDPOINT + f"/{name}/start", debug=debug) 180 if not response: 181 if 'detail' in response.text: 182 return False, response.json()['detail'] 183 return False, response.text 184 185 return tuple(response.json())
Start a job.
188def create_job( 189 self, 190 name: str, 191 sysargs: List[str], 192 properties: Optional[Dict[str, str]] = None, 193 debug: bool = False, 194) -> SuccessTuple: 195 """ 196 Create a job. 197 """ 198 response = self.post( 199 JOBS_ENDPOINT + f"/{name}", 200 json={ 201 'sysargs': sysargs, 202 'properties': properties, 203 }, 204 debug=debug, 205 ) 206 if not response: 207 if 'detail' in response.text: 208 return False, response.json()['detail'] 209 return False, response.text 210 211 return tuple(response.json())
Create a job.
214def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 215 """ 216 Stop a job. 217 """ 218 response = self.post(JOBS_ENDPOINT + f"/{name}/stop", debug=debug) 219 if not response: 220 if 'detail' in response.text: 221 return False, response.json()['detail'] 222 return False, response.text 223 224 return tuple(response.json())
Stop a job.
227def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 228 """ 229 Pause a job. 230 """ 231 response = self.post(JOBS_ENDPOINT + f"/{name}/pause", debug=debug) 232 if not response: 233 if 'detail' in response.text: 234 return False, response.json()['detail'] 235 return False, response.text 236 237 return tuple(response.json())
Pause a job.
240def get_logs(self, name: str, debug: bool = False) -> str: 241 """ 242 Return the logs for a job. 243 """ 244 response = self.get(LOGS_ENDPOINT + f"/{name}") 245 if not response: 246 raise ValueError(f"Cannot fetch logs for job '{name}':\n{response.text}") 247 248 return response.json()
Return the logs for a job.
251def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 252 """ 253 Return the job's manual stop time. 254 """ 255 response = self.get(JOBS_ENDPOINT + f"/{name}/stop_time") 256 if not response: 257 warn(f"Failed to get stop time for job '{name}':\n{response.text}") 258 return None 259 260 data = response.json() 261 if data is None: 262 return None 263 264 return datetime.fromisoformat(data)
Return the job's manual stop time.
348def monitor_logs( 349 self, 350 name: str, 351 callback_function: Callable[[Any], Any], 352 input_callback_function: Callable[[None], str], 353 stop_callback_function: Callable[[None], str], 354 stop_on_exit: bool = False, 355 strip_timestamps: bool = False, 356 accept_input: bool = True, 357 debug: bool = False, 358): 359 """ 360 Monitor a job's log files and execute a callback with the changes. 361 """ 362 return asyncio.run( 363 self.monitor_logs_async( 364 name, 365 callback_function, 366 input_callback_function=input_callback_function, 367 stop_callback_function=stop_callback_function, 368 stop_on_exit=stop_on_exit, 369 strip_timestamps=strip_timestamps, 370 accept_input=accept_input, 371 debug=debug 372 ) 373 )
Monitor a job's log files and execute a callback with the changes.
267async def monitor_logs_async( 268 self, 269 name: str, 270 callback_function: Callable[[Any], Any], 271 input_callback_function: Callable[[], str], 272 stop_callback_function: Callable[[SuccessTuple], str], 273 stop_on_exit: bool = False, 274 strip_timestamps: bool = False, 275 accept_input: bool = True, 276 debug: bool = False, 277): 278 """ 279 Monitor a job's log files and await a callback with the changes. 280 """ 281 import traceback 282 from meerschaum.jobs import StopMonitoringLogs 283 from meerschaum.utils.formatting._jobs import strip_timestamp_from_line 284 285 websockets, websockets_exceptions = mrsm.attempt_import('websockets', 'websockets.exceptions') 286 protocol = 'ws' if self.URI.startswith('http://') else 'wss' 287 port = self.port if 'port' in self.__dict__ else '' 288 uri = f"{protocol}://{self.host}:{port}{LOGS_ENDPOINT}/{name}/ws" 289 290 async def _stdin_callback(client): 291 if input_callback_function is None: 292 return 293 294 if asyncio.iscoroutinefunction(input_callback_function): 295 data = await input_callback_function() 296 else: 297 data = input_callback_function() 298 299 await client.send(data) 300 301 async def _stop_callback(client): 302 try: 303 result = tuple(json.loads(await client.recv())) 304 except Exception as e: 305 warn(traceback.format_exc()) 306 result = False, str(e) 307 308 if stop_callback_function is not None: 309 if asyncio.iscoroutinefunction(stop_callback_function): 310 await stop_callback_function(result) 311 else: 312 stop_callback_function(result) 313 314 if stop_on_exit: 315 raise StopMonitoringLogs 316 317 message_callbacks = { 318 JOBS_STDIN_MESSAGE: _stdin_callback, 319 JOBS_STOP_MESSAGE: _stop_callback, 320 } 321 322 async with websockets.connect(uri) as websocket: 323 try: 324 await websocket.send(self.token or 'no-login') 325 except websockets_exceptions.ConnectionClosedOK: 326 pass 327 328 while True: 329 try: 330 response = await websocket.recv() 331 callback = message_callbacks.get(response, None) 332 if callback is not None: 333 await callback(websocket) 334 continue 335 336 if strip_timestamps: 337 response = strip_timestamp_from_line(response) 338 339 if asyncio.iscoroutinefunction(callback_function): 340 await callback_function(response) 341 else: 342 callback_function(response) 343 except (KeyboardInterrupt, StopMonitoringLogs): 344 await websocket.close() 345 break
Monitor a job's log files and await a callback with the changes.
375def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 376 """ 377 Return whether a remote job is blocking on stdin. 378 """ 379 response = self.get(JOBS_ENDPOINT + f'/{name}/is_blocking_on_stdin', debug=debug) 380 if not response: 381 return False 382 383 return response.json()
Return whether a remote job is blocking on stdin.
116def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 117 """ 118 Return a job's `began` timestamp, if it exists. 119 """ 120 properties = self.get_job_properties(name, debug=debug) 121 began_str = properties.get('daemon', {}).get('began', None) 122 if began_str is None: 123 return None 124 125 return began_str
Return a job's began timestamp, if it exists.
127def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 128 """ 129 Return a job's `ended` timestamp, if it exists. 130 """ 131 properties = self.get_job_properties(name, debug=debug) 132 ended_str = properties.get('daemon', {}).get('ended', None) 133 if ended_str is None: 134 return None 135 136 return ended_str
Return a job's ended timestamp, if it exists.
138def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 139 """ 140 Return a job's `paused` timestamp, if it exists. 141 """ 142 properties = self.get_job_properties(name, debug=debug) 143 paused_str = properties.get('daemon', {}).get('paused', None) 144 if paused_str is None: 145 return None 146 147 return paused_str
Return a job's paused timestamp, if it exists.
109def get_job_status(self, name: str, debug: bool = False) -> str: 110 """ 111 Return the job's status. 112 """ 113 metadata = self.get_job_metadata(name, debug=debug) 114 return metadata.get('status', 'stopped')
Return the job's status.