123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import copy
- import numpy as np
- from federatedml.feature.binning.base_binning import BaseBinning
- from federatedml.feature.binning.bin_inner_param import BinInnerParam
- from federatedml.feature.binning.bucket_binning import BucketBinning
- from federatedml.feature.binning.optimal_binning.optimal_binning import OptimalBinning
- from federatedml.feature.binning.quantile_binning import QuantileBinning
- from federatedml.feature.binning.iv_calculator import IvCalculator
- from federatedml.feature.binning.bin_result import MultiClassBinResult
- from federatedml.feature.fate_element_type import NoneType
- from federatedml.feature.sparse_vector import SparseVector
- from federatedml.model_base import ModelBase
- from federatedml.param.feature_binning_param import HeteroFeatureBinningParam as FeatureBinningParam
- from federatedml.protobuf.generated import feature_binning_meta_pb2, feature_binning_param_pb2
- from federatedml.statistic.data_overview import get_header, get_anonymous_header
- from federatedml.transfer_variable.transfer_class.hetero_feature_binning_transfer_variable import \
- HeteroFeatureBinningTransferVariable
- from federatedml.util import LOGGER
- from federatedml.util import abnormal_detection
- from federatedml.util import consts
- from federatedml.util.anonymous_generator_util import Anonymous
- from federatedml.util.io_check import assert_io_num_rows_equal
- from federatedml.util.schema_check import assert_schema_consistent
- MODEL_PARAM_NAME = 'FeatureBinningParam'
- MODEL_META_NAME = 'FeatureBinningMeta'
- class BaseFeatureBinning(ModelBase):
- """
- Do binning method through guest and host
- """
- def __init__(self):
- super(BaseFeatureBinning, self).__init__()
- self.transfer_variable = HeteroFeatureBinningTransferVariable()
- self.binning_obj: BaseBinning = None
- self.header = None
- self.anonymous_header = None
- self.training_anonymous_header = None
- self.schema = None
- self.host_results = []
- self.transform_host_results = []
- self.transform_type = None
- self.model_param = FeatureBinningParam()
- self.bin_inner_param = BinInnerParam()
- self.bin_result = MultiClassBinResult(labels=[0, 1])
- self.transform_bin_result = MultiClassBinResult(labels=[0, 1])
- self.has_missing_value = False
- self.labels = []
- self.use_manual_split_points = False
- self.has_woe_array = False
- self._stage = "fit"
- def _init_model(self, params: FeatureBinningParam):
- self.model_param = params
- self.transform_type = self.model_param.transform_param.transform_type
- """
- if self.role == consts.HOST:
- if self.transform_type == "woe":
- raise ValueError("Host party do not support woe transform now.")
- """
- if self.model_param.method == consts.QUANTILE:
- self.binning_obj = QuantileBinning(self.model_param)
- elif self.model_param.method == consts.BUCKET:
- self.binning_obj = BucketBinning(self.model_param)
- elif self.model_param.method == consts.OPTIMAL:
- if self.role == consts.HOST:
- self.model_param.bin_num = self.model_param.optimal_binning_param.init_bin_nums
- self.binning_obj = QuantileBinning(self.model_param)
- else:
- self.binning_obj = OptimalBinning(self.model_param)
- else:
- raise ValueError(f"Binning method: {self.model_param.method} is not supported.")
- self.iv_calculator = IvCalculator(self.model_param.adjustment_factor,
- role=self.role,
- party_id=self.component_properties.local_partyid)
- def _get_manual_split_points(self, data_instances):
- data_index_to_col_name = dict(enumerate(data_instances.schema.get("header")))
- manual_split_points = {}
- if self.model_param.split_points_by_index is not None:
- manual_split_points = {
- data_index_to_col_name.get(int(k), None): v for k, v in self.model_param.split_points_by_index.items()
- }
- if None in manual_split_points.keys():
- raise ValueError(f"Index given in `split_points_by_index` not found in input data header."
- f"Please check.")
- if self.model_param.split_points_by_col_name is not None:
- for col_name, split_points in self.model_param.split_points_by_col_name.items():
- if manual_split_points.get(col_name) is not None:
- raise ValueError(f"Split points for feature {col_name} given in both "
- f"`split_points_by_index` and `split_points_by_col_name`. Please check.")
- manual_split_points[col_name] = split_points
- if set(self.bin_inner_param.bin_names) != set(manual_split_points.keys()):
- raise ValueError(f"Column set from provided split points dictionary does not match that of"
- f"`bin_names` or `bin_indexes. Please check.`")
- return manual_split_points
- @staticmethod
- def data_format_transform(row):
- """
- transform data into sparse format
- """
- if type(row.features).__name__ != consts.SPARSE_VECTOR:
- feature_shape = row.features.shape[0]
- indices = []
- data = []
- for i in range(feature_shape):
- if np.isnan(row.features[i]):
- indices.append(i)
- data.append(NoneType())
- elif np.abs(row.features[i]) < consts.FLOAT_ZERO:
- continue
- else:
- indices.append(i)
- data.append(row.features[i])
- new_row = copy.deepcopy(row)
- new_row.features = SparseVector(indices, data, feature_shape)
- return new_row
- else:
- sparse_vec = row.features.get_sparse_vector()
- replace_key = []
- for key in sparse_vec:
- if sparse_vec.get(key) == NoneType() or np.isnan(sparse_vec.get(key)):
- replace_key.append(key)
- if len(replace_key) == 0:
- return row
- else:
- new_row = copy.deepcopy(row)
- new_sparse_vec = new_row.features.get_sparse_vector()
- for key in replace_key:
- new_sparse_vec[key] = NoneType()
- return new_row
- def _setup_bin_inner_param(self, data_instances, params):
- if self.schema is not None:
- return
- self.header = get_header(data_instances)
- self.anonymous_header = get_anonymous_header(data_instances)
- LOGGER.debug("_setup_bin_inner_param, get header length: {}".format(len(self.header)))
- self.schema = data_instances.schema
- self.bin_inner_param.set_header(self.header, self.anonymous_header)
- if params.bin_indexes == -1:
- self.bin_inner_param.set_bin_all()
- else:
- self.bin_inner_param.add_bin_indexes(params.bin_indexes)
- self.bin_inner_param.add_bin_names(params.bin_names)
- self.bin_inner_param.add_category_indexes(params.category_indexes)
- self.bin_inner_param.add_category_names(params.category_names)
- if params.transform_param.transform_cols == -1:
- self.bin_inner_param.set_transform_all()
- else:
- self.bin_inner_param.add_transform_bin_indexes(params.transform_param.transform_cols)
- self.bin_inner_param.add_transform_bin_names(params.transform_param.transform_names)
- self.binning_obj.set_bin_inner_param(self.bin_inner_param)
- @assert_io_num_rows_equal
- @assert_schema_consistent
- def transform_data(self, data_instances):
- self._setup_bin_inner_param(data_instances, self.model_param)
- if self.transform_type != "woe":
- data_instances = self.binning_obj.transform(data_instances, self.transform_type)
- elif self.role == consts.HOST and not self.has_woe_array:
- raise ValueError("Woe transform is not available for host parties.")
- else:
- data_instances = self.iv_calculator.woe_transformer(data_instances, self.bin_inner_param,
- self.bin_result)
- self.set_schema(data_instances)
- self.data_output = data_instances
- return data_instances
- def _get_meta(self):
- # col_list = [str(x) for x in self.cols]
- transform_param = feature_binning_meta_pb2.TransformMeta(
- transform_cols=self.bin_inner_param.transform_bin_indexes,
- transform_type=self.model_param.transform_param.transform_type
- )
- optimal_metric_method = None
- if self.model_param.method == consts.OPTIMAL and not self.use_manual_split_points:
- optimal_metric_method = self.model_param.optimal_binning_param.metric_method
- meta_protobuf_obj = feature_binning_meta_pb2.FeatureBinningMeta(
- method=self.model_param.method,
- compress_thres=self.model_param.compress_thres,
- head_size=self.model_param.head_size,
- error=self.model_param.error,
- bin_num=self.model_param.bin_num,
- cols=self.bin_inner_param.bin_names,
- adjustment_factor=self.model_param.adjustment_factor,
- local_only=self.model_param.local_only,
- need_run=self.need_run,
- transform_param=transform_param,
- skip_static=self.model_param.skip_static,
- optimal_metric_method=optimal_metric_method
- )
- return meta_protobuf_obj
- def _get_param(self):
- split_points_result = self.binning_obj.bin_results.split_results
- multi_class_result = self.bin_result.generated_pb_list(split_points_result)
- # LOGGER.debug(f"split_points_result: {split_points_result}")
- host_multi_class_result = []
- host_single_results = []
- anonymous_dict_list = []
- if self._stage == "transform" and self._check_lower_version_anonymous():
- if self.role == consts.GUEST:
- anonymous_dict_list = self.transfer_variable.host_anonymous_header_dict.get(idx=-1)
- elif self.role == consts.HOST:
- anonymous_dict = dict(zip(self.training_anonymous_header, self.anonymous_header))
- self.transfer_variable.host_anonymous_header_dict.remote(
- anonymous_dict,
- role=consts.GUEST,
- idx=0
- )
- for idx, host_res in enumerate(self.host_results):
- if not anonymous_dict_list:
- host_multi_class_result.extend(host_res.generated_pb_list())
- host_single_results.append(host_res.bin_results[0].generated_pb())
- else:
- updated_anonymous_header = anonymous_dict_list[idx]
- host_res.update_anonymous(updated_anonymous_header)
- host_multi_class_result.extend(host_res.generated_pb_list())
- host_single_results.append(host_res.bin_results[0].generated_pb())
- has_host_result = True if len(host_multi_class_result) else False
- multi_pb = feature_binning_param_pb2.MultiClassResult(
- results=multi_class_result,
- labels=[str(x) for x in self.labels],
- host_results=host_multi_class_result,
- host_party_ids=[str(x) for x in self.component_properties.host_party_idlist],
- has_host_result=has_host_result
- )
- if self._stage == "fit":
- result_obj = feature_binning_param_pb2. \
- FeatureBinningParam(binning_result=multi_class_result[0],
- host_results=host_single_results,
- header=self.header,
- header_anonymous=self.anonymous_header,
- model_name=consts.BINNING_MODEL,
- multi_class_result=multi_pb)
- else:
- transform_multi_class_result = self.transform_bin_result.generated_pb_list(split_points_result)
- transform_host_single_results = []
- transform_host_multi_class_result = []
- for host_res in self.transform_host_results:
- transform_host_multi_class_result.extend(host_res.generated_pb_list())
- transform_host_single_results.append(host_res.bin_results[0].generated_pb())
- transform_multi_pb = feature_binning_param_pb2.MultiClassResult(
- results=transform_multi_class_result,
- labels=[str(x) for x in self.labels],
- host_results=transform_host_multi_class_result,
- host_party_ids=[str(x) for x in self.component_properties.host_party_idlist],
- has_host_result=has_host_result
- )
- result_obj = feature_binning_param_pb2. \
- FeatureBinningParam(binning_result=multi_class_result[0],
- host_results=host_single_results,
- header=self.header,
- header_anonymous=self.anonymous_header,
- model_name=consts.BINNING_MODEL,
- multi_class_result=multi_pb,
- transform_binning_result=transform_multi_class_result[0],
- transform_host_results=transform_host_single_results,
- transform_multi_class_result=transform_multi_pb)
- return result_obj
- def load_model(self, model_dict):
- model_param = list(model_dict.get('model').values())[0].get(MODEL_PARAM_NAME)
- model_meta = list(model_dict.get('model').values())[0].get(MODEL_META_NAME)
- self.bin_inner_param = BinInnerParam()
- multi_class_result = model_param.multi_class_result
- self.labels = list(map(int, multi_class_result.labels))
- if self.labels:
- self.bin_result = MultiClassBinResult.reconstruct(list(multi_class_result.results), self.labels)
- if self.role == consts.HOST:
- binning_result = dict(list(multi_class_result.results)[0].binning_result)
- woe_array = list(binning_result.values())[0].woe_array
- # if manual woe, reconstruct
- if woe_array:
- self.bin_result = MultiClassBinResult.reconstruct(list(multi_class_result.results))
- self.has_woe_array = True
- assert isinstance(model_meta, feature_binning_meta_pb2.FeatureBinningMeta)
- assert isinstance(model_param, feature_binning_param_pb2.FeatureBinningParam)
- self.header = list(model_param.header)
- self.training_anonymous_header = list(model_param.header_anonymous)
- self.bin_inner_param.set_header(self.header, self.training_anonymous_header)
- self.bin_inner_param.add_transform_bin_indexes(list(model_meta.transform_param.transform_cols))
- self.bin_inner_param.add_bin_names(list(model_meta.cols))
- self.transform_type = model_meta.transform_param.transform_type
- bin_method = str(model_meta.method)
- if bin_method == consts.QUANTILE:
- self.binning_obj = QuantileBinning(params=model_meta)
- elif bin_method == consts.OPTIMAL:
- self.binning_obj = OptimalBinning(params=model_meta)
- else:
- self.binning_obj = BucketBinning(params=model_meta)
- # self.binning_obj.set_role_party(self.role, self.component_properties.local_partyid)
- self.binning_obj.set_bin_inner_param(self.bin_inner_param)
- split_results = dict(model_param.binning_result.binning_result)
- for col_name, sr_pb in split_results.items():
- split_points = list(sr_pb.split_points)
- self.binning_obj.bin_results.put_col_split_points(col_name, split_points)
- # self.binning_obj.bin_results.reconstruct(model_param.binning_result)
- self.host_results = []
- host_pbs = list(model_param.multi_class_result.host_results)
- if len(host_pbs):
- if len(self.labels) == 2:
- for host_pb in host_pbs:
- self.host_results.append(MultiClassBinResult.reconstruct(
- host_pb, self.labels))
- else:
- assert len(host_pbs) % len(self.labels) == 0
- i = 0
- while i < len(host_pbs):
- this_pbs = host_pbs[i: i + len(self.labels)]
- self.host_results.append(MultiClassBinResult.reconstruct(this_pbs, self.labels))
- i += len(self.labels)
- """
- if list(model_param.header_anonymous):
- self.anonymous_header = list(model_param.anonymous_header)
- """
- self._stage = "transform"
- def export_model(self):
- if self.model_output is not None:
- return self.model_output
- meta_obj = self._get_meta()
- param_obj = self._get_param()
- result = {
- MODEL_META_NAME: meta_obj,
- MODEL_PARAM_NAME: param_obj
- }
- self.model_output = result
- return result
- def save_data(self):
- return self.data_output
- def set_schema(self, data_instance):
- self.schema['header'] = self.header
- data_instance.schema = self.schema
- # LOGGER.debug("After Binning, when setting schema, schema is : {}".format(data_instance.schema))
- def set_optimal_metric_array(self, optimal_metric_array_dict):
- # LOGGER.debug(f"optimal metric array dict: {optimal_metric_array_dict}")
- for col_name, optimal_metric_array in optimal_metric_array_dict.items():
- self.bin_result.put_optimal_metric_array(col_name, optimal_metric_array)
- # LOGGER.debug(f"after set optimal metric, self.bin_result metric is: {self.bin_result.all_optimal_metric}")
- def _abnormal_detection(self, data_instances):
- """
- Make sure input data_instances is valid.
- """
- abnormal_detection.empty_table_detection(data_instances)
- abnormal_detection.empty_feature_detection(data_instances)
- self.check_schema_content(data_instances.schema)
- def _check_lower_version_anonymous(self):
- return not self.training_anonymous_header or \
- Anonymous.is_old_version_anonymous_header(self.training_anonymous_header)
|