Coverage for fingerprint_server_sdk / sealed.py: 98%
58 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 18:41 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 18:41 +0000
1import json
2import zlib
4from cryptography.hazmat.backends import default_backend
5from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
7from fingerprint_server_sdk.models.event import Event
9SEALED_HEADER = bytes([0x9E, 0x85, 0xDC, 0xED])
10DecryptionAlgorithm = {
11 'Aes256Gcm': 'aes-256-gcm',
12}
15class DecryptionKey:
16 """Key for decryption of sealed data."""
18 key: bytes
19 algorithm: str
21 def __init__(self, key: bytes, algorithm: str):
22 self.key = key
23 self.algorithm = algorithm
26class UnsealError(Exception):
27 """Error during unsealing."""
29 exception: Exception
30 key: DecryptionKey
32 def __init__(self, exception: Exception, key: DecryptionKey):
33 self.exception = exception
34 self.key = key
37class UnsealAggregateError(Exception):
38 """Aggregated error during unsealing."""
40 errors: list[UnsealError]
42 def __init__(self, errors: list[UnsealError]):
43 self.errors = errors
44 super().__init__('Unable to decrypt sealed data')
47def unseal_event_response(sealed_data: bytes, decryption_keys: list[DecryptionKey]) -> Event:
48 """Unseal event response with one of the provided keys."""
49 unsealed = __unseal(sealed_data, decryption_keys)
50 return __parse_event_response(unsealed)
53def __parse_event_response(unsealed: str) -> Event:
54 """Parse event response from unsealed data."""
55 json_data = json.loads(unsealed)
57 if 'event_id' not in json_data:
58 raise ValueError('Sealed data is not valid event response')
60 result = Event.from_dict(json_data)
61 if result is None:
62 raise ValueError('Failed to parse event response')
63 return result
66def __unseal(sealed_data: bytes, decryption_keys: list[DecryptionKey]) -> str:
67 """Unseal data with one of the provided keys."""
68 if sealed_data[: len(SEALED_HEADER)].hex() != SEALED_HEADER.hex():
69 raise ValueError('Invalid sealed data header')
71 errors = []
72 for decryption_key in decryption_keys:
73 if decryption_key.algorithm == DecryptionAlgorithm['Aes256Gcm']:
74 try:
75 return __unseal_aes256gcm(sealed_data, decryption_key.key)
76 except Exception as e:
77 errors.append(UnsealError(e, decryption_key))
78 continue
79 else:
80 raise ValueError(f'Unsupported decryption algorithm: {decryption_key.algorithm}')
82 raise UnsealAggregateError(errors)
85def __unseal_aes256gcm(sealed_data: bytes, decryption_key: bytes) -> str:
86 """Unseal data with AES-256-GCM."""
87 nonce_length = 12
88 nonce = sealed_data[len(SEALED_HEADER) : len(SEALED_HEADER) + nonce_length]
90 auth_tag_length = 16
91 auth_tag = sealed_data[-auth_tag_length:]
93 ciphertext = sealed_data[len(SEALED_HEADER) + nonce_length : -auth_tag_length]
95 decipher = Cipher(
96 algorithms.AES(decryption_key), modes.GCM(nonce, auth_tag), backend=default_backend()
97 ).decryptor()
99 compressed = decipher.update(ciphertext) + decipher.finalize()
101 payload = zlib.decompress(compressed, -zlib.MAX_WBITS)
103 return payload.decode('utf-8')