data_access_test.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import os
  2. import time
  3. import unittest
  4. import requests
  5. from fate_flow.entity.run_status import JobStatus
  6. from fate_flow.utils.base_utils import get_fate_flow_directory
  7. from fate_flow.settings import API_VERSION, HOST, HTTP_PORT
  8. class TestDataAccess(unittest.TestCase):
  9. def setUp(self):
  10. self.data_dir = os.path.join(get_fate_flow_directory(), "examples", "data")
  11. self.upload_guest_config = {"file": os.path.join(self.data_dir, "breast_hetero_guest.csv"), "head": 1,
  12. "partition": 10, "namespace": "experiment",
  13. "table_name": "breast_hetero_guest", "use_local_data": 0, 'drop': 1, 'backend': 0, "id_delimiter": ',', }
  14. self.upload_host_config = {"file": os.path.join(self.data_dir, "breast_hetero_host.csv"), "head": 1,
  15. "partition": 10, "namespace": "experiment",
  16. "table_name": "breast_hetero_host", "use_local_data": 0, 'drop': 1, 'backend': 0, "id_delimiter": ',', }
  17. self.download_config = {"output_path": os.path.join(get_fate_flow_directory(),
  18. "fate_flow/fate_flow_unittest_breast_b.csv"),
  19. "namespace": "experiment",
  20. "table_name": "breast_hetero_guest"}
  21. self.server_url = "http://{}:{}/{}".format(HOST, HTTP_PORT, API_VERSION)
  22. def test_upload_guest(self):
  23. response = requests.post("/".join([self.server_url, 'data', 'upload']), json=self.upload_guest_config)
  24. self.assertTrue(response.status_code in [200, 201])
  25. self.assertTrue(int(response.json()['retcode']) == 0)
  26. job_id = response.json()['jobId']
  27. for i in range(60):
  28. response = requests.post("/".join([self.server_url, 'job', 'query']), json={'job_id': job_id})
  29. self.assertTrue(int(response.json()['retcode']) == 0)
  30. if response.json()['data'][0]['f_status'] == JobStatus.SUCCESS:
  31. break
  32. time.sleep(1)
  33. def test_upload_host(self):
  34. response = requests.post("/".join([self.server_url, 'data', 'upload']), json=self.upload_host_config)
  35. self.assertTrue(response.status_code in [200, 201])
  36. self.assertTrue(int(response.json()['retcode']) == 0)
  37. job_id = response.json()['jobId']
  38. for i in range(60):
  39. response = requests.post("/".join([self.server_url, 'job', 'query']), json={'job_id': job_id})
  40. self.assertTrue(int(response.json()['retcode']) == 0)
  41. if response.json()['data'][0]['f_status'] == JobStatus.SUCCESS:
  42. break
  43. time.sleep(1)
  44. def test_upload_history(self):
  45. response = requests.post("/".join([self.server_url, 'data', 'upload/history']), json={'limit': 2})
  46. self.assertTrue(response.status_code in [200, 201])
  47. self.assertTrue(int(response.json()['retcode']) == 0)
  48. def test_download(self):
  49. response = requests.post("/".join([self.server_url, 'data', 'download']), json=self.download_config)
  50. self.assertTrue(response.status_code in [200, 201])
  51. self.assertTrue(int(response.json()['retcode']) == 0)
  52. if __name__ == '__main__':
  53. unittest.main()