pipelined_model_test.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. import unittest
  2. from unittest.mock import patch
  3. import os
  4. import io
  5. import shutil
  6. import hashlib
  7. import concurrent.futures
  8. from pathlib import Path
  9. from copy import deepcopy
  10. from zipfile import ZipFile
  11. from ruamel import yaml
  12. from fate_flow.pipelined_model.pipelined_model import PipelinedModel
  13. from fate_flow.settings import TEMP_DIRECTORY
  14. with open(Path(__file__).parent.parent / 'misc' / 'define_meta.yaml', encoding='utf8') as _f:
  15. data_define_meta = yaml.safe_load(_f)
  16. args_update_component_meta = [
  17. 'dataio_0',
  18. 'DataIO',
  19. 'dataio',
  20. {
  21. 'DataIOMeta': 'DataIOMeta',
  22. 'DataIOParam': 'DataIOParam',
  23. },
  24. ]
  25. class TestPipelinedModel(unittest.TestCase):
  26. def setUp(self):
  27. shutil.rmtree(TEMP_DIRECTORY, True)
  28. self.pipelined_model = PipelinedModel('foobar', 'v1')
  29. shutil.rmtree(self.pipelined_model.model_path, True)
  30. self.pipelined_model.create_pipelined_model()
  31. with open(self.pipelined_model.define_meta_path, 'w', encoding='utf8') as f:
  32. yaml.dump(data_define_meta, f)
  33. def tearDown(self):
  34. shutil.rmtree(TEMP_DIRECTORY, True)
  35. shutil.rmtree(self.pipelined_model.model_path, True)
  36. def test_write_read_file_same_time(self):
  37. fw = open(self.pipelined_model.define_meta_path, 'r+', encoding='utf8')
  38. self.assertEqual(yaml.safe_load(fw), data_define_meta)
  39. fw.seek(0)
  40. fw.write('foobar')
  41. with open(self.pipelined_model.define_meta_path, encoding='utf8') as fr:
  42. self.assertEqual(yaml.safe_load(fr), data_define_meta)
  43. fw.truncate()
  44. with open(self.pipelined_model.define_meta_path, encoding='utf8') as fr:
  45. self.assertEqual(fr.read(), 'foobar')
  46. fw.seek(0)
  47. fw.write('abc')
  48. fw.close()
  49. with open(self.pipelined_model.define_meta_path, encoding='utf8') as fr:
  50. self.assertEqual(fr.read(), 'abcbar')
  51. def test_update_component_meta_with_changes(self):
  52. with patch('ruamel.yaml.dump', side_effect=yaml.dump) as yaml_dump:
  53. self.pipelined_model.update_component_meta(
  54. 'dataio_0', 'DataIO_v0', 'dataio', {
  55. 'DataIOMeta': 'DataIOMeta_v0',
  56. 'DataIOParam': 'DataIOParam_v0',
  57. }
  58. )
  59. yaml_dump.assert_called_once()
  60. with open(self.pipelined_model.define_meta_path, encoding='utf8') as tmp:
  61. define_index = yaml.safe_load(tmp)
  62. _data = deepcopy(data_define_meta)
  63. _data['component_define']['dataio_0']['module_name'] = 'DataIO_v0'
  64. _data['model_proto']['dataio_0']['dataio'] = {
  65. 'DataIOMeta': 'DataIOMeta_v0',
  66. 'DataIOParam': 'DataIOParam_v0',
  67. }
  68. self.assertEqual(define_index, _data)
  69. def test_update_component_meta_without_changes(self):
  70. with open(self.pipelined_model.define_meta_path, 'w', encoding='utf8') as f:
  71. yaml.dump(data_define_meta, f, Dumper=yaml.RoundTripDumper)
  72. with patch('ruamel.yaml.dump', side_effect=yaml.dump) as yaml_dump:
  73. self.pipelined_model.update_component_meta(*args_update_component_meta)
  74. yaml_dump.assert_not_called()
  75. with open(self.pipelined_model.define_meta_path, encoding='utf8') as tmp:
  76. define_index = yaml.safe_load(tmp)
  77. self.assertEqual(define_index, data_define_meta)
  78. def test_update_component_meta_multi_thread(self):
  79. with patch('ruamel.yaml.safe_load', side_effect=yaml.safe_load) as yaml_load, \
  80. patch('ruamel.yaml.dump', side_effect=yaml.dump) as yaml_dump, \
  81. concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
  82. for _ in range(100):
  83. executor.submit(self.pipelined_model.update_component_meta, *args_update_component_meta)
  84. self.assertEqual(yaml_load.call_count, 100)
  85. self.assertEqual(yaml_dump.call_count, 0)
  86. with open(self.pipelined_model.define_meta_path, encoding='utf8') as tmp:
  87. define_index = yaml.safe_load(tmp)
  88. self.assertEqual(define_index, data_define_meta)
  89. def test_update_component_meta_empty_file(self):
  90. open(self.pipelined_model.define_meta_path, 'w').close()
  91. with self.assertRaisesRegex(ValueError, 'Invalid meta file'):
  92. self.pipelined_model.update_component_meta(*args_update_component_meta)
  93. def test_packaging_model(self):
  94. archive_file_path = self.pipelined_model.packaging_model()
  95. self.assertEqual(archive_file_path, self.pipelined_model.archive_model_file_path)
  96. self.assertTrue(Path(archive_file_path).is_file())
  97. self.assertTrue(Path(archive_file_path + '.sha1').is_file())
  98. with ZipFile(archive_file_path) as z:
  99. with io.TextIOWrapper(z.open('define/define_meta.yaml'), encoding='utf8') as f:
  100. define_index = yaml.safe_load(f)
  101. self.assertEqual(define_index, data_define_meta)
  102. with open(archive_file_path, 'rb') as f, open(archive_file_path + '.sha1', encoding='utf8') as g:
  103. sha1 = hashlib.sha1(f.read()).hexdigest()
  104. sha1_orig = g.read().strip()
  105. self.assertEqual(sha1, sha1_orig)
  106. def test_packaging_model_not_exists(self):
  107. shutil.rmtree(self.pipelined_model.model_path, True)
  108. with self.assertRaisesRegex(FileNotFoundError, 'Can not found foobar v1 model local cache'):
  109. self.pipelined_model.packaging_model()
  110. def test_unpack_model(self):
  111. archive_file_path = self.pipelined_model.packaging_model()
  112. self.assertTrue(Path(archive_file_path + '.sha1').is_file())
  113. shutil.rmtree(self.pipelined_model.model_path, True)
  114. self.assertFalse(Path(self.pipelined_model.model_path).exists())
  115. self.pipelined_model.unpack_model(archive_file_path)
  116. with open(self.pipelined_model.define_meta_path, encoding='utf8') as tmp:
  117. define_index = yaml.safe_load(tmp)
  118. self.assertEqual(define_index, data_define_meta)
  119. def test_unpack_model_local_cache_exists(self):
  120. archive_file_path = self.pipelined_model.packaging_model()
  121. with self.assertRaisesRegex(FileExistsError, 'Model foobar v1 local cache already existed'):
  122. self.pipelined_model.unpack_model(archive_file_path)
  123. def test_unpack_model_no_hash_file(self):
  124. archive_file_path = self.pipelined_model.packaging_model()
  125. Path(archive_file_path + '.sha1').unlink()
  126. self.assertFalse(Path(archive_file_path + '.sha1').exists())
  127. shutil.rmtree(self.pipelined_model.model_path, True)
  128. self.assertFalse(os.path.exists(self.pipelined_model.model_path))
  129. self.pipelined_model.unpack_model(archive_file_path)
  130. with open(self.pipelined_model.define_meta_path, encoding='utf8') as tmp:
  131. define_index = yaml.safe_load(tmp)
  132. self.assertEqual(define_index, data_define_meta)
  133. def test_unpack_model_hash_not_match(self):
  134. archive_file_path = self.pipelined_model.packaging_model()
  135. self.assertTrue(Path(archive_file_path + '.sha1').is_file())
  136. with open(archive_file_path + '.sha1', 'w', encoding='utf8') as f:
  137. f.write('abc123')
  138. shutil.rmtree(self.pipelined_model.model_path, True)
  139. self.assertFalse(Path(self.pipelined_model.model_path).exists())
  140. with self.assertRaisesRegex(ValueError, 'Hash not match.'):
  141. self.pipelined_model.unpack_model(archive_file_path)
  142. if __name__ == '__main__':
  143. unittest.main()