Coverage for fingerprint_pro_server_api_sdk/api_client.py: 65%

275 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-09 17:50 +0000

1# coding: utf-8 

2""" 

3 Fingerprint Pro Server API 

4 

5 Fingerprint Pro Server API allows you to get information about visitors and about individual events in a server environment. It can be used for data exports, decision-making, and data analysis scenarios. Server API is intended for server-side usage, it's not intended to be used from the client side, whether it's a browser or a mobile device. # noqa: E501 

6 

7 OpenAPI spec version: 3 

8 Contact: support@fingerprint.com 

9 Generated by: https://github.com/swagger-api/swagger-codegen.git 

10""" 

11import json 

12import mimetypes 

13import os 

14import re 

15import tempfile 

16 

17from urllib.parse import quote 

18from multiprocessing import Pool 

19from typing import Optional, Any, Union, Dict, List, Tuple 

20from datetime import date, datetime 

21 

22from fingerprint_pro_server_api_sdk.configuration import Configuration 

23import fingerprint_pro_server_api_sdk.models 

24from fingerprint_pro_server_api_sdk import rest 

25from fingerprint_pro_server_api_sdk.rest import ApiException, RESTResponse 

26from fingerprint_pro_server_api_sdk.base_model import BaseModel 

27 

28PRIMITIVE_TYPES = (float, bool, bytes, str, int) 

29NATIVE_TYPES_MAPPING = { 

30 'int': int, 

31 'long': int, 

32 'float': float, 

33 'str': str, 

34 'bool': bool, 

35 'date': date, 

36 'datetime': datetime, 

37 'object': object, 

38} 

39 

40 

41class ApiClient: 

42 """Generic API client for Swagger client library builds. 

43 

44 Swagger generic API client. This client handles the client- 

45 server communication, and is invariant across implementations. Specifics of 

46 the methods and models for each application are generated from the Swagger 

47 templates. 

48 

49 NOTE: This class is auto generated by the swagger code generator program. 

50 Ref: https://github.com/swagger-api/swagger-codegen 

51 Do not edit the class manually. 

52 

53 :param configuration: .Configuration object for this client 

54 :param header_name: a header to pass when making calls to the API. 

55 :param header_value: a header value to pass when making calls to 

56 the API. 

57 :param cookie: a cookie to include in the header when making calls 

58 to the API 

59 """ 

60 

61 def __init__(self, configuration: Optional[Configuration] = None, header_name: Optional[str] = None, 

62 header_value: Optional[str] = None, cookie: Optional[str] = None, pool: Optional[Pool] = None): 

63 if configuration is None: 

64 configuration = Configuration() 

65 self.configuration = configuration 

66 

67 if pool is None: 

68 try: 

69 from multiprocessing.pool import ThreadPool 

70 self.pool = ThreadPool() 

71 except (ImportError, OSError): 

72 from fingerprint_pro_server_api_sdk.dummy_pool import DummyPool 

73 self.pool = DummyPool() 

74 else: 

75 self.pool = pool 

76 self.rest_client = rest.RESTClientObject(configuration) 

77 self.default_headers = {} 

78 if header_name is not None: 

79 self.default_headers[header_name] = header_value 

80 self.cookie = cookie 

81 # Set default User-Agent. 

82 self.user_agent = 'Swagger-Codegen/8.1.0/python' 

83 

84 def __del__(self): 

85 self.pool.close() 

86 self.pool.join() 

87 

88 @property 

89 def user_agent(self) -> Optional[str]: 

90 """User agent for this API client""" 

91 return self.default_headers['User-Agent'] 

92 

93 @user_agent.setter 

94 def user_agent(self, value: str): 

95 self.default_headers['User-Agent'] = value 

96 

97 def set_default_header(self, header_name: str, header_value: str): 

98 self.default_headers[header_name] = header_value 

99 

100 def __call_api( 

101 self, resource_path: str, method: str, path_params: Optional[Dict[str, Any]] = None, 

102 query_params: Optional[List[Tuple[str, Any]]] = None, header_params: Optional[Dict[str, Any]] = None, 

103 body: Any = None, post_params: Optional[List[Tuple[str, Any]]] = None, 

104 files: Optional[Dict[str, Any]] = None, response_type: Optional[str] = None, 

105 auth_settings: Optional[List[str]] = None, _return_http_data_only: Optional[bool] = None, 

106 collection_formats: Optional[Dict[str, Any]] = None, _preload_content: bool = True, 

107 _request_timeout: Optional[int] = None): 

108 

109 config = self.configuration 

110 

111 # header parameters 

112 header_params = header_params or {} 

113 header_params.update(self.default_headers) 

114 if self.cookie: 

115 header_params['Cookie'] = self.cookie 

116 if header_params: 

117 header_params = self.sanitize_for_serialization(header_params) 

118 header_params = dict(self.parameters_to_tuples(header_params, 

119 collection_formats)) 

120 

121 # path parameters 

122 if path_params: 

123 path_params = self.sanitize_for_serialization(path_params) 

124 path_params = self.parameters_to_tuples(path_params, 

125 collection_formats) 

126 for k, v in path_params: 

127 # specified safe chars, encode everything 

128 resource_path = resource_path.replace( 

129 '{%s}' % k, 

130 quote(str(v), safe=config.safe_chars_for_path_param) 

131 ) 

132 

133 # query parameters 

134 if query_params: 

135 query_params = self.sanitize_for_serialization(query_params) 

136 query_params = self.parameters_to_tuples(query_params, 

137 collection_formats) 

138 

139 # post parameters 

140 if post_params or files: 

141 post_params = self.prepare_post_parameters(post_params, files) 

142 post_params = self.sanitize_for_serialization(post_params) 

143 post_params = self.parameters_to_tuples(post_params, 

144 collection_formats) 

145 

146 # auth setting 

147 self.update_params_for_auth(header_params, query_params, auth_settings) 

148 

149 # body 

150 if body: 

151 body = self.sanitize_for_serialization(body) 

152 

153 # request url 

154 url = self.configuration.host + resource_path 

155 

156 # perform request and return response 

157 response_data = self.request( 

158 method, url, query_params=query_params, headers=header_params, 

159 post_params=post_params, body=body, 

160 _preload_content=_preload_content, 

161 _request_timeout=_request_timeout) 

162 

163 self.last_response = response_data 

164 

165 return_data = response_data 

166 try: 

167 if _preload_content: 

168 # deserialize response data 

169 if response_type: 

170 return_data = self.deserialize(response_data, response_type) 

171 else: 

172 return_data = None 

173 except ValueError as e: 

174 error = ApiException(http_resp=response_data) 

175 error.reason = e 

176 raise error 

177 

178 if _return_http_data_only: 

179 return (return_data) 

180 else: 

181 return (return_data, response_data.status, 

182 response_data.getheaders()) 

183 

184 def sanitize_for_serialization(self, obj: Union[Dict[str, Any], List[Tuple[str, Any]], BaseModel]): 

185 """Builds a JSON POST object. 

186 

187 If obj is None, return None. 

188 If obj is str, int, long, float, bool, return directly. 

189 If obj is datetime.datetime, datetime.date 

190 convert to string in iso8601 format. 

191 If obj is list, sanitize each element in the list. 

192 If obj is dict, return the dict. 

193 If obj is swagger model, return the properties dict. 

194 

195 :param obj: The data to serialize. 

196 :return: The serialized form of data. 

197 """ 

198 if obj is None: 

199 return None 

200 elif isinstance(obj, PRIMITIVE_TYPES): 

201 return obj 

202 elif isinstance(obj, list): 

203 return [self.sanitize_for_serialization(sub_obj) 

204 for sub_obj in obj] 

205 elif isinstance(obj, tuple): 

206 return tuple(self.sanitize_for_serialization(sub_obj) 

207 for sub_obj in obj) 

208 elif isinstance(obj, (datetime, date)): 

209 return obj.isoformat() 

210 

211 if isinstance(obj, dict): 

212 obj_dict = obj 

213 else: 

214 # Convert model obj to dict except 

215 # attributes `swagger_types`, `attribute_map` 

216 # and attributes which value is not None. 

217 # Convert attribute name to json key in 

218 # model definition for request. 

219 obj_dict = {obj.attribute_map[attr]: getattr(obj, attr) 

220 for attr, _ in obj.swagger_types.items() 

221 if getattr(obj, attr) is not None} 

222 

223 return {key: self.sanitize_for_serialization(val) 

224 for key, val in obj_dict.items()} 

225 

226 def deserialize(self, response: Union[RESTResponse, ApiException], response_type: Any, is_error=False): 

227 """Deserializes response into an object. 

228 

229 :param response: RESTResponse object to be deserialized. 

230 :param response_type: class literal for 

231 deserialized object, or string of class name. 

232 :param is_error: boolean means deserialization should 

233 be made for `response.body` field 

234 

235 :return: deserialized object. 

236 """ 

237 # handle file downloading 

238 # save response body into a tmp file and return the instance 

239 if response_type == "file": 

240 return self.__deserialize_file(response) 

241 

242 if is_error: 

243 try: 

244 data = json.loads(response.body) 

245 except ValueError: 

246 data = response.body 

247 # fetch data from response object 

248 else: 

249 try: 

250 data = json.loads(response.data) 

251 except ValueError: 

252 data = response.data 

253 

254 return ApiClientDeserializer.deserialize(data, response_type) 

255 

256 def call_api(self, resource_path: str, method: str, path_params: Optional[Dict[str, Any]] = None, 

257 query_params: Optional[List[Tuple[str, Any]]] = None, header_params: Optional[Dict[str, Any]] = None, 

258 body: Any = None, post_params: Optional[List[Tuple[str, Any]]] = None, 

259 files: Optional[Dict[str, Any]] = None, response_type: Optional[str] = None, 

260 auth_settings: Optional[List[str]] = None, async_req: Optional[bool] = None, 

261 _return_http_data_only: Optional[bool] = None, collection_formats: Optional[Dict[str, Any]] = None, 

262 _preload_content: bool = True, _request_timeout: Optional[int] = None): 

263 """Makes the HTTP request (synchronous) and returns deserialized data. 

264 

265 To make an async request, set the async_req parameter. 

266 

267 :param resource_path: Path to method endpoint. 

268 :param method: Method to call. 

269 :param path_params: Path parameters in the url. 

270 :param query_params: Query parameters in the url. 

271 :param header_params: Header parameters to be 

272 placed in the request header. 

273 :param body: Request body. 

274 :param post_params: Request post form parameters, 

275 for `application/x-www-form-urlencoded`, `multipart/form-data`. 

276 :param auth_settings: Auth Settings names for the request. 

277 :param response_type: Response data type. 

278 :param files: key -> filename, value -> filepath, 

279 for `multipart/form-data`. 

280 :param async_req: execute request asynchronously 

281 :param _return_http_data_only: response data without head status code 

282 and headers 

283 :param collection_formats: dict of collection formats for path, query, 

284 header, and post parameters. 

285 :param _preload_content: if False, the urllib3.HTTPResponse object will 

286 be returned without reading/decoding response 

287 data. Default is True. 

288 :param _request_timeout: timeout setting for this request. If one 

289 number provided, it will be total request 

290 timeout. It can also be a pair (tuple) of 

291 (connection, read) timeouts. 

292 :return: 

293 If async_req parameter is True, 

294 the request will be called asynchronously. 

295 The method will return the request thread. 

296 If parameter async_req is False or missing, 

297 then the method will return the response directly. 

298 """ 

299 if not async_req: 

300 return self.__call_api(resource_path, method, 

301 path_params, query_params, header_params, 

302 body, post_params, files, 

303 response_type, auth_settings, 

304 _return_http_data_only, collection_formats, 

305 _preload_content, _request_timeout) 

306 else: 

307 thread = self.pool.apply_async(self.__call_api, (resource_path, 

308 method, path_params, query_params, 

309 header_params, body, 

310 post_params, files, 

311 response_type, auth_settings, 

312 _return_http_data_only, 

313 collection_formats, 

314 _preload_content, _request_timeout)) 

315 return thread 

316 

317 def request(self, method, url, query_params=None, headers=None, 

318 post_params=None, body=None, _preload_content=True, 

319 _request_timeout=None): 

320 """Makes the HTTP request using RESTClient.""" 

321 if method in ["GET", "HEAD", "OPTIONS", "POST", "PUT", "PATCH", "DELETE"]: 

322 return self.rest_client.request(method, url, 

323 query_params=query_params, 

324 headers=headers, 

325 post_params=post_params, 

326 _preload_content=_preload_content, 

327 _request_timeout=_request_timeout, 

328 body=body) 

329 else: 

330 raise ValueError( 

331 "http method must be `GET`, `HEAD`, `OPTIONS`," 

332 " `POST`, `PATCH`, `PUT` or `DELETE`." 

333 ) 

334 

335 def parameters_to_tuples(self, params, collection_formats): 

336 """Get parameters as list of tuples, formatting collections. 

337 

338 :param params: Parameters as dict or list of two-tuples 

339 :param dict collection_formats: Parameter collection formats 

340 :return: Parameters as list of tuples, collections formatted 

341 """ 

342 new_params = [] 

343 if collection_formats is None: 

344 collection_formats = {} 

345 for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501 

346 if k in collection_formats: 

347 collection_format = collection_formats[k] 

348 if collection_format == 'multi': 

349 new_params.extend((k, value) for value in v) 

350 else: 

351 if collection_format == 'ssv': 

352 delimiter = ' ' 

353 elif collection_format == 'tsv': 

354 delimiter = '\t' 

355 elif collection_format == 'pipes': 

356 delimiter = '|' 

357 else: # csv is the default 

358 delimiter = ',' 

359 new_params.append( 

360 (k, delimiter.join(str(value) for value in v))) 

361 else: 

362 new_params.append((k, v)) 

363 return new_params 

364 

365 def prepare_post_parameters(self, post_params: Optional[List[Tuple[str, Any]]] = None, 

366 files: Optional[Dict[str, Any]] = None): 

367 """Builds form parameters. 

368 

369 :param post_params: Normal form parameters. 

370 :param files: File parameters. 

371 :return: Form parameters with files. 

372 """ 

373 params = [] 

374 

375 if post_params: 

376 params = post_params 

377 

378 if files: 

379 for k, v in files.items(): 

380 if not v: 

381 continue 

382 file_names = v if type(v) is list else [v] 

383 for n in file_names: 

384 with open(n, 'rb') as f: 

385 filename = os.path.basename(f.name) 

386 filedata = f.read() 

387 mimetype = (mimetypes.guess_type(filename)[0] or 

388 'application/octet-stream') 

389 params.append( 

390 tuple([k, tuple([filename, filedata, mimetype])])) 

391 

392 return params 

393 

394 def select_header_accept(self, accepts: List[str]) -> Optional[str]: 

395 """Returns `Accept` based on an array of accepts provided. 

396 

397 :param accepts: List of headers. 

398 :return: Accept (e.g. application/json). 

399 """ 

400 if not accepts: 

401 return 

402 

403 accepts = [x.lower() for x in accepts] 

404 

405 if 'application/json' in accepts: 

406 return 'application/json' 

407 else: 

408 return ', '.join(accepts) 

409 

410 def select_header_content_type(self, content_types: List[str]) -> str: 

411 """Returns `Content-Type` based on an array of content_types provided. 

412 

413 :param content_types: List of content-types. 

414 :return: Content-Type (e.g. application/json). 

415 """ 

416 if not content_types: 

417 return 'application/json' 

418 

419 content_types = [x.lower() for x in content_types] 

420 

421 if 'application/json' in content_types or '*/*' in content_types: 

422 return 'application/json' 

423 else: 

424 return content_types[0] 

425 

426 def update_params_for_auth(self, headers: Optional[Dict[str, Any]], queries: Optional[List[Tuple[str, Any]]], 

427 auth_settings: Optional[List[str]]): 

428 """Updates header and query params based on authentication setting. 

429 

430 :param headers: Header parameters dict to be updated. 

431 :param queries: Query parameters tuple list to be updated. 

432 :param auth_settings: Authentication setting identifiers list. 

433 """ 

434 if not auth_settings: 

435 return 

436 

437 for auth in auth_settings: 

438 auth_setting = self.configuration.auth_settings().get(auth) 

439 if auth_setting: 

440 if not auth_setting['value']: 

441 continue 

442 elif auth_setting['in'] == 'header': 

443 headers[auth_setting['key']] = auth_setting['value'] 

444 elif auth_setting['in'] == 'query': 

445 queries.append((auth_setting['key'], auth_setting['value'])) 

446 else: 

447 raise ValueError( 

448 'Authentication token must be in `query` or `header`' 

449 ) 

450 

451 def __deserialize_file(self, response): 

452 """Deserializes body to file 

453 

454 Saves response body into a file in a temporary folder, 

455 using the filename from the `Content-Disposition` header if provided. 

456 

457 :param response: RESTResponse. 

458 :return: file path. 

459 """ 

460 fd, path = tempfile.mkstemp(dir=self.configuration.temp_folder_path) 

461 os.close(fd) 

462 os.remove(path) 

463 

464 content_disposition = response.getheader("Content-Disposition") 

465 if content_disposition: 

466 filename = re.search(r'filename=[\'"]?([^\'"\s]+)[\'"]?', 

467 content_disposition).group(1) 

468 path = os.path.join(os.path.dirname(path), filename) 

469 response_data = response.data 

470 with open(path, "w") as f: 

471 if isinstance(response_data, str): 

472 # change str to bytes so we can write it 

473 response_data = response_data.encode('utf-8') 

474 f.write(response_data) 

475 else: 

476 f.write(response_data) 

477 return path 

478 

479 

480class ApiClientDeserializer: 

481 """Deserializes server response into appropriate type.""" 

482 @staticmethod 

483 def deserialize(data: Union[Dict, List, str], klass: Any): 

484 """Deserializes dict, list, str into an object. 

485 

486 :param data: dict, list or str. 

487 :param klass: class literal, or string of class name. 

488 

489 :return: object. 

490 """ 

491 if data is None: 

492 return None 

493 

494 if type(klass) == str: 

495 if klass.startswith('list['): 

496 sub_kls = re.match(r'list\[(.*)\]', klass).group(1) 

497 return [ApiClientDeserializer.deserialize(sub_data, sub_kls) 

498 for sub_data in data] 

499 

500 if klass.startswith('dict('): 

501 sub_kls = re.match(r'dict\(([^,]*), (.*)\)', klass).group(2) 

502 return {k: ApiClientDeserializer.deserialize(v, sub_kls) 

503 for k, v in data.items()} 

504 

505 # convert str to class 

506 if klass in NATIVE_TYPES_MAPPING: 

507 klass = NATIVE_TYPES_MAPPING[klass] 

508 else: 

509 klass = getattr(fingerprint_pro_server_api_sdk.models, klass) 

510 

511 if klass in PRIMITIVE_TYPES: 

512 return ApiClientDeserializer.__deserialize_primitive(data, klass) 

513 elif klass == object: 

514 return data 

515 elif klass == date: 

516 return ApiClientDeserializer.__deserialize_date(data) 

517 elif klass == datetime: 

518 return ApiClientDeserializer.__deserialize_datatime(data) 

519 else: 

520 return ApiClientDeserializer.__deserialize_model(data, klass) 

521 

522 @staticmethod 

523 def __deserialize_primitive(data, klass): 

524 """Deserializes string to primitive type. 

525 

526 :param data: str. 

527 :param klass: class literal. 

528 

529 :return: int, long, float, str, bool. 

530 """ 

531 try: 

532 return klass(data) 

533 except UnicodeEncodeError: 

534 return str(data) 

535 except TypeError: 

536 return data 

537 

538 @staticmethod 

539 def __deserialize_date(string: str) -> date: 

540 """Deserializes string to date. 

541 

542 :param string: str. 

543 """ 

544 try: 

545 from dateutil.parser import parse 

546 return parse(string).date() 

547 except ImportError: 

548 return string 

549 except ValueError: 

550 raise rest.ApiException( 

551 status=0, 

552 reason="Failed to parse `{0}` as date object".format(string) 

553 ) 

554 

555 @staticmethod 

556 def __deserialize_datatime(string: str) -> datetime: 

557 """Deserializes string to datetime. 

558 

559 The string should be in iso8601 datetime format. 

560 

561 :param string: str. 

562 """ 

563 try: 

564 from dateutil.parser import parse 

565 return parse(string) 

566 except ImportError: 

567 return string 

568 except ValueError: 

569 raise rest.ApiException( 

570 status=0, 

571 reason=( 

572 "Failed to parse `{0}` as datetime object" 

573 .format(string) 

574 ) 

575 ) 

576 

577 @staticmethod 

578 def __hasattr(object, name): 

579 return name in object.__class__.__dict__ 

580 

581 @staticmethod 

582 def __deserialize_model(data, klass): 

583 """Deserializes list or dict to model. 

584 

585 :param data: dict, list. 

586 :param klass: class literal. 

587 :return: model object. 

588 """ 

589 

590 if not klass.swagger_types and not ApiClientDeserializer.__hasattr(klass, 'get_real_child_model'): 

591 if hasattr(klass, '__parent_class__') and klass.__parent_class__ == 'dict': 

592 return klass(**data) 

593 return data 

594 

595 kwargs = {} 

596 if klass.swagger_types is not None: 

597 for attr, attr_type in klass.swagger_types.items(): 

598 if (data is not None and 

599 klass.attribute_map[attr] in data and 

600 isinstance(data, (list, dict))): 

601 value = data[klass.attribute_map[attr]] 

602 kwargs[attr] = ApiClientDeserializer.deserialize(value, attr_type) 

603 

604 instance = klass(**kwargs) 

605 

606 if (isinstance(instance, dict) and 

607 klass.swagger_types is not None and 

608 isinstance(data, dict)): 

609 for key, value in data.items(): 

610 if key not in klass.swagger_types: 

611 instance[key] = value 

612 if ApiClientDeserializer.__hasattr(instance, 'get_real_child_model'): 

613 klass_name = instance.get_real_child_model(data) 

614 if klass_name: 

615 instance = ApiClientDeserializer.deserialize(data, klass_name) 

616 return instance