123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- import os
- import sys
- from fate_arch import storage
- from fate_arch.common import EngineType
- from fate_flow.controller.job_controller import JobController
- from fate_flow.entity.run_status import JobInheritanceStatus, TaskStatus
- from fate_flow.operation.job_saver import JobSaver
- from fate_flow.utils.log_utils import schedule_logger
- from fate_arch.computing import ComputingEngine
- from fate_flow.db.dependence_registry import DependenceRegistry
- from fate_flow.entity import ComponentProvider
- from fate_flow.entity.types import FateDependenceName, ComponentProviderName, FateDependenceStorageEngine, WorkerName
- from fate_flow.manager.provider_manager import ProviderManager
- from fate_flow.manager.worker_manager import WorkerManager
- from fate_flow.settings import DEPENDENT_DISTRIBUTION, FATE_FLOW_UPDATE_CHECK, ENGINES
- from fate_flow.utils import schedule_utils, job_utils, process_utils
- from fate_flow.worker.job_inheritor import JobInherit
- class DependenceManager:
- @classmethod
- def check_job_dependence(cls, job):
- if cls.check_job_inherit_dependence(job) and cls.check_spark_dependence(job):
- return True
- else:
- return False
- @classmethod
- def check_job_inherit_dependence(cls, job):
- schedule_logger(job.f_job_id).info(
- f"check job inherit dependence: {job.f_inheritance_info}, {job.f_inheritance_status}")
- if job.f_inheritance_info:
- if job.f_inheritance_status == JobInheritanceStatus.WAITING:
- cls.start_inheriting_job(job)
- return False
- elif job.f_inheritance_status == JobInheritanceStatus.RUNNING:
- return False
- elif job.f_inheritance_status == JobInheritanceStatus.FAILED:
- raise Exception("job inheritance failed")
- else:
- return True
- else:
- return True
- @classmethod
- def component_check(cls, job, check_type="inheritance"):
- if check_type == "rerun":
- task_list = JobSaver.query_task(job_id=job.f_job_id, party_id=job.f_party_id, role=job.f_role,
- status=TaskStatus.SUCCESS, only_latest=True)
- tasks = {}
- for task in task_list:
- tasks[task.f_component_name] = task
- else:
- tasks = JobController.load_tasks(component_list=job.f_inheritance_info.get("component_list", []),
- job_id=job.f_inheritance_info.get("job_id"),
- role=job.f_role,
- party_id=job.f_party_id)
- tracker_dict = JobController.load_task_tracker(tasks)
- missing_dependence_component_list = []
- # data dependence
- for tracker in tracker_dict.values():
- table_infos = tracker.get_output_data_info()
- for table in table_infos:
- table_meta = storage.StorageTableMeta(name=table.f_table_name, namespace=table.f_table_namespace)
- if not table_meta:
- missing_dependence_component_list.append(tracker.component_name)
- continue
- if check_type == "rerun":
- return missing_dependence_component_list
- elif check_type == "inheritance":
- # reload component list
- return list(set(job.f_inheritance_info.get("component_list", [])) - set(missing_dependence_component_list))
- @classmethod
- def start_inheriting_job(cls, job):
- JobSaver.update_job(job_info={"job_id": job.f_job_id, "role": job.f_role, "party_id": job.f_party_id,
- "inheritance_status": JobInheritanceStatus.RUNNING})
- conf_dir = job_utils.get_job_directory(job_id=job.f_job_id)
- os.makedirs(conf_dir, exist_ok=True)
- process_cmd = [
- sys.executable or 'python3',
- sys.modules[JobInherit.__module__].__file__,
- '--job_id', job.f_job_id,
- '--role', job.f_role,
- '--party_id', job.f_party_id,
- ]
- log_dir = os.path.join(job_utils.get_job_log_directory(job_id=job.f_job_id), "job_inheritance")
- process_utils.run_subprocess(job_id=job.f_job_id, config_dir=conf_dir, process_cmd=process_cmd,
- log_dir=log_dir, process_name="job_inheritance")
- @classmethod
- def check_spark_dependence(cls, job):
- if not DEPENDENT_DISTRIBUTION:
- return True
- engine_name = ENGINES.get(EngineType.COMPUTING)
- schedule_logger(job.f_job_id).info(f"job engine name: {engine_name}")
- if engine_name not in [ComputingEngine.SPARK]:
- return True
- dsl_parser = schedule_utils.get_job_dsl_parser(dsl=job.f_dsl, runtime_conf=job.f_runtime_conf,
- train_runtime_conf=job.f_train_runtime_conf)
- provider_group = ProviderManager.get_job_provider_group(dsl_parser=dsl_parser,
- runtime_conf=job.f_runtime_conf_on_party,
- role=job.f_role,
- party_id=job.f_party_id)
- version_provider_info = {}
- fate_flow_version_provider_info = {}
- schedule_logger(job.f_job_id).info(f'group_info:{provider_group}')
- for group_key, group_info in provider_group.items():
- if group_info["provider"]["name"] == ComponentProviderName.FATE_FLOW.value and \
- group_info["provider"]["version"] not in fate_flow_version_provider_info:
- fate_flow_version_provider_info[group_info["provider"]["version"]] = group_info["provider"]
- if group_info["provider"]["name"] == ComponentProviderName.FATE.value and \
- group_info["provider"]["version"] not in version_provider_info:
- version_provider_info[group_info["provider"]["version"]] = group_info["provider"]
- schedule_logger(job.f_job_id).info(f'version_provider_info:{version_provider_info}')
- schedule_logger(job.f_job_id).info(f'fate_flow_version_provider_info:{fate_flow_version_provider_info}')
- if not version_provider_info:
- version_provider_info = fate_flow_version_provider_info
- check_tag, upload_tag, upload_details = cls.check_upload(job.f_job_id, version_provider_info,
- fate_flow_version_provider_info)
- if upload_tag:
- cls.upload_spark_dependence(job, upload_details)
- return check_tag
- @classmethod
- def check_upload(cls, job_id, provider_group, fate_flow_version_provider_info,
- storage_engine=FateDependenceStorageEngine.HDFS.value):
- schedule_logger(job_id).info("start Check if need to upload dependencies")
- schedule_logger(job_id).info(f"{provider_group}")
- upload_details = {}
- check_tag = True
- upload_total = 0
- for version, provider_info in provider_group.items():
- upload_details[version] = {}
- provider = ComponentProvider(**provider_info)
- for dependence_type in [FateDependenceName.Fate_Source_Code.value, FateDependenceName.Python_Env.value]:
- schedule_logger(job_id).info(f"{dependence_type}")
- dependencies_storage_info = DependenceRegistry.get_dependencies_storage_meta(
- storage_engine=storage_engine,
- version=provider.version,
- type=dependence_type,
- get_or_one=True
- )
- need_upload = False
- if dependencies_storage_info:
- if dependencies_storage_info.f_upload_status:
- # version dependence uploading
- check_tag = False
- continue
- elif not dependencies_storage_info.f_storage_path:
- need_upload = True
- upload_total += 1
- elif dependence_type == FateDependenceName.Fate_Source_Code.value:
- if provider.name == ComponentProviderName.FATE.value:
- check_fate_flow_provider_status = False
- if fate_flow_version_provider_info.values():
- flow_provider = ComponentProvider(**list(fate_flow_version_provider_info.values())[0])
- check_fate_flow_provider_status = DependenceRegistry.get_modify_time(flow_provider.path) \
- != dependencies_storage_info.f_fate_flow_snapshot_time
- if FATE_FLOW_UPDATE_CHECK and check_fate_flow_provider_status:
- need_upload = True
- upload_total += 1
- elif DependenceRegistry.get_modify_time(provider.path) != \
- dependencies_storage_info.f_snapshot_time:
- need_upload = True
- upload_total += 1
- elif provider.name == ComponentProviderName.FATE_FLOW.value and FATE_FLOW_UPDATE_CHECK:
- if DependenceRegistry.get_modify_time(provider.path) != \
- dependencies_storage_info.f_fate_flow_snapshot_time:
- need_upload = True
- upload_total += 1
- else:
- need_upload = True
- upload_total += 1
- if need_upload:
- upload_details[version][dependence_type] = provider
- if upload_total > 0:
- check_tag = False
- schedule_logger(job_id).info(f"check dependencies result: {check_tag}, {upload_details}")
- return check_tag, upload_total > 0, upload_details
- @classmethod
- def upload_spark_dependence(cls, job, upload_details, storage_engine=FateDependenceStorageEngine.HDFS.value):
- schedule_logger(job.f_job_id).info(f"start upload dependence: {upload_details}")
- for version, type_provider in upload_details.items():
- for dependence_type, provider in type_provider.items():
- storage_meta = {
- "f_storage_engine": storage_engine,
- "f_type": dependence_type,
- "f_version": version,
- "f_upload_status": True
- }
- schedule_logger(job.f_job_id).info(f"update dependence storage meta:{storage_meta}")
- DependenceRegistry.save_dependencies_storage_meta(storage_meta, status_check=True)
- WorkerManager.start_general_worker(worker_name=WorkerName.DEPENDENCE_UPLOAD, job_id=job.f_job_id,
- role=job.f_role, party_id=job.f_party_id, provider=provider,
- dependence_type=dependence_type, callback=cls.record_upload_process,
- callback_param=["dependence_type", "pid", "provider"])
- @classmethod
- def record_upload_process(cls, provider, dependence_type, pid,
- storage_engine=FateDependenceStorageEngine.HDFS.value):
- storage_meta = {
- "f_storage_engine": storage_engine,
- "f_type": dependence_type,
- "f_version": provider.version,
- "f_pid": pid,
- "f_upload_status": True
- }
- DependenceRegistry.save_dependencies_storage_meta(storage_meta)
- @classmethod
- def kill_upload_process(cls, version, storage_engine, dependence_type):
- storage_meta = {
- "f_storage_engine": storage_engine,
- "f_type": dependence_type,
- "f_version": version,
- "f_upload_status": False,
- "f_pid": 0
- }
- DependenceRegistry.save_dependencies_storage_meta(storage_meta)
|