dependence_upload.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import functools
  17. import os
  18. import shutil
  19. import zipfile
  20. import subprocess
  21. from fate_arch.common import file_utils
  22. from fate_flow.utils.log_utils import getLogger
  23. from fate_flow.db.db_models import ComponentProviderInfo
  24. from fate_flow.db.dependence_registry import DependenceRegistry
  25. from fate_flow.db.service_registry import ServerRegistry
  26. from fate_flow.entity import ComponentProvider
  27. from fate_flow.entity.types import FateDependenceName, ComponentProviderName, FateDependenceStorageEngine
  28. from fate_flow.settings import FATE_VERSION_DEPENDENCIES_PATH
  29. from fate_flow.worker.base_worker import BaseWorker
  30. from fate_flow.utils.base_utils import get_fate_flow_python_directory
  31. LOGGER = getLogger()
  32. def upload_except_exit(func):
  33. @functools.wraps(func)
  34. def _wrapper(*args, **kwargs):
  35. try:
  36. return func(*args, **kwargs)
  37. except Exception as e:
  38. provider = kwargs.get("provider")
  39. dependence_type = kwargs.get("dependence_type")
  40. storage_engine = FateDependenceStorageEngine.HDFS.value
  41. storage_meta = {
  42. "f_storage_engine": storage_engine,
  43. "f_type": dependence_type,
  44. "f_version": provider.version,
  45. "f_upload_status": False
  46. }
  47. DependenceRegistry.save_dependencies_storage_meta(storage_meta)
  48. raise e
  49. return _wrapper
  50. class DependenceUpload(BaseWorker):
  51. def _run(self):
  52. provider = ComponentProvider(**self.args.config.get("provider"))
  53. dependence_type = self.args.dependence_type
  54. self.upload_dependencies_to_hadoop(provider=provider, dependence_type=dependence_type)
  55. @classmethod
  56. @upload_except_exit
  57. def upload_dependencies_to_hadoop(cls, provider, dependence_type, storage_engine=FateDependenceStorageEngine.HDFS.value):
  58. LOGGER.info(f'upload {dependence_type} dependencies to hadoop')
  59. LOGGER.info(f'dependencies loading ...')
  60. if dependence_type == FateDependenceName.Python_Env.value:
  61. # todo: version python env
  62. target_file = os.path.join(FATE_VERSION_DEPENDENCIES_PATH, provider.version, "python_env.tar.gz")
  63. venv_pack_path = os.path.join(os.getenv("VIRTUAL_ENV"), "bin/venv-pack")
  64. subprocess.run([venv_pack_path, "-o", target_file])
  65. source_path = os.path.dirname(os.path.dirname(os.getenv("VIRTUAL_ENV")))
  66. cls.rewrite_pyvenv_cfg(os.path.join(os.getenv("VIRTUAL_ENV"), "pyvenv.cfg"), "python_env")
  67. dependencies_conf = {"executor_python": f"./{dependence_type}/bin/python",
  68. "driver_python": f"{os.path.join(os.getenv('VIRTUAL_ENV'), 'bin', 'python')}"}
  69. else:
  70. fate_code_dependencies = {
  71. "fate_flow": get_fate_flow_python_directory("fate_flow"),
  72. "fate_arch": file_utils.get_fate_python_directory("fate_arch"),
  73. "conf": file_utils.get_project_base_directory("conf")
  74. }
  75. fate_flow_snapshot_time = DependenceRegistry.get_modify_time(fate_code_dependencies["fate_flow"])
  76. fate_code_base_dir = os.path.join(FATE_VERSION_DEPENDENCIES_PATH, provider.version, "fate_code", "fate")
  77. python_base_dir = os.path.join(fate_code_base_dir, "python")
  78. if os.path.exists(os.path.dirname(python_base_dir)):
  79. shutil.rmtree(os.path.dirname(python_base_dir))
  80. for key, path in fate_code_dependencies.items():
  81. cls.copy_dir(path, os.path.join(python_base_dir, key))
  82. if key == "conf":
  83. cls.move_dir(os.path.join(python_base_dir, key), os.path.dirname(fate_code_base_dir))
  84. if provider.name == ComponentProviderName.FATE.value:
  85. source_path = provider.path
  86. else:
  87. source_path = ComponentProviderInfo.get_or_none(
  88. ComponentProviderInfo.f_version == provider.version,
  89. ComponentProviderInfo.f_provider_name == ComponentProviderName.FATE.value
  90. ).f_path
  91. cls.copy_dir(source_path, os.path.join(python_base_dir, "federatedml"))
  92. target_file = os.path.join(FATE_VERSION_DEPENDENCIES_PATH, provider.version, "fate.zip")
  93. cls.zip_dir(os.path.dirname(fate_code_base_dir), target_file)
  94. dependencies_conf = {"executor_env_pythonpath": f"./{dependence_type}/fate/python:$PYTHONPATH"}
  95. LOGGER.info(f'dependencies loading success')
  96. LOGGER.info(f'start upload')
  97. snapshot_time = DependenceRegistry.get_modify_time(source_path)
  98. hdfs_address = ServerRegistry.FATE_ON_SPARK.get("hdfs", {}).get("name_node")
  99. LOGGER.info(f'hdfs address: {hdfs_address}')
  100. storage_dir = f"/fate_dependence/{provider.version}"
  101. os.system(f" {os.getenv('HADOOP_HOME')}/bin/hdfs dfs -mkdir -p {hdfs_address}{storage_dir}")
  102. status = os.system(f"{os.getenv('HADOOP_HOME')}/bin/hdfs dfs -put -f {target_file} {hdfs_address}{storage_dir}")
  103. LOGGER.info(f'upload end, status is {status}')
  104. if status == 0:
  105. storage_path = os.path.join(storage_dir, os.path.basename(target_file))
  106. storage_meta = {
  107. "f_storage_engine": storage_engine,
  108. "f_type": dependence_type,
  109. "f_version": provider.version,
  110. "f_storage_path": storage_path,
  111. "f_snapshot_time": snapshot_time,
  112. "f_fate_flow_snapshot_time": fate_flow_snapshot_time if dependence_type == FateDependenceName.Fate_Source_Code.value else None,
  113. "f_dependencies_conf": {"archives": "#".join([storage_path, dependence_type])},
  114. "f_upload_status": False,
  115. "f_pid": 0
  116. }
  117. storage_meta["f_dependencies_conf"].update(dependencies_conf)
  118. DependenceRegistry.save_dependencies_storage_meta(storage_meta)
  119. else:
  120. raise Exception(f"{os.getenv('HADOOP_HOME')}/bin/hdfs dfs -put {target_file} {storage_dir} failed status: {status}")
  121. return storage_meta
  122. @classmethod
  123. def zip_dir(cls, input_dir_path, output_zip_full_name, dir_list=None):
  124. with zipfile.ZipFile(output_zip_full_name, "w", zipfile.ZIP_DEFLATED) as zip_object:
  125. if not dir_list:
  126. cls.zip_write(zip_object, input_dir_path, input_dir_path)
  127. else:
  128. for dir_name in dir_list:
  129. dir_path = os.path.join(input_dir_path, dir_name)
  130. cls.zip_write(zip_object, dir_path, input_dir_path)
  131. @classmethod
  132. def zip_write(cls, zip_object, dir_path, input_dir_path):
  133. for path, dirnames, filenames in os.walk(dir_path):
  134. fpath = path.replace(input_dir_path, '')
  135. for filename in filenames:
  136. if os.path.exists(os.path.join(path, filename)):
  137. zip_object.write(os.path.join(path, filename), os.path.join(fpath, filename))
  138. @staticmethod
  139. def copy_dir(source_path, target_path):
  140. if os.path.exists(target_path):
  141. shutil.rmtree(target_path)
  142. shutil.copytree(source_path, target_path)
  143. @staticmethod
  144. def move_dir(source_path, target_path):
  145. shutil.move(source_path, target_path)
  146. @classmethod
  147. def rewrite_pyvenv_cfg(cls, file, dir_name):
  148. import re
  149. bak_file = file + '.bak'
  150. shutil.copyfile(file, bak_file)
  151. with open(file, "w") as fw:
  152. with open(bak_file, 'r') as fr:
  153. lines = fr.readlines()
  154. match_str = None
  155. for line in lines:
  156. change_line = re.findall(".*=(.*)miniconda.*", line)
  157. if change_line:
  158. if not match_str:
  159. match_str = change_line[0]
  160. line = re.sub(match_str, f" ./{dir_name}/", line)
  161. fw.write(line)
  162. if __name__ == '__main__':
  163. DependenceUpload().run()