iv_calculator.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import functools
  16. import math
  17. import operator
  18. import numpy as np
  19. from federatedml.feature.binning.base_binning import BaseBinning
  20. from federatedml.feature.binning.bin_result import BinColResults, MultiClassBinResult
  21. from federatedml.statistic import data_overview
  22. from federatedml.feature.sparse_vector import SparseVector
  23. from federatedml.cipher_compressor.compressor import PackingCipherTensor
  24. from federatedml.util import LOGGER
  25. class IvCalculator(object):
  26. def __init__(self, adjustment_factor, role, party_id):
  27. self.adjustment_factor = adjustment_factor
  28. self.role = role
  29. self.party_id = party_id
  30. def cal_local_iv(self, data_instances, split_points,
  31. labels=None, label_counts=None, bin_cols_map=None,
  32. label_table=None):
  33. """
  34. data_bin_table : Table.
  35. Each element represent for the corresponding bin number this feature belongs to.
  36. e.g. it could be:
  37. [{'x1': 1, 'x2': 5, 'x3': 2}
  38. ...
  39. ]
  40. Returns:
  41. MultiClassBinResult object
  42. """
  43. header = data_instances.schema.get("header")
  44. if bin_cols_map is None:
  45. bin_cols_map = {name: idx for idx, name in enumerate(header)}
  46. bin_indexes = [idx for idx, _ in enumerate(header)]
  47. else:
  48. bin_indexes = []
  49. for h in header:
  50. if h in bin_cols_map:
  51. bin_indexes.append(bin_cols_map[h])
  52. if label_counts is None:
  53. label_counts = data_overview.get_label_count(data_instances)
  54. labels = sorted(label_counts.keys())
  55. labels.reverse()
  56. label_counts = [label_counts[k] for k in labels]
  57. data_bin_table = BaseBinning.get_data_bin(data_instances, split_points, bin_cols_map)
  58. sparse_bin_points = BaseBinning.get_sparse_bin(bin_indexes, split_points, header)
  59. sparse_bin_points = {header[k]: v for k, v in sparse_bin_points.items()}
  60. if label_table is None:
  61. label_table = self.convert_label(data_instances, labels)
  62. result_counts = self.cal_bin_label(data_bin_table, sparse_bin_points, label_table, label_counts)
  63. multi_bin_res = self.cal_iv_from_counts(result_counts, labels,
  64. role=self.role,
  65. party_id=self.party_id)
  66. for col_name, sp in split_points.items():
  67. multi_bin_res.put_col_split_points(col_name, sp)
  68. return multi_bin_res
  69. def cal_iv_from_counts(self, result_counts, labels, role, party_id):
  70. result = MultiClassBinResult(labels)
  71. result.set_role_party(role, party_id)
  72. if len(labels) == 2:
  73. col_result_obj_dict = self.cal_single_label_iv_woe(result_counts,
  74. self.adjustment_factor)
  75. for col_name, bin_col_result in col_result_obj_dict.items():
  76. result.put_col_results(col_name=col_name, col_results=bin_col_result)
  77. else:
  78. for label_idx, y in enumerate(labels):
  79. this_result_counts = self.mask_label(result_counts, label_idx)
  80. col_result_obj_dict = self.cal_single_label_iv_woe(this_result_counts,
  81. self.adjustment_factor)
  82. for col_name, bin_col_result in col_result_obj_dict.items():
  83. result.put_col_results(col_name=col_name, col_results=bin_col_result, label_idx=label_idx)
  84. return result
  85. @staticmethod
  86. def mask_label(result_counts, label_idx):
  87. def _mask(counts):
  88. res = []
  89. for c in counts:
  90. res.append(np.array([c[label_idx], np.sum(c) - c[label_idx]]))
  91. return res
  92. return result_counts.mapValues(_mask)
  93. def cal_bin_label(self, data_bin_table, sparse_bin_points, label_table, label_counts):
  94. """
  95. data_bin_table : Table.
  96. Each element represent for the corresponding bin number this feature belongs to.
  97. e.g. it could be:
  98. [{'x1': 1, 'x2': 5, 'x3': 2}
  99. ...
  100. ]
  101. sparse_bin_points: dict
  102. Dict of sparse bin num
  103. {"x0": 2, "x1": 3, "x2": 5 ... }
  104. label_table : Table
  105. id with labels
  106. Returns:
  107. Table with value:
  108. [[label_0_sum, label_1_sum, ...], [label_0_sum, label_1_sum, ...] ... ]
  109. """
  110. data_bin_with_label = data_bin_table.join(label_table, lambda x, y: (x, y))
  111. f = functools.partial(self.add_label_in_partition,
  112. sparse_bin_points=sparse_bin_points)
  113. result_counts = data_bin_with_label.mapReducePartitions(f, self.aggregate_partition_label)
  114. return result_counts
  115. def cal_single_label_iv_woe(self, result_counts, adjustment_factor):
  116. """
  117. Given event count information calculate iv information
  118. Parameters
  119. ----------
  120. result_counts: dict or table.
  121. It is like:
  122. {'x1': [[event_count, non_event_count], [event_count, non_event_count] ... ],
  123. 'x2': [[event_count, non_event_count], [event_count, non_event_count] ... ],
  124. ...
  125. }
  126. adjustment_factor : float
  127. The adjustment factor when calculating WOE
  128. Returns
  129. -------
  130. Dict of IVAttributes object
  131. {'x1': attr_obj,
  132. 'x2': attr_obj
  133. ...
  134. }
  135. """
  136. if isinstance(result_counts, dict):
  137. col_result_obj_dict = {}
  138. for col_name, data_event_count in result_counts.items():
  139. col_result_obj = self.woe_1d(data_event_count, adjustment_factor)
  140. col_result_obj_dict[col_name] = col_result_obj
  141. else:
  142. woe_1d = functools.partial(self.woe_1d, adjustment_factor=adjustment_factor)
  143. col_result_obj_dict = dict(result_counts.mapValues(woe_1d).collect())
  144. return col_result_obj_dict
  145. @staticmethod
  146. def fill_sparse_result(col_name, static_nums, sparse_bin_points, label_counts):
  147. """
  148. Parameters
  149. ----------
  150. col_name: str
  151. current col_name, use to obtain sparse point
  152. static_nums : list.
  153. It is like:
  154. [[label_0_sum, label_1_sum, ...], [label_0_sum, label_1_sum, ...] ... ]
  155. where the bin of sparse point located in is empty.
  156. sparse_bin_points : dict
  157. Dict of sparse bin num
  158. {"x1": 2, "x2": 3, "x3": 5 ... }
  159. label_counts: np.array
  160. eg. [100, 200, ...]
  161. Returns
  162. -------
  163. The format is same as static_nums.
  164. """
  165. curt_all = functools.reduce(lambda x, y: x + y, static_nums)
  166. sparse_bin = sparse_bin_points.get(col_name)
  167. static_nums[sparse_bin] = label_counts - curt_all
  168. return col_name, static_nums
  169. @staticmethod
  170. def combine_labels(result_counts, idx):
  171. """
  172. result_counts: Table
  173. [[label_0_sum, label_1_sum, ...], [label_0_sum, label_1_sum, ...] ... ]
  174. idx: int
  175. Returns:
  176. """
  177. @staticmethod
  178. def add_label_in_partition(data_bin_with_table, sparse_bin_points):
  179. """
  180. Add all label, so that become convenient to calculate woe and iv
  181. Parameters
  182. ----------
  183. data_bin_with_table : Table
  184. The input data, the Table is like:
  185. (id, {'x1': 1, 'x2': 5, 'x3': 2}, y)
  186. where y = [is_label_0, is_label_1, ...] which is one-hot format array of label
  187. sparse_bin_points: dict
  188. Dict of sparse bin num
  189. {0: 2, 1: 3, 2:5 ... }
  190. Returns
  191. -------
  192. ['x1', [[label_0_sum, label_1_sum, ...], [label_0_sum, label_1_sum, ...] ... ],
  193. 'x2', [[label_0_sum, label_1_sum, ...], [label_0_sum, label_1_sum, ...] ... ],
  194. ...
  195. ]
  196. """
  197. result_sum = {}
  198. for _, datas in data_bin_with_table:
  199. bin_idx_dict = datas[0]
  200. y = datas[1]
  201. for col_name, bin_idx in bin_idx_dict.items():
  202. result_sum.setdefault(col_name, [])
  203. col_sum = result_sum[col_name]
  204. while bin_idx >= len(col_sum):
  205. if isinstance(y, PackingCipherTensor):
  206. zero_y = np.zeros(y.dim)
  207. col_sum.append(PackingCipherTensor(zero_y.tolist()))
  208. else:
  209. col_sum.append(np.zeros(len(y)))
  210. # if bin_idx == sparse_bin_points[col_name]:
  211. # continue
  212. col_sum[bin_idx] = col_sum[bin_idx] + y
  213. return list(result_sum.items())
  214. @staticmethod
  215. def aggregate_partition_label(sum1, sum2):
  216. """
  217. Used in reduce function. Aggregate the result calculate from each partition.
  218. Parameters
  219. ----------
  220. sum1 : list.
  221. It is like:
  222. [[label_0_sum, label_1_sum, ...], [label_0_sum, label_1_sum, ...] ... ]
  223. sum2 : list
  224. Same as sum1
  225. Returns
  226. -------
  227. Merged sum. The format is same as sum1.
  228. """
  229. if sum1 is None and sum2 is None:
  230. return None
  231. if sum1 is None:
  232. return sum2
  233. if sum2 is None:
  234. return sum1
  235. for idx, label_sum2 in enumerate(sum2):
  236. if idx >= len(sum1):
  237. sum1.append(label_sum2)
  238. else:
  239. sum1[idx] = sum1[idx] + label_sum2
  240. return sum1
  241. @staticmethod
  242. def woe_1d(data_event_count, adjustment_factor):
  243. """
  244. Given event and non-event count in one column, calculate its woe value.
  245. Parameters
  246. ----------
  247. data_event_count : list
  248. [(event_sum, non-event_sum), (same sum in second_bin), (in third bin) ...]
  249. adjustment_factor : float
  250. The adjustment factor when calculating WOE
  251. Returns
  252. -------
  253. IVAttributes : object
  254. Stored information that related iv and woe value
  255. """
  256. event_total = 0
  257. non_event_total = 0
  258. for bin_res in data_event_count:
  259. if len(bin_res) != 2:
  260. raise ValueError(f"bin_res should has length of 2,"
  261. f" data_event_count: {data_event_count}, bin_res: {bin_res}")
  262. event_total += bin_res[0]
  263. non_event_total += bin_res[1]
  264. if event_total == 0:
  265. # raise ValueError("NO event label in target data")
  266. event_total = 1
  267. if non_event_total == 0:
  268. # raise ValueError("NO non-event label in target data")
  269. non_event_total = 1
  270. iv = 0
  271. event_count_array = []
  272. non_event_count_array = []
  273. event_rate_array = []
  274. non_event_rate_array = []
  275. woe_array = []
  276. iv_array = []
  277. for event_count, non_event_count in data_event_count:
  278. if event_count == 0 or non_event_count == 0:
  279. event_rate = 1.0 * (event_count + adjustment_factor) / event_total
  280. non_event_rate = 1.0 * (non_event_count + adjustment_factor) / non_event_total
  281. else:
  282. event_rate = 1.0 * event_count / event_total
  283. non_event_rate = 1.0 * non_event_count / non_event_total
  284. woe_i = math.log(event_rate / non_event_rate)
  285. event_count_array.append(int(event_count))
  286. non_event_count_array.append(int(non_event_count))
  287. event_rate_array.append(event_rate)
  288. non_event_rate_array.append(non_event_rate)
  289. woe_array.append(woe_i)
  290. iv_i = (event_rate - non_event_rate) * woe_i
  291. iv_array.append(iv_i)
  292. iv += iv_i
  293. return BinColResults(woe_array=woe_array, iv_array=iv_array, event_count_array=event_count_array,
  294. non_event_count_array=non_event_count_array,
  295. event_rate_array=event_rate_array, non_event_rate_array=non_event_rate_array, iv=iv)
  296. @staticmethod
  297. def statistic_label(data_instances):
  298. label_counts = data_overview.get_label_count(data_instances)
  299. label_elements = list(label_counts.keys())
  300. label_counts = [label_counts[k] for k in label_elements]
  301. return label_elements, label_counts
  302. @staticmethod
  303. def convert_label(data_instances, label_elements):
  304. def _convert(instance):
  305. res_labels = np.zeros(len(label_elements))
  306. res_labels[label_elements.index(instance.label)] = 1
  307. return res_labels
  308. label_table = data_instances.mapValues(_convert)
  309. return label_table
  310. @staticmethod
  311. def woe_transformer(data_instances, bin_inner_param, multi_class_bin_res: MultiClassBinResult,
  312. abnormal_list=None):
  313. if abnormal_list is None:
  314. abnormal_list = []
  315. if len(multi_class_bin_res.bin_results) > 1:
  316. raise ValueError(f"WOE transform of multi-class-labeled data not supported. Please check!")
  317. bin_res = multi_class_bin_res.bin_results[0]
  318. transform_cols_idx = bin_inner_param.transform_bin_indexes
  319. split_points_dict = bin_res.all_split_points
  320. is_sparse = data_overview.is_sparse_data(data_instances)
  321. def convert(instances):
  322. if is_sparse:
  323. all_data = instances.features.get_all_data()
  324. indice = []
  325. sparse_value = []
  326. data_shape = instances.features.get_shape()
  327. for col_idx, col_value in all_data:
  328. if col_idx in transform_cols_idx:
  329. if col_value in abnormal_list:
  330. indice.append(col_idx)
  331. sparse_value.append(col_value)
  332. continue
  333. # Maybe it is because missing value add in sparse value, but
  334. col_name = bin_inner_param.header[col_idx]
  335. split_points = split_points_dict[col_name]
  336. bin_num = BaseBinning.get_bin_num(col_value, split_points)
  337. indice.append(col_idx)
  338. col_results = bin_res.all_cols_results.get(col_name)
  339. woe_value = col_results.woe_array[bin_num]
  340. sparse_value.append(woe_value)
  341. else:
  342. indice.append(col_idx)
  343. sparse_value.append(col_value)
  344. sparse_vector = SparseVector(indice, sparse_value, data_shape)
  345. instances.features = sparse_vector
  346. else:
  347. features = instances.features
  348. assert isinstance(features, np.ndarray)
  349. transform_cols_idx_set = set(transform_cols_idx)
  350. for col_idx, col_value in enumerate(features):
  351. if col_idx in transform_cols_idx_set:
  352. if col_value in abnormal_list:
  353. features[col_idx] = col_value
  354. continue
  355. col_name = bin_inner_param.header[col_idx]
  356. split_points = split_points_dict[col_name]
  357. bin_num = BaseBinning.get_bin_num(col_value, split_points)
  358. col_results = bin_res.all_cols_results.get(col_name)
  359. if np.isnan(col_value) and len(col_results.woe_array) == bin_num:
  360. raise ValueError("Missing value found in transform data, "
  361. "but the training data does not have missing value, "
  362. "this is not supported.")
  363. woe_value = col_results.woe_array[bin_num]
  364. features[col_idx] = woe_value
  365. instances.features = features
  366. return instances
  367. return data_instances.mapValues(convert)
  368. @staticmethod
  369. def check_containing_missing_value(data_instances):
  370. is_sparse = data_overview.is_sparse_data(data_instances)
  371. def _sparse_check(instance):
  372. result = set()
  373. sparse_data = instance.features.get_all_data()
  374. for col_idx, col_value in sparse_data:
  375. if np.isnan(col_value):
  376. result.add(col_idx)
  377. return result
  378. if is_sparse:
  379. has_missing_value = data_instances.mapValues(_sparse_check).reduce(
  380. lambda a, b: a.union(b)
  381. )
  382. else:
  383. has_missing_value = data_instances.mapValues(lambda x: x.features).reduce(operator.add)
  384. has_missing_value = {idx for idx, value in enumerate(has_missing_value) if np.isnan(value)}
  385. return has_missing_value