dependence_manager.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. import os
  2. import sys
  3. from fate_arch import storage
  4. from fate_arch.common import EngineType
  5. from fate_flow.controller.job_controller import JobController
  6. from fate_flow.entity.run_status import JobInheritanceStatus, TaskStatus
  7. from fate_flow.operation.job_saver import JobSaver
  8. from fate_flow.utils.log_utils import schedule_logger
  9. from fate_arch.computing import ComputingEngine
  10. from fate_flow.db.dependence_registry import DependenceRegistry
  11. from fate_flow.entity import ComponentProvider
  12. from fate_flow.entity.types import FateDependenceName, ComponentProviderName, FateDependenceStorageEngine, WorkerName
  13. from fate_flow.manager.provider_manager import ProviderManager
  14. from fate_flow.manager.worker_manager import WorkerManager
  15. from fate_flow.settings import DEPENDENT_DISTRIBUTION, FATE_FLOW_UPDATE_CHECK, ENGINES
  16. from fate_flow.utils import schedule_utils, job_utils, process_utils
  17. from fate_flow.worker.job_inheritor import JobInherit
  18. class DependenceManager:
  19. @classmethod
  20. def check_job_dependence(cls, job):
  21. if cls.check_job_inherit_dependence(job) and cls.check_spark_dependence(job):
  22. return True
  23. else:
  24. return False
  25. @classmethod
  26. def check_job_inherit_dependence(cls, job):
  27. schedule_logger(job.f_job_id).info(
  28. f"check job inherit dependence: {job.f_inheritance_info}, {job.f_inheritance_status}")
  29. if job.f_inheritance_info:
  30. if job.f_inheritance_status == JobInheritanceStatus.WAITING:
  31. cls.start_inheriting_job(job)
  32. return False
  33. elif job.f_inheritance_status == JobInheritanceStatus.RUNNING:
  34. return False
  35. elif job.f_inheritance_status == JobInheritanceStatus.FAILED:
  36. raise Exception("job inheritance failed")
  37. else:
  38. return True
  39. else:
  40. return True
  41. @classmethod
  42. def component_check(cls, job, check_type="inheritance"):
  43. if check_type == "rerun":
  44. task_list = JobSaver.query_task(job_id=job.f_job_id, party_id=job.f_party_id, role=job.f_role,
  45. status=TaskStatus.SUCCESS, only_latest=True)
  46. tasks = {}
  47. for task in task_list:
  48. tasks[task.f_component_name] = task
  49. else:
  50. tasks = JobController.load_tasks(component_list=job.f_inheritance_info.get("component_list", []),
  51. job_id=job.f_inheritance_info.get("job_id"),
  52. role=job.f_role,
  53. party_id=job.f_party_id)
  54. tracker_dict = JobController.load_task_tracker(tasks)
  55. missing_dependence_component_list = []
  56. # data dependence
  57. for tracker in tracker_dict.values():
  58. table_infos = tracker.get_output_data_info()
  59. for table in table_infos:
  60. table_meta = storage.StorageTableMeta(name=table.f_table_name, namespace=table.f_table_namespace)
  61. if not table_meta:
  62. missing_dependence_component_list.append(tracker.component_name)
  63. continue
  64. if check_type == "rerun":
  65. return missing_dependence_component_list
  66. elif check_type == "inheritance":
  67. # reload component list
  68. return list(set(job.f_inheritance_info.get("component_list", [])) - set(missing_dependence_component_list))
  69. @classmethod
  70. def start_inheriting_job(cls, job):
  71. JobSaver.update_job(job_info={"job_id": job.f_job_id, "role": job.f_role, "party_id": job.f_party_id,
  72. "inheritance_status": JobInheritanceStatus.RUNNING})
  73. conf_dir = job_utils.get_job_directory(job_id=job.f_job_id)
  74. os.makedirs(conf_dir, exist_ok=True)
  75. process_cmd = [
  76. sys.executable or 'python3',
  77. sys.modules[JobInherit.__module__].__file__,
  78. '--job_id', job.f_job_id,
  79. '--role', job.f_role,
  80. '--party_id', job.f_party_id,
  81. ]
  82. log_dir = os.path.join(job_utils.get_job_log_directory(job_id=job.f_job_id), "job_inheritance")
  83. process_utils.run_subprocess(job_id=job.f_job_id, config_dir=conf_dir, process_cmd=process_cmd,
  84. log_dir=log_dir, process_name="job_inheritance")
  85. @classmethod
  86. def check_spark_dependence(cls, job):
  87. if not DEPENDENT_DISTRIBUTION:
  88. return True
  89. engine_name = ENGINES.get(EngineType.COMPUTING)
  90. schedule_logger(job.f_job_id).info(f"job engine name: {engine_name}")
  91. if engine_name not in [ComputingEngine.SPARK]:
  92. return True
  93. dsl_parser = schedule_utils.get_job_dsl_parser(dsl=job.f_dsl, runtime_conf=job.f_runtime_conf,
  94. train_runtime_conf=job.f_train_runtime_conf)
  95. provider_group = ProviderManager.get_job_provider_group(dsl_parser=dsl_parser,
  96. runtime_conf=job.f_runtime_conf_on_party,
  97. role=job.f_role,
  98. party_id=job.f_party_id)
  99. version_provider_info = {}
  100. fate_flow_version_provider_info = {}
  101. schedule_logger(job.f_job_id).info(f'group_info:{provider_group}')
  102. for group_key, group_info in provider_group.items():
  103. if group_info["provider"]["name"] == ComponentProviderName.FATE_FLOW.value and \
  104. group_info["provider"]["version"] not in fate_flow_version_provider_info:
  105. fate_flow_version_provider_info[group_info["provider"]["version"]] = group_info["provider"]
  106. if group_info["provider"]["name"] == ComponentProviderName.FATE.value and \
  107. group_info["provider"]["version"] not in version_provider_info:
  108. version_provider_info[group_info["provider"]["version"]] = group_info["provider"]
  109. schedule_logger(job.f_job_id).info(f'version_provider_info:{version_provider_info}')
  110. schedule_logger(job.f_job_id).info(f'fate_flow_version_provider_info:{fate_flow_version_provider_info}')
  111. if not version_provider_info:
  112. version_provider_info = fate_flow_version_provider_info
  113. check_tag, upload_tag, upload_details = cls.check_upload(job.f_job_id, version_provider_info,
  114. fate_flow_version_provider_info)
  115. if upload_tag:
  116. cls.upload_spark_dependence(job, upload_details)
  117. return check_tag
  118. @classmethod
  119. def check_upload(cls, job_id, provider_group, fate_flow_version_provider_info,
  120. storage_engine=FateDependenceStorageEngine.HDFS.value):
  121. schedule_logger(job_id).info("start Check if need to upload dependencies")
  122. schedule_logger(job_id).info(f"{provider_group}")
  123. upload_details = {}
  124. check_tag = True
  125. upload_total = 0
  126. for version, provider_info in provider_group.items():
  127. upload_details[version] = {}
  128. provider = ComponentProvider(**provider_info)
  129. for dependence_type in [FateDependenceName.Fate_Source_Code.value, FateDependenceName.Python_Env.value]:
  130. schedule_logger(job_id).info(f"{dependence_type}")
  131. dependencies_storage_info = DependenceRegistry.get_dependencies_storage_meta(
  132. storage_engine=storage_engine,
  133. version=provider.version,
  134. type=dependence_type,
  135. get_or_one=True
  136. )
  137. need_upload = False
  138. if dependencies_storage_info:
  139. if dependencies_storage_info.f_upload_status:
  140. # version dependence uploading
  141. check_tag = False
  142. continue
  143. elif not dependencies_storage_info.f_storage_path:
  144. need_upload = True
  145. upload_total += 1
  146. elif dependence_type == FateDependenceName.Fate_Source_Code.value:
  147. if provider.name == ComponentProviderName.FATE.value:
  148. check_fate_flow_provider_status = False
  149. if fate_flow_version_provider_info.values():
  150. flow_provider = ComponentProvider(**list(fate_flow_version_provider_info.values())[0])
  151. check_fate_flow_provider_status = DependenceRegistry.get_modify_time(flow_provider.path) \
  152. != dependencies_storage_info.f_fate_flow_snapshot_time
  153. if FATE_FLOW_UPDATE_CHECK and check_fate_flow_provider_status:
  154. need_upload = True
  155. upload_total += 1
  156. elif DependenceRegistry.get_modify_time(provider.path) != \
  157. dependencies_storage_info.f_snapshot_time:
  158. need_upload = True
  159. upload_total += 1
  160. elif provider.name == ComponentProviderName.FATE_FLOW.value and FATE_FLOW_UPDATE_CHECK:
  161. if DependenceRegistry.get_modify_time(provider.path) != \
  162. dependencies_storage_info.f_fate_flow_snapshot_time:
  163. need_upload = True
  164. upload_total += 1
  165. else:
  166. need_upload = True
  167. upload_total += 1
  168. if need_upload:
  169. upload_details[version][dependence_type] = provider
  170. if upload_total > 0:
  171. check_tag = False
  172. schedule_logger(job_id).info(f"check dependencies result: {check_tag}, {upload_details}")
  173. return check_tag, upload_total > 0, upload_details
  174. @classmethod
  175. def upload_spark_dependence(cls, job, upload_details, storage_engine=FateDependenceStorageEngine.HDFS.value):
  176. schedule_logger(job.f_job_id).info(f"start upload dependence: {upload_details}")
  177. for version, type_provider in upload_details.items():
  178. for dependence_type, provider in type_provider.items():
  179. storage_meta = {
  180. "f_storage_engine": storage_engine,
  181. "f_type": dependence_type,
  182. "f_version": version,
  183. "f_upload_status": True
  184. }
  185. schedule_logger(job.f_job_id).info(f"update dependence storage meta:{storage_meta}")
  186. DependenceRegistry.save_dependencies_storage_meta(storage_meta, status_check=True)
  187. WorkerManager.start_general_worker(worker_name=WorkerName.DEPENDENCE_UPLOAD, job_id=job.f_job_id,
  188. role=job.f_role, party_id=job.f_party_id, provider=provider,
  189. dependence_type=dependence_type, callback=cls.record_upload_process,
  190. callback_param=["dependence_type", "pid", "provider"])
  191. @classmethod
  192. def record_upload_process(cls, provider, dependence_type, pid,
  193. storage_engine=FateDependenceStorageEngine.HDFS.value):
  194. storage_meta = {
  195. "f_storage_engine": storage_engine,
  196. "f_type": dependence_type,
  197. "f_version": provider.version,
  198. "f_pid": pid,
  199. "f_upload_status": True
  200. }
  201. DependenceRegistry.save_dependencies_storage_meta(storage_meta)
  202. @classmethod
  203. def kill_upload_process(cls, version, storage_engine, dependence_type):
  204. storage_meta = {
  205. "f_storage_engine": storage_engine,
  206. "f_type": dependence_type,
  207. "f_version": version,
  208. "f_upload_status": False,
  209. "f_pid": 0
  210. }
  211. DependenceRegistry.save_dependencies_storage_meta(storage_meta)