site_authentication.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536
  1. import base64
  2. import json
  3. from Crypto.PublicKey import RSA
  4. from Crypto.Signature import PKCS1_v1_5
  5. from Crypto.Hash import MD5
  6. from fate_flow.db.key_manager import RsaKeyManager
  7. from fate_flow.entity import RetCode
  8. from fate_flow.entity.types import SiteKeyName
  9. from fate_flow.hook import HookManager
  10. from fate_flow.hook.common.parameters import SignatureParameters, AuthenticationParameters, AuthenticationReturn, \
  11. SignatureReturn
  12. from fate_flow.settings import PARTY_ID
  13. @HookManager.register_site_signature_hook
  14. def signature(parm: SignatureParameters) -> SignatureReturn:
  15. private_key = RsaKeyManager.get_key(parm.party_id, key_name=SiteKeyName.PRIVATE.value)
  16. if not private_key:
  17. raise Exception(f"signature error: no found party id {parm.party_id} private key")
  18. sign= PKCS1_v1_5.new(RSA.importKey(private_key)).sign(MD5.new(json.dumps(parm.body).encode()))
  19. return SignatureReturn(site_signature=base64.b64encode(sign).decode())
  20. @HookManager.register_site_authentication_hook
  21. def authentication(parm: AuthenticationParameters) -> AuthenticationReturn:
  22. party_id = parm.src_party_id if parm.src_party_id and str(parm.src_party_id) != "0" else PARTY_ID
  23. public_key = RsaKeyManager.get_key(party_id=party_id, key_name=SiteKeyName.PUBLIC.value)
  24. if not public_key:
  25. raise Exception(f"signature error: no found party id {party_id} public key")
  26. verifier = PKCS1_v1_5.new(RSA.importKey(public_key))
  27. if verifier.verify(MD5.new(json.dumps(parm.body).encode()), base64.b64decode(parm.site_signature)) is True:
  28. return AuthenticationReturn()
  29. else:
  30. return AuthenticationReturn(code=RetCode.AUTHENTICATION_ERROR, message="authentication failed")