Coverage for fingerprint_pro_server_api_sdk/sealed.py: 100%

58 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-09 17:50 +0000

1import json 

2from typing import List 

3 

4from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

5from cryptography.hazmat.backends import default_backend 

6import zlib 

7 

8from fingerprint_pro_server_api_sdk.api_client import ApiClientDeserializer 

9from fingerprint_pro_server_api_sdk.models.events_get_response import EventsGetResponse 

10 

11SEALED_HEADER = bytes([0x9e, 0x85, 0xdc, 0xed]) 

12DecryptionAlgorithm = { 

13 'Aes256Gcm': 'aes-256-gcm', 

14} 

15 

16 

17class DecryptionKey: 

18 """Key for decryption of sealed data.""" 

19 exception: Exception 

20 algorithm: str 

21 

22 def __init__(self, key: bytes, algorithm: str): 

23 self.key = key 

24 self.algorithm = algorithm 

25 

26 

27class UnsealError(Exception): 

28 """Error during unsealing.""" 

29 exception: Exception 

30 key: DecryptionKey 

31 

32 def __init__(self, exception: Exception, key: DecryptionKey): 

33 self.exception = exception 

34 self.key = key 

35 

36 

37class UnsealAggregateError(Exception): 

38 """Aggregated error during unsealing.""" 

39 errors: List[UnsealError] 

40 

41 def __init__(self, errors: List[UnsealError]): 

42 self.errors = errors 

43 super().__init__("Unable to decrypt sealed data") 

44 

45 

46def unseal_event_response(sealed_data: bytes, decryption_keys: List[DecryptionKey]) -> EventsGetResponse: 

47 """Unseal event response with one of the provided keys.""" 

48 unsealed = __unseal(sealed_data, decryption_keys) 

49 return __parse_event_response(unsealed) 

50 

51 

52def __parse_event_response(unsealed: str) -> EventsGetResponse: 

53 """Parse event response from unsealed data.""" 

54 json_data = json.loads(unsealed) 

55 

56 if 'products' not in json_data: 

57 raise ValueError('Sealed data is not valid event response') 

58 

59 result: EventsGetResponse = ApiClientDeserializer.deserialize(json_data, 'EventsGetResponse') 

60 return result 

61 

62 

63def __unseal(sealed_data: bytes, decryption_keys: List[DecryptionKey]) -> str: 

64 """Unseal data with one of the provided keys.""" 

65 if sealed_data[:len(SEALED_HEADER)].hex() != SEALED_HEADER.hex(): 

66 raise ValueError('Invalid sealed data header') 

67 

68 errors = [] 

69 for decryption_key in decryption_keys: 

70 if decryption_key.algorithm == DecryptionAlgorithm['Aes256Gcm']: 

71 try: 

72 return __unseal_aes256gcm(sealed_data, decryption_key.key) 

73 except Exception as e: 

74 errors.append(UnsealError(e, decryption_key)) 

75 continue 

76 else: 

77 raise ValueError(f"Unsupported decryption algorithm: {decryption_key.algorithm}") 

78 

79 raise UnsealAggregateError(errors) 

80 

81 

82def __unseal_aes256gcm(sealed_data: bytes, decryption_key: bytes) -> str: 

83 """Unseal data with AES-256-GCM.""" 

84 nonce_length = 12 

85 nonce = sealed_data[len(SEALED_HEADER):len(SEALED_HEADER) + nonce_length] 

86 

87 auth_tag_length = 16 

88 auth_tag = sealed_data[-auth_tag_length:] 

89 

90 ciphertext = sealed_data[len(SEALED_HEADER) + nonce_length:-auth_tag_length] 

91 

92 decipher = Cipher( 

93 algorithms.AES(decryption_key), 

94 modes.GCM(nonce, auth_tag), 

95 backend=default_backend() 

96 ).decryptor() 

97 

98 compressed = decipher.update(ciphertext) + decipher.finalize() 

99 

100 payload = zlib.decompress(compressed, -zlib.MAX_WBITS) 

101 

102 return payload.decode('utf-8')