Coverage for fingerprint_pro_server_api_sdk/sealed.py: 100%

52 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-03-27 22:39 +0000

1import json 

2from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

3from cryptography.hazmat.backends import default_backend 

4import zlib 

5 

6from fingerprint_pro_server_api_sdk.models.event_response import EventResponse 

7 

8SEALED_HEADER = bytes([0x9e, 0x85, 0xdc, 0xed]) 

9DecryptionAlgorithm = { 

10 'Aes256Gcm': 'aes-256-gcm', 

11} 

12 

13 

14class DecryptionKey: 

15 def __init__(self, key, algorithm): 

16 self.key = key 

17 self.algorithm = algorithm 

18 

19 

20class UnsealError(Exception): 

21 exception: Exception 

22 key: DecryptionKey 

23 

24 def __init__(self, exception, key): 

25 self.exception = exception 

26 self.key = key 

27 

28 

29class UnsealAggregateError(Exception): 

30 def __init__(self, errors): 

31 self.errors = errors 

32 super().__init__("Unable to decrypt sealed data") 

33 

34 

35def parse_events_response(unsealed): 

36 json_data = json.loads(unsealed) 

37 

38 if 'products' not in json_data: 

39 raise ValueError('Sealed data is not valid events response') 

40 

41 return EventResponse(json_data['products']) 

42 

43 

44def unseal_events_response(sealed_data, decryption_keys): 

45 unsealed = unseal(sealed_data, decryption_keys) 

46 return parse_events_response(unsealed) 

47 

48 

49def unseal(sealed_data, decryption_keys): 

50 if sealed_data[:len(SEALED_HEADER)].hex() != SEALED_HEADER.hex(): 

51 raise ValueError('Invalid sealed data header') 

52 

53 errors = [] 

54 for decryption_key in decryption_keys: 

55 if decryption_key.algorithm == DecryptionAlgorithm['Aes256Gcm']: 

56 try: 

57 return unseal_aes256gcm(sealed_data, decryption_key.key) 

58 except Exception as e: 

59 errors.append(UnsealError(e, decryption_key)) 

60 continue 

61 else: 

62 raise ValueError(f"Unsupported decryption algorithm: {decryption_key.algorithm}") 

63 

64 raise UnsealAggregateError(errors) 

65 

66 

67def unseal_aes256gcm(sealed_data, decryption_key): 

68 nonce_length = 12 

69 nonce = sealed_data[len(SEALED_HEADER):len(SEALED_HEADER) + nonce_length] 

70 

71 auth_tag_length = 16 

72 auth_tag = sealed_data[-auth_tag_length:] 

73 

74 ciphertext = sealed_data[len(SEALED_HEADER) + nonce_length:-auth_tag_length] 

75 

76 decipher = Cipher( 

77 algorithms.AES(decryption_key), 

78 modes.GCM(nonce, auth_tag), 

79 backend=default_backend() 

80 ).decryptor() 

81 

82 compressed = decipher.update(ciphertext) + decipher.finalize() 

83 

84 payload = zlib.decompress(compressed, -zlib.MAX_WBITS) 

85 

86 return payload.decode('utf-8')