data_format_preprocess.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #
  4. # Copyright 2019 The FATE Authors. All Rights Reserved.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. #
  18. import copy
  19. import functools
  20. import numpy as np
  21. DEFAULT_LABEL_NAME = "label"
  22. DEFAULT_MATCH_ID_PREFIX = "match_id"
  23. SVMLIGHT_COLUMN_PREFIX = "x"
  24. DEFAULT_SID_NAME = "sid"
  25. DELIMITER = ","
  26. class DataFormatPreProcess(object):
  27. @staticmethod
  28. def get_feature_offset(meta):
  29. """
  30. works for sparse/svmlight/tag value data
  31. """
  32. with_label = meta.get("with_label", False)
  33. with_match_id = meta.get("with_match_id", False)
  34. id_range = meta.get("id_range", 0)
  35. if with_match_id:
  36. if not id_range:
  37. id_range = 1
  38. offset = id_range
  39. if with_label:
  40. offset += 1
  41. return offset
  42. @staticmethod
  43. def agg_partition_tags(kvs, delimiter=",", offset=0, tag_with_value=True, tag_value_delimiter=":"):
  44. tag_set = set()
  45. for _, value in kvs:
  46. cols = value.split(delimiter, -1)[offset:]
  47. if tag_with_value:
  48. tag_set |= set([col.split(tag_value_delimiter, -1)[0] for col in cols])
  49. else:
  50. tag_set |= set(cols)
  51. return tag_set
  52. @staticmethod
  53. def get_tag_list(data, schema):
  54. if "meta" not in schema:
  55. raise ValueError("Meta not in schema")
  56. meta = schema["meta"]
  57. if meta["input_format"] != "tag":
  58. raise ValueError("Input DataFormat Should Be Tag Or Tag Value")
  59. delimiter = meta["delimiter"]
  60. tag_with_value = meta["tag_with_value"]
  61. if not isinstance(tag_with_value, bool):
  62. raise ValueError(f"tag with value should be bool, bug {tag_with_value} find")
  63. tag_value_delimiter = meta["tag_value_delimiter"]
  64. offset = DataFormatPreProcess.get_feature_offset(meta)
  65. agg_func = functools.partial(DataFormatPreProcess.agg_partition_tags,
  66. delimiter=delimiter,
  67. offset=offset,
  68. tag_with_value=tag_with_value,
  69. tag_value_delimiter=tag_value_delimiter)
  70. agg_tags = data.applyPartitions(agg_func).reduce(lambda tag_set1, tag_set2: tag_set1 | tag_set2)
  71. return sorted(agg_tags)
  72. @staticmethod
  73. def get_lib_svm_dim(data, schema):
  74. if "meta" not in schema:
  75. raise ValueError("Meta not in schema")
  76. meta = schema["meta"]
  77. if "input_format" == ["sparse", "svmlight"]:
  78. raise ValueError("Input DataFormat Should Be SVMLight")
  79. delimiter = meta.get("delimiter", " ")
  80. offset = DataFormatPreProcess.get_feature_offset(meta)
  81. max_dim = data.\
  82. mapValues(
  83. lambda value:
  84. max([int(fid_value.split(":", -1)[0]) for fid_value in value.split(delimiter, -1)[offset:]])).\
  85. reduce(lambda x, y: max(x, y))
  86. return max_dim
  87. @staticmethod
  88. def generate_header(data, schema):
  89. if not schema.get('meta'):
  90. raise ValueError("Meta not in schema")
  91. meta = schema["meta"]
  92. generated_header = dict(original_index_info=dict(), meta=meta)
  93. input_format = meta.get("input_format")
  94. delimiter = meta.get("delimiter", ",")
  95. if not input_format:
  96. raise ValueError("InputFormat should be configured.")
  97. if input_format == "dense":
  98. if "header" not in schema:
  99. raise ValueError("Dense input data must have schema")
  100. header = schema["header"].strip().split(delimiter, -1)
  101. header = list(map(lambda col: col.strip(), header))
  102. header_index_mapping = dict(zip(header, range(len(header))))
  103. with_label = meta.get("with_label", False)
  104. if not isinstance(with_label, bool):
  105. raise ValueError("with_label should be True or False")
  106. id_list = meta.get("id_list", [])
  107. if not isinstance(id_list, (type(None), list)):
  108. raise ValueError("id_list should be list type or None")
  109. with_match_id = meta.get("with_match_id", False)
  110. filter_ids = set()
  111. if with_match_id:
  112. if not id_list:
  113. match_id_name = header[0]
  114. match_id_index = [0]
  115. filter_ids.add(0)
  116. else:
  117. match_id_name = []
  118. match_id_index = []
  119. for _id in id_list:
  120. if _id in header_index_mapping:
  121. match_id_name.append(_id)
  122. match_id_index.append(header_index_mapping[_id])
  123. filter_ids.add(match_id_index[-1])
  124. else:
  125. raise ValueError(f"Can not find {_id} in id_list in data's header")
  126. generated_header["match_id_name"] = match_id_name
  127. generated_header["original_index_info"]["match_id_index"] = match_id_index
  128. if with_label:
  129. label_name = meta["label_name"]
  130. label_index = header_index_mapping[label_name]
  131. generated_header["label_name"] = label_name
  132. generated_header["original_index_info"]["label_index"] = label_index
  133. filter_ids.add(label_index)
  134. header_ids = list(filter(lambda ids: ids not in filter_ids, range(len(header))))
  135. generated_header["original_index_info"]["header_index"] = header_ids
  136. generated_header["header"] = np.array(header)[header_ids].tolist()
  137. else:
  138. if input_format == "tag":
  139. sorted_tag_list = DataFormatPreProcess.get_tag_list(data, schema)
  140. generated_header["header"] = sorted_tag_list
  141. elif input_format in ["sparse", "svmlight"]:
  142. max_dim = DataFormatPreProcess.get_lib_svm_dim(data, schema)
  143. generated_header["header"] = [SVMLIGHT_COLUMN_PREFIX + str(i) for i in range(max_dim + 1)]
  144. else:
  145. raise NotImplementedError(f"InputFormat {input_format} is not implemented")
  146. with_label = meta.get("with_label", False)
  147. with_match_id = meta.get("with_match_id", False)
  148. id_range = meta.get("id_range", 0)
  149. if id_range and not with_match_id:
  150. raise ValueError(f"id_range {id_range} != 0, with_match_id should be true")
  151. if with_match_id:
  152. if not id_range:
  153. id_range = 1
  154. if id_range == 1:
  155. generated_header["match_id_name"] = DEFAULT_MATCH_ID_PREFIX
  156. else:
  157. generated_header["match_id_name"] = [DEFAULT_MATCH_ID_PREFIX + str(i) for i in range(id_range)]
  158. if with_label:
  159. generated_header["label_name"] = DEFAULT_LABEL_NAME
  160. if id_range:
  161. generated_header["meta"]["id_range"] = id_range
  162. generated_header["is_display"] = False
  163. sid = schema.get("sid")
  164. if sid is None or sid == "":
  165. sid = DEFAULT_SID_NAME
  166. generated_header["sid"] = sid.strip()
  167. return generated_header
  168. @staticmethod
  169. def reconstruct_header(schema):
  170. original_index_info = schema.get("original_index_info")
  171. if not original_index_info:
  172. return schema["header"]
  173. header_index_mapping = dict()
  174. if "header_index" in original_index_info and original_index_info["header_index"]:
  175. for idx, col_name in zip(original_index_info["header_index"], schema["header"]):
  176. header_index_mapping[idx] = col_name
  177. if original_index_info.get("match_id_index") is not None:
  178. match_id_name = schema["match_id_name"]
  179. match_id_index = original_index_info["match_id_index"]
  180. if isinstance(match_id_name, str):
  181. header_index_mapping[match_id_index[0]] = match_id_name
  182. else:
  183. for idx, col_name in zip(match_id_index, match_id_name):
  184. header_index_mapping[idx] = col_name
  185. if original_index_info.get("label_index") is not None:
  186. header_index_mapping[original_index_info["label_index"]] = schema["label_name"]
  187. original_header = [None] * len(header_index_mapping)
  188. for idx, col_name in header_index_mapping.items():
  189. original_header[idx] = col_name
  190. return original_header
  191. @staticmethod
  192. def extend_header(schema, columns):
  193. schema = copy.deepcopy(schema)
  194. original_index_info = schema.get("original_index_info")
  195. columns = list(map(lambda column: column.strip(), columns))
  196. header = schema["header"]
  197. if isinstance(header, list):
  198. header.extend(columns)
  199. schema["header"] = header
  200. if original_index_info and "header_index" in original_index_info:
  201. header_index = original_index_info["header_index"]
  202. if header_index:
  203. pre_max_col_idx = max(header_index)
  204. else:
  205. pre_max_col_idx = -1
  206. if original_index_info.get("label_index") is not None:
  207. pre_max_col_idx = max(original_index_info["label_index"], pre_max_col_idx)
  208. if original_index_info.get("match_id_index") is not None:
  209. pre_max_col_idx = max(max(original_index_info["match_id_index"]), pre_max_col_idx)
  210. append_header_index = [i + pre_max_col_idx + 1 for i in range(len(columns))]
  211. schema["original_index_info"]["header_index"] = header_index + append_header_index
  212. else:
  213. if len(header) == 0:
  214. new_header = DELIMITER.join(columns)
  215. else:
  216. new_header = DELIMITER.join(header.split(DELIMITER, -1) + columns)
  217. schema["header"] = new_header
  218. if schema.get("sid") is not None:
  219. schema["sid"] = schema["sid"].strip()
  220. return schema
  221. @staticmethod
  222. def clean_header(schema):
  223. schema = copy.deepcopy(schema)
  224. header = schema["header"]
  225. if "label_name" in schema:
  226. del schema["label_name"]
  227. if "anonymous_header" in schema:
  228. del schema["anonymous_header"]
  229. if "anonymous_label" in schema:
  230. del schema["anonymous_label"]
  231. if isinstance(header, list):
  232. schema["header"] = []
  233. original_index_info = schema.get("original_index_info")
  234. if original_index_info:
  235. del schema["original_index_info"]
  236. if "match_id_name" in schema:
  237. del schema["match_id_name"]
  238. if "match_id_index" in schema:
  239. del schema["match_id_index"]
  240. else:
  241. schema["header"] = ""
  242. return schema
  243. @staticmethod
  244. def recover_schema(schema):
  245. if not schema.get('meta'):
  246. raise ValueError("Meta not in schema, can not recover meta")
  247. recovery_schema = copy.deepcopy(schema)
  248. meta = schema["meta"]
  249. input_format = meta.get("input_format", "dense")
  250. if input_format == "dense":
  251. """schema has not been processed yet"""
  252. if "original_index_info" not in schema:
  253. return recovery_schema
  254. header_list = DataFormatPreProcess.reconstruct_header(schema)
  255. del recovery_schema["original_index_info"]
  256. delimiter = schema.get("delimiter", ",")
  257. header = "" if not header_list else delimiter.join(header_list)
  258. recovery_schema["header"] = header
  259. if "label_name" in recovery_schema:
  260. del recovery_schema["label_name"]
  261. if meta.get("with_match_id"):
  262. del recovery_schema["match_id_name"]
  263. else:
  264. recovery_schema["header"] = ""
  265. if "label_name" in recovery_schema:
  266. del recovery_schema["label_name"]
  267. if meta.get("id_range"):
  268. recovery_schema["meta"]["id_range"] = 0
  269. if meta.get("with_label"):
  270. del recovery_schema["meta"]["label_name"]
  271. del recovery_schema["is_display"]
  272. if meta.get("with_match_id"):
  273. del recovery_schema["match_id_name"]
  274. if "anonymous_header" in schema:
  275. del recovery_schema["anonymous_header"]
  276. if "anonymous_label" in schema:
  277. del recovery_schema["anonymous_label"]
  278. return recovery_schema