123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- import base64
- from Crypto.PublicKey import RSA
- from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
- from Crypto.Cipher import AES
- from functools import wraps
- from utils.http import make_json_response
- from urllib import parse
- from django.http import JsonResponse
- from django.conf import settings
- import json
- IV = '16-Bytes--String'
- with open('public.rsa') as f:
- key = f.read()
- public_key = RSA.import_key(key)
- public_cipher = PKCS1_cipher.new(public_key)
- print(public_key.exportKey().decode(encoding='utf-8'))
- with open('private.rsa') as f:
- key = f.read()
- private_key = RSA.import_key(key)
- private_cipher = PKCS1_cipher.new(private_key)
- # print(private_key.exportKey().decode(encoding='utf-8'))
- def get_padding(content):
- padding = 16 - len(content) % 16
- return bytes([padding] * padding)
- def get_file_encrypt_cipher(key):
- return AES.new(key.encode(), AES.MODE_CBC, settings.FILE_ENCRYPT_IV.encode())
- # 安全传输decorator
- def secure_transport(view_func):
- @wraps(view_func)
- def _wrapped_view(request, *args, **kwargs):
- data = request.POST
- enc_key = data.get('enc_key')
- cipher_text = data.get('cipher_text')
- if not enc_key or not cipher_text:
- print('无加密')
- return view_func(request, *args, **kwargs)
- aes_key = private_cipher.decrypt(base64.b64decode(enc_key.encode('utf-8')), b'error').decode('utf-8')
- print(f'key={aes_key}')
- def get_aes_cipher():
- return AES.new(aes_key.encode('utf-8'), AES.MODE_CBC, IV.encode('utf-8'))
- decrypted = get_aes_cipher().decrypt(base64.b64decode(cipher_text.encode('utf-8')))
- # print(decrypted)
- decrypted = decrypted[:-decrypted[-1]]
- # print(decrypted)
- plain_text = decrypted.decode('utf-8')
- print(plain_text)
- try:
- loaded = json.loads(plain_text)
- except:
- print('不是json')
- loaded = {}
- plain_text = parse.unquote(plain_text)
- print(plain_text)
- for kv in plain_text.split('&'):
- p = kv.find('=')
- loaded[kv[:p]] = kv[p+1:]
- dec_request = request
- dec_request.POST = {'key': aes_key, **request.POST, **loaded}
- raw_response = view_func(dec_request, *args, **kwargs)
- if not isinstance(raw_response, JsonResponse):
- return raw_response
- content = json.dumps({'data': json.loads(raw_response.content)}).encode('utf-8')
- content += get_padding(content)
- print(content)
- enc_content = base64.b64encode(get_aes_cipher().encrypt(content)).decode('utf-8')
- print(enc_content)
- if settings.DEBUG:
- print(get_aes_cipher().decrypt(base64.b64decode(enc_content.encode('utf-8'))))
- return make_json_response(enc_content=enc_content)
- return _wrapped_view
- def test():
- plain_text = '{"username": "user1"}'
- encrypted = public_cipher.encrypt(bytes(plain_text.encode('utf8')))
- cipher_text = base64.b64encode(encrypted)
- print(cipher_text.decode('utf8'))
- decrypted = private_cipher.decrypt(base64.b64decode(cipher_text), b'error')
- print(decrypted.decode('utf8'))
- # print(public_key.exportKey().decode('utf-8'))
- # random_generator = Random.new().read
- # rsa = RSA.generate(2048, random_generator)
- # private_key = rsa.exportKey()
- # print(private_key.decode('utf-8'))
|