Coverage for fingerprint_server_sdk / sealed.py: 98%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-20 11:58 +0000

1import json 

2import zlib 

3 

4from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

5 

6from fingerprint_server_sdk.models.event import Event 

7 

8SEALED_HEADER = bytes([0x9E, 0x85, 0xDC, 0xED]) 

9DecryptionAlgorithm = { 

10 'Aes256Gcm': 'aes-256-gcm', 

11} 

12 

13 

14class DecryptionKey: 

15 """Key for decryption of sealed data.""" 

16 

17 key: bytes 

18 algorithm: str 

19 

20 def __init__(self, key: bytes, algorithm: str): 

21 self.key = key 

22 self.algorithm = algorithm 

23 

24 

25class UnsealError(Exception): 

26 """Error during unsealing.""" 

27 

28 exception: Exception 

29 key: DecryptionKey 

30 

31 def __init__(self, exception: Exception, key: DecryptionKey): 

32 self.exception = exception 

33 self.key = key 

34 

35 

36class UnsealAggregateError(Exception): 

37 """Aggregated error during unsealing.""" 

38 

39 errors: list[UnsealError] 

40 

41 def __init__(self, errors: list[UnsealError]): 

42 self.errors = errors 

43 super().__init__('Unable to decrypt sealed data') 

44 

45 

46def unseal_event_response(sealed_data: bytes, decryption_keys: list[DecryptionKey]) -> Event: 

47 """Unseal event response with one of the provided keys.""" 

48 unsealed = __unseal(sealed_data, decryption_keys) 

49 return __parse_event_response(unsealed) 

50 

51 

52def __parse_event_response(unsealed: str) -> Event: 

53 """Parse event response from unsealed data.""" 

54 json_data = json.loads(unsealed) 

55 

56 if 'event_id' not in json_data: 

57 raise ValueError('Sealed data is not valid event response') 

58 

59 result = Event.from_dict(json_data) 

60 if result is None: 

61 raise ValueError('Failed to parse event response') 

62 return result 

63 

64 

65def __unseal(sealed_data: bytes, decryption_keys: list[DecryptionKey]) -> str: 

66 """Unseal data with one of the provided keys.""" 

67 if sealed_data[: len(SEALED_HEADER)].hex() != SEALED_HEADER.hex(): 

68 raise ValueError('Invalid sealed data header') 

69 

70 errors = [] 

71 for decryption_key in decryption_keys: 

72 if decryption_key.algorithm == DecryptionAlgorithm['Aes256Gcm']: 

73 try: 

74 return __unseal_aes256gcm(sealed_data, decryption_key.key) 

75 except Exception as e: 

76 errors.append(UnsealError(e, decryption_key)) 

77 continue 

78 else: 

79 raise ValueError(f'Unsupported decryption algorithm: {decryption_key.algorithm}') 

80 

81 raise UnsealAggregateError(errors) 

82 

83 

84def __unseal_aes256gcm(sealed_data: bytes, decryption_key: bytes) -> str: 

85 """Unseal data with AES-256-GCM.""" 

86 nonce_length = 12 

87 nonce = sealed_data[len(SEALED_HEADER) : len(SEALED_HEADER) + nonce_length] 

88 

89 auth_tag_length = 16 

90 auth_tag = sealed_data[-auth_tag_length:] 

91 

92 ciphertext = sealed_data[len(SEALED_HEADER) + nonce_length : -auth_tag_length] 

93 

94 decipher = Cipher(algorithms.AES(decryption_key), modes.GCM(nonce, auth_tag)).decryptor() 

95 

96 compressed = decipher.update(ciphertext) + decipher.finalize() 

97 

98 payload = zlib.decompress(compressed, -zlib.MAX_WBITS) 

99 

100 return payload.decode('utf-8')