data_overview.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #
  4. # Copyright 2019 The FATE Authors. All Rights Reserved.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. import copy
  18. import functools
  19. import json
  20. from collections import Counter
  21. import numpy as np
  22. from federatedml.feature.instance import Instance
  23. from federatedml.util import LOGGER
  24. from federatedml.util import consts
  25. def get_features_shape(data_instances):
  26. one_feature = data_instances.first()
  27. instance = one_feature[1]
  28. if instance is None:
  29. return None
  30. if one_feature is not None:
  31. if type(one_feature[1].features).__name__ == consts.SPARSE_VECTOR:
  32. return one_feature[1].features.get_shape()
  33. else:
  34. return one_feature[1].features.shape[0]
  35. else:
  36. return None
  37. def get_instance_shape(instance):
  38. if instance is None:
  39. return None
  40. if type(instance.features).__name__ == consts.SPARSE_VECTOR:
  41. return instance.features.get_shape()
  42. else:
  43. return instance.features.shape[0]
  44. def get_anonymous_header(data_instances):
  45. anonymous_header = data_instances.schema.get('anonymous_header') # ['x1', 'x2', 'x3' ... ]
  46. return anonymous_header
  47. def look_up_names_from_header(name_list, source_header, transform_header):
  48. """
  49. Parameters
  50. ----------
  51. name_list: list or str, list of feature name(s)
  52. source_header: table header containing name_list
  53. transform_header: table header into which name_list to be transformed
  54. Returns
  55. -------
  56. list of plaintext feature names
  57. """
  58. if name_list is None:
  59. return
  60. if len(source_header) != len(transform_header):
  61. raise ValueError(f"Length of source header and transform header do not match, please check.")
  62. if not isinstance(name_list, list):
  63. name_list = [name_list]
  64. name_set = set(name_list)
  65. # name list contains repeated name
  66. if len(name_set) < len(name_list):
  67. LOGGER.debug(f"Repeated name(s) found in provided name_list: {name_list}.")
  68. name_set = name_list
  69. feature_names = [f_name for i, f_name in enumerate(transform_header) if source_header[i] in name_set]
  70. if len(feature_names) < len(name_set):
  71. raise ValueError(f"Cannot match all provided names from: {name_list} to given header, "
  72. f"please check.")
  73. return feature_names
  74. def max_abs_sample_weight_map_func(kv_iter):
  75. max_weight = -1
  76. for k, inst in kv_iter:
  77. if np.abs(inst.weight) > max_weight:
  78. max_weight = np.abs(inst.weight)
  79. return max_weight
  80. def max_sample_weight_cmp(v1, v2):
  81. return v1 if v1 > v2 else v2
  82. def get_max_sample_weight(data_inst_with_weight):
  83. inter_rs = data_inst_with_weight.applyPartitions(max_abs_sample_weight_map_func)
  84. max_weight = inter_rs.reduce(max_sample_weight_cmp)
  85. return max_weight
  86. def check_negative_sample_weight(kv_iterator):
  87. for k, v in kv_iterator:
  88. if isinstance(v, Instance) and v.weight is not None:
  89. if v.weight < 0:
  90. return True
  91. return False
  92. def header_alignment(data_instances, pre_header, pre_anonymous_header=None):
  93. header = [col.strip() for col in data_instances.schema["header"]]
  94. if len((set(header) & set(pre_header))) != len(pre_header):
  95. raise ValueError(f"fit & transform data' header should be the same! "
  96. f"Previous header: {pre_header}. "
  97. f"Current header: {header}.")
  98. if pre_header == header:
  99. if pre_anonymous_header:
  100. data_instances.schema["anonymous_header"] = pre_anonymous_header
  101. return data_instances
  102. if len(pre_header) != len(header):
  103. LOGGER.warning(
  104. "header in prediction stage is super-set training stage, predict size is {}, training header size is {}".format(
  105. len(header), len(pre_header)))
  106. else:
  107. LOGGER.warning("header in prediction stage will be shuffled to match the header of training stage")
  108. header_idx_mapping = dict(zip(pre_header, [i for i in range(len(pre_header))]))
  109. header_correct = {}
  110. for i in range(len(header)):
  111. col = header[i]
  112. if col not in header_idx_mapping:
  113. continue
  114. header_correct[i] = header_idx_mapping[col]
  115. def align_header(inst, header_pos=None):
  116. if type(inst.features).__name__ == consts.SPARSE_VECTOR:
  117. shape = len(header_pos)
  118. new_data = {}
  119. for k, v in inst.features.get_all_data():
  120. if k not in header_pos:
  121. continue
  122. new_data[header_pos.get(k)] = v
  123. inst_new = copy.deepcopy(inst)
  124. inst_new.features.set_shape(shape)
  125. inst_new.features.set_sparse_vector(new_data)
  126. else:
  127. col_order = [None] * len(header_pos)
  128. for k, v in header_pos.items():
  129. col_order[v] = k
  130. inst_new = copy.deepcopy(inst)
  131. inst_new.features = inst.features[col_order]
  132. return inst_new
  133. correct_schema = data_instances.schema
  134. correct_schema["header"] = pre_header
  135. if pre_anonymous_header:
  136. correct_schema["anonymous_header"] = pre_anonymous_header
  137. data_instances = data_instances.mapValues(lambda inst: align_header(inst, header_pos=header_correct))
  138. data_instances.schema = correct_schema
  139. return data_instances
  140. def get_data_shape(data):
  141. one_feature = data.first()
  142. if one_feature is not None:
  143. return len(list(one_feature[1]))
  144. else:
  145. return None
  146. def get_header(data_instances):
  147. header = data_instances.schema.get('header') # ['x1', 'x2', 'x3' ... ]
  148. return header
  149. def is_empty_feature(data_instances):
  150. shape_of_feature = get_features_shape(data_instances)
  151. if shape_of_feature is None or shape_of_feature == 0:
  152. return True
  153. return False
  154. def is_sparse_data(data_instance):
  155. first_data = data_instance.first()
  156. if type(first_data[1]).__name__ in ['ndarray', 'list', 'tuple']:
  157. return False
  158. data_feature = first_data[1].features
  159. if type(data_feature).__name__ == "ndarray":
  160. return False
  161. else:
  162. return True
  163. def count_labels(data_instance):
  164. def _count_labels(instances):
  165. labels = set()
  166. for idx, instance in instances:
  167. label = instance.label
  168. labels.add(label)
  169. return labels
  170. label_set = data_instance.applyPartitions(_count_labels)
  171. label_set = label_set.reduce(lambda x1, x2: x1.union(x2))
  172. return len(label_set)
  173. # if len(label_set) != 2:
  174. # return False
  175. # return True
  176. def with_weight(data_instances):
  177. first_entry = data_instances.first()[1]
  178. if isinstance(first_entry, Instance) and first_entry.weight is not None:
  179. return True
  180. return False
  181. def get_class_dict(kv_iterator):
  182. class_dict = {}
  183. for _, inst in kv_iterator:
  184. count = class_dict.get(inst.label, 0)
  185. class_dict[inst.label] = count + 1
  186. if len(class_dict.keys()) > consts.MAX_CLASSNUM:
  187. raise ValueError("In Classify Task, max dif classes should be no more than %d" % (consts.MAX_CLASSNUM))
  188. return class_dict
  189. def get_label_count(data_instances):
  190. class_weight = data_instances.mapPartitions(get_class_dict).reduce(
  191. lambda x, y: dict(Counter(x) + Counter(y)))
  192. return class_weight
  193. def get_predict_result_labels(data):
  194. def _get_labels(score_inst):
  195. labels = set()
  196. for idx, result in score_inst:
  197. true_label = result.features[0]
  198. predict_label = result.features[1]
  199. labels.add(true_label)
  200. labels.add(predict_label)
  201. return labels
  202. label_set = data.applyPartitions(_get_labels)
  203. label_set = label_set.reduce(lambda x1, x2: x1.union(x2))
  204. if len(label_set) > consts.MAX_CLASSNUM:
  205. raise ValueError("In Classify Task, max dif classes should be no more than %d" % (consts.MAX_CLASSNUM))
  206. return label_set
  207. def rubbish_clear(rubbish_list):
  208. """
  209. Temporary procession for resource recovery. This will be discarded in next version because of our new resource recovery plan
  210. Parameter
  211. ----------
  212. rubbish_list: list of Table, each Table in this will be destroy
  213. """
  214. for r in rubbish_list:
  215. try:
  216. if r is None:
  217. continue
  218. r.destroy()
  219. except Exception as e:
  220. LOGGER.warning("destroy table error,:{}, but this can be ignored sometimes".format(e))
  221. def check_with_inst_id(data_instances):
  222. instance = data_instances.first()[1]
  223. if isinstance(instance, Instance) and instance.with_inst_id:
  224. return True
  225. return False
  226. def predict_detail_dict_to_str(result_dict):
  227. return "\"" + json.dumps(result_dict).replace("\"", "\'") + "\""
  228. def predict_detail_str_to_dict(result_dict_str):
  229. return json.loads(json.loads(result_dict_str).replace("\'", "\""))
  230. def scale_sample_weight(data_instances):
  231. data_count = data_instances.count()
  232. def _sum_all_weight(kv_iterator):
  233. weight_sum = 0
  234. for _, v in kv_iterator:
  235. weight_sum += v.weight
  236. return weight_sum
  237. total_weight = data_instances.mapPartitions(_sum_all_weight).reduce(lambda x, y: x + y)
  238. # LOGGER.debug(f"weight_sum is : {total_weight}")
  239. scale_factor = data_count / total_weight
  240. # LOGGER.debug(f"scale factor is : {total_weight}")
  241. def _replace_weight(instance):
  242. new_weight = instance.weight * scale_factor
  243. instance.set_weight(new_weight)
  244. return instance
  245. scaled_instances = data_instances.mapValues(lambda v: _replace_weight(v))
  246. return scaled_instances
  247. class DataStatistics(object):
  248. def __init__(self):
  249. self.multivariate_statistic_obj = None
  250. def static_all_values(self, data_instances, static_col_indexes, is_sparse: bool = False):
  251. if not is_sparse:
  252. f = functools.partial(self.__dense_values_set,
  253. static_col_indexes=static_col_indexes)
  254. else:
  255. f = functools.partial(self.__sparse_values_set,
  256. static_col_indexes=static_col_indexes)
  257. result_sets = data_instances.applyPartitions(f).reduce(self.__reduce_set_results)
  258. result = [sorted(list(x)) for x in result_sets]
  259. return result
  260. @staticmethod
  261. def __dense_values_set(instances, static_col_indexes: list):
  262. result = [set() for _ in static_col_indexes]
  263. for _, instance in instances:
  264. for idx, col_index in enumerate(static_col_indexes):
  265. value_set = result[idx]
  266. value_set.add(instance.features[col_index])
  267. return result
  268. @staticmethod
  269. def __sparse_values_set(instances, static_col_indexes: list):
  270. tmp_result = {idx: set() for idx in static_col_indexes}
  271. for _, instance in instances:
  272. data_generator = instance.features.get_all_data()
  273. for idx, value in data_generator:
  274. if idx not in tmp_result:
  275. continue
  276. tmp_result[idx].add(value)
  277. result = [tmp_result[x] for x in static_col_indexes]
  278. return result
  279. @staticmethod
  280. def __reduce_set_results(result_set_a, result_set_b):
  281. final_result_sets = []
  282. for set_a, set_b in zip(result_set_a, result_set_b):
  283. final_result_sets.append(set_a.union(set_b))
  284. return final_result_sets