boosting.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530
  1. import copy
  2. import functools
  3. import typing
  4. from abc import ABC
  5. import abc
  6. from numpy import random
  7. import numpy as np
  8. from federatedml.param.boosting_param import BoostingParam, ObjectiveParam
  9. from federatedml.param.predict_param import PredictParam
  10. from federatedml.param.feature_binning_param import FeatureBinningParam
  11. from federatedml.model_selection import start_cross_validation
  12. from federatedml.util import abnormal_detection
  13. from federatedml.util import consts
  14. from federatedml.feature.sparse_vector import SparseVector
  15. from federatedml.model_base import ModelBase
  16. from federatedml.feature.fate_element_type import NoneType
  17. from federatedml.ensemble.basic_algorithms import BasicAlgorithms
  18. from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import FairLoss
  19. from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import HuberLoss
  20. from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import LeastAbsoluteErrorLoss
  21. from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import LeastSquaredErrorLoss
  22. from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import LogCoshLoss
  23. from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import TweedieLoss
  24. from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import SigmoidBinaryCrossEntropyLoss
  25. from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import SoftmaxCrossEntropyLoss
  26. from federatedml.param.evaluation_param import EvaluateParam
  27. from federatedml.ensemble.boosting.predict_cache import PredictDataCache
  28. from federatedml.statistic import data_overview
  29. from federatedml.optim.convergence import converge_func_factory
  30. from federatedml.statistic.data_overview import get_anonymous_header
  31. from federatedml.util import LOGGER
  32. class Boosting(ModelBase, ABC):
  33. def __init__(self):
  34. super(Boosting, self).__init__()
  35. # input hyper parameter
  36. self.task_type = None
  37. self.learning_rate = None
  38. self.start_round = None
  39. self.boosting_round = None
  40. self.n_iter_no_change = None
  41. self.tol = 0.0
  42. self.bin_num = None
  43. self.calculated_mode = None
  44. self.cv_param = None
  45. self.validation_freqs = None
  46. self.feature_name_fid_mapping = {}
  47. self.mode = None
  48. self.predict_param = PredictParam()
  49. self.objective_param = ObjectiveParam()
  50. self.model_param = BoostingParam()
  51. self.subsample_feature_rate = 1.0
  52. self.subsample_random_seed = None
  53. self.model_name = 'default' # model name
  54. self.early_stopping_rounds = None
  55. self.use_first_metric_only = False
  56. self.binning_error = consts.DEFAULT_RELATIVE_ERROR
  57. # running variable
  58. # random seed
  59. self.random_seed = 100
  60. # feat anonymous header
  61. self.anonymous_header = None
  62. # data
  63. self.data_inst = None # original input data
  64. self.binning_class = None # class used for data binning
  65. self.binning_obj = None # instance of self.binning_class
  66. self.data_bin = None # data with transformed features
  67. self.bin_split_points = None # feature split points
  68. self.bin_sparse_points = None # feature sparse points
  69. self.use_missing = False # should handle missing value or not
  70. self.zero_as_missing = False # set missing value as value or not
  71. # booster
  72. self.booster_dim = 1 # booster dimension
  73. self.booster_meta = None # booster's hyper parameters
  74. self.boosting_model_list = [] # list hol\ds boosters
  75. # training
  76. self.feature_num = None # feature number
  77. self.init_score = None # init score
  78. self.num_classes = 1 # number of classes
  79. self.convergence = None # function to check loss convergence
  80. self.classes_ = [] # list of class indices
  81. self.y = None # label
  82. self.y_hat = None # accumulated predict value
  83. self.loss = None # loss func
  84. self.predict_y_hat = None # accumulated predict value for predicting mode
  85. self.history_loss = [] # list holds loss history
  86. self.metrics = None
  87. self.is_converged = False
  88. self.is_warm_start = False # warm start parameter
  89. self.on_training = False
  90. # cache and header alignment
  91. self.predict_data_cache = PredictDataCache()
  92. self.data_alignment_map = {}
  93. # federation
  94. self.transfer_variable = None
  95. def _init_model(self, boosting_param: BoostingParam):
  96. self.task_type = boosting_param.task_type
  97. self.objective_param = boosting_param.objective_param
  98. self.learning_rate = boosting_param.learning_rate
  99. self.boosting_round = boosting_param.num_trees
  100. self.n_iter_no_change = boosting_param.n_iter_no_change
  101. self.tol = boosting_param.tol
  102. self.bin_num = boosting_param.bin_num
  103. self.predict_param = boosting_param.predict_param
  104. self.cv_param = boosting_param.cv_param
  105. self.validation_freqs = boosting_param.validation_freqs
  106. self.metrics = boosting_param.metrics
  107. self.subsample_feature_rate = boosting_param.subsample_feature_rate
  108. self.binning_error = boosting_param.binning_error
  109. self.is_warm_start = self.component_properties.is_warm_start
  110. LOGGER.debug('warm start is {}'.format(self.is_warm_start))
  111. if boosting_param.random_seed is not None:
  112. self.random_seed = boosting_param.random_seed
  113. # initialize random seed here
  114. LOGGER.debug('setting random seed done, random seed is {}'.format(self.random_seed))
  115. np.random.seed(self.random_seed)
  116. """
  117. Data Processing
  118. """
  119. @staticmethod
  120. def data_format_transform(row):
  121. """
  122. transform data into sparse format
  123. """
  124. if type(row.features).__name__ != consts.SPARSE_VECTOR:
  125. feature_shape = row.features.shape[0]
  126. indices = []
  127. data = []
  128. for i in range(feature_shape):
  129. if np.isnan(row.features[i]):
  130. indices.append(i)
  131. data.append(NoneType())
  132. elif np.abs(row.features[i]) < consts.FLOAT_ZERO:
  133. continue
  134. else:
  135. indices.append(i)
  136. data.append(row.features[i])
  137. new_row = copy.deepcopy(row)
  138. new_row.features = SparseVector(indices, data, feature_shape)
  139. return new_row
  140. else:
  141. sparse_vec = row.features.get_sparse_vector()
  142. replace_key = []
  143. for key in sparse_vec:
  144. if sparse_vec.get(key) == NoneType() or np.isnan(sparse_vec.get(key)):
  145. replace_key.append(key)
  146. if len(replace_key) == 0:
  147. return row
  148. else:
  149. new_row = copy.deepcopy(row)
  150. new_sparse_vec = new_row.features.get_sparse_vector()
  151. for key in replace_key:
  152. new_sparse_vec[key] = NoneType()
  153. return new_row
  154. def convert_feature_to_bin(self, data_instance, handle_missing_value=False):
  155. """
  156. convert bin index to real value
  157. """
  158. LOGGER.info("convert feature to bins")
  159. param_obj = FeatureBinningParam(bin_num=self.bin_num, error=self.binning_error)
  160. if handle_missing_value:
  161. self.binning_obj = self.binning_class(param_obj, abnormal_list=[NoneType()], )
  162. else:
  163. self.binning_obj = self.binning_class(param_obj)
  164. self.binning_obj.fit_split_points(data_instance)
  165. rs = self.binning_obj.convert_feature_to_bin(data_instance)
  166. LOGGER.info("convert feature to bins over")
  167. return rs
  168. def sample_valid_features(self):
  169. LOGGER.info("sample valid features")
  170. self.feature_num = self.bin_split_points.shape[0]
  171. choose_feature = random.choice(range(0, self.feature_num),
  172. max(1, int(self.subsample_feature_rate * self.feature_num)), replace=False)
  173. valid_features = [False for i in range(self.feature_num)]
  174. for fid in choose_feature:
  175. valid_features[fid] = True
  176. return valid_features
  177. @staticmethod
  178. def data_alignment(data_inst):
  179. """
  180. align data: abnormal detection and transform data to sparse format
  181. """
  182. abnormal_detection.empty_table_detection(data_inst)
  183. abnormal_detection.empty_feature_detection(data_inst)
  184. schema = data_inst.schema
  185. new_data_inst = data_inst.mapValues(lambda row: Boosting.data_format_transform(row))
  186. new_data_inst.schema = schema
  187. return new_data_inst
  188. def data_and_header_alignment(self, data_inst):
  189. """
  190. turn data into sparse and align header/ align data table header
  191. """
  192. cache_dataset_key = self.predict_data_cache.get_data_key(data_inst)
  193. if cache_dataset_key in self.data_alignment_map:
  194. processed_data = self.data_alignment_map[cache_dataset_key]
  195. else:
  196. data_inst_tmp = self.data_alignment(data_inst)
  197. header = [None] * len(self.feature_name_fid_mapping)
  198. for idx, col in self.feature_name_fid_mapping.items():
  199. header[idx] = col
  200. processed_data = data_overview.header_alignment(data_inst_tmp, header,
  201. pre_anonymous_header=get_anonymous_header(data_inst))
  202. self.data_alignment_map[cache_dataset_key] = processed_data
  203. return processed_data
  204. @staticmethod
  205. def gen_feature_fid_mapping(schema):
  206. """
  207. generate {idx: feature_name} mapping
  208. """
  209. header = schema.get("header")
  210. feature_name_fid_mapping = dict(zip(range(len(header)), header))
  211. LOGGER.debug("fid_mapping is {}".format(feature_name_fid_mapping))
  212. return feature_name_fid_mapping
  213. def prepare_data(self, data_inst):
  214. """
  215. prepare data: data alignment, and transform feature to bin id
  216. Args:
  217. data_inst: training data
  218. Returns: data_bin, data_split_points, data_sparse_point
  219. """
  220. # to sprase vec
  221. data_inst = self.data_alignment(data_inst)
  222. # binning
  223. return self.convert_feature_to_bin(data_inst, self.use_missing)
  224. @abc.abstractmethod
  225. def check_label(self, *args) -> typing.Tuple[typing.List[int], int, int]:
  226. """
  227. Returns: get classes indices, class number and booster dimension and class
  228. """
  229. raise NotImplementedError()
  230. @staticmethod
  231. def get_label(data_bin):
  232. """
  233. extract y label from Table
  234. """
  235. y = data_bin.mapValues(lambda instance: instance.label)
  236. return y
  237. """
  238. Functions
  239. """
  240. def cross_validation(self, data_instances):
  241. return start_cross_validation.run(self, data_instances)
  242. def feat_name_check(self, data_inst, feat_name_fid_mapping):
  243. previous_model_feat_name = set(feat_name_fid_mapping.values())
  244. cur_data_feat_name = set(data_inst.schema['header'])
  245. assert previous_model_feat_name == cur_data_feat_name, 'feature alignment failed, diff: {}' \
  246. .format(previous_model_feat_name.symmetric_difference(cur_data_feat_name))
  247. LOGGER.debug('warm start feat name {}, {}'.format(previous_model_feat_name, cur_data_feat_name))
  248. def get_loss_function(self):
  249. loss_type = self.objective_param.objective
  250. params = self.objective_param.params
  251. LOGGER.info("set objective, objective is {}".format(loss_type))
  252. if self.task_type == consts.CLASSIFICATION:
  253. if loss_type == "cross_entropy":
  254. if self.num_classes == 2:
  255. loss_func = SigmoidBinaryCrossEntropyLoss()
  256. else:
  257. loss_func = SoftmaxCrossEntropyLoss()
  258. else:
  259. raise NotImplementedError("objective %s not supported yet" % (loss_type))
  260. elif self.task_type == consts.REGRESSION:
  261. if loss_type == "lse":
  262. loss_func = LeastSquaredErrorLoss()
  263. elif loss_type == "lae":
  264. loss_func = LeastAbsoluteErrorLoss()
  265. elif loss_type == "huber":
  266. loss_func = HuberLoss(params[0])
  267. elif loss_type == "fair":
  268. loss_func = FairLoss(params[0])
  269. elif loss_type == "tweedie":
  270. loss_func = TweedieLoss(params[0])
  271. elif loss_type == "log_cosh":
  272. loss_func = LogCoshLoss()
  273. else:
  274. raise NotImplementedError("objective %s not supported yet" % (loss_type))
  275. else:
  276. raise NotImplementedError("objective %s not supported yet" % (loss_type))
  277. return loss_func
  278. def get_metrics_param(self):
  279. """
  280. this interface gives evaluation type. Will be called by validation strategy
  281. """
  282. if self.task_type == consts.CLASSIFICATION:
  283. if self.num_classes == 2:
  284. return EvaluateParam(eval_type="binary",
  285. pos_label=self.classes_[1], metrics=self.metrics)
  286. else:
  287. return EvaluateParam(eval_type="multi", metrics=self.metrics)
  288. else:
  289. return EvaluateParam(eval_type="regression", metrics=self.metrics)
  290. def compute_loss(self, y_hat, y):
  291. """
  292. compute loss given predicted y and real y
  293. """
  294. LOGGER.info("compute loss")
  295. if self.task_type == consts.CLASSIFICATION:
  296. loss_method = self.loss
  297. y_predict = y_hat.mapValues(lambda val: loss_method.predict(val))
  298. loss = loss_method.compute_loss(y, y_predict)
  299. elif self.task_type == consts.REGRESSION:
  300. if self.objective_param.objective in ["lse", "lae", "logcosh", "log_cosh", "huber"]:
  301. loss_method = self.loss
  302. loss = loss_method.compute_loss(y, y_hat)
  303. elif self.objective_param.objective in ['tweedie']:
  304. loss_method = self.loss
  305. y_predict = y_hat.mapValues(lambda val: loss_method.predict(val))
  306. loss = loss_method.compute_loss(y, y_predict)
  307. return float(loss)
  308. def check_convergence(self, loss):
  309. """
  310. check if the loss converges
  311. """
  312. LOGGER.info("check convergence")
  313. if self.convergence is None:
  314. self.convergence = converge_func_factory("diff", self.tol)
  315. return self.convergence.is_converge(loss)
  316. @staticmethod
  317. def accumulate_y_hat(val, new_val, lr=0.1, idx=0):
  318. # vector sum
  319. if isinstance(new_val, np.ndarray) and len(new_val) == len(val):
  320. return val + new_val * lr
  321. # accumulate by dimension
  322. z_vec = np.zeros(len(val))
  323. z_vec[idx] = lr * new_val
  324. return z_vec + val
  325. def generate_flowid(self, round_num, dim):
  326. LOGGER.info("generate flowid, flowid {}".format(self.flowid))
  327. return ".".join(map(str, [self.flowid, round_num, dim]))
  328. def get_new_predict_score(self, y_hat, cur_sample_weights, dim=0):
  329. func = functools.partial(self.accumulate_y_hat, lr=self.learning_rate, idx=dim)
  330. return y_hat.join(cur_sample_weights, func)
  331. def _get_cv_param(self):
  332. self.model_param.cv_param.role = self.role
  333. self.model_param.cv_param.mode = self.mode
  334. return self.model_param.cv_param
  335. """
  336. fit and predict
  337. """
  338. @abc.abstractmethod
  339. def fit(self, data_inst, validate_data=None):
  340. raise NotImplementedError()
  341. @abc.abstractmethod
  342. def predict(self, data_inst):
  343. raise NotImplementedError()
  344. @abc.abstractmethod
  345. def generate_summary(self) -> dict:
  346. """
  347. return model summary
  348. """
  349. raise NotImplementedError()
  350. """
  351. Training Procedure
  352. """
  353. def get_init_score(self, y, num_classes: int):
  354. if num_classes > 2:
  355. y_hat, init_score = self.loss.initialize(y, num_classes)
  356. else:
  357. y_hat, init_score = self.loss.initialize(y)
  358. return y_hat, init_score
  359. @abc.abstractmethod
  360. def fit_a_learner(self, *args) -> BasicAlgorithms:
  361. """
  362. fit a booster and return it
  363. """
  364. raise NotImplementedError()
  365. """
  366. Prediction Procedure
  367. """
  368. @abc.abstractmethod
  369. def load_learner(self, *args):
  370. """
  371. load a booster
  372. """
  373. raise NotImplementedError()
  374. def score_to_predict_result(self, data_inst, y_hat):
  375. """
  376. given binary/multi-class/regression prediction scores, outputs result in standard format
  377. """
  378. predicts = None
  379. loss_method = self.loss
  380. if self.task_type == consts.CLASSIFICATION:
  381. if self.num_classes == 2:
  382. predicts = y_hat.mapValues(lambda f: float(loss_method.predict(f)))
  383. else:
  384. predicts = y_hat.mapValues(lambda f: loss_method.predict(f).tolist())
  385. elif self.task_type == consts.REGRESSION:
  386. if self.objective_param.objective in ["tweedie"]:
  387. predicts = y_hat.mapValues(lambda f: [float(loss_method.predict(f))])
  388. elif self.objective_param.objective in ["lse", "lae", "huber", "log_cosh", "fair"]:
  389. predicts = y_hat
  390. else:
  391. raise NotImplementedError("objective {} not supprted yet".format(self.objective_param.objective))
  392. if self.task_type == consts.CLASSIFICATION:
  393. predict_result = self.predict_score_to_output(data_inst, predict_score=predicts, classes=self.classes_,
  394. threshold=self.predict_param.threshold)
  395. elif self.task_type == consts.REGRESSION:
  396. predicts = predicts.mapValues(lambda x: x[0])
  397. predict_result = self.predict_score_to_output(data_inst, predict_score=predicts, classes=None)
  398. else:
  399. raise NotImplementedError("task type {} not supported yet".format(self.task_type))
  400. return predict_result
  401. """
  402. Model IO
  403. """
  404. @abc.abstractmethod
  405. def get_model_meta(self):
  406. raise NotImplementedError()
  407. @abc.abstractmethod
  408. def get_model_param(self):
  409. raise NotImplementedError()
  410. @abc.abstractmethod
  411. def set_model_meta(self, model_meta):
  412. raise NotImplementedError()
  413. @abc.abstractmethod
  414. def set_model_param(self, model_param):
  415. raise NotImplementedError()
  416. def preprocess(self):
  417. pass
  418. def postprocess(self):
  419. pass
  420. def get_cur_model(self):
  421. meta_name, meta_protobuf = self.get_model_meta()
  422. param_name, param_protobuf = self.get_model_param()
  423. return {meta_name: meta_protobuf,
  424. param_name: param_protobuf
  425. }
  426. def export_model(self):
  427. if self.need_cv:
  428. return None
  429. return self.get_cur_model()
  430. def load_model(self, model_dict, model_key="model"):
  431. model_param = None
  432. model_meta = None
  433. for _, value in model_dict[model_key].items():
  434. for model in value:
  435. if model.endswith("Meta"):
  436. model_meta = value[model]
  437. if model.endswith("Param"):
  438. model_param = value[model]
  439. LOGGER.info("load model")
  440. self.set_model_meta(model_meta)
  441. self.set_model_param(model_param)
  442. def predict_proba(self, data_inst):
  443. pass
  444. def save_data(self):
  445. return self.data_output
  446. def save_model(self):
  447. pass