123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import numpy as np
- from federatedml.feature.fate_element_type import NoneType
- from federatedml.model_base import ModelBase
- from federatedml.param.statistics_param import StatisticsParam
- from federatedml.protobuf.generated import statistic_meta_pb2, statistic_param_pb2
- from federatedml.statistic.data_overview import get_header
- from federatedml.statistic.statics import MultivariateStatisticalSummary
- from federatedml.util import LOGGER
- from federatedml.util import abnormal_detection
- from federatedml.util import consts
- MODEL_PARAM_NAME = 'StatisticParam'
- MODEL_META_NAME = 'StatisticMeta'
- SYSTEM_ABNORMAL_VALUES = [None, np.nan, NoneType]
- class StatisticInnerParam(object):
- def __init__(self):
- self.col_name_maps = {}
- self.header = []
- self.static_indices = []
- self.static_indices_set = set()
- self.static_names = []
- def set_header(self, header):
- self.header = header
- for idx, col_name in enumerate(self.header):
- self.col_name_maps[col_name] = idx
- def set_static_all(self):
- self.static_indices = [i for i in range(len(self.header))]
- self.static_indices_set = set(self.static_indices)
- self.static_names = self.header
- def add_static_indices(self, static_indices):
- if static_indices is None:
- return
- for idx in static_indices:
- if idx >= len(self.header):
- LOGGER.warning("Adding indices that out of header's bound")
- continue
- if idx not in self.static_indices_set:
- self.static_indices_set.add(idx)
- self.static_indices.append(idx)
- self.static_names.append(self.header[idx])
- def add_static_names(self, static_names):
- if static_names is None:
- return
- for col_name in static_names:
- idx = self.col_name_maps.get(col_name)
- if idx is None:
- LOGGER.warning(f"Adding col_name: {col_name} that is not exist in header")
- continue
- if idx not in self.static_indices_set:
- self.static_indices_set.add(idx)
- self.static_indices.append(idx)
- self.static_names.append(self.header[idx])
- class DataStatistics(ModelBase):
- def __init__(self):
- super().__init__()
- self.model_param = StatisticsParam()
- self.inner_param = None
- self.schema = None
- self.statistic_obj: MultivariateStatisticalSummary = None
- self._result_dict = {}
- self._numeric_statics = []
- self._quantile_statics = []
- self.feature_value_pb = []
- def _init_model(self, model_param):
- self.model_param = model_param
- for stat_name in self.model_param.statistics:
- if stat_name in self.model_param.LEGAL_STAT:
- self._numeric_statics.append(stat_name)
- else:
- self._quantile_statics.append(stat_name)
- def _init_param(self, data_instances):
- if self.schema is None or len(self.schema) == 0:
- self.schema = data_instances.schema
- if self.inner_param is not None:
- return
- self.inner_param = StatisticInnerParam()
- # self.schema = data_instances.schema
- LOGGER.debug("In _init_params, schema is : {}".format(self.schema))
- header = get_header(data_instances)
- self.inner_param.set_header(header)
- if self.model_param.column_indexes == -1:
- self.inner_param.set_static_all()
- else:
- self.inner_param.add_static_indices(self.model_param.column_indexes)
- self.inner_param.add_static_names(self.model_param.column_names)
- LOGGER.debug(f"column_indexes: {self.model_param.column_indexes}, inner_param"
- f" static_indices: {self.inner_param.static_indices}")
- return self
- @staticmethod
- def _merge_abnormal_list(abnormal_list):
- if abnormal_list is None:
- return SYSTEM_ABNORMAL_VALUES
- return abnormal_list + SYSTEM_ABNORMAL_VALUES
- def fit(self, data_instances):
- self._init_param(data_instances)
- self._abnormal_detection(data_instances)
- if consts.KURTOSIS in self.model_param.statistics:
- stat_order = 4
- elif consts.SKEWNESS in self.model_param.statistics:
- stat_order = 3
- else:
- stat_order = 2
- abnormal_list = self._merge_abnormal_list(self.model_param.abnormal_list)
- self.statistic_obj = MultivariateStatisticalSummary(data_instances,
- cols_index=self.inner_param.static_indices,
- abnormal_list=abnormal_list,
- error=self.model_param.quantile_error,
- stat_order=stat_order,
- bias=self.model_param.bias)
- results = None
- for stat_name in self._numeric_statics:
- stat_res = self.statistic_obj.get_statics(stat_name)
- LOGGER.debug(f"state_name: {stat_name}, stat_res: {stat_res}")
- self.feature_value_pb.append(self._convert_pb(stat_res, stat_name))
- if results is None:
- results = {k: {stat_name: v} for k, v in stat_res.items()}
- else:
- for k, v in results.items():
- results[k] = dict(**v, **{stat_name: stat_res[k]})
- for query_point in self._quantile_statics:
- q = float(query_point[:-1]) / 100
- res = self.statistic_obj.get_quantile_point(q)
- self.feature_value_pb.append(self._convert_pb(res, query_point))
- if results is None:
- results = res
- else:
- for k, v in res.items():
- results[k][query_point] = v
- for k, v in results.items():
- # new_dict = {}
- # for stat_name, value in v.items():
- # LOGGER.debug(f"stat_name: {stat_name}, value: {value}, type: {type(value)}")
- self.add_summary(k, v)
- LOGGER.debug(f"Before return, summary: {self.summary()}")
- def _convert_pb(self, stat_res, stat_name):
- values = [stat_res[col_name] for col_name in self.inner_param.static_names]
- return statistic_param_pb2.StatisticSingleFeatureValue(
- values=values,
- col_names=self.inner_param.static_names,
- value_name=stat_name
- )
- def export_model(self):
- if self.model_output is not None:
- return self.model_output
- meta_obj = self._get_meta()
- param_obj = self._get_param()
- result = {
- MODEL_META_NAME: meta_obj,
- MODEL_PARAM_NAME: param_obj
- }
- self.model_output = result
- return result
- def _get_meta(self):
- return statistic_meta_pb2.StatisticMeta(
- statistics=self.model_param.statistics,
- static_columns=self.inner_param.static_names,
- quantile_error=self.model_param.quantile_error,
- need_run=self.model_param.need_run
- )
- def _get_param(self):
- all_result = statistic_param_pb2.StatisticOnePartyResult(
- results=self.feature_value_pb
- )
- return statistic_param_pb2.ModelParam(
- self_values=all_result,
- model_name=consts.STATISTIC_MODEL
- )
- def _abnormal_detection(self, data_instances):
- """
- Make sure input data_instances is valid.
- """
- abnormal_detection.empty_table_detection(data_instances)
- abnormal_detection.empty_feature_detection(data_instances)
- self.check_schema_content(data_instances.schema)
|