123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530 |
- import copy
- import functools
- import typing
- from abc import ABC
- import abc
- from numpy import random
- import numpy as np
- from federatedml.param.boosting_param import BoostingParam, ObjectiveParam
- from federatedml.param.predict_param import PredictParam
- from federatedml.param.feature_binning_param import FeatureBinningParam
- from federatedml.model_selection import start_cross_validation
- from federatedml.util import abnormal_detection
- from federatedml.util import consts
- from federatedml.feature.sparse_vector import SparseVector
- from federatedml.model_base import ModelBase
- from federatedml.feature.fate_element_type import NoneType
- from federatedml.ensemble.basic_algorithms import BasicAlgorithms
- from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import FairLoss
- from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import HuberLoss
- from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import LeastAbsoluteErrorLoss
- from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import LeastSquaredErrorLoss
- from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import LogCoshLoss
- from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import TweedieLoss
- from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import SigmoidBinaryCrossEntropyLoss
- from federatedml.ensemble.basic_algorithms.decision_tree.tree_core.loss import SoftmaxCrossEntropyLoss
- from federatedml.param.evaluation_param import EvaluateParam
- from federatedml.ensemble.boosting.predict_cache import PredictDataCache
- from federatedml.statistic import data_overview
- from federatedml.optim.convergence import converge_func_factory
- from federatedml.statistic.data_overview import get_anonymous_header
- from federatedml.util import LOGGER
- class Boosting(ModelBase, ABC):
- def __init__(self):
- super(Boosting, self).__init__()
- # input hyper parameter
- self.task_type = None
- self.learning_rate = None
- self.start_round = None
- self.boosting_round = None
- self.n_iter_no_change = None
- self.tol = 0.0
- self.bin_num = None
- self.calculated_mode = None
- self.cv_param = None
- self.validation_freqs = None
- self.feature_name_fid_mapping = {}
- self.mode = None
- self.predict_param = PredictParam()
- self.objective_param = ObjectiveParam()
- self.model_param = BoostingParam()
- self.subsample_feature_rate = 1.0
- self.subsample_random_seed = None
- self.model_name = 'default' # model name
- self.early_stopping_rounds = None
- self.use_first_metric_only = False
- self.binning_error = consts.DEFAULT_RELATIVE_ERROR
- # running variable
- # random seed
- self.random_seed = 100
- # feat anonymous header
- self.anonymous_header = None
- # data
- self.data_inst = None # original input data
- self.binning_class = None # class used for data binning
- self.binning_obj = None # instance of self.binning_class
- self.data_bin = None # data with transformed features
- self.bin_split_points = None # feature split points
- self.bin_sparse_points = None # feature sparse points
- self.use_missing = False # should handle missing value or not
- self.zero_as_missing = False # set missing value as value or not
- # booster
- self.booster_dim = 1 # booster dimension
- self.booster_meta = None # booster's hyper parameters
- self.boosting_model_list = [] # list hol\ds boosters
- # training
- self.feature_num = None # feature number
- self.init_score = None # init score
- self.num_classes = 1 # number of classes
- self.convergence = None # function to check loss convergence
- self.classes_ = [] # list of class indices
- self.y = None # label
- self.y_hat = None # accumulated predict value
- self.loss = None # loss func
- self.predict_y_hat = None # accumulated predict value for predicting mode
- self.history_loss = [] # list holds loss history
- self.metrics = None
- self.is_converged = False
- self.is_warm_start = False # warm start parameter
- self.on_training = False
- # cache and header alignment
- self.predict_data_cache = PredictDataCache()
- self.data_alignment_map = {}
- # federation
- self.transfer_variable = None
- def _init_model(self, boosting_param: BoostingParam):
- self.task_type = boosting_param.task_type
- self.objective_param = boosting_param.objective_param
- self.learning_rate = boosting_param.learning_rate
- self.boosting_round = boosting_param.num_trees
- self.n_iter_no_change = boosting_param.n_iter_no_change
- self.tol = boosting_param.tol
- self.bin_num = boosting_param.bin_num
- self.predict_param = boosting_param.predict_param
- self.cv_param = boosting_param.cv_param
- self.validation_freqs = boosting_param.validation_freqs
- self.metrics = boosting_param.metrics
- self.subsample_feature_rate = boosting_param.subsample_feature_rate
- self.binning_error = boosting_param.binning_error
- self.is_warm_start = self.component_properties.is_warm_start
- LOGGER.debug('warm start is {}'.format(self.is_warm_start))
- if boosting_param.random_seed is not None:
- self.random_seed = boosting_param.random_seed
- # initialize random seed here
- LOGGER.debug('setting random seed done, random seed is {}'.format(self.random_seed))
- np.random.seed(self.random_seed)
- """
- Data Processing
- """
- @staticmethod
- def data_format_transform(row):
- """
- transform data into sparse format
- """
- if type(row.features).__name__ != consts.SPARSE_VECTOR:
- feature_shape = row.features.shape[0]
- indices = []
- data = []
- for i in range(feature_shape):
- if np.isnan(row.features[i]):
- indices.append(i)
- data.append(NoneType())
- elif np.abs(row.features[i]) < consts.FLOAT_ZERO:
- continue
- else:
- indices.append(i)
- data.append(row.features[i])
- new_row = copy.deepcopy(row)
- new_row.features = SparseVector(indices, data, feature_shape)
- return new_row
- else:
- sparse_vec = row.features.get_sparse_vector()
- replace_key = []
- for key in sparse_vec:
- if sparse_vec.get(key) == NoneType() or np.isnan(sparse_vec.get(key)):
- replace_key.append(key)
- if len(replace_key) == 0:
- return row
- else:
- new_row = copy.deepcopy(row)
- new_sparse_vec = new_row.features.get_sparse_vector()
- for key in replace_key:
- new_sparse_vec[key] = NoneType()
- return new_row
- def convert_feature_to_bin(self, data_instance, handle_missing_value=False):
- """
- convert bin index to real value
- """
- LOGGER.info("convert feature to bins")
- param_obj = FeatureBinningParam(bin_num=self.bin_num, error=self.binning_error)
- if handle_missing_value:
- self.binning_obj = self.binning_class(param_obj, abnormal_list=[NoneType()], )
- else:
- self.binning_obj = self.binning_class(param_obj)
- self.binning_obj.fit_split_points(data_instance)
- rs = self.binning_obj.convert_feature_to_bin(data_instance)
- LOGGER.info("convert feature to bins over")
- return rs
- def sample_valid_features(self):
- LOGGER.info("sample valid features")
- self.feature_num = self.bin_split_points.shape[0]
- choose_feature = random.choice(range(0, self.feature_num),
- max(1, int(self.subsample_feature_rate * self.feature_num)), replace=False)
- valid_features = [False for i in range(self.feature_num)]
- for fid in choose_feature:
- valid_features[fid] = True
- return valid_features
- @staticmethod
- def data_alignment(data_inst):
- """
- align data: abnormal detection and transform data to sparse format
- """
- abnormal_detection.empty_table_detection(data_inst)
- abnormal_detection.empty_feature_detection(data_inst)
- schema = data_inst.schema
- new_data_inst = data_inst.mapValues(lambda row: Boosting.data_format_transform(row))
- new_data_inst.schema = schema
- return new_data_inst
- def data_and_header_alignment(self, data_inst):
- """
- turn data into sparse and align header/ align data table header
- """
- cache_dataset_key = self.predict_data_cache.get_data_key(data_inst)
- if cache_dataset_key in self.data_alignment_map:
- processed_data = self.data_alignment_map[cache_dataset_key]
- else:
- data_inst_tmp = self.data_alignment(data_inst)
- header = [None] * len(self.feature_name_fid_mapping)
- for idx, col in self.feature_name_fid_mapping.items():
- header[idx] = col
- processed_data = data_overview.header_alignment(data_inst_tmp, header,
- pre_anonymous_header=get_anonymous_header(data_inst))
- self.data_alignment_map[cache_dataset_key] = processed_data
- return processed_data
- @staticmethod
- def gen_feature_fid_mapping(schema):
- """
- generate {idx: feature_name} mapping
- """
- header = schema.get("header")
- feature_name_fid_mapping = dict(zip(range(len(header)), header))
- LOGGER.debug("fid_mapping is {}".format(feature_name_fid_mapping))
- return feature_name_fid_mapping
- def prepare_data(self, data_inst):
- """
- prepare data: data alignment, and transform feature to bin id
- Args:
- data_inst: training data
- Returns: data_bin, data_split_points, data_sparse_point
- """
- # to sprase vec
- data_inst = self.data_alignment(data_inst)
- # binning
- return self.convert_feature_to_bin(data_inst, self.use_missing)
- @abc.abstractmethod
- def check_label(self, *args) -> typing.Tuple[typing.List[int], int, int]:
- """
- Returns: get classes indices, class number and booster dimension and class
- """
- raise NotImplementedError()
- @staticmethod
- def get_label(data_bin):
- """
- extract y label from Table
- """
- y = data_bin.mapValues(lambda instance: instance.label)
- return y
- """
- Functions
- """
- def cross_validation(self, data_instances):
- return start_cross_validation.run(self, data_instances)
- def feat_name_check(self, data_inst, feat_name_fid_mapping):
- previous_model_feat_name = set(feat_name_fid_mapping.values())
- cur_data_feat_name = set(data_inst.schema['header'])
- assert previous_model_feat_name == cur_data_feat_name, 'feature alignment failed, diff: {}' \
- .format(previous_model_feat_name.symmetric_difference(cur_data_feat_name))
- LOGGER.debug('warm start feat name {}, {}'.format(previous_model_feat_name, cur_data_feat_name))
- def get_loss_function(self):
- loss_type = self.objective_param.objective
- params = self.objective_param.params
- LOGGER.info("set objective, objective is {}".format(loss_type))
- if self.task_type == consts.CLASSIFICATION:
- if loss_type == "cross_entropy":
- if self.num_classes == 2:
- loss_func = SigmoidBinaryCrossEntropyLoss()
- else:
- loss_func = SoftmaxCrossEntropyLoss()
- else:
- raise NotImplementedError("objective %s not supported yet" % (loss_type))
- elif self.task_type == consts.REGRESSION:
- if loss_type == "lse":
- loss_func = LeastSquaredErrorLoss()
- elif loss_type == "lae":
- loss_func = LeastAbsoluteErrorLoss()
- elif loss_type == "huber":
- loss_func = HuberLoss(params[0])
- elif loss_type == "fair":
- loss_func = FairLoss(params[0])
- elif loss_type == "tweedie":
- loss_func = TweedieLoss(params[0])
- elif loss_type == "log_cosh":
- loss_func = LogCoshLoss()
- else:
- raise NotImplementedError("objective %s not supported yet" % (loss_type))
- else:
- raise NotImplementedError("objective %s not supported yet" % (loss_type))
- return loss_func
- def get_metrics_param(self):
- """
- this interface gives evaluation type. Will be called by validation strategy
- """
- if self.task_type == consts.CLASSIFICATION:
- if self.num_classes == 2:
- return EvaluateParam(eval_type="binary",
- pos_label=self.classes_[1], metrics=self.metrics)
- else:
- return EvaluateParam(eval_type="multi", metrics=self.metrics)
- else:
- return EvaluateParam(eval_type="regression", metrics=self.metrics)
- def compute_loss(self, y_hat, y):
- """
- compute loss given predicted y and real y
- """
- LOGGER.info("compute loss")
- if self.task_type == consts.CLASSIFICATION:
- loss_method = self.loss
- y_predict = y_hat.mapValues(lambda val: loss_method.predict(val))
- loss = loss_method.compute_loss(y, y_predict)
- elif self.task_type == consts.REGRESSION:
- if self.objective_param.objective in ["lse", "lae", "logcosh", "log_cosh", "huber"]:
- loss_method = self.loss
- loss = loss_method.compute_loss(y, y_hat)
- elif self.objective_param.objective in ['tweedie']:
- loss_method = self.loss
- y_predict = y_hat.mapValues(lambda val: loss_method.predict(val))
- loss = loss_method.compute_loss(y, y_predict)
- return float(loss)
- def check_convergence(self, loss):
- """
- check if the loss converges
- """
- LOGGER.info("check convergence")
- if self.convergence is None:
- self.convergence = converge_func_factory("diff", self.tol)
- return self.convergence.is_converge(loss)
- @staticmethod
- def accumulate_y_hat(val, new_val, lr=0.1, idx=0):
- # vector sum
- if isinstance(new_val, np.ndarray) and len(new_val) == len(val):
- return val + new_val * lr
- # accumulate by dimension
- z_vec = np.zeros(len(val))
- z_vec[idx] = lr * new_val
- return z_vec + val
- def generate_flowid(self, round_num, dim):
- LOGGER.info("generate flowid, flowid {}".format(self.flowid))
- return ".".join(map(str, [self.flowid, round_num, dim]))
- def get_new_predict_score(self, y_hat, cur_sample_weights, dim=0):
- func = functools.partial(self.accumulate_y_hat, lr=self.learning_rate, idx=dim)
- return y_hat.join(cur_sample_weights, func)
- def _get_cv_param(self):
- self.model_param.cv_param.role = self.role
- self.model_param.cv_param.mode = self.mode
- return self.model_param.cv_param
- """
- fit and predict
- """
- @abc.abstractmethod
- def fit(self, data_inst, validate_data=None):
- raise NotImplementedError()
- @abc.abstractmethod
- def predict(self, data_inst):
- raise NotImplementedError()
- @abc.abstractmethod
- def generate_summary(self) -> dict:
- """
- return model summary
- """
- raise NotImplementedError()
- """
- Training Procedure
- """
- def get_init_score(self, y, num_classes: int):
- if num_classes > 2:
- y_hat, init_score = self.loss.initialize(y, num_classes)
- else:
- y_hat, init_score = self.loss.initialize(y)
- return y_hat, init_score
- @abc.abstractmethod
- def fit_a_learner(self, *args) -> BasicAlgorithms:
- """
- fit a booster and return it
- """
- raise NotImplementedError()
- """
- Prediction Procedure
- """
- @abc.abstractmethod
- def load_learner(self, *args):
- """
- load a booster
- """
- raise NotImplementedError()
- def score_to_predict_result(self, data_inst, y_hat):
- """
- given binary/multi-class/regression prediction scores, outputs result in standard format
- """
- predicts = None
- loss_method = self.loss
- if self.task_type == consts.CLASSIFICATION:
- if self.num_classes == 2:
- predicts = y_hat.mapValues(lambda f: float(loss_method.predict(f)))
- else:
- predicts = y_hat.mapValues(lambda f: loss_method.predict(f).tolist())
- elif self.task_type == consts.REGRESSION:
- if self.objective_param.objective in ["tweedie"]:
- predicts = y_hat.mapValues(lambda f: [float(loss_method.predict(f))])
- elif self.objective_param.objective in ["lse", "lae", "huber", "log_cosh", "fair"]:
- predicts = y_hat
- else:
- raise NotImplementedError("objective {} not supprted yet".format(self.objective_param.objective))
- if self.task_type == consts.CLASSIFICATION:
- predict_result = self.predict_score_to_output(data_inst, predict_score=predicts, classes=self.classes_,
- threshold=self.predict_param.threshold)
- elif self.task_type == consts.REGRESSION:
- predicts = predicts.mapValues(lambda x: x[0])
- predict_result = self.predict_score_to_output(data_inst, predict_score=predicts, classes=None)
- else:
- raise NotImplementedError("task type {} not supported yet".format(self.task_type))
- return predict_result
- """
- Model IO
- """
- @abc.abstractmethod
- def get_model_meta(self):
- raise NotImplementedError()
- @abc.abstractmethod
- def get_model_param(self):
- raise NotImplementedError()
- @abc.abstractmethod
- def set_model_meta(self, model_meta):
- raise NotImplementedError()
- @abc.abstractmethod
- def set_model_param(self, model_param):
- raise NotImplementedError()
- def preprocess(self):
- pass
- def postprocess(self):
- pass
- def get_cur_model(self):
- meta_name, meta_protobuf = self.get_model_meta()
- param_name, param_protobuf = self.get_model_param()
- return {meta_name: meta_protobuf,
- param_name: param_protobuf
- }
- def export_model(self):
- if self.need_cv:
- return None
- return self.get_cur_model()
- def load_model(self, model_dict, model_key="model"):
- model_param = None
- model_meta = None
- for _, value in model_dict[model_key].items():
- for model in value:
- if model.endswith("Meta"):
- model_meta = value[model]
- if model.endswith("Param"):
- model_param = value[model]
- LOGGER.info("load model")
- self.set_model_meta(model_meta)
- self.set_model_param(model_param)
- def predict_proba(self, data_inst):
- pass
- def save_data(self):
- return self.data_output
- def save_model(self):
- pass
|