Coverage for fingerprint_server_sdk / sealed.py: 98%

58 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-11 18:41 +0000

1import json 

2import zlib 

3 

4from cryptography.hazmat.backends import default_backend 

5from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

6 

7from fingerprint_server_sdk.models.event import Event 

8 

9SEALED_HEADER = bytes([0x9E, 0x85, 0xDC, 0xED]) 

10DecryptionAlgorithm = { 

11 'Aes256Gcm': 'aes-256-gcm', 

12} 

13 

14 

15class DecryptionKey: 

16 """Key for decryption of sealed data.""" 

17 

18 key: bytes 

19 algorithm: str 

20 

21 def __init__(self, key: bytes, algorithm: str): 

22 self.key = key 

23 self.algorithm = algorithm 

24 

25 

26class UnsealError(Exception): 

27 """Error during unsealing.""" 

28 

29 exception: Exception 

30 key: DecryptionKey 

31 

32 def __init__(self, exception: Exception, key: DecryptionKey): 

33 self.exception = exception 

34 self.key = key 

35 

36 

37class UnsealAggregateError(Exception): 

38 """Aggregated error during unsealing.""" 

39 

40 errors: list[UnsealError] 

41 

42 def __init__(self, errors: list[UnsealError]): 

43 self.errors = errors 

44 super().__init__('Unable to decrypt sealed data') 

45 

46 

47def unseal_event_response(sealed_data: bytes, decryption_keys: list[DecryptionKey]) -> Event: 

48 """Unseal event response with one of the provided keys.""" 

49 unsealed = __unseal(sealed_data, decryption_keys) 

50 return __parse_event_response(unsealed) 

51 

52 

53def __parse_event_response(unsealed: str) -> Event: 

54 """Parse event response from unsealed data.""" 

55 json_data = json.loads(unsealed) 

56 

57 if 'event_id' not in json_data: 

58 raise ValueError('Sealed data is not valid event response') 

59 

60 result = Event.from_dict(json_data) 

61 if result is None: 

62 raise ValueError('Failed to parse event response') 

63 return result 

64 

65 

66def __unseal(sealed_data: bytes, decryption_keys: list[DecryptionKey]) -> str: 

67 """Unseal data with one of the provided keys.""" 

68 if sealed_data[: len(SEALED_HEADER)].hex() != SEALED_HEADER.hex(): 

69 raise ValueError('Invalid sealed data header') 

70 

71 errors = [] 

72 for decryption_key in decryption_keys: 

73 if decryption_key.algorithm == DecryptionAlgorithm['Aes256Gcm']: 

74 try: 

75 return __unseal_aes256gcm(sealed_data, decryption_key.key) 

76 except Exception as e: 

77 errors.append(UnsealError(e, decryption_key)) 

78 continue 

79 else: 

80 raise ValueError(f'Unsupported decryption algorithm: {decryption_key.algorithm}') 

81 

82 raise UnsealAggregateError(errors) 

83 

84 

85def __unseal_aes256gcm(sealed_data: bytes, decryption_key: bytes) -> str: 

86 """Unseal data with AES-256-GCM.""" 

87 nonce_length = 12 

88 nonce = sealed_data[len(SEALED_HEADER) : len(SEALED_HEADER) + nonce_length] 

89 

90 auth_tag_length = 16 

91 auth_tag = sealed_data[-auth_tag_length:] 

92 

93 ciphertext = sealed_data[len(SEALED_HEADER) + nonce_length : -auth_tag_length] 

94 

95 decipher = Cipher( 

96 algorithms.AES(decryption_key), modes.GCM(nonce, auth_tag), backend=default_backend() 

97 ).decryptor() 

98 

99 compressed = decipher.update(ciphertext) + decipher.finalize() 

100 

101 payload = zlib.decompress(compressed, -zlib.MAX_WBITS) 

102 

103 return payload.decode('utf-8')