123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import copy
- import functools
- import json
- from collections import Counter
- import numpy as np
- from federatedml.feature.instance import Instance
- from federatedml.util import LOGGER
- from federatedml.util import consts
- def get_features_shape(data_instances):
- one_feature = data_instances.first()
- instance = one_feature[1]
- if instance is None:
- return None
- if one_feature is not None:
- if type(one_feature[1].features).__name__ == consts.SPARSE_VECTOR:
- return one_feature[1].features.get_shape()
- else:
- return one_feature[1].features.shape[0]
- else:
- return None
- def get_instance_shape(instance):
- if instance is None:
- return None
- if type(instance.features).__name__ == consts.SPARSE_VECTOR:
- return instance.features.get_shape()
- else:
- return instance.features.shape[0]
- def get_anonymous_header(data_instances):
- anonymous_header = data_instances.schema.get('anonymous_header') # ['x1', 'x2', 'x3' ... ]
- return anonymous_header
- def look_up_names_from_header(name_list, source_header, transform_header):
- """
- Parameters
- ----------
- name_list: list or str, list of feature name(s)
- source_header: table header containing name_list
- transform_header: table header into which name_list to be transformed
- Returns
- -------
- list of plaintext feature names
- """
- if name_list is None:
- return
- if len(source_header) != len(transform_header):
- raise ValueError(f"Length of source header and transform header do not match, please check.")
- if not isinstance(name_list, list):
- name_list = [name_list]
- name_set = set(name_list)
- # name list contains repeated name
- if len(name_set) < len(name_list):
- LOGGER.debug(f"Repeated name(s) found in provided name_list: {name_list}.")
- name_set = name_list
- feature_names = [f_name for i, f_name in enumerate(transform_header) if source_header[i] in name_set]
- if len(feature_names) < len(name_set):
- raise ValueError(f"Cannot match all provided names from: {name_list} to given header, "
- f"please check.")
- return feature_names
- def max_abs_sample_weight_map_func(kv_iter):
- max_weight = -1
- for k, inst in kv_iter:
- if np.abs(inst.weight) > max_weight:
- max_weight = np.abs(inst.weight)
- return max_weight
- def max_sample_weight_cmp(v1, v2):
- return v1 if v1 > v2 else v2
- def get_max_sample_weight(data_inst_with_weight):
- inter_rs = data_inst_with_weight.applyPartitions(max_abs_sample_weight_map_func)
- max_weight = inter_rs.reduce(max_sample_weight_cmp)
- return max_weight
- def check_negative_sample_weight(kv_iterator):
- for k, v in kv_iterator:
- if isinstance(v, Instance) and v.weight is not None:
- if v.weight < 0:
- return True
- return False
- def header_alignment(data_instances, pre_header, pre_anonymous_header=None):
- header = [col.strip() for col in data_instances.schema["header"]]
- if len((set(header) & set(pre_header))) != len(pre_header):
- raise ValueError(f"fit & transform data' header should be the same! "
- f"Previous header: {pre_header}. "
- f"Current header: {header}.")
- if pre_header == header:
- if pre_anonymous_header:
- data_instances.schema["anonymous_header"] = pre_anonymous_header
- return data_instances
- if len(pre_header) != len(header):
- LOGGER.warning(
- "header in prediction stage is super-set training stage, predict size is {}, training header size is {}".format(
- len(header), len(pre_header)))
- else:
- LOGGER.warning("header in prediction stage will be shuffled to match the header of training stage")
- header_idx_mapping = dict(zip(pre_header, [i for i in range(len(pre_header))]))
- header_correct = {}
- for i in range(len(header)):
- col = header[i]
- if col not in header_idx_mapping:
- continue
- header_correct[i] = header_idx_mapping[col]
- def align_header(inst, header_pos=None):
- if type(inst.features).__name__ == consts.SPARSE_VECTOR:
- shape = len(header_pos)
- new_data = {}
- for k, v in inst.features.get_all_data():
- if k not in header_pos:
- continue
- new_data[header_pos.get(k)] = v
- inst_new = copy.deepcopy(inst)
- inst_new.features.set_shape(shape)
- inst_new.features.set_sparse_vector(new_data)
- else:
- col_order = [None] * len(header_pos)
- for k, v in header_pos.items():
- col_order[v] = k
- inst_new = copy.deepcopy(inst)
- inst_new.features = inst.features[col_order]
- return inst_new
- correct_schema = data_instances.schema
- correct_schema["header"] = pre_header
- if pre_anonymous_header:
- correct_schema["anonymous_header"] = pre_anonymous_header
- data_instances = data_instances.mapValues(lambda inst: align_header(inst, header_pos=header_correct))
- data_instances.schema = correct_schema
- return data_instances
- def get_data_shape(data):
- one_feature = data.first()
- if one_feature is not None:
- return len(list(one_feature[1]))
- else:
- return None
- def get_header(data_instances):
- header = data_instances.schema.get('header') # ['x1', 'x2', 'x3' ... ]
- return header
- def is_empty_feature(data_instances):
- shape_of_feature = get_features_shape(data_instances)
- if shape_of_feature is None or shape_of_feature == 0:
- return True
- return False
- def is_sparse_data(data_instance):
- first_data = data_instance.first()
- if type(first_data[1]).__name__ in ['ndarray', 'list', 'tuple']:
- return False
- data_feature = first_data[1].features
- if type(data_feature).__name__ == "ndarray":
- return False
- else:
- return True
- def count_labels(data_instance):
- def _count_labels(instances):
- labels = set()
- for idx, instance in instances:
- label = instance.label
- labels.add(label)
- return labels
- label_set = data_instance.applyPartitions(_count_labels)
- label_set = label_set.reduce(lambda x1, x2: x1.union(x2))
- return len(label_set)
- # if len(label_set) != 2:
- # return False
- # return True
- def with_weight(data_instances):
- first_entry = data_instances.first()[1]
- if isinstance(first_entry, Instance) and first_entry.weight is not None:
- return True
- return False
- def get_class_dict(kv_iterator):
- class_dict = {}
- for _, inst in kv_iterator:
- count = class_dict.get(inst.label, 0)
- class_dict[inst.label] = count + 1
- if len(class_dict.keys()) > consts.MAX_CLASSNUM:
- raise ValueError("In Classify Task, max dif classes should be no more than %d" % (consts.MAX_CLASSNUM))
- return class_dict
- def get_label_count(data_instances):
- class_weight = data_instances.mapPartitions(get_class_dict).reduce(
- lambda x, y: dict(Counter(x) + Counter(y)))
- return class_weight
- def get_predict_result_labels(data):
- def _get_labels(score_inst):
- labels = set()
- for idx, result in score_inst:
- true_label = result.features[0]
- predict_label = result.features[1]
- labels.add(true_label)
- labels.add(predict_label)
- return labels
- label_set = data.applyPartitions(_get_labels)
- label_set = label_set.reduce(lambda x1, x2: x1.union(x2))
- if len(label_set) > consts.MAX_CLASSNUM:
- raise ValueError("In Classify Task, max dif classes should be no more than %d" % (consts.MAX_CLASSNUM))
- return label_set
- def rubbish_clear(rubbish_list):
- """
- Temporary procession for resource recovery. This will be discarded in next version because of our new resource recovery plan
- Parameter
- ----------
- rubbish_list: list of Table, each Table in this will be destroy
- """
- for r in rubbish_list:
- try:
- if r is None:
- continue
- r.destroy()
- except Exception as e:
- LOGGER.warning("destroy table error,:{}, but this can be ignored sometimes".format(e))
- def check_with_inst_id(data_instances):
- instance = data_instances.first()[1]
- if isinstance(instance, Instance) and instance.with_inst_id:
- return True
- return False
- def predict_detail_dict_to_str(result_dict):
- return "\"" + json.dumps(result_dict).replace("\"", "\'") + "\""
- def predict_detail_str_to_dict(result_dict_str):
- return json.loads(json.loads(result_dict_str).replace("\'", "\""))
- def scale_sample_weight(data_instances):
- data_count = data_instances.count()
- def _sum_all_weight(kv_iterator):
- weight_sum = 0
- for _, v in kv_iterator:
- weight_sum += v.weight
- return weight_sum
- total_weight = data_instances.mapPartitions(_sum_all_weight).reduce(lambda x, y: x + y)
- # LOGGER.debug(f"weight_sum is : {total_weight}")
- scale_factor = data_count / total_weight
- # LOGGER.debug(f"scale factor is : {total_weight}")
- def _replace_weight(instance):
- new_weight = instance.weight * scale_factor
- instance.set_weight(new_weight)
- return instance
- scaled_instances = data_instances.mapValues(lambda v: _replace_weight(v))
- return scaled_instances
- class DataStatistics(object):
- def __init__(self):
- self.multivariate_statistic_obj = None
- def static_all_values(self, data_instances, static_col_indexes, is_sparse: bool = False):
- if not is_sparse:
- f = functools.partial(self.__dense_values_set,
- static_col_indexes=static_col_indexes)
- else:
- f = functools.partial(self.__sparse_values_set,
- static_col_indexes=static_col_indexes)
- result_sets = data_instances.applyPartitions(f).reduce(self.__reduce_set_results)
- result = [sorted(list(x)) for x in result_sets]
- return result
- @staticmethod
- def __dense_values_set(instances, static_col_indexes: list):
- result = [set() for _ in static_col_indexes]
- for _, instance in instances:
- for idx, col_index in enumerate(static_col_indexes):
- value_set = result[idx]
- value_set.add(instance.features[col_index])
- return result
- @staticmethod
- def __sparse_values_set(instances, static_col_indexes: list):
- tmp_result = {idx: set() for idx in static_col_indexes}
- for _, instance in instances:
- data_generator = instance.features.get_all_data()
- for idx, value in data_generator:
- if idx not in tmp_result:
- continue
- tmp_result[idx].add(value)
- result = [tmp_result[x] for x in static_col_indexes]
- return result
- @staticmethod
- def __reduce_set_results(result_set_a, result_set_b):
- final_result_sets = []
- for set_a, set_b in zip(result_set_a, result_set_b):
- final_result_sets.append(set_a.union(set_b))
- return final_result_sets
|