crypto.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import base64
  2. from Crypto.PublicKey import RSA
  3. from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
  4. from Crypto.Cipher import AES
  5. from functools import wraps
  6. from utils.http import make_json_response
  7. from urllib import parse
  8. from django.http import JsonResponse
  9. from django.conf import settings
  10. import json
  11. IV = '16-Bytes--String'
  12. with open('public.rsa') as f:
  13. key = f.read()
  14. public_key = RSA.import_key(key)
  15. public_cipher = PKCS1_cipher.new(public_key)
  16. print(public_key.exportKey().decode(encoding='utf-8'))
  17. with open('private.rsa') as f:
  18. key = f.read()
  19. private_key = RSA.import_key(key)
  20. private_cipher = PKCS1_cipher.new(private_key)
  21. # print(private_key.exportKey().decode(encoding='utf-8'))
  22. def get_padding(content):
  23. padding = 16 - len(content) % 16
  24. return bytes([padding] * padding)
  25. def get_file_encrypt_cipher(key):
  26. return AES.new(key.encode(), AES.MODE_CBC, settings.FILE_ENCRYPT_IV.encode())
  27. # 安全传输decorator
  28. def secure_transport(view_func):
  29. @wraps(view_func)
  30. def _wrapped_view(request, *args, **kwargs):
  31. data = request.POST
  32. enc_key = data.get('enc_key')
  33. cipher_text = data.get('cipher_text')
  34. if not enc_key or not cipher_text:
  35. print('无加密')
  36. return view_func(request, *args, **kwargs)
  37. aes_key = private_cipher.decrypt(base64.b64decode(enc_key.encode('utf-8')), b'error').decode('utf-8')
  38. print(f'key={aes_key}')
  39. def get_aes_cipher():
  40. return AES.new(aes_key.encode('utf-8'), AES.MODE_CBC, IV.encode('utf-8'))
  41. decrypted = get_aes_cipher().decrypt(base64.b64decode(cipher_text.encode('utf-8')))
  42. # print(decrypted)
  43. decrypted = decrypted[:-decrypted[-1]]
  44. # print(decrypted)
  45. plain_text = decrypted.decode('utf-8')
  46. print(plain_text)
  47. try:
  48. loaded = json.loads(plain_text)
  49. except:
  50. print('不是json')
  51. loaded = {}
  52. plain_text = parse.unquote(plain_text)
  53. print(plain_text)
  54. for kv in plain_text.split('&'):
  55. p = kv.find('=')
  56. loaded[kv[:p]] = kv[p+1:]
  57. dec_request = request
  58. dec_request.POST = {'key': aes_key, **request.POST, **loaded}
  59. raw_response = view_func(dec_request, *args, **kwargs)
  60. if not isinstance(raw_response, JsonResponse):
  61. return raw_response
  62. content = json.dumps({'data': json.loads(raw_response.content)}).encode('utf-8')
  63. content += get_padding(content)
  64. print(content)
  65. enc_content = base64.b64encode(get_aes_cipher().encrypt(content)).decode('utf-8')
  66. print(enc_content)
  67. if settings.DEBUG:
  68. print(get_aes_cipher().decrypt(base64.b64decode(enc_content.encode('utf-8'))))
  69. return make_json_response(enc_content=enc_content)
  70. return _wrapped_view
  71. def test():
  72. plain_text = '{"username": "user1"}'
  73. encrypted = public_cipher.encrypt(bytes(plain_text.encode('utf8')))
  74. cipher_text = base64.b64encode(encrypted)
  75. print(cipher_text.decode('utf8'))
  76. decrypted = private_cipher.decrypt(base64.b64decode(cipher_text), b'error')
  77. print(decrypted.decode('utf8'))
  78. # print(public_key.exportKey().decode('utf-8'))
  79. # random_generator = Random.new().read
  80. # rsa = RSA.generate(2048, random_generator)
  81. # private_key = rsa.exportKey()
  82. # print(private_key.decode('utf-8'))