123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- import os
- import time
- import unittest
- import requests
- from fate_flow.entity.run_status import JobStatus
- from fate_flow.utils.base_utils import get_fate_flow_directory
- from fate_flow.settings import API_VERSION, HOST, HTTP_PORT
- class TestDataAccess(unittest.TestCase):
- def setUp(self):
- self.data_dir = os.path.join(get_fate_flow_directory(), "examples", "data")
- self.upload_guest_config = {"file": os.path.join(self.data_dir, "breast_hetero_guest.csv"), "head": 1,
- "partition": 10, "namespace": "experiment",
- "table_name": "breast_hetero_guest", "use_local_data": 0, 'drop': 1, 'backend': 0, "id_delimiter": ',', }
- self.upload_host_config = {"file": os.path.join(self.data_dir, "breast_hetero_host.csv"), "head": 1,
- "partition": 10, "namespace": "experiment",
- "table_name": "breast_hetero_host", "use_local_data": 0, 'drop': 1, 'backend': 0, "id_delimiter": ',', }
- self.download_config = {"output_path": os.path.join(get_fate_flow_directory(),
- "fate_flow/fate_flow_unittest_breast_b.csv"),
- "namespace": "experiment",
- "table_name": "breast_hetero_guest"}
- self.server_url = "http://{}:{}/{}".format(HOST, HTTP_PORT, API_VERSION)
- def test_upload_guest(self):
- response = requests.post("/".join([self.server_url, 'data', 'upload']), json=self.upload_guest_config)
- self.assertTrue(response.status_code in [200, 201])
- self.assertTrue(int(response.json()['retcode']) == 0)
- job_id = response.json()['jobId']
- for i in range(60):
- response = requests.post("/".join([self.server_url, 'job', 'query']), json={'job_id': job_id})
- self.assertTrue(int(response.json()['retcode']) == 0)
- if response.json()['data'][0]['f_status'] == JobStatus.SUCCESS:
- break
- time.sleep(1)
- def test_upload_host(self):
- response = requests.post("/".join([self.server_url, 'data', 'upload']), json=self.upload_host_config)
- self.assertTrue(response.status_code in [200, 201])
- self.assertTrue(int(response.json()['retcode']) == 0)
- job_id = response.json()['jobId']
- for i in range(60):
- response = requests.post("/".join([self.server_url, 'job', 'query']), json={'job_id': job_id})
- self.assertTrue(int(response.json()['retcode']) == 0)
- if response.json()['data'][0]['f_status'] == JobStatus.SUCCESS:
- break
- time.sleep(1)
- def test_upload_history(self):
- response = requests.post("/".join([self.server_url, 'data', 'upload/history']), json={'limit': 2})
- self.assertTrue(response.status_code in [200, 201])
- self.assertTrue(int(response.json()['retcode']) == 0)
- def test_download(self):
- response = requests.post("/".join([self.server_url, 'data', 'download']), json=self.download_config)
- self.assertTrue(response.status_code in [200, 201])
- self.assertTrue(int(response.json()['retcode']) == 0)
- if __name__ == '__main__':
- unittest.main()
|