db_utils.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. import operator
  2. from fate_arch.common.base_utils import current_timestamp
  3. from fate_arch.metastore.db_models import DB, StorageConnectorModel
  4. class StorageConnector:
  5. def __init__(self, connector_name, engine=None, connector_info=None):
  6. self.name = connector_name
  7. self.engine = engine
  8. self.connector_info = connector_info
  9. @DB.connection_context()
  10. def create_or_update(self):
  11. defaults = {
  12. "f_name": self.name,
  13. "f_engine": self.engine,
  14. "f_connector_info": self.connector_info,
  15. "f_create_time": current_timestamp(),
  16. }
  17. connector, status = StorageConnectorModel.get_or_create(
  18. f_name=self.name,
  19. defaults=defaults)
  20. if status is False:
  21. for key in defaults:
  22. setattr(connector, key, defaults[key])
  23. connector.save(force_insert=False)
  24. @DB.connection_context()
  25. def get_info(self):
  26. connectors = [connector for connector in StorageConnectorModel.select().where(
  27. operator.attrgetter("f_name")(StorageConnectorModel) == self.name)]
  28. if connectors:
  29. return connectors[0].f_connector_info
  30. else:
  31. return {}