Coverage for fingerprint_pro_server_api_sdk/sealed.py: 100%
58 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-09 17:50 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-09 17:50 +0000
1import json
2from typing import List
4from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
5from cryptography.hazmat.backends import default_backend
6import zlib
8from fingerprint_pro_server_api_sdk.api_client import ApiClientDeserializer
9from fingerprint_pro_server_api_sdk.models.events_get_response import EventsGetResponse
11SEALED_HEADER = bytes([0x9e, 0x85, 0xdc, 0xed])
12DecryptionAlgorithm = {
13 'Aes256Gcm': 'aes-256-gcm',
14}
17class DecryptionKey:
18 """Key for decryption of sealed data."""
19 exception: Exception
20 algorithm: str
22 def __init__(self, key: bytes, algorithm: str):
23 self.key = key
24 self.algorithm = algorithm
27class UnsealError(Exception):
28 """Error during unsealing."""
29 exception: Exception
30 key: DecryptionKey
32 def __init__(self, exception: Exception, key: DecryptionKey):
33 self.exception = exception
34 self.key = key
37class UnsealAggregateError(Exception):
38 """Aggregated error during unsealing."""
39 errors: List[UnsealError]
41 def __init__(self, errors: List[UnsealError]):
42 self.errors = errors
43 super().__init__("Unable to decrypt sealed data")
46def unseal_event_response(sealed_data: bytes, decryption_keys: List[DecryptionKey]) -> EventsGetResponse:
47 """Unseal event response with one of the provided keys."""
48 unsealed = __unseal(sealed_data, decryption_keys)
49 return __parse_event_response(unsealed)
52def __parse_event_response(unsealed: str) -> EventsGetResponse:
53 """Parse event response from unsealed data."""
54 json_data = json.loads(unsealed)
56 if 'products' not in json_data:
57 raise ValueError('Sealed data is not valid event response')
59 result: EventsGetResponse = ApiClientDeserializer.deserialize(json_data, 'EventsGetResponse')
60 return result
63def __unseal(sealed_data: bytes, decryption_keys: List[DecryptionKey]) -> str:
64 """Unseal data with one of the provided keys."""
65 if sealed_data[:len(SEALED_HEADER)].hex() != SEALED_HEADER.hex():
66 raise ValueError('Invalid sealed data header')
68 errors = []
69 for decryption_key in decryption_keys:
70 if decryption_key.algorithm == DecryptionAlgorithm['Aes256Gcm']:
71 try:
72 return __unseal_aes256gcm(sealed_data, decryption_key.key)
73 except Exception as e:
74 errors.append(UnsealError(e, decryption_key))
75 continue
76 else:
77 raise ValueError(f"Unsupported decryption algorithm: {decryption_key.algorithm}")
79 raise UnsealAggregateError(errors)
82def __unseal_aes256gcm(sealed_data: bytes, decryption_key: bytes) -> str:
83 """Unseal data with AES-256-GCM."""
84 nonce_length = 12
85 nonce = sealed_data[len(SEALED_HEADER):len(SEALED_HEADER) + nonce_length]
87 auth_tag_length = 16
88 auth_tag = sealed_data[-auth_tag_length:]
90 ciphertext = sealed_data[len(SEALED_HEADER) + nonce_length:-auth_tag_length]
92 decipher = Cipher(
93 algorithms.AES(decryption_key),
94 modes.GCM(nonce, auth_tag),
95 backend=default_backend()
96 ).decryptor()
98 compressed = decipher.update(ciphertext) + decipher.finalize()
100 payload = zlib.decompress(compressed, -zlib.MAX_WBITS)
102 return payload.decode('utf-8')