Coverage for fingerprint_pro_server_api_sdk/sealed.py: 100%
52 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-03-27 22:39 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-03-27 22:39 +0000
1import json
2from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
3from cryptography.hazmat.backends import default_backend
4import zlib
6from fingerprint_pro_server_api_sdk.models.event_response import EventResponse
8SEALED_HEADER = bytes([0x9e, 0x85, 0xdc, 0xed])
9DecryptionAlgorithm = {
10 'Aes256Gcm': 'aes-256-gcm',
11}
14class DecryptionKey:
15 def __init__(self, key, algorithm):
16 self.key = key
17 self.algorithm = algorithm
20class UnsealError(Exception):
21 exception: Exception
22 key: DecryptionKey
24 def __init__(self, exception, key):
25 self.exception = exception
26 self.key = key
29class UnsealAggregateError(Exception):
30 def __init__(self, errors):
31 self.errors = errors
32 super().__init__("Unable to decrypt sealed data")
35def parse_events_response(unsealed):
36 json_data = json.loads(unsealed)
38 if 'products' not in json_data:
39 raise ValueError('Sealed data is not valid events response')
41 return EventResponse(json_data['products'])
44def unseal_events_response(sealed_data, decryption_keys):
45 unsealed = unseal(sealed_data, decryption_keys)
46 return parse_events_response(unsealed)
49def unseal(sealed_data, decryption_keys):
50 if sealed_data[:len(SEALED_HEADER)].hex() != SEALED_HEADER.hex():
51 raise ValueError('Invalid sealed data header')
53 errors = []
54 for decryption_key in decryption_keys:
55 if decryption_key.algorithm == DecryptionAlgorithm['Aes256Gcm']:
56 try:
57 return unseal_aes256gcm(sealed_data, decryption_key.key)
58 except Exception as e:
59 errors.append(UnsealError(e, decryption_key))
60 continue
61 else:
62 raise ValueError(f"Unsupported decryption algorithm: {decryption_key.algorithm}")
64 raise UnsealAggregateError(errors)
67def unseal_aes256gcm(sealed_data, decryption_key):
68 nonce_length = 12
69 nonce = sealed_data[len(SEALED_HEADER):len(SEALED_HEADER) + nonce_length]
71 auth_tag_length = 16
72 auth_tag = sealed_data[-auth_tag_length:]
74 ciphertext = sealed_data[len(SEALED_HEADER) + nonce_length:-auth_tag_length]
76 decipher = Cipher(
77 algorithms.AES(decryption_key),
78 modes.GCM(nonce, auth_tag),
79 backend=default_backend()
80 ).decryptor()
82 compressed = decipher.update(ciphertext) + decipher.finalize()
84 payload = zlib.decompress(compressed, -zlib.MAX_WBITS)
86 return payload.decode('utf-8')