123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451 |
- import copy
- import time
- import unittest
- import numpy as np
- from fate_arch.session import computing_session as session
- from sklearn.preprocessing import StandardScaler as SSL
- from federatedml.feature.feature_scale.standard_scale import StandardScale
- from federatedml.feature.instance import Instance
- from federatedml.param.scale_param import ScaleParam
- from federatedml.util.param_extract import ParamExtract
- class TestStandardScaler(unittest.TestCase):
- def setUp(self):
- self.test_data = [
- [0, 1.0, 10, 2, 3, 1],
- [1.0, 2, 9, 2, 4, 2],
- [0, 3.0, 8, 3, 3, 3],
- [1.0, 4, 7, 4, 4, 4],
- [1.0, 5, 6, 5, 5, 5],
- [1.0, 6, 5, 6, 6, -100],
- [0, 7.0, 4, 7, 7, 7],
- [0, 8, 3.0, 8, 6, 8],
- [0, 9, 2, 9.0, 9, 9],
- [0, 10, 1, 10.0, 10, 10]
- ]
- str_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
- session.init(str_time)
- self.test_instance = []
- for td in self.test_data:
- self.test_instance.append(Instance(features=np.array(td)))
- self.table_instance = self.data_to_table(self.test_instance)
- self.table_instance.schema['header'] = ["fid" + str(i) for i in range(len(self.test_data[0]))]
- self.table_instance.schema['anonymous_header'] = [
- "guest_9999_x" + str(i) for i in range(len(self.test_data[0]))]
- def print_table(self, table):
- for v in (list(table.collect())):
- print(v[1].features)
- def data_to_table(self, data, partition=10):
- data_table = session.parallelize(data, include_key=False, partition=partition)
- return data_table
- def get_table_instance_feature(self, table_instance):
- res_list = []
- for k, v in list(table_instance.collect()):
- res_list.append(list(np.around(v.features, 4)))
- return res_list
- def get_scale_param(self):
- component_param = {
- "method": "standard_scale",
- "mode": "normal",
- "scale_col_indexes": [],
- "with_mean": True,
- "with_std": True,
- }
- scale_param = ScaleParam()
- param_extracter = ParamExtract()
- param_extracter.parse_param_from_config(scale_param, component_param)
- return scale_param
- # test with (with_mean=True, with_std=True):
- def test_fit1(self):
- scale_param = self.get_scale_param()
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- mean = standard_scaler.mean
- std = standard_scaler.std
- scaler = SSL()
- scaler.fit(self.test_data)
- self.assertListEqual(self.get_table_instance_feature(fit_instance),
- np.around(scaler.transform(self.test_data), 4).tolist())
- self.assertListEqual(list(np.around(mean, 4)), list(np.around(scaler.mean_, 4)))
- self.assertListEqual(list(np.around(std, 4)), list(np.around(scaler.scale_, 4)))
- # test with (with_mean=False, with_std=True):
- def test_fit2(self):
- scale_param = self.get_scale_param()
- scale_param.with_mean = False
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- mean = standard_scaler.mean
- std = standard_scaler.std
- scaler = SSL(with_mean=False)
- scaler.fit(self.test_data)
- self.assertListEqual(self.get_table_instance_feature(fit_instance),
- np.around(scaler.transform(self.test_data), 4).tolist())
- self.assertListEqual(list(np.around(mean, 4)), [0 for _ in mean])
- self.assertListEqual(list(np.around(std, 4)), list(np.around(scaler.scale_, 4)))
- # test with (with_mean=True, with_std=False):
- def test_fit3(self):
- scale_param = self.get_scale_param()
- scale_param.with_std = False
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- mean = standard_scaler.mean
- std = standard_scaler.std
- scaler = SSL(with_std=False)
- scaler.fit(self.test_data)
- self.assertListEqual(self.get_table_instance_feature(fit_instance),
- np.around(scaler.transform(self.test_data), 4).tolist())
- self.assertListEqual(list(np.around(mean, 4)), list(np.around(scaler.mean_, 4)))
- self.assertListEqual(list(np.around(std, 4)), [1 for _ in std])
- # test with (with_mean=False, with_std=False):
- def test_fit4(self):
- scale_param = self.get_scale_param()
- scale_param.with_std = False
- scale_param.with_mean = False
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- mean = standard_scaler.mean
- std = standard_scaler.std
- scaler = SSL(with_mean=False, with_std=False)
- scaler.fit(self.test_data)
- self.assertListEqual(self.get_table_instance_feature(fit_instance),
- np.around(scaler.transform(self.test_data), 4).tolist())
- self.assertEqual(mean, [0 for _ in range(len(self.test_data[0]))])
- self.assertEqual(std, [1 for _ in range(len(self.test_data[0]))])
- # test with (area="all", scale_column_idx=[], with_mean=True, with_std=True):
- def test_fit5(self):
- scale_param = self.get_scale_param()
- scale_param.scale_column_idx = []
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- mean = standard_scaler.mean
- std = standard_scaler.std
- scaler = SSL()
- scaler.fit(self.test_data)
- self.assertListEqual(self.get_table_instance_feature(fit_instance),
- np.around(scaler.transform(self.test_data), 4).tolist())
- self.assertListEqual(list(np.around(mean, 4)), list(np.around(scaler.mean_, 4)))
- self.assertListEqual(list(np.around(std, 4)), list(np.around(scaler.scale_, 4)))
- # test with (area="col", scale_column_idx=[], with_mean=True, with_std=True):
- def test_fit6(self):
- scale_param = self.get_scale_param()
- scale_param.scale_col_indexes = []
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- mean = standard_scaler.mean
- std = standard_scaler.std
- scaler = SSL()
- scaler.fit(self.test_data)
- self.assertListEqual(self.get_table_instance_feature(fit_instance),
- np.around(self.test_data, 4).tolist())
- self.assertListEqual(list(np.around(mean, 4)), list(np.around(scaler.mean_, 4)))
- self.assertListEqual(list(np.around(std, 4)), list(np.around(scaler.scale_, 4)))
- # test with (area="all", upper=2, lower=1, with_mean=False, with_std=False):
- def test_fit7(self):
- scale_param = self.get_scale_param()
- scale_param.scale_column_idx = []
- scale_param.feat_upper = 2
- scale_param.feat_lower = 1
- scale_param.with_mean = False
- scale_param.with_std = False
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- mean = standard_scaler.mean
- std = standard_scaler.std
- column_max_value = standard_scaler.column_max_value
- column_min_value = standard_scaler.column_min_value
- for i, line in enumerate(self.test_data):
- for j, value in enumerate(line):
- if value > 2:
- self.test_data[i][j] = 2
- elif value < 1:
- self.test_data[i][j] = 1
- self.assertListEqual(self.get_table_instance_feature(fit_instance),
- np.around(self.test_data, 4).tolist())
- self.assertEqual(mean, [0 for _ in range(len(self.test_data[0]))])
- self.assertEqual(std, [1 for _ in range(len(self.test_data[0]))])
- self.assertEqual(column_max_value, [1, 2, 2, 2, 2, 2])
- self.assertEqual(column_min_value, [1, 1, 1, 2, 2, 1])
- # test with (area="all", upper=[2,2,2,2,2,2], lower=[1,1,1,1,1,1], with_mean=False, with_std=False):
- def test_fit8(self):
- scale_param = self.get_scale_param()
- scale_param.scale_column_idx = []
- scale_param.feat_upper = [2, 2, 2, 2, 2, 2]
- scale_param.feat_lower = [1, 1, 1, 1, 1, 1]
- scale_param.with_mean = False
- scale_param.with_std = False
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- mean = standard_scaler.mean
- std = standard_scaler.std
- column_max_value = standard_scaler.column_max_value
- column_min_value = standard_scaler.column_min_value
- for i, line in enumerate(self.test_data):
- for j, value in enumerate(line):
- if value > 2:
- self.test_data[i][j] = 2
- elif value < 1:
- self.test_data[i][j] = 1
- self.assertListEqual(self.get_table_instance_feature(fit_instance),
- np.around(self.test_data, 4).tolist())
- self.assertEqual(mean, [0 for _ in range(len(self.test_data[0]))])
- self.assertEqual(std, [1 for _ in range(len(self.test_data[0]))])
- self.assertEqual(column_max_value, [1, 2, 2, 2, 2, 2])
- self.assertEqual(column_min_value, [1, 1, 1, 2, 2, 1])
- # test with (area="col", upper=[2,2,2,2,2,2], lower=[1,1,1,1,1,1],
- # scale_column_idx=[1,2,4], with_mean=True, with_std=True):
- def test_fit9(self):
- scale_column_idx = [1, 2, 4]
- scale_param = self.get_scale_param()
- scale_param.feat_upper = [2, 2, 2, 2, 2, 2]
- scale_param.feat_lower = [1, 1, 1, 1, 1, 1]
- scale_param.with_mean = True
- scale_param.with_std = True
- scale_param.scale_col_indexes = scale_column_idx
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- mean = standard_scaler.mean
- std = standard_scaler.std
- column_max_value = standard_scaler.column_max_value
- column_min_value = standard_scaler.column_min_value
- raw_data = copy.deepcopy(self.test_data)
- for i, line in enumerate(self.test_data):
- for j, value in enumerate(line):
- if j in scale_column_idx:
- if value > 2:
- self.test_data[i][j] = 2
- elif value < 1:
- self.test_data[i][j] = 1
- scaler = SSL(with_mean=True, with_std=True)
- scaler.fit(self.test_data)
- transform_data = np.around(scaler.transform(self.test_data), 4).tolist()
- for i, line in enumerate(transform_data):
- for j, cols in enumerate(line):
- if j not in scale_column_idx:
- transform_data[i][j] = raw_data[i][j]
- self.assertListEqual(self.get_table_instance_feature(fit_instance),
- transform_data)
- self.assertListEqual(list(np.around(mean, 6)), list(np.around(scaler.mean_, 6)))
- self.assertListEqual(list(np.around(std, 6)), list(np.around(scaler.scale_, 6)))
- self.assertEqual(column_max_value, [1, 2, 2, 10, 2, 10])
- self.assertEqual(column_min_value, [0, 1, 1, 2, 2, -100])
- raw_data_transform = standard_scaler.transform(self.table_instance)
- self.assertListEqual(self.get_table_instance_feature(fit_instance),
- self.get_table_instance_feature(raw_data_transform))
- # test with (mode="cap", area="col", upper=0.8, lower=0.2, scale_column_idx=[1,2,4], with_mean=True, with_std=True):
- def test_fit10(self):
- scale_column_idx = [1, 2, 4]
- scale_param = self.get_scale_param()
- scale_param.scale_col_indexes = []
- scale_param.feat_upper = 0.8
- scale_param.feat_lower = 0.2
- scale_param.with_mean = True
- scale_param.with_std = True
- scale_param.mode = "cap"
- scale_param.scale_col_indexes = scale_column_idx
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- mean = standard_scaler.mean
- std = standard_scaler.std
- column_max_value = standard_scaler.column_max_value
- column_min_value = standard_scaler.column_min_value
- gt_cap_lower_list = [0, 2, 2, 2, 3, 1]
- gt_cap_upper_list = [1, 8, 8, 8, 7, 8]
- raw_data = copy.deepcopy(self.test_data)
- for i, line in enumerate(self.test_data):
- for j, value in enumerate(line):
- if j in scale_column_idx:
- if value > gt_cap_upper_list[j]:
- self.test_data[i][j] = gt_cap_upper_list[j]
- elif value < gt_cap_lower_list[j]:
- self.test_data[i][j] = gt_cap_lower_list[j]
- scaler = SSL(with_mean=True, with_std=True)
- scaler.fit(self.test_data)
- transform_data = np.around(scaler.transform(self.test_data), 4).tolist()
- for i, line in enumerate(transform_data):
- for j, cols in enumerate(line):
- if j not in scale_column_idx:
- transform_data[i][j] = raw_data[i][j]
- self.assertListEqual(self.get_table_instance_feature(fit_instance),
- transform_data)
- self.assertEqual(column_max_value, gt_cap_upper_list)
- self.assertEqual(column_min_value, gt_cap_lower_list)
- self.assertListEqual(list(np.around(mean, 6)), list(np.around(scaler.mean_, 6)))
- self.assertListEqual(list(np.around(std, 6)), list(np.around(scaler.scale_, 6)))
- raw_data_transform = standard_scaler.transform(self.table_instance)
- self.assertListEqual(self.get_table_instance_feature(fit_instance),
- self.get_table_instance_feature(raw_data_transform))
- # test with (with_mean=True, with_std=True):
- def test_transform1(self):
- scale_param = self.get_scale_param()
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- transform_data = standard_scaler.transform(self.table_instance)
- self.assertListEqual(self.get_table_instance_feature(transform_data),
- self.get_table_instance_feature(fit_instance))
- # test with (with_mean=True, with_std=False):
- def test_transform2(self):
- scale_param = self.get_scale_param()
- scale_param.with_std = False
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- transform_data = standard_scaler.transform(self.table_instance)
- self.assertListEqual(self.get_table_instance_feature(transform_data),
- self.get_table_instance_feature(fit_instance))
- # test with (with_mean=False, with_std=True):
- def test_transform3(self):
- scale_param = self.get_scale_param()
- scale_param.with_mean = False
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- transform_data = standard_scaler.transform(self.table_instance)
- self.assertListEqual(self.get_table_instance_feature(transform_data),
- self.get_table_instance_feature(fit_instance))
- # test with (with_mean=False, with_std=False):
- def test_transform4(self):
- scale_param = self.get_scale_param()
- scale_param.with_mean = False
- scale_param.with_std = False
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- transform_data = standard_scaler.transform(self.table_instance)
- self.assertListEqual(self.get_table_instance_feature(transform_data),
- self.get_table_instance_feature(fit_instance))
- # test with (area='all', scale_column_idx=[], with_mean=False, with_std=False):
- def test_transform5(self):
- scale_param = self.get_scale_param()
- scale_param.with_mean = False
- scale_param.with_std = False
- scale_param.scale_column_idx = []
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- transform_data = standard_scaler.transform(self.table_instance)
- self.assertListEqual(self.get_table_instance_feature(transform_data),
- self.get_table_instance_feature(fit_instance))
- # test with (area='col', with_mean=[], with_std=False):
- def test_transform6(self):
- scale_param = self.get_scale_param()
- scale_param.with_mean = False
- scale_param.with_std = False
- scale_param.scale_column_idx = []
- standard_scaler = StandardScale(scale_param)
- fit_instance = standard_scaler.fit(self.table_instance)
- transform_data = standard_scaler.transform(self.table_instance)
- self.assertListEqual(self.get_table_instance_feature(transform_data),
- self.get_table_instance_feature(fit_instance))
- def test_cols_select_fit_and_transform(self):
- scale_param = self.get_scale_param()
- scale_param.scale_column_idx = [1, 2, 4]
- standard_scaler = StandardScale(scale_param)
- fit_data = standard_scaler.fit(self.table_instance)
- scale_column_idx = standard_scaler.scale_column_idx
- scaler = SSL(with_mean=True, with_std=True)
- scaler.fit(self.test_data)
- transform_data = np.around(scaler.transform(self.test_data), 4).tolist()
- for i, line in enumerate(transform_data):
- for j, cols in enumerate(line):
- if j not in scale_column_idx:
- transform_data[i][j] = self.test_data[i][j]
- self.assertListEqual(self.get_table_instance_feature(fit_data),
- transform_data)
- std_scale_transform_data = standard_scaler.transform(self.table_instance)
- self.assertListEqual(self.get_table_instance_feature(std_scale_transform_data),
- transform_data)
- def test_cols_select_fit_and_transform_repeat(self):
- scale_param = self.get_scale_param()
- scale_param.scale_column_idx = [1, 1, 2, 2, 4, 5, 5]
- standard_scaler = StandardScale(scale_param)
- fit_data = standard_scaler.fit(self.table_instance)
- scale_column_idx = standard_scaler.scale_column_idx
- scaler = SSL(with_mean=True, with_std=True)
- scaler.fit(self.test_data)
- transform_data = np.around(scaler.transform(self.test_data), 4).tolist()
- for i, line in enumerate(transform_data):
- for j, cols in enumerate(line):
- if j not in scale_column_idx:
- transform_data[i][j] = self.test_data[i][j]
- self.assertListEqual(self.get_table_instance_feature(fit_data),
- transform_data)
- std_scale_transform_data = standard_scaler.transform(self.table_instance)
- self.assertListEqual(self.get_table_instance_feature(std_scale_transform_data),
- transform_data)
- def tearDown(self):
- session.stop()
- if __name__ == "__main__":
- unittest.main()
|