import os
import sys

from fate_arch import storage
from fate_arch.common import EngineType
from fate_flow.controller.job_controller import JobController
from fate_flow.entity.run_status import JobInheritanceStatus, TaskStatus
from fate_flow.operation.job_saver import JobSaver
from fate_flow.utils.log_utils import schedule_logger
from fate_arch.computing import ComputingEngine
from fate_flow.db.dependence_registry import DependenceRegistry
from fate_flow.entity import ComponentProvider
from fate_flow.entity.types import FateDependenceName, ComponentProviderName, FateDependenceStorageEngine, WorkerName
from fate_flow.manager.provider_manager import ProviderManager
from fate_flow.manager.worker_manager import WorkerManager
from fate_flow.settings import DEPENDENT_DISTRIBUTION, FATE_FLOW_UPDATE_CHECK, ENGINES
from fate_flow.utils import schedule_utils, job_utils, process_utils
from fate_flow.worker.job_inheritor import JobInherit


class DependenceManager:
    @classmethod
    def check_job_dependence(cls, job):
        if cls.check_job_inherit_dependence(job) and cls.check_spark_dependence(job):
            return True
        else:
            return False

    @classmethod
    def check_job_inherit_dependence(cls, job):
        schedule_logger(job.f_job_id).info(
            f"check job inherit dependence: {job.f_inheritance_info}, {job.f_inheritance_status}")
        if job.f_inheritance_info:
            if job.f_inheritance_status == JobInheritanceStatus.WAITING:
                cls.start_inheriting_job(job)
                return False
            elif job.f_inheritance_status == JobInheritanceStatus.RUNNING:
                return False
            elif job.f_inheritance_status == JobInheritanceStatus.FAILED:
                raise Exception("job inheritance failed")
            else:
                return True
        else:
            return True

    @classmethod
    def component_check(cls, job, check_type="inheritance"):
        if check_type == "rerun":
            task_list = JobSaver.query_task(job_id=job.f_job_id, party_id=job.f_party_id, role=job.f_role,
                                            status=TaskStatus.SUCCESS, only_latest=True)
            tasks = {}
            for task in task_list:
                tasks[task.f_component_name] = task
        else:
            tasks = JobController.load_tasks(component_list=job.f_inheritance_info.get("component_list", []),
                                             job_id=job.f_inheritance_info.get("job_id"),
                                             role=job.f_role,
                                             party_id=job.f_party_id)
        tracker_dict = JobController.load_task_tracker(tasks)
        missing_dependence_component_list = []
        # data dependence
        for tracker in tracker_dict.values():
            table_infos = tracker.get_output_data_info()
            for table in table_infos:
                table_meta = storage.StorageTableMeta(name=table.f_table_name, namespace=table.f_table_namespace)
                if not table_meta:
                    missing_dependence_component_list.append(tracker.component_name)
                    continue
        if check_type == "rerun":
            return missing_dependence_component_list
        elif check_type == "inheritance":
            # reload component list
            return list(set(job.f_inheritance_info.get("component_list", [])) - set(missing_dependence_component_list))

    @classmethod
    def start_inheriting_job(cls, job):
        JobSaver.update_job(job_info={"job_id": job.f_job_id, "role": job.f_role, "party_id": job.f_party_id,
                                      "inheritance_status": JobInheritanceStatus.RUNNING})
        conf_dir = job_utils.get_job_directory(job_id=job.f_job_id)
        os.makedirs(conf_dir, exist_ok=True)
        process_cmd = [
            sys.executable or 'python3',
            sys.modules[JobInherit.__module__].__file__,
            '--job_id', job.f_job_id,
            '--role', job.f_role,
            '--party_id', job.f_party_id,
        ]
        log_dir = os.path.join(job_utils.get_job_log_directory(job_id=job.f_job_id), "job_inheritance")
        process_utils.run_subprocess(job_id=job.f_job_id, config_dir=conf_dir, process_cmd=process_cmd,
                                         log_dir=log_dir, process_name="job_inheritance")

    @classmethod
    def check_spark_dependence(cls, job):
        if not DEPENDENT_DISTRIBUTION:
            return True
        engine_name = ENGINES.get(EngineType.COMPUTING)
        schedule_logger(job.f_job_id).info(f"job engine name: {engine_name}")
        if engine_name not in [ComputingEngine.SPARK]:
            return True
        dsl_parser = schedule_utils.get_job_dsl_parser(dsl=job.f_dsl, runtime_conf=job.f_runtime_conf,
                                                       train_runtime_conf=job.f_train_runtime_conf)
        provider_group = ProviderManager.get_job_provider_group(dsl_parser=dsl_parser,
                                                                runtime_conf=job.f_runtime_conf_on_party,
                                                                role=job.f_role,
                                                                party_id=job.f_party_id)
        version_provider_info = {}
        fate_flow_version_provider_info = {}
        schedule_logger(job.f_job_id).info(f'group_info:{provider_group}')
        for group_key, group_info in provider_group.items():
            if group_info["provider"]["name"] == ComponentProviderName.FATE_FLOW.value and \
                    group_info["provider"]["version"] not in fate_flow_version_provider_info:
                fate_flow_version_provider_info[group_info["provider"]["version"]] = group_info["provider"]
            if group_info["provider"]["name"] == ComponentProviderName.FATE.value and \
                    group_info["provider"]["version"] not in version_provider_info:
                version_provider_info[group_info["provider"]["version"]] = group_info["provider"]
            schedule_logger(job.f_job_id).info(f'version_provider_info:{version_provider_info}')
            schedule_logger(job.f_job_id).info(f'fate_flow_version_provider_info:{fate_flow_version_provider_info}')
        if not version_provider_info:
            version_provider_info = fate_flow_version_provider_info
        check_tag, upload_tag, upload_details = cls.check_upload(job.f_job_id, version_provider_info,
                                                                 fate_flow_version_provider_info)
        if upload_tag:
            cls.upload_spark_dependence(job, upload_details)
        return check_tag

    @classmethod
    def check_upload(cls, job_id, provider_group, fate_flow_version_provider_info,
                     storage_engine=FateDependenceStorageEngine.HDFS.value):
        schedule_logger(job_id).info("start Check if need to upload dependencies")
        schedule_logger(job_id).info(f"{provider_group}")
        upload_details = {}
        check_tag = True
        upload_total = 0
        for version, provider_info in provider_group.items():
            upload_details[version] = {}
            provider = ComponentProvider(**provider_info)
            for dependence_type in [FateDependenceName.Fate_Source_Code.value, FateDependenceName.Python_Env.value]:
                schedule_logger(job_id).info(f"{dependence_type}")
                dependencies_storage_info = DependenceRegistry.get_dependencies_storage_meta(
                    storage_engine=storage_engine,
                    version=provider.version,
                    type=dependence_type,
                    get_or_one=True
                )
                need_upload = False
                if dependencies_storage_info:
                    if dependencies_storage_info.f_upload_status:
                        # version dependence uploading
                        check_tag = False
                        continue
                    elif not dependencies_storage_info.f_storage_path:
                        need_upload = True
                        upload_total += 1

                    elif dependence_type == FateDependenceName.Fate_Source_Code.value:
                        if provider.name == ComponentProviderName.FATE.value:
                            check_fate_flow_provider_status = False
                            if fate_flow_version_provider_info.values():
                                flow_provider = ComponentProvider(**list(fate_flow_version_provider_info.values())[0])
                                check_fate_flow_provider_status = DependenceRegistry.get_modify_time(flow_provider.path) \
                                                                  != dependencies_storage_info.f_fate_flow_snapshot_time
                            if FATE_FLOW_UPDATE_CHECK and check_fate_flow_provider_status:
                                need_upload = True
                                upload_total += 1
                            elif DependenceRegistry.get_modify_time(provider.path) != \
                                    dependencies_storage_info.f_snapshot_time:
                                need_upload = True
                                upload_total += 1
                        elif provider.name == ComponentProviderName.FATE_FLOW.value and FATE_FLOW_UPDATE_CHECK:
                            if DependenceRegistry.get_modify_time(provider.path) != \
                                    dependencies_storage_info.f_fate_flow_snapshot_time:
                                need_upload = True
                                upload_total += 1
                else:
                    need_upload = True
                    upload_total += 1
                if need_upload:
                    upload_details[version][dependence_type] = provider
        if upload_total > 0:
            check_tag = False
        schedule_logger(job_id).info(f"check dependencies result: {check_tag}, {upload_details}")
        return check_tag, upload_total > 0, upload_details

    @classmethod
    def upload_spark_dependence(cls, job, upload_details, storage_engine=FateDependenceStorageEngine.HDFS.value):
        schedule_logger(job.f_job_id).info(f"start upload dependence: {upload_details}")
        for version, type_provider in upload_details.items():
            for dependence_type, provider in type_provider.items():
                storage_meta = {
                    "f_storage_engine": storage_engine,
                    "f_type": dependence_type,
                    "f_version": version,
                    "f_upload_status": True
                }
                schedule_logger(job.f_job_id).info(f"update dependence storage meta:{storage_meta}")
                DependenceRegistry.save_dependencies_storage_meta(storage_meta, status_check=True)
                WorkerManager.start_general_worker(worker_name=WorkerName.DEPENDENCE_UPLOAD, job_id=job.f_job_id,
                                                   role=job.f_role, party_id=job.f_party_id, provider=provider,
                                                   dependence_type=dependence_type, callback=cls.record_upload_process,
                                                   callback_param=["dependence_type", "pid", "provider"])

    @classmethod
    def record_upload_process(cls, provider, dependence_type, pid,
                              storage_engine=FateDependenceStorageEngine.HDFS.value):
        storage_meta = {
            "f_storage_engine": storage_engine,
            "f_type": dependence_type,
            "f_version": provider.version,
            "f_pid": pid,
            "f_upload_status": True
        }
        DependenceRegistry.save_dependencies_storage_meta(storage_meta)

    @classmethod
    def kill_upload_process(cls, version, storage_engine, dependence_type):
        storage_meta = {
            "f_storage_engine": storage_engine,
            "f_type": dependence_type,
            "f_version": version,
            "f_upload_status": False,
            "f_pid": 0
        }
        DependenceRegistry.save_dependencies_storage_meta(storage_meta)