123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import copy
- import numpy as np
- from sklearn.linear_model import LogisticRegression
- from federatedml.model_base import ModelBase
- from federatedml.param.local_baseline_param import LocalBaselineParam
- from federatedml.protobuf.generated import lr_model_meta_pb2, lr_model_param_pb2
- from federatedml.statistic import data_overview
- from federatedml.util import LOGGER
- from federatedml.util import abnormal_detection
- from federatedml.util.io_check import assert_io_num_rows_equal
- class LocalBaseline(ModelBase):
- def __init__(self):
- super(LocalBaseline, self).__init__()
- self.model_param = LocalBaselineParam()
- self.model_name = "LocalBaseline"
- self.metric_type = ""
- self.model_param_name = "LocalBaselineParam"
- self.model_meta_name = "LocalBaselineMeta"
- # one_ve_rest parameter
- self.need_one_vs_rest = None
- self.one_vs_rest_classes = []
- self.one_vs_rest_obj = None
- def _init_model(self, params):
- self.model_name = params.model_name
- self.model_opts = params.model_opts
- self.predict_param = params.predict_param
- self.model = None
- self.model_fit = None
- self.header = None
- self.model_weights = None
- def get_model(self):
- # extend in future with more model types
- model = LogisticRegression(**self.model_opts)
- self.model = copy.deepcopy(model)
- return model
- def _get_model_param(self):
- model = self.model_fit
- n_iter = int(model.n_iter_[0])
- is_converged = bool(n_iter < model.max_iter)
- coef = model.coef_[0]
- #LOGGER.debug(f"model coef len {coef.shape[0]}, value: {coef}")
- weight_dict = dict(zip(self.header, [float(i) for i in coef]))
- #LOGGER.debug(f"model weight dict {weight_dict}")
- # intercept is in array format if fit_intercept
- intercept = model.intercept_[0] if model.fit_intercept else model.intercept_
- result = {'iters': n_iter,
- 'is_converged': is_converged,
- 'weight': weight_dict,
- 'intercept': intercept,
- 'header': self.header,
- 'best_iteration': -1
- }
- return result
- def _get_model_param_ovr(self):
- model = self.model_fit
- n_iter = int(model.n_iter_[0])
- is_converged = bool(n_iter < model.max_iter)
- classes = model.classes_
- coef_all = model.coef_
- intercept_all = model.intercept_
- ovr_pb_objs = []
- ovr_pb_classes = []
- for i, label in enumerate(classes):
- coef = coef_all[i, ]
- weight_dict = dict(zip(self.header, list(coef)))
- intercept = intercept_all[i] if model.fit_intercept else intercept_all
- result = {'iters': n_iter,
- 'is_converged': is_converged,
- 'weight': weight_dict,
- 'intercept': intercept,
- 'header': self.header,
- 'best_iteration': -1
- }
- param_protobuf_obj = lr_model_param_pb2.SingleModel(**result)
- ovr_pb_objs.append(param_protobuf_obj)
- ovr_pb_classes.append(str(label))
- one_vs_rest_result = {
- 'completed_models': ovr_pb_objs,
- 'one_vs_rest_classes': ovr_pb_classes
- }
- param_result = {'one_vs_rest_result': one_vs_rest_result,
- 'need_one_vs_rest': True,
- 'header': self.header}
- return param_result
- def _get_param(self):
- header = self.header
- #LOGGER.debug("In get_param, header: {}".format(header))
- if header is None:
- param_protobuf_obj = lr_model_param_pb2.LRModelParam()
- return param_protobuf_obj
- if self.need_one_vs_rest:
- result = self._get_model_param_ovr()
- param_protobuf_obj = lr_model_param_pb2.LRModelParam(**result)
- else:
- result = self._get_model_param()
- param_protobuf_obj = lr_model_param_pb2.LRModelParam(**result)
- #LOGGER.debug("in _get_param, result: {}".format(result))
- return param_protobuf_obj
- def _get_meta(self):
- model = self.model_fit
- predict_param = lr_model_meta_pb2.PredictMeta(**{"threshold": self.predict_param.threshold})
- result = {'penalty': model.penalty,
- 'tol': model.tol,
- 'fit_intercept': model.fit_intercept,
- 'optimizer': model.solver,
- 'need_one_vs_rest': self.need_one_vs_rest,
- 'max_iter': model.max_iter,
- 'predict_param': predict_param
- }
- meta_protobuf_obj = lr_model_meta_pb2.LRModelMeta(**result)
- return meta_protobuf_obj
- def export_model(self):
- if not self.need_run:
- return
- meta_obj = self._get_meta()
- param_obj = self._get_param()
- result = {
- self.model_meta_name: meta_obj,
- self.model_param_name: param_obj
- }
- return result
- def get_model_summary(self):
- header = self.header
- if header is None:
- return {}
- if not self.need_one_vs_rest:
- param = self._get_model_param()
- summary = {
- 'coef': param['weight'],
- 'intercept': param['intercept'],
- 'is_converged': param['is_converged'],
- 'iters': param['iters'],
- 'one_vs_rest': False
- }
- else:
- model = self.model_fit
- n_iter = int(model.n_iter_[0])
- is_converged = bool(n_iter < model.max_iter)
- classes = model.classes_
- coef_all = model.coef_
- intercept_all = model.intercept_
- summary = {}
- for i, label in enumerate(classes):
- coef = coef_all[i, ]
- weight_dict = dict(zip(self.header, [float(i) for i in coef]))
- intercept = float(intercept_all[i]) if model.fit_intercept else float(intercept_all)
- single_summary = {
- 'coef': weight_dict,
- 'intercept': intercept,
- 'is_converged': is_converged,
- 'iters': n_iter
- }
- single_key = f"{label}"
- summary[single_key] = single_summary
- summary['one_vs_rest'] = True
- return summary
- @assert_io_num_rows_equal
- def _load_single_coef(self, result_obj):
- feature_shape = len(self.header)
- tmp_vars = np.zeros(feature_shape)
- weight_dict = dict(result_obj.weight)
- for idx, header_name in enumerate(self.header):
- tmp_vars[idx] = weight_dict.get(header_name)
- return tmp_vars
- def _load_single_model(self, result_obj):
- coef = self._load_single_coef(result_obj)
- self.model_fit.__setattr__('coef_', np.array([coef]))
- self.model_fit.__setattr__('intercept_', np.array([result_obj.intercept]))
- self.model_fit.__setattr__('classes_', np.array([0, 1]))
- self.model_fit.__setattr__('n_iter_', [result_obj.iters])
- return
- def _load_ovr_model(self, result_obj):
- one_vs_rest_result = result_obj.one_vs_rest_result
- classes = np.array([int(i) for i in one_vs_rest_result.one_vs_rest_classes])
- models = one_vs_rest_result.completed_models
- class_count, feature_shape = len(classes), len(self.header)
- coef_all = np.zeros((class_count, feature_shape))
- intercept_all = np.zeros(class_count)
- iters = -1
- for i, label in enumerate(classes):
- model = models[i]
- coef = self._load_single_coef(model)
- coef_all[i, ] = coef
- intercept_all[i] = model.intercept
- iters = model.iters
- self.model_fit.__setattr__('coef_', coef_all)
- self.model_fit.__setattr__('intercept_', intercept_all)
- self.model_fit.__setattr__('classes_', classes)
- self.model_fit.__setattr__('n_iter_', [iters])
- return
- def _load_model_meta(self, meta_obj):
- self.model_fit.__setattr__('penalty', meta_obj.penalty)
- self.model_fit.__setattr__('tol', meta_obj.tol)
- self.model_fit.__setattr__('fit_intercept', meta_obj.fit_intercept)
- self.model_fit.__setattr__('solver', meta_obj.optimizer)
- self.model_fit.__setattr__('max_iter', meta_obj.max_iter)
- def load_model(self, model_dict):
- result_obj = list(model_dict.get('model').values())[0].get(self.model_param_name)
- meta_obj = list(model_dict.get('model').values())[0].get(self.model_meta_name)
- self.model_fit = LogisticRegression()
- self._load_model_meta(meta_obj)
- self.header = list(result_obj.header)
- self.need_one_vs_rest = meta_obj.need_one_vs_rest
- LOGGER.debug("in _load_model need_one_vs_rest: {}".format(self.need_one_vs_rest))
- if self.need_one_vs_rest:
- self._load_ovr_model(result_obj)
- else:
- self._load_single_model(result_obj)
- return
- @assert_io_num_rows_equal
- def predict(self, data_instances):
- if not self.need_run:
- return
- model_fit = self.model_fit
- classes = [int(x) for x in model_fit.classes_]
- if self.need_one_vs_rest:
- pred_prob = data_instances.mapValues(lambda v: model_fit.predict_proba(v.features[None, :])[0])
- else:
- pred_prob = data_instances.mapValues(lambda v: model_fit.predict_proba(v.features[None, :])[0][1])
- predict_result = self.predict_score_to_output(data_instances=data_instances, predict_score=pred_prob,
- classes=classes, threshold=self.predict_param.threshold)
- return predict_result
- def fit(self, data_instances, validate_data=None):
- if not self.need_run:
- return
- # check if empty table
- LOGGER.info("Enter Local Baseline fit")
- abnormal_detection.empty_table_detection(data_instances)
- abnormal_detection.empty_feature_detection(data_instances)
- # get model
- model = self.get_model()
- # get header
- self.header = data_overview.get_header(data_instances)
- X_table = data_instances.mapValues(lambda v: v.features)
- y_table = data_instances.mapValues(lambda v: v.label)
- X = np.array([v[1] for v in list(X_table.collect())])
- y = np.array([v[1] for v in list(y_table.collect())])
- w = None
- if data_overview.with_weight(data_instances):
- LOGGER.info(f"Input Data with Weight. Weight will be used to fit model.")
- weight_table = data_instances.mapValues(lambda v: v.weight)
- w = np.array([v[1] for v in list(weight_table.collect())])
- self.model_fit = model.fit(X, y, w)
- self.need_one_vs_rest = len(self.model_fit.classes_) > 2
- self.set_summary(self.get_model_summary())
|