py_aes_encryption.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #
  4. # Copyright 2019 The FATE Authors. All Rights Reserved.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. #
  18. import os
  19. import numpy as np
  20. from federatedml.secureprotol.symmetric_encryption.py_aes_core import AESModeOfOperationOFB
  21. from federatedml.secureprotol.symmetric_encryption.symmetric_encryption import SymmetricKey
  22. from federatedml.util import conversion
  23. class AESKey(SymmetricKey):
  24. """
  25. Note that a key cannot used for both encryption and decryption scenarios
  26. """
  27. def __init__(self, key, nonce=None):
  28. """
  29. :param key: bytes, must be 16, 24 or 32 bytes long
  30. :param nonce: bytes, must be 16 bytes long
  31. """
  32. super(AESKey, self).__init__()
  33. if nonce is None:
  34. self.nonce = os.urandom(16)
  35. self.key = key
  36. self.cipher_core = AESModeOfOperationOFB(key=self.key, iv=self.nonce)
  37. else:
  38. self.nonce = nonce
  39. self.key = key
  40. self.cipher_core = AESModeOfOperationOFB(key=self.key, iv=self.nonce)
  41. def _renew(self):
  42. """
  43. Self renew cipher_core after encryption and decryption
  44. :return:
  45. """
  46. self.cipher_core = AESModeOfOperationOFB(key=self.key, iv=self.nonce)
  47. class AESEncryptKey(AESKey):
  48. """
  49. AES encryption scheme
  50. Note that the ciphertext size is affected only by that of the plaintext, instead of the key length
  51. """
  52. def __init__(self, key):
  53. super(AESEncryptKey, self).__init__(key=key)
  54. def encrypt(self, plaintext):
  55. if isinstance(plaintext, list):
  56. return [self.encrypt_single_val(p) for p in plaintext]
  57. else:
  58. return self.encrypt_single_val(plaintext)
  59. def encrypt_single_val(self, plaintext):
  60. if not isinstance(plaintext, bytes):
  61. plaintext = self._all_to_bytes(plaintext)
  62. elif isinstance(plaintext, bytes):
  63. pass
  64. else:
  65. raise TypeError("AES encryptor supports bytes/int/float/str")
  66. ciphertext = self.cipher_core.encrypt(plaintext)
  67. self._renew()
  68. return ciphertext
  69. @staticmethod
  70. def _all_to_bytes(message):
  71. """
  72. Convert an int/float/str to bytes, e.g., 1.65 -> b'1.65', 'hello -> b'hello'
  73. :param message: int/float/str
  74. :return: -1 if type error, otherwise str
  75. """
  76. if isinstance(message, int) or isinstance(message, float):
  77. return conversion.str_to_bytes(str(message))
  78. elif isinstance(message, str):
  79. return conversion.str_to_bytes(message)
  80. else:
  81. return -1
  82. def get_nonce(self):
  83. return self.nonce
  84. class AESDecryptKey(AESKey):
  85. """
  86. AES decryption scheme
  87. """
  88. def __init__(self, key, nonce):
  89. super(AESDecryptKey, self).__init__(key=key, nonce=nonce)
  90. def decrypt(self, ciphertext):
  91. if isinstance(ciphertext, list):
  92. return np.array([self.decrypt_single_val(p) for p in ciphertext])
  93. else:
  94. return self.decrypt_single_val(ciphertext)
  95. def decrypt_single_val(self, ciphertext):
  96. """
  97. :param ciphertext: bytes
  98. :return: str
  99. """
  100. if not isinstance(ciphertext, bytes):
  101. raise TypeError("AES decryptor supports bytes only")
  102. plaintext = conversion.bytes_to_str(self.cipher_core.decrypt(ciphertext))
  103. self._renew()
  104. return plaintext