local_baseline.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #
  4. # Copyright 2019 The FATE Authors. All Rights Reserved.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. import copy
  18. import numpy as np
  19. from sklearn.linear_model import LogisticRegression
  20. from federatedml.model_base import ModelBase
  21. from federatedml.param.local_baseline_param import LocalBaselineParam
  22. from federatedml.protobuf.generated import lr_model_meta_pb2, lr_model_param_pb2
  23. from federatedml.statistic import data_overview
  24. from federatedml.util import LOGGER
  25. from federatedml.util import abnormal_detection
  26. from federatedml.util.io_check import assert_io_num_rows_equal
  27. class LocalBaseline(ModelBase):
  28. def __init__(self):
  29. super(LocalBaseline, self).__init__()
  30. self.model_param = LocalBaselineParam()
  31. self.model_name = "LocalBaseline"
  32. self.metric_type = ""
  33. self.model_param_name = "LocalBaselineParam"
  34. self.model_meta_name = "LocalBaselineMeta"
  35. # one_ve_rest parameter
  36. self.need_one_vs_rest = None
  37. self.one_vs_rest_classes = []
  38. self.one_vs_rest_obj = None
  39. def _init_model(self, params):
  40. self.model_name = params.model_name
  41. self.model_opts = params.model_opts
  42. self.predict_param = params.predict_param
  43. self.model = None
  44. self.model_fit = None
  45. self.header = None
  46. self.model_weights = None
  47. def get_model(self):
  48. # extend in future with more model types
  49. model = LogisticRegression(**self.model_opts)
  50. self.model = copy.deepcopy(model)
  51. return model
  52. def _get_model_param(self):
  53. model = self.model_fit
  54. n_iter = int(model.n_iter_[0])
  55. is_converged = bool(n_iter < model.max_iter)
  56. coef = model.coef_[0]
  57. #LOGGER.debug(f"model coef len {coef.shape[0]}, value: {coef}")
  58. weight_dict = dict(zip(self.header, [float(i) for i in coef]))
  59. #LOGGER.debug(f"model weight dict {weight_dict}")
  60. # intercept is in array format if fit_intercept
  61. intercept = model.intercept_[0] if model.fit_intercept else model.intercept_
  62. result = {'iters': n_iter,
  63. 'is_converged': is_converged,
  64. 'weight': weight_dict,
  65. 'intercept': intercept,
  66. 'header': self.header,
  67. 'best_iteration': -1
  68. }
  69. return result
  70. def _get_model_param_ovr(self):
  71. model = self.model_fit
  72. n_iter = int(model.n_iter_[0])
  73. is_converged = bool(n_iter < model.max_iter)
  74. classes = model.classes_
  75. coef_all = model.coef_
  76. intercept_all = model.intercept_
  77. ovr_pb_objs = []
  78. ovr_pb_classes = []
  79. for i, label in enumerate(classes):
  80. coef = coef_all[i, ]
  81. weight_dict = dict(zip(self.header, list(coef)))
  82. intercept = intercept_all[i] if model.fit_intercept else intercept_all
  83. result = {'iters': n_iter,
  84. 'is_converged': is_converged,
  85. 'weight': weight_dict,
  86. 'intercept': intercept,
  87. 'header': self.header,
  88. 'best_iteration': -1
  89. }
  90. param_protobuf_obj = lr_model_param_pb2.SingleModel(**result)
  91. ovr_pb_objs.append(param_protobuf_obj)
  92. ovr_pb_classes.append(str(label))
  93. one_vs_rest_result = {
  94. 'completed_models': ovr_pb_objs,
  95. 'one_vs_rest_classes': ovr_pb_classes
  96. }
  97. param_result = {'one_vs_rest_result': one_vs_rest_result,
  98. 'need_one_vs_rest': True,
  99. 'header': self.header}
  100. return param_result
  101. def _get_param(self):
  102. header = self.header
  103. #LOGGER.debug("In get_param, header: {}".format(header))
  104. if header is None:
  105. param_protobuf_obj = lr_model_param_pb2.LRModelParam()
  106. return param_protobuf_obj
  107. if self.need_one_vs_rest:
  108. result = self._get_model_param_ovr()
  109. param_protobuf_obj = lr_model_param_pb2.LRModelParam(**result)
  110. else:
  111. result = self._get_model_param()
  112. param_protobuf_obj = lr_model_param_pb2.LRModelParam(**result)
  113. #LOGGER.debug("in _get_param, result: {}".format(result))
  114. return param_protobuf_obj
  115. def _get_meta(self):
  116. model = self.model_fit
  117. predict_param = lr_model_meta_pb2.PredictMeta(**{"threshold": self.predict_param.threshold})
  118. result = {'penalty': model.penalty,
  119. 'tol': model.tol,
  120. 'fit_intercept': model.fit_intercept,
  121. 'optimizer': model.solver,
  122. 'need_one_vs_rest': self.need_one_vs_rest,
  123. 'max_iter': model.max_iter,
  124. 'predict_param': predict_param
  125. }
  126. meta_protobuf_obj = lr_model_meta_pb2.LRModelMeta(**result)
  127. return meta_protobuf_obj
  128. def export_model(self):
  129. if not self.need_run:
  130. return
  131. meta_obj = self._get_meta()
  132. param_obj = self._get_param()
  133. result = {
  134. self.model_meta_name: meta_obj,
  135. self.model_param_name: param_obj
  136. }
  137. return result
  138. def get_model_summary(self):
  139. header = self.header
  140. if header is None:
  141. return {}
  142. if not self.need_one_vs_rest:
  143. param = self._get_model_param()
  144. summary = {
  145. 'coef': param['weight'],
  146. 'intercept': param['intercept'],
  147. 'is_converged': param['is_converged'],
  148. 'iters': param['iters'],
  149. 'one_vs_rest': False
  150. }
  151. else:
  152. model = self.model_fit
  153. n_iter = int(model.n_iter_[0])
  154. is_converged = bool(n_iter < model.max_iter)
  155. classes = model.classes_
  156. coef_all = model.coef_
  157. intercept_all = model.intercept_
  158. summary = {}
  159. for i, label in enumerate(classes):
  160. coef = coef_all[i, ]
  161. weight_dict = dict(zip(self.header, [float(i) for i in coef]))
  162. intercept = float(intercept_all[i]) if model.fit_intercept else float(intercept_all)
  163. single_summary = {
  164. 'coef': weight_dict,
  165. 'intercept': intercept,
  166. 'is_converged': is_converged,
  167. 'iters': n_iter
  168. }
  169. single_key = f"{label}"
  170. summary[single_key] = single_summary
  171. summary['one_vs_rest'] = True
  172. return summary
  173. @assert_io_num_rows_equal
  174. def _load_single_coef(self, result_obj):
  175. feature_shape = len(self.header)
  176. tmp_vars = np.zeros(feature_shape)
  177. weight_dict = dict(result_obj.weight)
  178. for idx, header_name in enumerate(self.header):
  179. tmp_vars[idx] = weight_dict.get(header_name)
  180. return tmp_vars
  181. def _load_single_model(self, result_obj):
  182. coef = self._load_single_coef(result_obj)
  183. self.model_fit.__setattr__('coef_', np.array([coef]))
  184. self.model_fit.__setattr__('intercept_', np.array([result_obj.intercept]))
  185. self.model_fit.__setattr__('classes_', np.array([0, 1]))
  186. self.model_fit.__setattr__('n_iter_', [result_obj.iters])
  187. return
  188. def _load_ovr_model(self, result_obj):
  189. one_vs_rest_result = result_obj.one_vs_rest_result
  190. classes = np.array([int(i) for i in one_vs_rest_result.one_vs_rest_classes])
  191. models = one_vs_rest_result.completed_models
  192. class_count, feature_shape = len(classes), len(self.header)
  193. coef_all = np.zeros((class_count, feature_shape))
  194. intercept_all = np.zeros(class_count)
  195. iters = -1
  196. for i, label in enumerate(classes):
  197. model = models[i]
  198. coef = self._load_single_coef(model)
  199. coef_all[i, ] = coef
  200. intercept_all[i] = model.intercept
  201. iters = model.iters
  202. self.model_fit.__setattr__('coef_', coef_all)
  203. self.model_fit.__setattr__('intercept_', intercept_all)
  204. self.model_fit.__setattr__('classes_', classes)
  205. self.model_fit.__setattr__('n_iter_', [iters])
  206. return
  207. def _load_model_meta(self, meta_obj):
  208. self.model_fit.__setattr__('penalty', meta_obj.penalty)
  209. self.model_fit.__setattr__('tol', meta_obj.tol)
  210. self.model_fit.__setattr__('fit_intercept', meta_obj.fit_intercept)
  211. self.model_fit.__setattr__('solver', meta_obj.optimizer)
  212. self.model_fit.__setattr__('max_iter', meta_obj.max_iter)
  213. def load_model(self, model_dict):
  214. result_obj = list(model_dict.get('model').values())[0].get(self.model_param_name)
  215. meta_obj = list(model_dict.get('model').values())[0].get(self.model_meta_name)
  216. self.model_fit = LogisticRegression()
  217. self._load_model_meta(meta_obj)
  218. self.header = list(result_obj.header)
  219. self.need_one_vs_rest = meta_obj.need_one_vs_rest
  220. LOGGER.debug("in _load_model need_one_vs_rest: {}".format(self.need_one_vs_rest))
  221. if self.need_one_vs_rest:
  222. self._load_ovr_model(result_obj)
  223. else:
  224. self._load_single_model(result_obj)
  225. return
  226. @assert_io_num_rows_equal
  227. def predict(self, data_instances):
  228. if not self.need_run:
  229. return
  230. model_fit = self.model_fit
  231. classes = [int(x) for x in model_fit.classes_]
  232. if self.need_one_vs_rest:
  233. pred_prob = data_instances.mapValues(lambda v: model_fit.predict_proba(v.features[None, :])[0])
  234. else:
  235. pred_prob = data_instances.mapValues(lambda v: model_fit.predict_proba(v.features[None, :])[0][1])
  236. predict_result = self.predict_score_to_output(data_instances=data_instances, predict_score=pred_prob,
  237. classes=classes, threshold=self.predict_param.threshold)
  238. return predict_result
  239. def fit(self, data_instances, validate_data=None):
  240. if not self.need_run:
  241. return
  242. # check if empty table
  243. LOGGER.info("Enter Local Baseline fit")
  244. abnormal_detection.empty_table_detection(data_instances)
  245. abnormal_detection.empty_feature_detection(data_instances)
  246. # get model
  247. model = self.get_model()
  248. # get header
  249. self.header = data_overview.get_header(data_instances)
  250. X_table = data_instances.mapValues(lambda v: v.features)
  251. y_table = data_instances.mapValues(lambda v: v.label)
  252. X = np.array([v[1] for v in list(X_table.collect())])
  253. y = np.array([v[1] for v in list(y_table.collect())])
  254. w = None
  255. if data_overview.with_weight(data_instances):
  256. LOGGER.info(f"Input Data with Weight. Weight will be used to fit model.")
  257. weight_table = data_instances.mapValues(lambda v: v.weight)
  258. w = np.array([v[1] for v in list(weight_table.collect())])
  259. self.model_fit = model.fit(X, y, w)
  260. self.need_one_vs_rest = len(self.model_fit.classes_) > 2
  261. self.set_summary(self.get_model_summary())