123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import bisect
- import functools
- import math
- import random
- import copy
- import numpy as np
- from federatedml.feature.binning.bin_inner_param import BinInnerParam
- from federatedml.feature.binning.bin_result import BinColResults, SplitPointsResult
- from federatedml.statistic.data_overview import get_header, get_anonymous_header
- from federatedml.feature.sparse_vector import SparseVector
- from federatedml.param.feature_binning_param import FeatureBinningParam
- from federatedml.statistic import data_overview
- from federatedml.util import LOGGER
- from federatedml.feature.fate_element_type import NoneType
- # from federatedml.statistic import statics
- class BaseBinning(object):
- """
- This is use for discrete data so that can transform data or use information for feature selection.
- """
- def __init__(self, params=None, abnormal_list=None, labels=None):
- self.bin_inner_param: BinInnerParam = None
- self.is_multi_class = False
- self.bin_results = SplitPointsResult()
- if params is None:
- return
- if isinstance(params, FeatureBinningParam):
- self.params = params
- else:
- self.params = None
- self.bin_num = params.bin_num
- if abnormal_list is None:
- self.abnormal_list = []
- else:
- self.abnormal_list = abnormal_list
- self.split_points = None
- @property
- def header(self):
- return self.bin_inner_param.header
- # @property
- # def split_points(self):
- # return self.bin_results.all_split_points
- def _default_setting(self, header, anonymous_header):
- if self.bin_inner_param is not None:
- return
- self.bin_inner_param = BinInnerParam()
- self.bin_inner_param.set_header(header, anonymous_header)
- if self.params.bin_indexes == -1:
- self.bin_inner_param.set_bin_all()
- else:
- self.bin_inner_param.add_bin_indexes(self.params.bin_indexes)
- self.bin_inner_param.add_bin_names(self.params.bin_names)
- self.bin_inner_param.add_category_indexes(self.params.category_indexes)
- self.bin_inner_param.add_category_names(self.params.category_names)
- if self.params.transform_param.transform_cols == -1:
- self.bin_inner_param.set_transform_all()
- else:
- self.bin_inner_param.add_transform_bin_indexes(self.params.transform_param.transform_cols)
- self.bin_inner_param.add_transform_bin_names(self.params.transform_param.transform_names)
- def fit_split_points(self, data_instances):
- """
- Get split points
- Parameters
- ----------
- data_instances : Table
- The input data
- Returns
- -------
- split_points : dict.
- Each value represent for the split points for a feature. The element in each row represent for
- the corresponding split point.
- e.g.
- split_points = {'x1': [0.1, 0.2, 0.3, 0.4 ...], # The first feature
- 'x2': [1, 2, 3, 4, ...], # The second feature
- ...] # Other features
- """
- pass
- def fit_category_features(self, data_instances):
- is_sparse = data_overview.is_sparse_data(data_instances)
- if len(self.bin_inner_param.category_indexes) > 0:
- statics_obj = data_overview.DataStatistics()
- category_col_values = statics_obj.static_all_values(data_instances,
- self.bin_inner_param.category_indexes,
- is_sparse)
- for col_name, split_points in zip(self.bin_inner_param.category_names, category_col_values):
- self.bin_results.put_col_split_points(col_name, split_points)
- self.bin_results.put_col_optimal_metric_array(col_name, None)
- def set_bin_inner_param(self, bin_inner_param):
- self.bin_inner_param = bin_inner_param
- def transform(self, data_instances, transform_type):
- # self._init_cols(data_instances)
- for col_name in self.bin_inner_param.transform_bin_names:
- if col_name not in self.bin_inner_param.col_name_maps:
- raise ValueError("Transform col_name: {} is not existed".format(col_name))
- if transform_type == 'bin_num':
- data_instances, _, _ = self.convert_feature_to_bin(data_instances)
- elif transform_type == 'woe':
- data_instances = self.convert_feature_to_woe(data_instances)
- return data_instances
- @staticmethod
- def get_data_bin(data_instances, split_points, bin_cols_map):
- """
- Apply the binning method
- Parameters
- ----------
- data_instances : Table
- The input data
- split_points : dict.
- Each value represent for the split points for a feature. The element in each row represent for
- the corresponding split point.
- e.g.
- split_points = {'x1': [0.1, 0.2, 0.3, 0.4 ...], # The first feature
- 'x2': [1, 2, 3, 4, ...], # The second feature
- ...] # Other features
- Returns
- -------
- data_bin_table : Table.
- Each element represent for the corresponding bin number this feature belongs to.
- e.g. it could be:
- [{'x1': 1, 'x2': 5, 'x3': 2}
- ...
- ]
- """
- # self._init_cols(data_instances)
- is_sparse = data_overview.is_sparse_data(data_instances)
- header = data_instances.schema.get('header')
- f = functools.partial(BaseBinning.bin_data,
- split_points=split_points,
- cols_dict=bin_cols_map,
- header=header,
- is_sparse=is_sparse)
- data_bin_dict = data_instances.mapValues(f)
- return data_bin_dict
- def convert_feature_to_woe(self, data_instances):
- is_sparse = data_overview.is_sparse_data(data_instances)
- schema = data_instances.schema
- if is_sparse:
- f = functools.partial(self._convert_sparse_data,
- bin_inner_param=self.bin_inner_param,
- bin_results=self.bin_results,
- abnormal_list=self.abnormal_list,
- convert_type='woe'
- )
- new_data = data_instances.mapValues(f)
- else:
- f = functools.partial(self._convert_dense_data,
- bin_inner_param=self.bin_inner_param,
- bin_results=self.bin_results,
- abnormal_list=self.abnormal_list,
- convert_type='woe')
- new_data = data_instances.mapValues(f)
- new_data.schema = schema
- return new_data
- def convert_feature_to_bin(self, data_instances, split_points=None):
- is_sparse = data_overview.is_sparse_data(data_instances)
- schema = data_instances.schema
- if split_points is None:
- split_points = self.bin_results.all_split_points
- else:
- for col_name, sp in split_points.items():
- self.bin_results.put_col_split_points(col_name, sp)
- if is_sparse:
- f = functools.partial(self._convert_sparse_data,
- bin_inner_param=self.bin_inner_param,
- bin_results=self.bin_results,
- abnormal_list=self.abnormal_list,
- convert_type='bin_num'
- )
- new_data = data_instances.mapValues(f)
- else:
- f = functools.partial(self._convert_dense_data,
- bin_inner_param=self.bin_inner_param,
- bin_results=self.bin_results,
- abnormal_list=self.abnormal_list,
- convert_type='bin_num')
- new_data = data_instances.mapValues(f)
- new_data.schema = schema
- header = get_header(data_instances)
- bin_sparse = self.get_sparse_bin(self.bin_inner_param.transform_bin_indexes, split_points, header)
- split_points_result = self.bin_results.get_split_points_array(self.bin_inner_param.transform_bin_names)
- return new_data, split_points_result, bin_sparse
- def _setup_bin_inner_param(self, data_instances, params):
- if self.bin_inner_param is not None:
- return
- self.bin_inner_param = BinInnerParam()
- header = get_header(data_instances)
- anonymous_header = get_anonymous_header(data_instances)
- LOGGER.debug("_setup_bin_inner_param, get header length: {}".format(len(self.header)))
- self.schema = data_instances.schema
- self.bin_inner_param.set_header(header, anonymous_header)
- if params.bin_indexes == -1:
- self.bin_inner_param.set_bin_all()
- else:
- self.bin_inner_param.add_bin_indexes(params.bin_indexes)
- self.bin_inner_param.add_bin_names(params.bin_names)
- self.bin_inner_param.add_category_indexes(params.category_indexes)
- self.bin_inner_param.add_category_names(params.category_names)
- if params.transform_param.transform_cols == -1:
- self.bin_inner_param.set_transform_all()
- else:
- self.bin_inner_param.add_transform_bin_indexes(params.transform_param.transform_cols)
- self.bin_inner_param.add_transform_bin_names(params.transform_param.transform_names)
- self.set_bin_inner_param(self.bin_inner_param)
- @staticmethod
- def _convert_sparse_data(instances, bin_inner_param: BinInnerParam, bin_results: SplitPointsResult,
- abnormal_list: list, convert_type: str = 'bin_num'):
- instances = copy.deepcopy(instances)
- all_data = instances.features.get_all_data()
- data_shape = instances.features.get_shape()
- indice = []
- sparse_value = []
- transform_cols_idx_set = bin_inner_param.transform_bin_indexes_added_set
- split_points_dict = bin_results.all_split_points
- for col_idx, col_value in all_data:
- if col_idx in transform_cols_idx_set:
- if col_value in abnormal_list:
- indice.append(col_idx)
- sparse_value.append(col_value)
- continue
- # Maybe it is because missing value add in sparse value, but
- col_name = bin_inner_param.header[col_idx]
- split_points = split_points_dict[col_name]
- bin_num = BaseBinning.get_bin_num(col_value, split_points)
- indice.append(col_idx)
- if convert_type == 'bin_num':
- sparse_value.append(bin_num)
- else:
- sparse_value.append(col_value)
- else:
- indice.append(col_idx)
- sparse_value.append(col_value)
- sparse_vector = SparseVector(indice, sparse_value, data_shape)
- instances.features = sparse_vector
- return instances
- @staticmethod
- def get_sparse_bin(transform_cols_idx, split_points_dict, header):
- """
- Get which bins the 0 located at for each column.
- Returns
- -------
- Dict of sparse bin num
- {0: 2, 1: 3, 2:5 ... }
- """
- result = {}
- for col_idx in transform_cols_idx:
- col_name = header[col_idx]
- split_points = split_points_dict[col_name]
- sparse_bin_num = BaseBinning.get_bin_num(0, split_points)
- result[col_idx] = sparse_bin_num
- return result
- @staticmethod
- def _convert_dense_data(instances, bin_inner_param: BinInnerParam, bin_results: SplitPointsResult,
- abnormal_list: list, convert_type: str = 'bin_num'):
- instances = copy.deepcopy(instances)
- features = instances.features
- transform_cols_idx_set = bin_inner_param.transform_bin_indexes_added_set
- split_points_dict = bin_results.all_split_points
- for col_idx, col_value in enumerate(features):
- if col_idx in transform_cols_idx_set:
- if col_value in abnormal_list:
- features[col_idx] = col_value
- continue
- col_name = bin_inner_param.header[col_idx]
- split_points = split_points_dict[col_name]
- bin_num = BaseBinning.get_bin_num(col_value, split_points)
- if convert_type == 'bin_num':
- features[col_idx] = bin_num
- else:
- features[col_idx] = col_value
- instances.features = features
- return instances
- @staticmethod
- def convert_bin_counts_table(result_counts, idx):
- """
- Given event count information calculate iv information
- Parameters
- ----------
- result_counts: table.
- It is like:
- ('x1': [[label_0_count, label_1_count, ...], [label_0_count, label_1_count, ...] ... ],
- 'x2': [[label_0_count, label_1_count, ...], [label_0_count, label_1_count, ...] ... ],
- ...
- )
- idx: int
- Returns
- -------
- ('x1': [[event_count, non_event_count], [event_count, non_event_count] ... ],
- 'x2': [[event_count, non_event_count], [event_count, non_event_count] ... ],
- ...
- )
- """
- def _convert(list_counts):
- res = []
- for c_array in list_counts:
- event_count = c_array[idx]
- non_event_count = np.sum(c_array) - event_count
- res.append([event_count, non_event_count])
- return res
- return result_counts.mapValues(_convert)
- @staticmethod
- def fill_sparse_result(col_name, static_nums, sparse_bin_points, label_counts):
- """
- Parameters
- ----------
- static_nums : list.
- It is like:
- [[event_count, total_num], [event_count, total_num] ... ]
- sparse_bin_points : dict
- Dict of sparse bin num
- {"x1": 2, "x2": 3, "x3": 5 ... }
- label_counts: np.array
- eg. [100, 200, ...]
- Returns
- -------
- The format is same as result_counts.
- """
- curt_all = functools.reduce(lambda x, y: x + y, static_nums)
- sparse_bin = sparse_bin_points.get(col_name)
- static_nums[sparse_bin] = label_counts - curt_all
- return col_name, static_nums
- @staticmethod
- def bin_data(instance, split_points, cols_dict, header, is_sparse):
- """
- Apply the binning method
- Parameters
- ----------
- instance : Table
- The input data
- split_points : dict.
- Each value represent for the split points for a feature. The element in each row represent for
- the corresponding split point.
- e.g.
- split_points = {'x1': [0.1, 0.2, 0.3, 0.4 ...], # The first feature
- 'x2': [1, 2, 3, 4, ...], # The second feature
- ...] # Other features
- cols_dict: dict
- Record key, value pairs where key is cols' name, and value is cols' index.
- header: list
- header of Table
- is_sparse: bool
- Specify whether it is sparse data or not
- Returns
- -------
- result_bin_dict : dict.
- Each element represent for the corresponding bin number this feature belongs to.
- e.g. it could be:
- [{1: 1, 2: 5, 3: 2}
- ...
- ] # Each number represent for the bin number it belongs to.
- """
- result_bin_nums = {}
- if is_sparse:
- sparse_data = instance.features.get_all_data()
- for col_idx, col_value in sparse_data:
- col_name = header[col_idx]
- if col_name in cols_dict:
- col_split_points = split_points[col_name]
- col_bin_num = BaseBinning.get_bin_num(col_value, col_split_points)
- result_bin_nums[col_name] = col_bin_num
- return result_bin_nums
- # For dense data
- for col_name, col_index in cols_dict.items():
- col_split_points = split_points[col_name]
- value = instance.features[col_index]
- col_bin_num = BaseBinning.get_bin_num(value, col_split_points)
- result_bin_nums[col_name] = col_bin_num
- return result_bin_nums
- @staticmethod
- def get_bin_num(value, split_points):
- if np.isnan(value):
- return len(split_points)
- sp = split_points[:-1]
- col_bin_num = bisect.bisect_left(sp, value)
- # col_bin_num = bisect.bisect_left(split_points, value)
- return col_bin_num
- @staticmethod
- def add_label_in_partition_bak(data_bin_with_table, sparse_bin_points):
- """
- Add all label, so that become convenient to calculate woe and iv
- Parameters
- ----------
- data_bin_with_table : Table
- The input data, the Table is like:
- (id, {'x1': 1, 'x2': 5, 'x3': 2}, y)
- sparse_bin_points: dict
- Dict of sparse bin num
- {0: 2, 1: 3, 2:5 ... }
- Returns
- -------
- result_sum: the result Table. It is like:
- {'x1': [[event_count, total_num], [event_count, total_num] ... ],
- 'x2': [[event_count, total_num], [event_count, total_num] ... ],
- ...
- }
- """
- result_sum = {}
- for _, datas in data_bin_with_table:
- bin_idx_dict = datas[0]
- y = datas[1]
- # y = y_combo[0]
- # inverse_y = y_combo[1]
- for col_name, bin_idx in bin_idx_dict.items():
- result_sum.setdefault(col_name, [])
- col_sum = result_sum[col_name]
- while bin_idx >= len(col_sum):
- col_sum.append([0, 0])
- if bin_idx == sparse_bin_points[col_name]:
- continue
- label_sum = col_sum[bin_idx]
- label_sum[0] = label_sum[0] + y
- label_sum[1] = label_sum[1] + 1
- col_sum[bin_idx] = label_sum
- result_sum[col_name] = col_sum
- return list(result_sum.items())
|