Coverage for fingerprint_server_sdk / api_client.py: 57%
327 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 18:41 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 18:41 +0000
1"""
2Server API
3Fingerprint Server API allows you to get, search, and update Events in a server environment. It can be used for data exports, decision-making, and data analysis scenarios.
4Server API is intended for server-side usage, it's not intended to be used from the client side, whether it's a browser or a mobile device.
6The version of the OpenAPI document: 4
7Contact: support@fingerprint.com
8Generated by OpenAPI Generator (https://openapi-generator.tech)
10Do not edit the class manually.
11""" # noqa: E501
13from __future__ import annotations
15import datetime
16import decimal
17import json
18import mimetypes
19import os
20import re
21import tempfile
22import uuid
23from enum import Enum
24from types import TracebackType
25from typing import Any, Optional, Union
26from urllib.parse import quote
28from dateutil.parser import parse
29from pydantic import SecretStr
31import fingerprint_server_sdk.models
32from fingerprint_server_sdk import __version__, rest
33from fingerprint_server_sdk.api_response import ApiResponse
34from fingerprint_server_sdk.api_response import T as ApiResponseT
35from fingerprint_server_sdk.configuration import Configuration
36from fingerprint_server_sdk.exceptions import (
37 ApiException,
38 ApiValueError,
39)
41RequestSerialized = tuple[str, str, dict[str, Any], Optional[Any], Any]
42FilesType = dict[
43 str, Union[str, bytes, list[str], list[bytes], tuple[str, bytes], list[tuple[str, bytes]]]
44]
47class ApiClient:
48 """Generic API client for OpenAPI client library builds.
50 OpenAPI generic API client. This client handles the client-
51 server communication, and is invariant across implementations. Specifics of
52 the methods and models for each application are generated from the OpenAPI
53 templates.
55 :param configuration: .Configuration object for this client
56 :param header_name: a header to pass when making calls to the API.
57 :param header_value: a header value to pass when making calls to
58 the API.
59 :param cookie: a cookie to include in the header when making calls
60 to the API
61 """
63 PRIMITIVE_TYPES = (float, bool, bytes, str, int)
64 NATIVE_TYPES_MAPPING = {
65 'int': int,
66 'float': float,
67 'str': str,
68 'bool': bool,
69 'date': datetime.date,
70 'datetime': datetime.datetime,
71 'decimal': decimal.Decimal,
72 'object': object,
73 }
75 def __init__(
76 self,
77 configuration: Configuration,
78 header_name: Optional[str] = None,
79 header_value: Optional[str] = None,
80 cookie: Optional[str] = None,
81 ) -> None:
82 self.configuration = configuration
84 self.rest_client = rest.RESTClientObject(configuration)
85 self.default_headers: dict[str, str] = {}
86 if header_name is not None and header_value is not None:
87 self.default_headers[header_name] = header_value
88 self.cookie = cookie
89 # Set default User-Agent.
90 self.user_agent = f'fingerprint-server-python-sdk/{__version__}'
91 self.client_side_validation = configuration.client_side_validation
93 def __enter__(self) -> ApiClient:
94 return self
96 def __exit__(
97 self,
98 exc_type: Optional[type[BaseException]],
99 exc_value: Optional[BaseException],
100 traceback: Optional[TracebackType],
101 ) -> None:
102 pass
104 @property
105 def user_agent(self) -> str:
106 """User agent for this API client"""
107 return self.default_headers['User-Agent']
109 @user_agent.setter
110 def user_agent(self, value: str) -> None:
111 self.default_headers['User-Agent'] = value
113 def set_default_header(self, header_name: str, header_value: str) -> None:
114 self.default_headers[header_name] = header_value
116 def param_serialize(
117 self,
118 method: str,
119 resource_path: str,
120 path_params: Optional[dict[str, Any]] = None,
121 query_params: Optional[list[tuple[str, Any]]] = None,
122 header_params: Optional[dict[str, Any]] = None,
123 body: Optional[Any] = None,
124 post_params: Optional[list[tuple[str, Any]]] = None,
125 files: Optional[FilesType] = None,
126 auth_settings: Optional[list[str]] = None,
127 collection_formats: Optional[dict[str, str]] = None,
128 _request_auth: Optional[dict[str, Any]] = None,
129 ) -> RequestSerialized:
130 """Builds the HTTP request params needed by the request.
131 :param method: Method to call.
132 :param resource_path: Path to method endpoint.
133 :param path_params: Path parameters in the url.
134 :param query_params: Query parameters in the url.
135 :param header_params: Header parameters to be
136 placed in the request header.
137 :param body: Request body.
138 :param post_params dict: Request post form parameters,
139 for `application/x-www-form-urlencoded`, `multipart/form-data`.
140 :param auth_settings list: Auth Settings names for the request.
141 :param files dict: key -> filename, value -> filepath,
142 for `multipart/form-data`.
143 :param collection_formats: dict of collection formats for path, query,
144 header, and post parameters.
145 :param _request_auth: set to override the auth_settings for an a single
146 request; this effectively ignores the authentication
147 in the spec for a single request.
148 :return: tuple of form (path, http_method, query_params, header_params,
149 body, post_params, files)
150 """
152 config = self.configuration
154 # header parameters
155 header_params = header_params or {}
156 header_params.update(self.default_headers)
157 if self.cookie:
158 header_params['Cookie'] = self.cookie
159 if header_params:
160 header_params = self.sanitize_for_serialization(header_params)
161 header_params = dict(self.parameters_to_tuples(header_params, collection_formats))
163 # path parameters
164 if path_params:
165 path_params_sanitized = self.sanitize_for_serialization(path_params)
166 path_params_tuples = self.parameters_to_tuples(
167 path_params_sanitized, collection_formats
168 )
169 for k, v in path_params_tuples:
170 # specified safe chars, encode everything
171 resource_path = resource_path.replace(
172 '{' + k + '}', quote(str(v), safe=config.safe_chars_for_path_param)
173 )
175 # post parameters
176 post_params_result: Any = None
177 if post_params or files:
178 post_params_list = post_params if post_params else []
179 post_params_sanitized = self.sanitize_for_serialization(post_params_list)
180 post_params_result = self.parameters_to_tuples(
181 post_params_sanitized, collection_formats
182 )
183 if files:
184 post_params_result.extend(self.files_parameters(files))
186 # auth setting
187 self.update_params_for_auth(
188 header_params,
189 query_params,
190 auth_settings,
191 resource_path,
192 method,
193 body,
194 request_auth=_request_auth,
195 )
197 # body
198 if body:
199 body = self.sanitize_for_serialization(body)
201 # request url
202 url = self.configuration.host + resource_path
204 query_params = list(query_params or [])
205 if getattr(self.configuration, 'default_query_params', None):
206 existing_keys = {k for k, _ in query_params}
207 for k, v in self.configuration.default_query_params:
208 if k not in existing_keys:
209 query_params.append((k, v))
211 # query parameters
212 if query_params:
213 query_params = self.sanitize_for_serialization(query_params)
214 url_query = self.parameters_to_url_query(query_params, collection_formats)
215 url += '?' + url_query
217 return method, url, header_params, body, post_params_result
219 def call_api(
220 self,
221 method: str,
222 url: str,
223 header_params: Optional[dict[str, Any]] = None,
224 body: Any = None,
225 post_params: Optional[list[Any]] = None,
226 _request_timeout: Optional[Union[int, float, tuple[float, float]]] = None,
227 ) -> rest.RESTResponse:
228 """Makes the HTTP request (synchronous)
229 :param method: Method to call.
230 :param url: Path to method endpoint.
231 :param header_params: Header parameters to be
232 placed in the request header.
233 :param body: Request body.
234 :param post_params: Request post form parameters,
235 for `application/x-www-form-urlencoded`, `multipart/form-data`.
236 :param _request_timeout: timeout setting for this request.
237 :return: RESTResponse
238 """
240 try:
241 # perform request and return response
242 response_data = self.rest_client.request(
243 method,
244 url,
245 headers=header_params,
246 body=body,
247 post_params=post_params,
248 _request_timeout=_request_timeout,
249 )
251 except ApiException as e:
252 raise e
254 return response_data
256 def response_deserialize(
257 self,
258 response_data: rest.RESTResponse,
259 response_types_map: Optional[dict[str, Any]] = None,
260 ) -> ApiResponse[ApiResponseT]:
261 """Deserializes response into an object.
262 :param response_data: RESTResponse object to be deserialized.
263 :param response_types_map: dict of response types.
264 :return: ApiResponse
265 """
267 msg = 'RESTResponse.read() must be called before passing it to response_deserialize()'
268 assert response_data.data is not None, msg
270 if response_types_map is None:
271 response_types_map = {}
273 response_type = response_types_map.get(str(response_data.status))
274 if (
275 not response_type
276 and isinstance(response_data.status, int)
277 and 100 <= response_data.status <= 599
278 ):
279 # if not found, look for '1XX', '2XX', etc.
280 response_type = response_types_map.get(str(response_data.status)[0] + 'XX')
282 # deserialize response data
283 response_text: Optional[str] = None
284 return_data: Any = None
285 try:
286 if response_type == 'bytearray':
287 return_data = response_data.data
288 elif response_type == 'file':
289 return_data = self.__deserialize_file(response_data)
290 elif response_type is not None:
291 match = None
292 content_type = response_data.getheader('content-type')
293 if content_type is not None:
294 match = re.search(r'charset=([a-zA-Z\-\d]+)[\s;]?', content_type)
295 encoding = match.group(1) if match else 'utf-8'
296 response_text = response_data.data.decode(encoding)
297 return_data = self.deserialize(response_text, response_type, content_type)
298 finally:
299 if not 200 <= response_data.status <= 299:
300 raise ApiException.from_response(
301 http_resp=response_data,
302 body=response_text,
303 data=return_data,
304 )
306 return ApiResponse(
307 status_code=response_data.status,
308 data=return_data,
309 headers=response_data.getheaders(),
310 raw_data=response_data.data,
311 )
313 def sanitize_for_serialization(self, obj: Any) -> Any:
314 """Builds a JSON POST object.
316 If obj is None, return None.
317 If obj is SecretStr, return obj.get_secret_value()
318 If obj is str, int, long, float, bool, return directly.
319 If obj is datetime.datetime, datetime.date
320 convert to string in iso8601 format.
321 If obj is decimal.Decimal return string representation.
322 If obj is list, sanitize each element in the list.
323 If obj is dict, return the dict.
324 If obj is OpenAPI model, return the properties dict.
326 :param obj: The data to serialize.
327 :return: The serialized form of data.
328 """
329 if obj is None:
330 return None
331 elif isinstance(obj, Enum):
332 return obj.value
333 elif isinstance(obj, SecretStr):
334 return obj.get_secret_value()
335 elif isinstance(obj, self.PRIMITIVE_TYPES):
336 return obj
337 elif isinstance(obj, uuid.UUID):
338 return str(obj)
339 elif isinstance(obj, list):
340 return [self.sanitize_for_serialization(sub_obj) for sub_obj in obj]
341 elif isinstance(obj, tuple):
342 return tuple(self.sanitize_for_serialization(sub_obj) for sub_obj in obj)
343 elif isinstance(obj, (datetime.datetime, datetime.date)):
344 return obj.isoformat()
345 elif isinstance(obj, decimal.Decimal):
346 return str(obj)
348 elif isinstance(obj, dict):
349 obj_dict = obj
350 else:
351 # Convert model obj to dict except
352 # attributes `openapi_types`, `attribute_map`
353 # and attributes which value is not None.
354 # Convert attribute name to json key in
355 # model definition for request.
356 if hasattr(obj, 'to_dict') and callable(obj.to_dict):
357 obj_dict = obj.to_dict()
358 else:
359 obj_dict = obj.__dict__
361 if isinstance(obj_dict, list):
362 # here we handle instances that can either be a list or something else,
363 # and only became a real list by calling to_dict()
364 return self.sanitize_for_serialization(obj_dict)
366 return {key: self.sanitize_for_serialization(val) for key, val in obj_dict.items()}
368 def deserialize(
369 self, response_text: str, response_type: str, content_type: Optional[str]
370 ) -> Any:
371 """Deserializes response into an object.
373 :param response: RESTResponse object to be deserialized.
374 :param response_type: class literal for
375 deserialized object, or string of class name.
376 :param content_type: content type of response.
378 :return: deserialized object.
379 """
381 # fetch data from response object
382 if content_type is None:
383 try:
384 data = json.loads(response_text)
385 except ValueError:
386 data = response_text
387 elif re.match(
388 r'^application/(json|[\w!#$&.+\-^_]+\+json)\s*(;|$)', content_type, re.IGNORECASE
389 ):
390 if response_text == '':
391 data = ''
392 else:
393 data = json.loads(response_text)
394 elif re.match(r'^text\/[a-z.+-]+\s*(;|$)', content_type, re.IGNORECASE):
395 data = response_text
396 else:
397 raise ApiException(status=0, reason=f'Unsupported content type: {content_type}')
399 return self.__deserialize(data, response_type)
401 def __deserialize(self, data: Any, klass: Any) -> Any:
402 """Deserializes dict, list, str into an object.
404 :param data: dict, list or str.
405 :param klass: class literal, or string of class name.
407 :return: object.
408 """
409 if data is None:
410 return None
412 if isinstance(klass, str):
413 if klass.startswith('List['):
414 m = re.match(r'List\[(.*)]', klass)
415 assert m is not None, 'Malformed List type definition'
416 sub_kls = m.group(1)
417 return [self.__deserialize(sub_data, sub_kls) for sub_data in data]
419 if klass.startswith('Dict['):
420 m = re.match(r'Dict\[([^,]*), (.*)]', klass)
421 assert m is not None, 'Malformed Dict type definition'
422 sub_kls = m.group(2)
423 return {k: self.__deserialize(v, sub_kls) for k, v in data.items()}
425 # convert str to class
426 if klass in self.NATIVE_TYPES_MAPPING:
427 klass = self.NATIVE_TYPES_MAPPING[klass]
428 else:
429 klass = getattr(fingerprint_server_sdk.models, klass)
431 if klass in self.PRIMITIVE_TYPES:
432 return self.__deserialize_primitive(data, klass)
433 elif klass is object:
434 return self.__deserialize_object(data)
435 elif klass is datetime.date:
436 return self.__deserialize_date(data)
437 elif klass is datetime.datetime:
438 return self.__deserialize_datetime(data)
439 elif klass is decimal.Decimal:
440 return decimal.Decimal(data)
441 elif issubclass(klass, Enum):
442 return self.__deserialize_enum(data, klass)
443 else:
444 return self.__deserialize_model(data, klass)
446 def parameters_to_tuples(
447 self,
448 params: Union[dict[str, Any], list[tuple[str, Any]]],
449 collection_formats: Optional[dict[str, str]],
450 ) -> list[tuple[str, str]]:
451 """Get parameters as list of tuples, formatting collections.
453 :param params: Parameters as dict or list of two-tuples
454 :param dict collection_formats: Parameter collection formats
455 :return: Parameters as list of tuples, collections formatted
456 """
457 new_params: list[tuple[str, str]] = []
458 if collection_formats is None:
459 collection_formats = {}
460 for k, v in params.items() if isinstance(params, dict) else params:
461 if k in collection_formats:
462 collection_format = collection_formats[k]
463 if collection_format == 'multi':
464 new_params.extend((k, value) for value in v)
465 else:
466 if collection_format == 'ssv':
467 delimiter = ' '
468 elif collection_format == 'tsv':
469 delimiter = '\t'
470 elif collection_format == 'pipes':
471 delimiter = '|'
472 else: # csv is the default
473 delimiter = ','
474 new_params.append((k, delimiter.join(str(value) for value in v)))
475 else:
476 new_params.append((k, v))
477 return new_params
479 def parameters_to_url_query(
480 self,
481 params: Union[dict[str, Any], list[tuple[str, Any]]],
482 collection_formats: Optional[dict[str, str]],
483 ) -> str:
484 """Get parameters as list of tuples, formatting collections.
486 :param params: Parameters as dict or list of two-tuples
487 :param dict collection_formats: Parameter collection formats
488 :return: URL query string (e.g. a=Hello%20World&b=123)
489 """
490 new_params: list[tuple[str, str]] = []
491 if collection_formats is None:
492 collection_formats = {}
493 for k, v in params.items() if isinstance(params, dict) else params:
494 if isinstance(v, bool):
495 v = str(v).lower()
496 if isinstance(v, (int, float)):
497 v = str(v)
498 if isinstance(v, dict):
499 v = json.dumps(v)
501 if k in collection_formats:
502 collection_format = collection_formats[k]
503 if collection_format == 'multi':
504 new_params.extend((k, quote(str(value))) for value in v)
505 else:
506 if collection_format == 'ssv':
507 delimiter = ' '
508 elif collection_format == 'tsv':
509 delimiter = '\t'
510 elif collection_format == 'pipes':
511 delimiter = '|'
512 else: # csv is the default
513 delimiter = ','
514 new_params.append((k, delimiter.join(quote(str(value)) for value in v)))
515 else:
516 new_params.append((k, quote(str(v))))
518 return '&'.join(['='.join(map(str, item)) for item in new_params])
520 def files_parameters(
521 self,
522 files: FilesType,
523 ) -> list[tuple[Any, Any]]:
524 """Builds form parameters.
526 :param files: File parameters.
527 :return: Form parameters with files.
528 """
529 params = []
530 for k, v in files.items():
531 if isinstance(v, str):
532 with open(v, 'rb') as f:
533 filename = os.path.basename(f.name)
534 filedata = f.read()
535 elif isinstance(v, bytes):
536 filename = k
537 filedata = v
538 elif isinstance(v, tuple):
539 filename, filedata = v
540 elif isinstance(v, list):
541 for file_param in v:
542 params.extend(self.files_parameters({k: file_param}))
543 continue
544 else:
545 raise ValueError('Unsupported file value')
546 mimetype = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
547 params.append((k, (filename, filedata, mimetype)))
548 return params
550 def select_header_accept(self, accepts: list[str]) -> Optional[str]:
551 """Returns `Accept` based on an array of accepts provided.
553 :param accepts: List of headers.
554 :return: Accept (e.g. application/json).
555 """
556 if not accepts:
557 return None
559 for accept in accepts:
560 if re.search('json', accept, re.IGNORECASE):
561 return accept
563 return accepts[0]
565 def select_header_content_type(self, content_types: list[str]) -> Optional[str]:
566 """Returns `Content-Type` based on an array of content_types provided.
568 :param content_types: List of content-types.
569 :return: Content-Type (e.g. application/json).
570 """
571 if not content_types:
572 return None
574 for content_type in content_types:
575 if re.search('json', content_type, re.IGNORECASE):
576 return content_type
578 return content_types[0]
580 def update_params_for_auth(
581 self,
582 headers: dict[str, str],
583 queries: Optional[list[tuple[str, Any]]],
584 auth_settings: Optional[list[str]],
585 resource_path: str,
586 method: str,
587 body: Any,
588 request_auth: Optional[dict[str, Any]] = None,
589 ) -> None:
590 """Updates header and query params based on authentication setting.
592 :param headers: Header parameters dict to be updated.
593 :param queries: Query parameters tuple list to be updated.
594 :param auth_settings: Authentication setting identifiers list.
595 :resource_path: A string representation of the HTTP request resource path.
596 :method: A string representation of the HTTP request method.
597 :body: A object representing the body of the HTTP request.
598 The object type is the return value of sanitize_for_serialization().
599 :param request_auth: if set, the provided settings will
600 override the token in the configuration.
601 """
602 if not auth_settings:
603 return
605 if request_auth:
606 self._apply_auth_params(headers, queries, resource_path, method, body, request_auth)
607 else:
608 for auth in auth_settings:
609 auth_setting = self.configuration.auth_settings().get(auth)
610 if auth_setting:
611 self._apply_auth_params(
612 headers,
613 queries,
614 resource_path,
615 method,
616 body,
617 auth_setting, # type: ignore[arg-type]
618 )
620 def _apply_auth_params(
621 self,
622 headers: dict[str, str],
623 queries: Optional[list[tuple[str, Any]]],
624 resource_path: str,
625 method: str,
626 body: Any,
627 auth_setting: dict[str, Any],
628 ) -> None:
629 """Updates the request parameters based on a single auth_setting
631 :param headers: Header parameters dict to be updated.
632 :param queries: Query parameters tuple list to be updated.
633 :resource_path: A string representation of the HTTP request resource path.
634 :method: A string representation of the HTTP request method.
635 :body: A object representing the body of the HTTP request.
636 The object type is the return value of sanitize_for_serialization().
637 :param auth_setting: auth settings for the endpoint
638 """
639 if auth_setting['in'] == 'cookie':
640 headers['Cookie'] = auth_setting['value']
641 elif auth_setting['in'] == 'header':
642 if auth_setting['type'] != 'http-signature':
643 headers[auth_setting['key']] = auth_setting['value']
644 elif auth_setting['in'] == 'query':
645 if queries is not None:
646 queries.append((auth_setting['key'], auth_setting['value']))
647 else:
648 raise ApiValueError('Authentication token must be in `query` or `header`')
650 def __deserialize_file(self, response: rest.RESTResponse) -> Any:
651 """Deserializes body to file
653 Saves response body into a file in a temporary folder,
654 using the filename from the `Content-Disposition` header if provided.
656 handle file downloading
657 save response body into a tmp file and return the instance
659 :param response: RESTResponse.
660 :return: file path.
661 """
662 fd, path = tempfile.mkstemp(dir=getattr(self.configuration, 'temp_folder_path', None))
663 os.close(fd)
664 os.remove(path)
666 content_disposition = response.getheader('Content-Disposition')
667 if content_disposition:
668 m = re.search(r'filename=[\'"]?([^\'"\s]+)[\'"]?', content_disposition)
669 assert m is not None, "Unexpected 'content-disposition' header value"
670 filename = m.group(1)
671 path = os.path.join(os.path.dirname(path), filename)
673 with open(path, 'wb') as f:
674 if response.data is not None:
675 f.write(response.data)
677 return path
679 def __deserialize_primitive(self, data: Any, klass: type) -> Any:
680 """Deserializes string to primitive type.
682 :param data: str.
683 :param klass: class literal.
685 :return: int, long, float, str, bool.
686 """
687 try:
688 return klass(data)
689 except UnicodeEncodeError:
690 return str(data)
691 except TypeError:
692 return data
694 def __deserialize_object(self, value: Any) -> Any:
695 """Return an original value.
697 :return: object.
698 """
699 return value
701 def __deserialize_date(self, string: str) -> datetime.date:
702 """Deserializes string to date.
704 :param string: str.
705 :return: date.
706 """
707 try:
708 return parse(string).date()
709 except ImportError:
710 return string # type: ignore[return-value]
711 except ValueError as err:
712 raise ApiException(
713 status=0, reason=f'Failed to parse `{string}` as date object'
714 ) from err
716 def __deserialize_datetime(self, string: str) -> datetime.datetime:
717 """Deserializes string to datetime.
719 The string should be in iso8601 datetime format.
721 :param string: str.
722 :return: datetime.
723 """
724 try:
725 return parse(string)
726 except ImportError:
727 return string # type: ignore[return-value]
728 except ValueError as err:
729 raise ApiException(
730 status=0, reason=(f'Failed to parse `{string}` as datetime object')
731 ) from err
733 def __deserialize_enum(self, data: Any, klass: type[Enum]) -> Enum:
734 """Deserializes primitive type to enum.
736 :param data: primitive type.
737 :param klass: class literal.
738 :return: enum value.
739 """
740 try:
741 return klass(data)
742 except ValueError as err:
743 raise ApiException(
744 status=0, reason=(f'Failed to parse `{data}` as `{klass}`')
745 ) from err
747 def __deserialize_model(self, data: Any, klass: Any) -> Any:
748 """Deserializes list or dict to model.
750 :param data: dict, list.
751 :param klass: class literal.
752 :return: model object.
753 """
755 return klass.from_dict(data)