mini_batch_test.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import time
  17. import unittest
  18. import numpy as np
  19. from fate_arch.session import computing_session as session
  20. from federatedml.feature.instance import Instance
  21. from federatedml.model_selection import MiniBatch
  22. from federatedml.model_selection import indices
  23. session.init("123")
  24. class TestMiniBatch(unittest.TestCase):
  25. def prepare_data(self, data_num, feature_num):
  26. final_result = []
  27. for i in range(data_num):
  28. tmp = i * np.ones(feature_num)
  29. inst = Instance(inst_id=i, features=tmp, label=0)
  30. tmp = (i, inst)
  31. final_result.append(tmp)
  32. table = session.parallelize(final_result,
  33. include_key=True,
  34. partition=3)
  35. return table
  36. def test_mini_batch_data_generator(self, data_num=100, batch_size=320):
  37. t0 = time.time()
  38. feature_num = 20
  39. expect_batches = data_num // batch_size
  40. # print("expect_batches: {}".format(expect_batches))
  41. data_instances = self.prepare_data(data_num=data_num, feature_num=feature_num)
  42. # print("Prepare data time: {}".format(time.time() - t0))
  43. mini_batch_obj = MiniBatch(data_inst=data_instances, batch_size=batch_size)
  44. batch_data_generator = mini_batch_obj.mini_batch_data_generator()
  45. batch_id = 0
  46. pre_time = time.time() - t0
  47. # print("Prepare mini batch time: {}".format(pre_time))
  48. total_num = 0
  49. for batch_data in batch_data_generator:
  50. batch_num = batch_data.count()
  51. if batch_id < expect_batches - 1:
  52. # print("In mini batch test, batch_num: {}, batch_size:{}".format(
  53. # batch_num, batch_size
  54. # ))
  55. self.assertEqual(batch_num, batch_size)
  56. batch_id += 1
  57. total_num += batch_num
  58. # curt_time = time.time()
  59. # print("One batch time: {}".format(curt_time - pre_time))
  60. # pre_time = curt_time
  61. self.assertEqual(total_num, data_num)
  62. def test_collect_index(self):
  63. data_num = 100
  64. feature_num = 20
  65. data_instances = self.prepare_data(data_num=data_num, feature_num=feature_num)
  66. # res = data_instances.mapValues(lambda x: x)
  67. data_sids_iter, data_size = indices.collect_index(data_instances)
  68. self.assertEqual(data_num, data_size)
  69. real_index_num = 0
  70. for sid, _ in data_sids_iter:
  71. real_index_num += 1
  72. self.assertEqual(data_num, real_index_num)
  73. def test_data_features(self):
  74. data_num = 100
  75. feature_num = 20
  76. data_instances = self.prepare_data(data_num=data_num, feature_num=feature_num)
  77. local_data = data_instances.collect()
  78. idx, data = local_data.__next__()
  79. features = data.features
  80. self.assertEqual(len(features), feature_num)
  81. def test_different_datasize_batch(self):
  82. data_nums = [10, 100]
  83. batch_size = [1, 2, 10, 32]
  84. for d_n in data_nums:
  85. for b_s in batch_size:
  86. # print("data_nums: {}, batch_size: {}".format(d_n, b_s))
  87. self.test_mini_batch_data_generator(data_num=d_n, batch_size=b_s)
  88. if __name__ == '__main__':
  89. unittest.main()