123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- import math
- import time
- import unittest
- import uuid
- import numpy as np
- from fate_arch.session import computing_session as session
- from federatedml.util import consts
- session.init("123")
- from federatedml.feature.instance import Instance
- from federatedml.statistic.statics import MultivariateStatisticalSummary
- class TestStatistics(unittest.TestCase):
- def setUp(self):
- self.job_id = str(uuid.uuid1())
- session.init(self.job_id)
- self.eps = 1e-5
- self.count = 1000
- self.feature_num = 100
- self._dense_table, self._dense_not_inst_table, self._original_data = None, None, None
- def _gen_table_data(self):
- if self._dense_table is not None:
- return self._dense_table, self._dense_not_inst_table, self._original_data
- headers = ['x' + str(i) for i in range(self.feature_num)]
- dense_inst = []
- dense_not_inst = []
- original_data = 100 * np.random.random((self.count, self.feature_num))
- # original_data = 100 * np.zeros((self.count, self.feature_num))
- for i in range(self.count):
- features = original_data[i, :]
- inst = Instance(features=features)
- dense_inst.append((i, inst))
- dense_not_inst.append((i, features))
- dense_table = session.parallelize(dense_inst, include_key=True, partition=16)
- dense_not_inst_table = session.parallelize(dense_not_inst, include_key=True, partition=16)
- dense_table.schema = {'header': headers}
- dense_not_inst_table.schema = {'header': headers}
- self._dense_table, self._dense_not_inst_table, self._original_data = \
- dense_table, dense_not_inst_table, original_data
- return dense_table, dense_not_inst_table, original_data
- def _gen_missing_table(self):
- headers = ['x' + str(i) for i in range(self.feature_num)]
- dense_inst = []
- dense_not_inst = []
- original_data = 100 * np.random.random((self.count, self.feature_num))
- for i in range(self.count):
- features = original_data[i, :]
- if i % 2 == 0:
- features = np.array([np.nan] * self.feature_num)
- inst = Instance(features=features)
- dense_inst.append((i, inst))
- dense_not_inst.append((i, features))
- dense_table = session.parallelize(dense_inst, include_key=True, partition=16)
- dense_not_inst_table = session.parallelize(dense_not_inst, include_key=True, partition=16)
- dense_table.schema = {'header': headers}
- dense_not_inst_table.schema = {'header': headers}
- return dense_table, dense_not_inst_table, original_data
- def test_MultivariateStatisticalSummary(self):
- dense_table, dense_not_inst_table, original_data = self._gen_table_data()
- summary_obj = MultivariateStatisticalSummary(dense_table)
- self._test_min_max(summary_obj, original_data, dense_table)
- self._test_min_max(summary_obj, original_data, dense_not_inst_table)
- def _test_min_max(self, summary_obj, original_data, data_table):
- # test max, min
- max_array = np.max(original_data, axis=0)
- min_array = np.min(original_data, axis=0)
- mean_array = np.mean(original_data, axis=0)
- var_array = np.var(original_data, axis=0)
- std_var_array = np.std(original_data, axis=0)
- t0 = time.time()
- header = data_table.schema['header']
- for idx, col_name in enumerate(header):
- self.assertEqual(summary_obj.get_max()[col_name], max_array[idx])
- self.assertEqual(summary_obj.get_min()[col_name], min_array[idx])
- self.assertTrue(self._float_equal(summary_obj.get_mean()[col_name], mean_array[idx]))
- self.assertTrue(self._float_equal(summary_obj.get_variance()[col_name], var_array[idx]))
- self.assertTrue(self._float_equal(summary_obj.get_std_variance()[col_name], std_var_array[idx]))
- print("max value etc, total time: {}".format(time.time() - t0))
- def _float_equal(self, x, y, error=1e-6):
- if math.fabs(x - y) < error:
- return True
- print(f"x: {x}, y: {y}")
- return False
- # def test_median(self):
- # error = 0
- # dense_table, dense_not_inst_table, original_data = self._gen_table_data()
- #
- # sorted_matrix = np.sort(original_data, axis=0)
- # median_array = sorted_matrix[self.count // 2, :]
- # header = dense_table.schema['header']
- # summary_obj = MultivariateStatisticalSummary(dense_table, error=error)
- # t0 = time.time()
- #
- # for idx, col_name in enumerate(header):
- # self.assertTrue(self._float_equal(summary_obj.get_median()[col_name],
- # median_array[idx]))
- # print("median interface, total time: {}".format(time.time() - t0))
- #
- # summary_obj_2 = MultivariateStatisticalSummary(dense_not_inst_table, error=error)
- # t0 = time.time()
- # for idx, col_name in enumerate(header):
- # self.assertTrue(self._float_equal(summary_obj_2.get_median()[col_name],
- # median_array[idx]))
- # print("median interface, total time: {}".format(time.time() - t0))
- #
- # def test_quantile_query(self):
- #
- # dense_table, dense_not_inst_table, original_data = self._gen_table_data()
- #
- # quantile_points = [0.25, 0.5, 0.75, 1.0]
- # quantile_array = np.quantile(original_data, quantile_points, axis=0)
- # summary_obj = MultivariateStatisticalSummary(dense_table, error=0)
- # header = dense_table.schema['header']
- #
- # t0 = time.time()
- # for q_idx, q in enumerate(quantile_points):
- # for idx, col_name in enumerate(header):
- # self.assertTrue(self._float_equal(summary_obj.get_quantile_point(q)[col_name],
- # quantile_array[q_idx][idx],
- # error=3))
- # print("quantile interface, total time: {}".format(time.time() - t0))
- #
- # def test_missing_value(self):
- # dense_table, dense_not_inst_table, original_data = self._gen_missing_table()
- # summary_obj = MultivariateStatisticalSummary(dense_table, error=0)
- # t0 = time.time()
- # missing_result = summary_obj.get_missing_ratio()
- # for col_name, missing_ratio in missing_result.items():
- # self.assertEqual(missing_ratio, 0.5, msg="missing ratio should be 0.5")
- # print("calculate missing ratio, total time: {}".format(time.time() - t0))
- def test_moment(self):
- dense_table, dense_not_inst_table, original_data = self._gen_table_data()
- summary_obj = MultivariateStatisticalSummary(dense_table, error=0, stat_order=4, bias=False)
- header = dense_table.schema['header']
- from scipy import stats
- moment_3 = stats.moment(original_data, 3, axis=0)
- moment_4 = stats.moment(original_data, 4, axis=0)
- skewness = stats.skew(original_data, axis=0, bias=False)
- kurtosis = stats.kurtosis(original_data, axis=0, bias=False)
- summary_moment_3 = summary_obj.get_statics("moment_3")
- summary_moment_4 = summary_obj.get_statics("moment_4")
- static_skewness = summary_obj.get_statics("skewness")
- static_kurtosis = summary_obj.get_statics("kurtosis")
- # print(f"moment: {summary_moment_4}, moment_2: {moment_4}")
- for idx, col_name in enumerate(header):
- self.assertTrue(self._float_equal(summary_moment_3[col_name],
- moment_3[idx]))
- self.assertTrue(self._float_equal(summary_moment_4[col_name],
- moment_4[idx]))
- self.assertTrue(self._float_equal(static_skewness[col_name],
- skewness[idx]))
- self.assertTrue(self._float_equal(static_kurtosis[col_name],
- kurtosis[idx]))
- def tearDown(self):
- session.stop()
- if __name__ == '__main__':
- unittest.main()
|