123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- import json
- import os
- import time
- import unittest
- import requests
- from fate_flow.entity.run_status import EndStatus, JobStatus
- from fate_arch.common.file_utils import load_json_conf
- from fate_flow.utils.base_utils import get_fate_flow_python_directory
- from fate_flow.settings import API_VERSION, HOST, HTTP_PORT,IS_STANDALONE
- WORK_MODE = 1 if not IS_STANDALONE else 0
- class TestTracking(unittest.TestCase):
- def setUp(self):
- self.sleep_time = 10
- self.success_job_dir = './jobs/'
- self.dsl_path = 'fate_flow/examples/test_hetero_lr_job_dsl.json'
- self.config_path = 'fate_flow/examples/test_hetero_lr_job_conf.json'
- self.test_component_name = 'hetero_feature_selection_0'
- self.server_url = "http://{}:{}/{}".format(HOST, HTTP_PORT, API_VERSION)
- self.party_info = load_json_conf(os.path.abspath(os.path.join('./jobs', 'party_info.json'))) if WORK_MODE else None
- self.guest_party_id = self.party_info['guest'] if WORK_MODE else 9999
- self.host_party_id = self.party_info['host'] if WORK_MODE else 10000
- def test_tracking(self):
- with open(os.path.join(get_fate_flow_python_directory(), self.dsl_path), 'r') as f:
- dsl_data = json.load(f)
- with open(os.path.join(get_fate_flow_python_directory(), self.config_path), 'r') as f:
- config_data = json.load(f)
- config_data[ "initiator"]["party_id"] = self.guest_party_id
- config_data["role"] = {
- "guest": [self.guest_party_id],
- "host": [self.host_party_id],
- "arbiter": [self.host_party_id]
- }
- response = requests.post("/".join([self.server_url, 'job', 'submit']),
- json={'job_dsl': dsl_data, 'job_runtime_conf': config_data})
- self.assertTrue(response.status_code in [200, 201])
- self.assertTrue(int(response.json()['retcode']) == 0)
- job_id = response.json()['jobId']
- job_info = {'f_status': 'running'}
- for i in range(60):
- response = requests.post("/".join([self.server_url, 'job', 'query']), json={'job_id': job_id, 'role': 'guest'})
- self.assertTrue(response.status_code in [200, 201])
- job_info = response.json()['data'][0]
- if EndStatus.contains(job_info['f_status']):
- break
- time.sleep(self.sleep_time)
- print('waiting job run success, the job has been running for {}s'.format((i+1)*self.sleep_time))
- self.assertTrue(job_info['f_status'] == JobStatus.SUCCESS)
- os.makedirs(self.success_job_dir, exist_ok=True)
- with open(os.path.join(self.success_job_dir, job_id), 'w') as fw:
- json.dump(job_info, fw)
- self.assertTrue(os.path.exists(os.path.join(self.success_job_dir, job_id)))
- # test_component_parameters
- test_component(self, 'component/parameters')
- # test_component_metric_all
- test_component(self, 'component/metric/all')
- # test_component_metric
- test_component(self, 'component/metrics')
- # test_component_output_model
- test_component(self, 'component/output/model')
- # test_component_output_data_download
- test_component(self, 'component/output/data')
- # test_component_output_data_download
- test_component(self, 'component/output/data/download')
- # test_job_data_view
- test_component(self, 'job/data_view')
- def test_component(self, fun):
- job_id = os.listdir(os.path.abspath(os.path.join(self.success_job_dir)))[-1]
- job_info = load_json_conf(os.path.abspath(os.path.join(self.success_job_dir, job_id)))
- data = {'job_id': job_id, 'role': job_info['f_role'], 'party_id': job_info['f_party_id'], 'component_name': self.test_component_name}
- if 'download' in fun:
- response = requests.get("/".join([self.server_url, "tracking", fun]), json=data, stream=True)
- self.assertTrue(response.status_code in [200, 201])
- else:
- response = requests.post("/".join([self.server_url, 'tracking', fun]), json=data)
- self.assertTrue(response.status_code in [200, 201])
- self.assertTrue(int(response.json()['retcode']) == 0)
- if __name__ == '__main__':
- unittest.main()
|