123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import copy
- import functools
- import numpy as np
- from sklearn.model_selection import KFold as sk_KFold
- from fate_arch.session import computing_session as session
- from federatedml.evaluation.evaluation import Evaluation
- from federatedml.model_selection.cross_validate import BaseCrossValidator
- from federatedml.model_selection.indices import collect_index
- from federatedml.transfer_variable.transfer_class.cross_validation_transfer_variable import \
- CrossValidationTransferVariable
- from federatedml.util import LOGGER
- from federatedml.util import consts
- class KFold(BaseCrossValidator):
- def __init__(self):
- super(KFold, self).__init__()
- self.model_param = None
- self.n_splits = 1
- self.shuffle = True
- self.random_seed = 1
- self.fold_history = None
- def _init_model(self, param):
- self.model_param = param
- self.n_splits = param.n_splits
- self.mode = param.mode
- self.role = param.role
- self.shuffle = param.shuffle
- self.random_seed = param.random_seed
- self.output_fold_history = param.output_fold_history
- self.history_value_type = param.history_value_type
- # self.evaluate_param = param.evaluate_param
- # np.random.seed(self.random_seed)
- def split(self, data_inst):
- # header = data_inst.schema.get('header')
- schema = data_inst.schema
- data_sids_iter, data_size = collect_index(data_inst)
- data_sids = []
- key_type = None
- for sid, _ in data_sids_iter:
- if key_type is None:
- key_type = type(sid)
- data_sids.append(sid)
- data_sids = np.array(data_sids)
- # if self.shuffle:
- # np.random.shuffle(data_sids)
- random_state = self.random_seed if self.shuffle else None
- kf = sk_KFold(n_splits=self.n_splits, shuffle=self.shuffle, random_state=random_state)
- n = 0
- for train, test in kf.split(data_sids):
- train_sids = data_sids[train]
- test_sids = data_sids[test]
- n += 1
- train_sids_table = [(key_type(x), 1) for x in train_sids]
- test_sids_table = [(key_type(x), 1) for x in test_sids]
- train_table = session.parallelize(train_sids_table,
- include_key=True,
- partition=data_inst.partitions)
- train_data = data_inst.join(train_table, lambda x, y: x)
- test_table = session.parallelize(test_sids_table,
- include_key=True,
- partition=data_inst.partitions)
- test_data = data_inst.join(test_table, lambda x, y: x)
- train_data.schema = schema
- test_data.schema = schema
- yield train_data, test_data
- @staticmethod
- def generate_new_id(id, fold_num, data_type):
- return f"{id}#fold{fold_num}#{data_type}"
- def transform_history_data(self, data, predict_data, fold_num, data_type):
- if self.history_value_type == "score":
- if predict_data is not None:
- history_data = predict_data.map(lambda k, v: (KFold.generate_new_id(k, fold_num, data_type), v))
- history_data.schema = copy.deepcopy(predict_data.schema)
- else:
- history_data = data.map(lambda k, v: (KFold.generate_new_id(k, fold_num, data_type), fold_num))
- schema = copy.deepcopy(data.schema)
- schema["header"] = ["fold_num"]
- history_data.schema = schema
- elif self.history_value_type == "instance":
- history_data = data.map(lambda k, v: (KFold.generate_new_id(k, fold_num, data_type), v))
- history_data.schema = copy.deepcopy(data.schema)
- else:
- raise ValueError(f"unknown history value type")
- return history_data
- @staticmethod
- def _append_name(instance, name):
- new_inst = copy.deepcopy(instance)
- new_inst.features.append(name)
- return new_inst
- def run(self, component_parameters, data_inst, original_model, host_do_evaluate):
- self._init_model(component_parameters)
- if data_inst is None:
- self._arbiter_run(original_model)
- return
- total_data_count = data_inst.count()
- LOGGER.debug(f"data_inst count: {total_data_count}")
- if self.output_fold_history:
- if total_data_count * self.n_splits > consts.MAX_SAMPLE_OUTPUT_LIMIT:
- LOGGER.warning(
- f"max sample output limit {consts.MAX_SAMPLE_OUTPUT_LIMIT} exceeded with n_splits ({self.n_splits}) * instance_count ({total_data_count})")
- if self.mode == consts.HOMO or self.role == consts.GUEST:
- data_generator = self.split(data_inst)
- else:
- data_generator = [(data_inst, data_inst)] * self.n_splits
- fold_num = 0
- summary_res = {}
- for train_data, test_data in data_generator:
- model = copy.deepcopy(original_model)
- LOGGER.debug("In CV, set_flowid flowid is : {}".format(fold_num))
- model.set_flowid(fold_num)
- model.set_cv_fold(fold_num)
- LOGGER.info("KFold fold_num is: {}".format(fold_num))
- if self.mode == consts.HETERO:
- train_data = self._align_data_index(train_data, model.flowid, consts.TRAIN_DATA)
- LOGGER.info("Train data Synchronized")
- test_data = self._align_data_index(test_data, model.flowid, consts.TEST_DATA)
- LOGGER.info("Test data Synchronized")
- train_data_count = train_data.count()
- test_data_count = test_data.count()
- LOGGER.debug(f"train_data count: {train_data_count}")
- if train_data_count + test_data_count != total_data_count:
- raise EnvironmentError("In cv fold: {}, train count: {}, test count: {}, original data count: {}."
- "Thus, 'train count + test count = total count' condition is not satisfied"
- .format(fold_num, train_data_count, test_data_count, total_data_count))
- this_flowid = 'train.' + str(fold_num)
- LOGGER.debug("In CV, set_flowid flowid is : {}".format(this_flowid))
- model.set_flowid(this_flowid)
- model.fit(train_data, test_data)
- this_flowid = 'predict_train.' + str(fold_num)
- LOGGER.debug("In CV, set_flowid flowid is : {}".format(this_flowid))
- model.set_flowid(this_flowid)
- train_pred_res = model.predict(train_data)
- # if train_pred_res is not None:
- if self.role == consts.GUEST or host_do_evaluate:
- fold_name = "_".join(['train', 'fold', str(fold_num)])
- f = functools.partial(self._append_name, name='train')
- train_pred_res = train_pred_res.mapValues(f)
- train_pred_res = model.set_predict_data_schema(train_pred_res, train_data.schema)
- # LOGGER.debug(f"train_pred_res schema: {train_pred_res.schema}")
- self.evaluate(train_pred_res, fold_name, model)
- this_flowid = 'predict_validate.' + str(fold_num)
- LOGGER.debug("In CV, set_flowid flowid is : {}".format(this_flowid))
- model.set_flowid(this_flowid)
- test_pred_res = model.predict(test_data)
- # if pred_res is not None:
- if self.role == consts.GUEST or host_do_evaluate:
- fold_name = "_".join(['validate', 'fold', str(fold_num)])
- f = functools.partial(self._append_name, name='validate')
- test_pred_res = test_pred_res.mapValues(f)
- test_pred_res = model.set_predict_data_schema(test_pred_res, test_data.schema)
- # LOGGER.debug(f"train_pred_res schema: {test_pred_res.schema}")
- self.evaluate(test_pred_res, fold_name, model)
- LOGGER.debug("Finish fold: {}".format(fold_num))
- if self.output_fold_history:
- LOGGER.debug(f"generating fold history for fold {fold_num}")
- fold_train_data = self.transform_history_data(train_data, train_pred_res, fold_num, "train")
- fold_validate_data = self.transform_history_data(test_data, test_pred_res, fold_num, "validate")
- fold_history_data = fold_train_data.union(fold_validate_data)
- fold_history_data.schema = fold_train_data.schema
- if self.fold_history is None:
- self.fold_history = fold_history_data
- else:
- new_fold_history = self.fold_history.union(fold_history_data)
- new_fold_history.schema = fold_history_data.schema
- self.fold_history = new_fold_history
- summary_res[f"fold_{fold_num}"] = model.summary()
- fold_num += 1
- summary_res['fold_num'] = fold_num
- LOGGER.debug("Finish all fold running")
- original_model.set_summary(summary_res)
- if self.output_fold_history:
- LOGGER.debug(f"output data schema: {self.fold_history.schema}")
- # LOGGER.debug(f"output data: {list(self.fold_history.collect())}")
- # LOGGER.debug(f"output data is: {self.fold_history}")
- return self.fold_history
- else:
- return data_inst
- def _arbiter_run(self, original_model):
- for fold_num in range(self.n_splits):
- LOGGER.info("KFold flowid is: {}".format(fold_num))
- model = copy.deepcopy(original_model)
- this_flowid = 'train.' + str(fold_num)
- model.set_flowid(this_flowid)
- model.set_cv_fold(fold_num)
- model.fit(None)
- this_flowid = 'predict_train.' + str(fold_num)
- model.set_flowid(this_flowid)
- model.predict(None)
- this_flowid = 'predict_validate.' + str(fold_num)
- model.set_flowid(this_flowid)
- model.predict(None)
- def _align_data_index(self, data_instance, flowid, data_application=None):
- schema = data_instance.schema
- if data_application is None:
- # LOGGER.warning("not data_application!")
- # return
- raise ValueError("In _align_data_index, data_application should be provided.")
- transfer_variable = CrossValidationTransferVariable()
- if data_application == consts.TRAIN_DATA:
- transfer_id = transfer_variable.train_sid
- elif data_application == consts.TEST_DATA:
- transfer_id = transfer_variable.test_sid
- else:
- raise ValueError("In _align_data_index, data_application should be provided.")
- if self.role == consts.GUEST:
- data_sid = data_instance.mapValues(lambda v: 1)
- transfer_id.remote(data_sid,
- role=consts.HOST,
- idx=-1,
- suffix=(flowid,))
- LOGGER.info("remote {} to host".format(data_application))
- return data_instance
- elif self.role == consts.HOST:
- data_sid = transfer_id.get(idx=0,
- suffix=(flowid,))
- LOGGER.info("get {} from guest".format(data_application))
- join_data_insts = data_sid.join(data_instance, lambda s, d: d)
- join_data_insts.schema = schema
- return join_data_insts
- def evaluate(self, validate_data, fold_name, model):
- if validate_data is None:
- return
- eval_obj = Evaluation()
- # LOGGER.debug("In KFold, evaluate_param is: {}".format(self.evaluate_param.__dict__))
- # eval_obj._init_model(self.evaluate_param)
- eval_param = model.get_metrics_param()
- eval_param.check_single_value_default_metric()
- eval_obj._init_model(eval_param)
- eval_obj.set_tracker(model.tracker)
- validate_data = {fold_name: validate_data}
- eval_obj.fit(validate_data)
- eval_obj.save_data()
|