job_clean.py 4.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from fate_flow.manager.data_manager import delete_tables_by_table_infos, delete_metric_data
  17. from fate_flow.operation.job_tracker import Tracker
  18. from fate_flow.operation.job_saver import JobSaver
  19. from fate_flow.settings import stat_logger
  20. from fate_flow.utils.job_utils import start_session_stop
  21. class JobClean(object):
  22. @classmethod
  23. def clean_table(cls, job_id, role, party_id, component_name):
  24. # clean data table
  25. stat_logger.info('start delete {} {} {} {} data table'.format(job_id, role, party_id, component_name))
  26. tracker = Tracker(job_id=job_id, role=role, party_id=party_id, component_name=component_name)
  27. output_data_table_infos = tracker.get_output_data_info()
  28. if output_data_table_infos:
  29. delete_tables_by_table_infos(output_data_table_infos)
  30. stat_logger.info('delete {} {} {} {} data table success'.format(job_id, role, party_id, component_name))
  31. @classmethod
  32. def start_clean_job(cls, **kwargs):
  33. tasks = JobSaver.query_task(**kwargs)
  34. if tasks:
  35. for task in tasks:
  36. try:
  37. # clean session
  38. stat_logger.info('start {} {} {} {} session stop'.format(task.f_job_id, task.f_role,
  39. task.f_party_id, task.f_component_name))
  40. start_session_stop(task)
  41. stat_logger.info('stop {} {} {} {} session success'.format(task.f_job_id, task.f_role,
  42. task.f_party_id, task.f_component_name))
  43. except Exception as e:
  44. stat_logger.info(f'start_session_stop occur exception: {e}')
  45. try:
  46. # clean data table
  47. JobClean.clean_table(job_id=task.f_job_id, role=task.f_role, party_id=task.f_party_id,
  48. component_name=task.f_component_name)
  49. except Exception as e:
  50. stat_logger.info('delete {} {} {} {} data table failed'.format(task.f_job_id, task.f_role,
  51. task.f_party_id,
  52. task.f_component_name))
  53. stat_logger.exception(e)
  54. try:
  55. # clean metric data
  56. stat_logger.info('start delete {} {} {} {} metric data'.format(task.f_job_id, task.f_role,
  57. task.f_party_id,
  58. task.f_component_name))
  59. delete_metric_data({'job_id': task.f_job_id,
  60. 'role': task.f_role,
  61. 'party_id': task.f_party_id,
  62. 'component_name': task.f_component_name})
  63. stat_logger.info('delete {} {} {} {} metric data success'.format(task.f_job_id, task.f_role,
  64. task.f_party_id,
  65. task.f_component_name))
  66. except Exception as e:
  67. stat_logger.info('delete {} {} {} {} metric data failed'.format(task.f_job_id, task.f_role,
  68. task.f_party_id,
  69. task.f_component_name))
  70. stat_logger.exception(e)
  71. else:
  72. raise Exception('no found task')