hetero_stepwise.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import itertools
  17. import uuid
  18. import numpy as np
  19. from google.protobuf.json_format import MessageToDict
  20. from sklearn import metrics
  21. from sklearn.linear_model import LogisticRegression, LinearRegression
  22. from federatedml.model_base import Metric, MetricMeta
  23. from federatedml.evaluation.metrics.regression_metric import IC, IC_Approx
  24. from federatedml.model_selection.stepwise.step import Step
  25. from federatedml.statistic import data_overview
  26. from federatedml.transfer_variable.transfer_class.stepwise_transfer_variable import StepwiseTransferVariable
  27. from federatedml.util import consts
  28. from federatedml.util import LOGGER
  29. class ModelInfo(object):
  30. def __init__(self, n_step, n_model, score, loss, direction):
  31. self.score = score
  32. self.n_step = n_step
  33. self.n_model = n_model
  34. self.direction = direction
  35. self.loss = loss
  36. self.uid = str(uuid.uuid1())
  37. def get_score(self):
  38. return self.score
  39. def get_loss(self):
  40. return self.loss
  41. def get_key(self):
  42. return self.uid
  43. class HeteroStepwise(object):
  44. def __init__(self):
  45. self.mode = None
  46. self.role = None
  47. self.forward = False
  48. self.backward = False
  49. self.n_step = 0
  50. self.has_test = False
  51. self.n_count = 0
  52. self.stop_stepwise = False
  53. self.models = None
  54. self.metric_namespace = "train"
  55. self.metric_type = "STEPWISE"
  56. self.intercept = None
  57. self.models = {}
  58. self.models_trained = {}
  59. self.IC_computer = None
  60. self.step_direction = None
  61. self.anonymous_header_guest = None
  62. self.anonymous_header_host = None
  63. def _init_model(self, param):
  64. self.model_param = param
  65. self.mode = param.mode
  66. self.role = param.role
  67. self.score_name = param.score_name
  68. self.direction = param.direction
  69. self.max_step = param.max_step
  70. self.nvmin = param.nvmin
  71. self.nvmax = param.nvmax
  72. self.transfer_variable = StepwiseTransferVariable()
  73. self._get_direction()
  74. def _get_direction(self):
  75. if self.direction == "forward":
  76. self.forward = True
  77. elif self.direction == "backward":
  78. self.backward = True
  79. elif self.direction == "both":
  80. self.forward = True
  81. self.backward = True
  82. else:
  83. raise ValueError("Wrong stepwise direction given.")
  84. def _put_model(self, key, model):
  85. """
  86. wrapper to put key, model dict pair into models dict
  87. """
  88. model_dict = {'model': {'stepwise': model.export_model()}}
  89. self.models[key] = model_dict
  90. def _get_model(self, key):
  91. """
  92. wrapper to get value of a given model key from models dict
  93. """
  94. value = self.models.get(key)
  95. return value
  96. def _set_k(self):
  97. """
  98. Helper function, get the penalty coefficient for AIC/BIC calculation.
  99. """
  100. if self.score_name == "aic":
  101. self.k = 2
  102. elif self.score_name == "bic":
  103. self.k = np.log(self.n_count)
  104. else:
  105. raise ValueError("Wrong score name given: {}. Only 'aic' or 'bic' acceptable.".format(self.score_name))
  106. @staticmethod
  107. def get_dfe(model, str_mask):
  108. dfe = sum(HeteroStepwise.string2mask(str_mask))
  109. if model.fit_intercept:
  110. dfe += 1
  111. LOGGER.debug("fit_intercept detected, 1 is added to dfe")
  112. return dfe
  113. def get_step_best(self, step_models):
  114. best_score = None
  115. best_model = ""
  116. for model in step_models:
  117. model_info = self.models_trained[model]
  118. score = model_info.get_score()
  119. if score is None:
  120. continue
  121. if best_score is None or score < best_score:
  122. best_score = score
  123. best_model = model
  124. LOGGER.info(f"step {self.n_step}, best model {best_model}")
  125. return best_model
  126. @staticmethod
  127. def drop_one(mask_to_drop):
  128. for i in np.nonzero(mask_to_drop)[0]:
  129. new_mask = np.copy(mask_to_drop)
  130. new_mask[i] = 0
  131. if sum(new_mask) > 0:
  132. yield new_mask
  133. @staticmethod
  134. def add_one(mask_to_add):
  135. for i in np.where(mask_to_add < 1)[0]:
  136. new_mask = np.copy(mask_to_add)
  137. new_mask[i] = 1
  138. yield new_mask
  139. def check_stop(self, new_host_mask, new_guest_mask, host_mask, guest_mask):
  140. # initial step
  141. if self.n_step == 0:
  142. return False
  143. # if model not updated
  144. if np.array_equal(new_host_mask, host_mask) and np.array_equal(new_guest_mask, guest_mask):
  145. LOGGER.debug("masks not changed, check_stop returns True")
  146. return True
  147. # if full model is the best
  148. if sum(new_host_mask < 1) == 0 and sum(new_guest_mask < 1) == 0 and self.n_step > 0:
  149. LOGGER.debug("masks are full model, check_stop returns True")
  150. return True
  151. # if new best reach variable count lower limit
  152. new_total_nv = sum(new_host_mask) + sum(new_guest_mask)
  153. total_nv = sum(host_mask) + sum(guest_mask)
  154. if new_total_nv == self.nvmin and total_nv >= self.nvmin:
  155. LOGGER.debug("variable count min reached, check_stop returns True")
  156. return True
  157. # if new best reach variable count upper limit
  158. if self.nvmax is not None:
  159. if new_total_nv == self.nvmax and total_nv <= self.nvmax:
  160. LOGGER.debug("variable count max reached, check_stop returns True")
  161. return True
  162. # if reach max step
  163. if self.n_step >= self.max_step:
  164. LOGGER.debug("max step reached, check_stop returns True")
  165. return True
  166. return False
  167. def get_intercept_loss(self, model, data):
  168. y = np.array([x[1] for x in data.mapValues(lambda v: v.label).collect()])
  169. X = np.ones((len(y), 1))
  170. if model.model_name == 'HeteroLinearRegression' or model.model_name == 'HeteroPoissonRegression':
  171. intercept_model = LinearRegression(fit_intercept=False)
  172. trained_model = intercept_model.fit(X, y)
  173. pred = trained_model.predict(X)
  174. loss = metrics.mean_squared_error(y, pred) / 2
  175. elif model.model_name == 'HeteroLogisticRegression':
  176. intercept_model = LogisticRegression(penalty='l1', C=1e8, fit_intercept=False, solver='liblinear')
  177. trained_model = intercept_model.fit(X, y)
  178. pred = trained_model.predict(X)
  179. loss = metrics.log_loss(y, pred)
  180. else:
  181. raise ValueError("Unknown model received. Stepwise stopped.")
  182. self.intercept = intercept_model.intercept_
  183. return loss
  184. def get_ic_val(self, model, model_mask):
  185. if self.role != consts.ARBITER:
  186. return None, None
  187. if len(model.loss_history) == 0:
  188. raise ValueError("Arbiter has no loss history. Stepwise does not support model without total loss.")
  189. # get final loss from loss history for criteria calculation
  190. loss = model.loss_history[-1]
  191. dfe = HeteroStepwise.get_dfe(model, model_mask)
  192. ic_val = self.IC_computer.compute(self.k, self.n_count, dfe, loss)
  193. if np.isinf(ic_val):
  194. raise ValueError("Loss value of infinity obtained. Stepwise stopped.")
  195. return loss, ic_val
  196. def get_ic_val_guest(self, model, train_data):
  197. if not model.fit_intercept:
  198. return None, None
  199. loss = self.get_intercept_loss(model, train_data)
  200. # intercept only model has dfe = 1
  201. dfe = 1
  202. ic_val = self.IC_computer.compute(self.k, self.n_count, dfe, loss)
  203. return loss, ic_val
  204. def _run_step(self, model, train_data, validate_data, feature_mask, n_model, model_mask):
  205. if self.direction == 'forward' and self.n_step == 0:
  206. if self.role == consts.GUEST:
  207. loss, ic_val = self.get_ic_val_guest(model, train_data)
  208. LOGGER.info("step {} n_model {}".format(self.n_step, n_model))
  209. model_info = ModelInfo(self.n_step, n_model, ic_val, loss, self.step_direction)
  210. self.models_trained[model_mask] = model_info
  211. model_key = model_info.get_key()
  212. self._put_model(model_key, model)
  213. else:
  214. model_info = ModelInfo(self.n_step, n_model, None, None, self.step_direction)
  215. self.models_trained[model_mask] = model_info
  216. model_key = model_info.get_key()
  217. self._put_model(model_key, model)
  218. return
  219. curr_step = Step()
  220. curr_step.set_step_info((self.n_step, n_model))
  221. trained_model = curr_step.run(model, train_data, validate_data, feature_mask)
  222. loss, ic_val = self.get_ic_val(trained_model, model_mask)
  223. LOGGER.info("step {} n_model {}: ic_val {}".format(self.n_step, n_model, ic_val))
  224. model_info = ModelInfo(self.n_step, n_model, ic_val, loss, self.step_direction)
  225. self.models_trained[model_mask] = model_info
  226. model_key = model_info.get_key()
  227. self._put_model(model_key, trained_model)
  228. def sync_data_info(self, data):
  229. if self.role == consts.ARBITER:
  230. return self.arbiter_sync_data_info()
  231. else:
  232. return self.client_sync_data_info(data)
  233. def arbiter_sync_data_info(self):
  234. n_host, j_host, self.anonymous_header_host = self.transfer_variable.host_data_info.get(idx=0)
  235. n_guest, j_guest, self.anonymous_header_guest = self.transfer_variable.guest_data_info.get(idx=0)
  236. self.n_count = n_host
  237. return j_host, j_guest
  238. def client_sync_data_info(self, data):
  239. n, j = data.count(), data_overview.get_features_shape(data)
  240. anonymous_header = data_overview.get_anonymous_header(data)
  241. self.n_count = n
  242. if self.role == consts.HOST:
  243. self.transfer_variable.host_data_info.remote((n, j, anonymous_header), role=consts.ARBITER, idx=0)
  244. self.transfer_variable.host_data_info.remote((n, j, anonymous_header), role=consts.GUEST, idx=0)
  245. j_host = j
  246. n_guest, j_guest, self.anonymous_header_guest = self.transfer_variable.guest_data_info.get(idx=0)
  247. self.anonymous_header_host = anonymous_header
  248. else:
  249. self.transfer_variable.guest_data_info.remote((n, j, anonymous_header), role=consts.ARBITER, idx=0)
  250. self.transfer_variable.guest_data_info.remote((n, j, anonymous_header), role=consts.HOST, idx=0)
  251. j_guest = j
  252. n_host, j_host, self.anonymous_header_host = self.transfer_variable.host_data_info.get(idx=0)
  253. self.anonymous_header_guest = anonymous_header
  254. return j_host, j_guest
  255. def get_to_enter(self, host_mask, guest_mask, all_features):
  256. if self.role == consts.GUEST:
  257. to_enter = [all_features[i] for i in np.where(guest_mask < 1)[0]]
  258. elif self.role == consts.HOST:
  259. to_enter = [all_features[i] for i in np.where(host_mask < 1)[0]]
  260. else:
  261. to_enter = []
  262. return to_enter
  263. def update_summary_client(self, model, host_mask, guest_mask, unilateral_features, host_anonym, guest_anonym):
  264. step_summary = {}
  265. if self.role == consts.GUEST:
  266. guest_features = [unilateral_features[i] for i in np.where(guest_mask == 1)[0]]
  267. host_features = [host_anonym[i] for i in np.where(host_mask == 1)[0]]
  268. elif self.role == consts.HOST:
  269. guest_features = [guest_anonym[i] for i in np.where(guest_mask == 1)[0]]
  270. host_features = [unilateral_features[i] for i in np.where(host_mask == 1)[0]]
  271. else:
  272. raise ValueError(f"upload summary on client only applies to host or guest.")
  273. step_summary["guest_features"] = guest_features
  274. step_summary["host_features"] = host_features
  275. model.add_summary(f"step_{self.n_step}", step_summary)
  276. def update_summary_arbiter(self, model, loss, ic_val):
  277. step_summary = {}
  278. step_summary["loss"] = loss
  279. step_summary["ic_val"] = ic_val
  280. model.add_summary(f"step_{self.n_step}", step_summary)
  281. def record_step_best(self, step_best, host_mask, guest_mask, data_instances, model):
  282. metas = {"host_mask": host_mask.tolist(), "guest_mask": guest_mask.tolist(),
  283. "score_name": self.score_name}
  284. metas["number_in"] = int(sum(host_mask) + sum(guest_mask))
  285. metas["direction"] = self.direction
  286. metas["n_count"] = int(self.n_count)
  287. """host_anonym = [
  288. anonymous_generator.generate_anonymous(
  289. fid=i,
  290. role='host',
  291. model=model) for i in range(
  292. len(host_mask))]
  293. guest_anonym = [
  294. anonymous_generator.generate_anonymous(
  295. fid=i,
  296. role='guest',
  297. model=model) for i in range(
  298. len(guest_mask))]
  299. metas["host_features_anonym"] = host_anonym
  300. metas["guest_features_anonym"] = guest_anonym
  301. """
  302. metas["host_features_anonym"] = self.anonymous_header_host
  303. metas["guest_features_anonym"] = self.anonymous_header_guest
  304. model_info = self.models_trained[step_best]
  305. loss = model_info.get_loss()
  306. ic_val = model_info.get_score()
  307. metas["loss"] = loss
  308. metas["current_ic_val"] = ic_val
  309. metas["fit_intercept"] = model.fit_intercept
  310. model_key = model_info.get_key()
  311. model_dict = self._get_model(model_key)
  312. if self.role != consts.ARBITER:
  313. all_features = data_instances.schema.get('header')
  314. metas["all_features"] = all_features
  315. metas["to_enter"] = self.get_to_enter(host_mask, guest_mask, all_features)
  316. model_param = list(model_dict.get('model').values())[0].get(
  317. model.model_param_name)
  318. param_dict = MessageToDict(model_param)
  319. metas["intercept"] = param_dict.get("intercept", None)
  320. metas["weight"] = param_dict.get("weight", {})
  321. metas["header"] = param_dict.get("header", [])
  322. if self.n_step == 0 and self.direction == "forward":
  323. metas["intercept"] = self.intercept
  324. self.update_summary_client(model,
  325. host_mask,
  326. guest_mask,
  327. all_features,
  328. self.anonymous_header_host,
  329. self.anonymous_header_guest)
  330. else:
  331. self.update_summary_arbiter(model, loss, ic_val)
  332. metric_name = f"stepwise_{self.n_step}"
  333. metric = [Metric(metric_name, float(self.n_step))]
  334. model.callback_metric(metric_name=metric_name, metric_namespace=self.metric_namespace, metric_data=metric)
  335. model.tracker.set_metric_meta(metric_name=metric_name, metric_namespace=self.metric_namespace,
  336. metric_meta=MetricMeta(name=metric_name, metric_type=self.metric_type,
  337. extra_metas=metas))
  338. LOGGER.info(f"metric_name: {metric_name}, metas: {metas}")
  339. return
  340. def sync_step_best(self, step_models):
  341. if self.role == consts.ARBITER:
  342. step_best = self.get_step_best(step_models)
  343. self.transfer_variable.step_best.remote(step_best, role=consts.HOST, suffix=(self.n_step,))
  344. self.transfer_variable.step_best.remote(step_best, role=consts.GUEST, suffix=(self.n_step,))
  345. LOGGER.info(f"step {self.n_step}, step_best sent is {step_best}")
  346. else:
  347. step_best = self.transfer_variable.step_best.get(suffix=(self.n_step,))[0]
  348. LOGGER.info(f"step {self.n_step}, step_best received is {step_best}")
  349. return step_best
  350. @staticmethod
  351. def mask2string(host_mask, guest_mask):
  352. mask = np.append(host_mask, guest_mask)
  353. string_repr = ''.join('1' if i else '0' for i in mask)
  354. return string_repr
  355. @staticmethod
  356. def string2mask(string_repr):
  357. mask = np.fromiter(map(int, string_repr), dtype=bool)
  358. return mask
  359. @staticmethod
  360. def predict(data_instances, model):
  361. if data_instances is None:
  362. return
  363. pred_result = model.predict(data_instances)
  364. return pred_result
  365. def get_IC_computer(self, model):
  366. if model.model_name == 'HeteroLinearRegression':
  367. return IC_Approx()
  368. else:
  369. return IC()
  370. def run(self, component_parameters, train_data, validate_data, model):
  371. LOGGER.info("Enter stepwise")
  372. self._init_model(component_parameters)
  373. j_host, j_guest = self.sync_data_info(train_data)
  374. if train_data is not None:
  375. self.anonymous_header = data_overview.get_anonymous_header(train_data)
  376. if self.backward:
  377. host_mask, guest_mask = np.ones(j_host, dtype=bool), np.ones(j_guest, dtype=bool)
  378. else:
  379. host_mask, guest_mask = np.zeros(j_host, dtype=bool), np.zeros(j_guest, dtype=bool)
  380. self.IC_computer = self.get_IC_computer(model)
  381. self._set_k()
  382. while self.n_step <= self.max_step:
  383. LOGGER.info("Enter step {}".format(self.n_step))
  384. step_models = set()
  385. step_models.add(HeteroStepwise.mask2string(host_mask, guest_mask))
  386. n_model = 0
  387. if self.backward:
  388. self.step_direction = "backward"
  389. LOGGER.info("step {}, direction: {}".format(self.n_step, self.step_direction))
  390. if self.n_step == 0:
  391. backward_gen = [[host_mask, guest_mask]]
  392. else:
  393. backward_host, backward_guest = HeteroStepwise.drop_one(host_mask), HeteroStepwise.drop_one(
  394. guest_mask)
  395. backward_gen = itertools.chain(zip(backward_host, itertools.cycle([guest_mask])),
  396. zip(itertools.cycle([host_mask]), backward_guest))
  397. for curr_host_mask, curr_guest_mask in backward_gen:
  398. model_mask = HeteroStepwise.mask2string(curr_host_mask, curr_guest_mask)
  399. step_models.add(model_mask)
  400. if model_mask not in self.models_trained:
  401. if self.role == consts.ARBITER:
  402. feature_mask = None
  403. elif self.role == consts.HOST:
  404. feature_mask = curr_host_mask
  405. else:
  406. feature_mask = curr_guest_mask
  407. self._run_step(model, train_data, validate_data, feature_mask, n_model, model_mask)
  408. n_model += 1
  409. if self.forward:
  410. self.step_direction = "forward"
  411. LOGGER.info("step {}, direction: {}".format(self.n_step, self.step_direction))
  412. forward_host, forward_guest = HeteroStepwise.add_one(host_mask), HeteroStepwise.add_one(guest_mask)
  413. if sum(guest_mask) + sum(host_mask) == 0:
  414. if self.n_step == 0:
  415. forward_gen = [[host_mask, guest_mask]]
  416. else:
  417. forward_gen = itertools.product(list(forward_host), list(forward_guest))
  418. else:
  419. forward_gen = itertools.chain(zip(forward_host, itertools.cycle([guest_mask])),
  420. zip(itertools.cycle([host_mask]), forward_guest))
  421. for curr_host_mask, curr_guest_mask in forward_gen:
  422. model_mask = HeteroStepwise.mask2string(curr_host_mask, curr_guest_mask)
  423. step_models.add(model_mask)
  424. LOGGER.info(f"step {self.n_step}, mask {model_mask}")
  425. if model_mask not in self.models_trained:
  426. if self.role == consts.ARBITER:
  427. feature_mask = None
  428. elif self.role == consts.HOST:
  429. feature_mask = curr_host_mask
  430. else:
  431. feature_mask = curr_guest_mask
  432. self._run_step(model, train_data, validate_data, feature_mask, n_model, model_mask)
  433. n_model += 1
  434. # forward step 0
  435. if sum(host_mask) + sum(guest_mask) == 0 and self.n_step == 0:
  436. model_mask = HeteroStepwise.mask2string(host_mask, guest_mask)
  437. self.record_step_best(model_mask, host_mask, guest_mask, train_data, model)
  438. self.n_step += 1
  439. continue
  440. old_host_mask, old_guest_mask = host_mask, guest_mask
  441. step_best = self.sync_step_best(step_models)
  442. step_best_mask = HeteroStepwise.string2mask(step_best)
  443. host_mask, guest_mask = step_best_mask[:j_host], step_best_mask[j_host:]
  444. LOGGER.debug("step {}, best_host_mask {}, best_guest_mask {}".format(self.n_step, host_mask, guest_mask))
  445. self.stop_stepwise = self.check_stop(host_mask, guest_mask, old_host_mask, old_guest_mask)
  446. if self.stop_stepwise:
  447. break
  448. self.record_step_best(step_best, host_mask, guest_mask, train_data, model)
  449. self.n_step += 1
  450. mask_string = HeteroStepwise.mask2string(host_mask, guest_mask)
  451. model_info = self.models_trained[mask_string]
  452. best_model_key = model_info.get_key()
  453. best_model = self._get_model(best_model_key)
  454. model.load_model(best_model)