123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import functools
- import os
- import shutil
- import zipfile
- import subprocess
- from fate_arch.common import file_utils
- from fate_flow.utils.log_utils import getLogger
- from fate_flow.db.db_models import ComponentProviderInfo
- from fate_flow.db.dependence_registry import DependenceRegistry
- from fate_flow.db.service_registry import ServerRegistry
- from fate_flow.entity import ComponentProvider
- from fate_flow.entity.types import FateDependenceName, ComponentProviderName, FateDependenceStorageEngine
- from fate_flow.settings import FATE_VERSION_DEPENDENCIES_PATH
- from fate_flow.worker.base_worker import BaseWorker
- from fate_flow.utils.base_utils import get_fate_flow_python_directory
- LOGGER = getLogger()
- def upload_except_exit(func):
- @functools.wraps(func)
- def _wrapper(*args, **kwargs):
- try:
- return func(*args, **kwargs)
- except Exception as e:
- provider = kwargs.get("provider")
- dependence_type = kwargs.get("dependence_type")
- storage_engine = FateDependenceStorageEngine.HDFS.value
- storage_meta = {
- "f_storage_engine": storage_engine,
- "f_type": dependence_type,
- "f_version": provider.version,
- "f_upload_status": False
- }
- DependenceRegistry.save_dependencies_storage_meta(storage_meta)
- raise e
- return _wrapper
- class DependenceUpload(BaseWorker):
- def _run(self):
- provider = ComponentProvider(**self.args.config.get("provider"))
- dependence_type = self.args.dependence_type
- self.upload_dependencies_to_hadoop(provider=provider, dependence_type=dependence_type)
- @classmethod
- @upload_except_exit
- def upload_dependencies_to_hadoop(cls, provider, dependence_type, storage_engine=FateDependenceStorageEngine.HDFS.value):
- LOGGER.info(f'upload {dependence_type} dependencies to hadoop')
- LOGGER.info(f'dependencies loading ...')
- if dependence_type == FateDependenceName.Python_Env.value:
- # todo: version python env
- target_file = os.path.join(FATE_VERSION_DEPENDENCIES_PATH, provider.version, "python_env.tar.gz")
- venv_pack_path = os.path.join(os.getenv("VIRTUAL_ENV"), "bin/venv-pack")
- subprocess.run([venv_pack_path, "-o", target_file])
- source_path = os.path.dirname(os.path.dirname(os.getenv("VIRTUAL_ENV")))
- cls.rewrite_pyvenv_cfg(os.path.join(os.getenv("VIRTUAL_ENV"), "pyvenv.cfg"), "python_env")
- dependencies_conf = {"executor_python": f"./{dependence_type}/bin/python",
- "driver_python": f"{os.path.join(os.getenv('VIRTUAL_ENV'), 'bin', 'python')}"}
- else:
- fate_code_dependencies = {
- "fate_flow": get_fate_flow_python_directory("fate_flow"),
- "fate_arch": file_utils.get_fate_python_directory("fate_arch"),
- "conf": file_utils.get_project_base_directory("conf")
- }
- fate_flow_snapshot_time = DependenceRegistry.get_modify_time(fate_code_dependencies["fate_flow"])
- fate_code_base_dir = os.path.join(FATE_VERSION_DEPENDENCIES_PATH, provider.version, "fate_code", "fate")
- python_base_dir = os.path.join(fate_code_base_dir, "python")
- if os.path.exists(os.path.dirname(python_base_dir)):
- shutil.rmtree(os.path.dirname(python_base_dir))
- for key, path in fate_code_dependencies.items():
- cls.copy_dir(path, os.path.join(python_base_dir, key))
- if key == "conf":
- cls.move_dir(os.path.join(python_base_dir, key), os.path.dirname(fate_code_base_dir))
- if provider.name == ComponentProviderName.FATE.value:
- source_path = provider.path
- else:
- source_path = ComponentProviderInfo.get_or_none(
- ComponentProviderInfo.f_version == provider.version,
- ComponentProviderInfo.f_provider_name == ComponentProviderName.FATE.value
- ).f_path
- cls.copy_dir(source_path, os.path.join(python_base_dir, "federatedml"))
- target_file = os.path.join(FATE_VERSION_DEPENDENCIES_PATH, provider.version, "fate.zip")
- cls.zip_dir(os.path.dirname(fate_code_base_dir), target_file)
- dependencies_conf = {"executor_env_pythonpath": f"./{dependence_type}/fate/python:$PYTHONPATH"}
- LOGGER.info(f'dependencies loading success')
- LOGGER.info(f'start upload')
- snapshot_time = DependenceRegistry.get_modify_time(source_path)
- hdfs_address = ServerRegistry.FATE_ON_SPARK.get("hdfs", {}).get("name_node")
- LOGGER.info(f'hdfs address: {hdfs_address}')
- storage_dir = f"/fate_dependence/{provider.version}"
- os.system(f" {os.getenv('HADOOP_HOME')}/bin/hdfs dfs -mkdir -p {hdfs_address}{storage_dir}")
- status = os.system(f"{os.getenv('HADOOP_HOME')}/bin/hdfs dfs -put -f {target_file} {hdfs_address}{storage_dir}")
- LOGGER.info(f'upload end, status is {status}')
- if status == 0:
- storage_path = os.path.join(storage_dir, os.path.basename(target_file))
- storage_meta = {
- "f_storage_engine": storage_engine,
- "f_type": dependence_type,
- "f_version": provider.version,
- "f_storage_path": storage_path,
- "f_snapshot_time": snapshot_time,
- "f_fate_flow_snapshot_time": fate_flow_snapshot_time if dependence_type == FateDependenceName.Fate_Source_Code.value else None,
- "f_dependencies_conf": {"archives": "#".join([storage_path, dependence_type])},
- "f_upload_status": False,
- "f_pid": 0
- }
- storage_meta["f_dependencies_conf"].update(dependencies_conf)
- DependenceRegistry.save_dependencies_storage_meta(storage_meta)
- else:
- raise Exception(f"{os.getenv('HADOOP_HOME')}/bin/hdfs dfs -put {target_file} {storage_dir} failed status: {status}")
- return storage_meta
- @classmethod
- def zip_dir(cls, input_dir_path, output_zip_full_name, dir_list=None):
- with zipfile.ZipFile(output_zip_full_name, "w", zipfile.ZIP_DEFLATED) as zip_object:
- if not dir_list:
- cls.zip_write(zip_object, input_dir_path, input_dir_path)
- else:
- for dir_name in dir_list:
- dir_path = os.path.join(input_dir_path, dir_name)
- cls.zip_write(zip_object, dir_path, input_dir_path)
- @classmethod
- def zip_write(cls, zip_object, dir_path, input_dir_path):
- for path, dirnames, filenames in os.walk(dir_path):
- fpath = path.replace(input_dir_path, '')
- for filename in filenames:
- if os.path.exists(os.path.join(path, filename)):
- zip_object.write(os.path.join(path, filename), os.path.join(fpath, filename))
- @staticmethod
- def copy_dir(source_path, target_path):
- if os.path.exists(target_path):
- shutil.rmtree(target_path)
- shutil.copytree(source_path, target_path)
- @staticmethod
- def move_dir(source_path, target_path):
- shutil.move(source_path, target_path)
- @classmethod
- def rewrite_pyvenv_cfg(cls, file, dir_name):
- import re
- bak_file = file + '.bak'
- shutil.copyfile(file, bak_file)
- with open(file, "w") as fw:
- with open(bak_file, 'r') as fr:
- lines = fr.readlines()
- match_str = None
- for line in lines:
- change_line = re.findall(".*=(.*)miniconda.*", line)
- if change_line:
- if not match_str:
- match_str = change_line[0]
- line = re.sub(match_str, f" ./{dir_name}/", line)
- fw.write(line)
- if __name__ == '__main__':
- DependenceUpload().run()
|