data_io.py 47 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #
  4. # Copyright 2019 The FATE Authors. All Rights Reserved.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. #
  18. ################################################################################
  19. #
  20. #
  21. ################################################################################
  22. import copy
  23. import functools
  24. import numpy as np
  25. from federatedml.feature.instance import Instance
  26. from federatedml.feature.sparse_vector import SparseVector
  27. from federatedml.model_base import ModelBase
  28. from federatedml.protobuf.generated.data_io_meta_pb2 import DataIOMeta
  29. from federatedml.protobuf.generated.data_io_meta_pb2 import ImputerMeta
  30. from federatedml.protobuf.generated.data_io_meta_pb2 import OutlierMeta
  31. from federatedml.protobuf.generated.data_io_param_pb2 import DataIOParam
  32. from federatedml.protobuf.generated.data_io_param_pb2 import ImputerParam
  33. from federatedml.protobuf.generated.data_io_param_pb2 import OutlierParam
  34. from federatedml.statistic import data_overview
  35. from federatedml.util import abnormal_detection
  36. from federatedml.util import consts
  37. from federatedml.util import LOGGER
  38. from federatedml.util.io_check import assert_io_num_rows_equal
  39. """
  40. # =============================================================================
  41. # DenseFeatureReader
  42. # =============================================================================
  43. class DenseFeatureReader(object):
  44. def __init__(self, data_io_param):
  45. self.delimitor = data_io_param.delimitor
  46. self.data_type = data_io_param.data_type
  47. self.exclusive_data_type = data_io_param.exclusive_data_type
  48. self.missing_fill = data_io_param.missing_fill
  49. self.default_value = data_io_param.default_value
  50. self.missing_fill_method = data_io_param.missing_fill_method
  51. self.missing_impute = data_io_param.missing_impute
  52. self.outlier_replace = data_io_param.outlier_replace
  53. self.outlier_replace_method = data_io_param.outlier_replace_method
  54. self.outlier_impute = data_io_param.outlier_impute
  55. self.outlier_replace_value = data_io_param.outlier_replace_value
  56. self.with_label = data_io_param.with_label
  57. self.label_name = data_io_param.label_name if self.with_label else None
  58. self.label_type = data_io_param.label_type if self.with_label else None
  59. self.output_format = data_io_param.output_format
  60. self.missing_impute_rate = None
  61. self.outlier_replace_rate = None
  62. self.label_idx = None
  63. self.header = None
  64. self.sid_name = None
  65. self.exclusive_data_type_fid_map = {}
  66. def generate_header(self, input_data, mode="fit"):
  67. header = input_data.schema["header"]
  68. sid_name = input_data.schema["sid"]
  69. LOGGER.debug("header is {}".format(header))
  70. LOGGER.debug("sid_name is {}".format(sid_name))
  71. if not header and not sid_name:
  72. raise ValueError("dense input-format should have header schema")
  73. header_gen = None
  74. if self.with_label:
  75. if mode == "fit":
  76. if not header:
  77. raise ValueError("dense input-format for fit stage should not be None if with_label is true")
  78. self.label_idx = header.split(self.delimitor, -1).index(self.label_name)
  79. header_gen = header.split(self.delimitor, -1)[: self.label_idx] + \
  80. header.split(self.delimitor, -1)[self.label_idx + 1:] or None
  81. elif header:
  82. header_list = header.split(self.delimitor, -1)
  83. if self.label_name in header_list:
  84. self.label_idx = header_list.index(self.label_name)
  85. header_gen = header.split(self.delimitor, -1)[: self.label_idx] + \
  86. header.split(self.delimitor, -1)[self.label_idx + 1:] or None
  87. else:
  88. self.label_idx = None
  89. header_gen = header.split(self.delimitor, -1)
  90. elif header:
  91. header_gen = header.split(self.delimitor, -1)
  92. self.header = header_gen
  93. self.sid_name = sid_name
  94. if header_gen:
  95. for i in range(len(header_gen)):
  96. col_name = header_gen[i]
  97. if self.exclusive_data_type is not None and col_name in self.exclusive_data_type:
  98. self.exclusive_data_type_fid_map[i] = self.exclusive_data_type[col_name]
  99. def get_schema(self):
  100. schema = make_schema(self.header, self.sid_name, self.label_name)
  101. return schema
  102. def read_data(self, input_data, mode="fit"):
  103. LOGGER.info("start to read dense data and change data to instance")
  104. abnormal_detection.empty_table_detection(input_data)
  105. input_data_labels = None
  106. fit_header = None
  107. if mode == "transform":
  108. fit_header = self.header
  109. self.generate_header(input_data, mode=mode)
  110. if self.label_idx is not None:
  111. data_shape = data_overview.get_data_shape(input_data)
  112. if not data_shape or self.label_idx >= data_shape:
  113. raise ValueError("input data's value is empty, it does not contain a label")
  114. input_data_features = input_data.mapValues(
  115. lambda value: [] if data_shape == 1 else value.split(self.delimitor, -1)[:self.label_idx] + value.split(
  116. self.delimitor, -1)[self.label_idx + 1:])
  117. input_data_labels = input_data.mapValues(lambda value: value.split(self.delimitor, -1)[self.label_idx])
  118. else:
  119. input_data_features = input_data.mapValues(
  120. lambda value: [] if not self.header else value.split(self.delimitor, -1))
  121. if mode == "fit":
  122. data_instance = self.fit(input_data, input_data_features, input_data_labels)
  123. else:
  124. data_instance = self.transform(input_data_features, input_data_labels)
  125. # data_instance = ModelBase.align_data_header(data_instance, fit_header)
  126. data_instance = data_overview.header_alignment(data_instance, fit_header)
  127. return data_instance
  128. def fit(self, input_data, input_data_features, input_data_labels):
  129. raise ValueError("In Fate-v1.9 or later version, DataIO is deprecated, use DataTransform instead.")
  130. schema = self.get_schema()
  131. set_schema(input_data_features, schema)
  132. input_data_features = self.fill_missing_value(input_data_features, "fit")
  133. input_data_features = self.replace_outlier_value(input_data_features, "fit")
  134. data_instance = self.gen_data_instance(input_data_features, input_data_labels)
  135. set_schema(data_instance, schema)
  136. return data_instance
  137. @assert_io_num_rows_equal
  138. def transform(self, input_data_features, input_data_labels):
  139. schema = make_schema(self.header, self.sid_name, self.label_name)
  140. set_schema(input_data_features, schema)
  141. input_data_features = self.fill_missing_value(input_data_features, "transform")
  142. input_data_features = self.replace_outlier_value(input_data_features, "transform")
  143. data_instance = self.gen_data_instance(input_data_features, input_data_labels)
  144. set_schema(data_instance, schema)
  145. return data_instance
  146. def fill_missing_value(self, input_data_features, mode="fit"):
  147. if self.missing_fill:
  148. from federatedml.feature.imputer import Imputer
  149. imputer_processor = Imputer(self.missing_impute)
  150. if mode == "fit":
  151. input_data_features, self.default_value = imputer_processor.fit(input_data_features,
  152. replace_method=self.missing_fill_method,
  153. replace_value=self.default_value)
  154. if self.missing_impute is None:
  155. self.missing_impute = imputer_processor.get_missing_value_list()
  156. else:
  157. input_data_features = imputer_processor.transform(input_data_features,
  158. transform_value=self.default_value)
  159. if self.missing_impute is None:
  160. self.missing_impute = imputer_processor.get_missing_value_list()
  161. self.missing_impute_rate = imputer_processor.get_impute_rate(mode)
  162. return input_data_features
  163. def replace_outlier_value(self, input_data_features, mode="fit"):
  164. if self.outlier_replace:
  165. from federatedml.feature.imputer import Imputer
  166. imputer_processor = Imputer(self.outlier_impute)
  167. if mode == "fit":
  168. input_data_features, self.outlier_replace_value = \
  169. imputer_processor.fit(input_data_features,
  170. replace_method=self.outlier_replace_method,
  171. replace_value=self.outlier_replace_value)
  172. if self.outlier_impute is None:
  173. self.outlier_impute = imputer_processor.get_missing_value_list()
  174. else:
  175. input_data_features = imputer_processor.transform(input_data_features,
  176. transform_value=self.outlier_replace_value)
  177. self.outlier_replace_rate = imputer_processor.get_impute_rate(mode)
  178. return input_data_features
  179. def gen_data_instance(self, input_data_features, input_data_labels):
  180. if self.label_idx is not None:
  181. data_instance = input_data_features.join(input_data_labels,
  182. lambda features, label:
  183. self.to_instance(features, label))
  184. else:
  185. data_instance = input_data_features.mapValues(lambda features: self.to_instance(features))
  186. return data_instance
  187. def to_instance(self, features, label=None):
  188. if self.header is None and len(features) != 0:
  189. raise ValueError("features shape {} not equal to header shape 0".format(len(features)))
  190. elif self.header is not None and len(self.header) != len(features):
  191. raise ValueError("features shape {} not equal to header shape {}".format(len(features), len(self.header)))
  192. if self.label_idx is not None:
  193. if self.label_type == 'int':
  194. label = int(label)
  195. elif self.label_type in ["float", "float64"]:
  196. label = float(label)
  197. format_features = DenseFeatureReader.gen_output_format(
  198. features,
  199. self.data_type,
  200. self.exclusive_data_type_fid_map,
  201. self.output_format,
  202. missing_impute=self.missing_impute)
  203. else:
  204. format_features = DenseFeatureReader.gen_output_format(
  205. features,
  206. self.data_type,
  207. self.exclusive_data_type_fid_map,
  208. self.output_format,
  209. missing_impute=self.missing_impute)
  210. return Instance(inst_id=None,
  211. features=format_features,
  212. label=label)
  213. @staticmethod
  214. def gen_output_format(features, data_type='float', exclusive_data_type_fid_map=None,
  215. output_format='dense', missing_impute=None):
  216. if output_format not in ["dense", "sparse"]:
  217. raise ValueError("output format {} is not define".format(output_format))
  218. if output_format == "dense":
  219. format_features = copy.deepcopy(features)
  220. if data_type in ["int", "int64", "long", "float", "float64", "double"]:
  221. for i in range(len(features)):
  222. if (missing_impute is not None and features[i] in missing_impute) or \
  223. (missing_impute is None and features[i] in ['', 'NULL', 'null', "NA"]):
  224. format_features[i] = np.nan
  225. if exclusive_data_type_fid_map:
  226. for fid in range(len(features)):
  227. if fid in exclusive_data_type_fid_map:
  228. dtype = exclusive_data_type_fid_map[fid]
  229. else:
  230. dtype = data_type
  231. format_features[fid] = getattr(np, dtype)(features[fid])
  232. return np.asarray(format_features, dtype=object)
  233. else:
  234. return np.asarray(format_features, dtype=data_type)
  235. indices = []
  236. data = []
  237. column_shape = len(features)
  238. non_zero = 0
  239. for i in range(column_shape):
  240. if (missing_impute is not None and features[i] in missing_impute) or \
  241. (missing_impute is None and features[i] in ['', 'NULL', 'null', "NA"]):
  242. indices.append(i)
  243. data.append(np.nan)
  244. non_zero += 1
  245. elif data_type in ['float', 'float64', "double"]:
  246. if np.fabs(float(features[i])) < consts.FLOAT_ZERO:
  247. continue
  248. indices.append(i)
  249. data.append(float(features[i]))
  250. non_zero += 1
  251. elif data_type in ['int', "int64", "long"]:
  252. if int(features[i]) == 0:
  253. continue
  254. indices.append(i)
  255. data.append(int(features[i]))
  256. else:
  257. indices.append(i)
  258. data.append(features[i])
  259. return SparseVector(indices, data, column_shape)
  260. def get_summary(self):
  261. if not self.missing_fill and not self.outlier_replace:
  262. return {}
  263. summary_buf = {}
  264. if self.missing_fill:
  265. missing_summary = dict()
  266. missing_summary["missing_value"] = list(self.missing_impute)
  267. missing_summary["missing_impute_value"] = dict(zip(self.header, self.default_value))
  268. missing_summary["missing_impute_rate"] = dict(zip(self.header, self.missing_impute_rate))
  269. summary_buf["missing_fill_info"] = missing_summary
  270. if self.outlier_replace:
  271. outlier_replace_summary = dict()
  272. outlier_replace_summary["outlier_value"] = list(self.outlier_impute)
  273. outlier_replace_summary["outlier_replace_value"] = dict(zip(self.header, self.outlier_replace_value))
  274. outlier_replace_summary["outlier_replace_rate"] = dict(zip(self.header, self.outlier_replace_rate))
  275. summary_buf["outlier_replace_rate"] = outlier_replace_summary
  276. return summary_buf
  277. def save_model(self):
  278. dataio_meta, dataio_param = save_data_io_model(input_format="dense",
  279. delimitor=self.delimitor,
  280. data_type=self.data_type,
  281. exclusive_data_type=self.exclusive_data_type,
  282. with_label=self.with_label,
  283. label_type=self.label_type,
  284. output_format=self.output_format,
  285. header=self.header,
  286. sid_name=self.sid_name,
  287. label_name=self.label_name,
  288. model_name="DenseFeatureReader")
  289. missing_imputer_meta, missing_imputer_param = save_missing_imputer_model(self.missing_fill,
  290. self.missing_fill_method,
  291. self.missing_impute,
  292. self.default_value,
  293. self.missing_impute_rate,
  294. self.header,
  295. "Imputer")
  296. dataio_meta.imputer_meta.CopyFrom(missing_imputer_meta)
  297. dataio_param.imputer_param.CopyFrom(missing_imputer_param)
  298. outlier_meta, outlier_param = save_outlier_model(self.outlier_replace,
  299. self.outlier_replace_method,
  300. self.outlier_impute,
  301. self.outlier_replace_value,
  302. self.outlier_replace_rate,
  303. self.header,
  304. "Outlier")
  305. dataio_meta.outlier_meta.CopyFrom(outlier_meta)
  306. dataio_param.outlier_param.CopyFrom(outlier_param)
  307. return {"DataIOMeta": dataio_meta,
  308. "DataIOParam": dataio_param
  309. }
  310. def load_model(self, model_meta, model_param):
  311. self.delimitor, self.data_type, self.exclusive_data_type, _1, _2, self.with_label, \
  312. self.label_type, self.output_format, self.header, self.sid_name, self.label_name = \
  313. load_data_io_model("DenseFeatureReader", model_meta, model_param)
  314. self.missing_fill, self.missing_fill_method, \
  315. self.missing_impute, self.default_value = load_missing_imputer_model(self.header,
  316. "Imputer",
  317. model_meta.imputer_meta,
  318. model_param.imputer_param)
  319. self.outlier_replace, self.outlier_replace_method, \
  320. self.outlier_impute, self.outlier_replace_value = load_outlier_model(self.header,
  321. "Outlier",
  322. model_meta.outlier_meta,
  323. model_param.outlier_param)
  324. # =============================================================================
  325. # SparseFeatureReader: mainly for libsvm input format
  326. # =============================================================================
  327. class SparseFeatureReader(object):
  328. def __init__(self, data_io_param):
  329. self.delimitor = data_io_param.delimitor
  330. self.data_type = data_io_param.data_type
  331. self.label_type = data_io_param.label_type
  332. self.output_format = data_io_param.output_format
  333. self.header = None
  334. self.sid_name = "sid"
  335. self.label_name = data_io_param.label_name
  336. def get_max_feature_index(self, line, delimitor=' '):
  337. if line.strip() == '':
  338. raise ValueError("find an empty line, please check!!!")
  339. cols = line.split(delimitor, -1)
  340. if len(cols) <= 1:
  341. return -1
  342. return max([int(fid_value.split(":", -1)[0]) for fid_value in cols[1:]])
  343. def generate_header(self, max_feature):
  344. self.header = [str(i) for i in range(max_feature + 1)]
  345. def read_data(self, input_data, mode="fit"):
  346. LOGGER.info("start to read sparse data and change data to instance")
  347. abnormal_detection.empty_table_detection(input_data)
  348. if not data_overview.get_data_shape(input_data):
  349. raise ValueError("input data's value is empty, it does not contain a label")
  350. if mode == "fit":
  351. data_instance = self.fit(input_data)
  352. else:
  353. data_instance = self.transform(input_data)
  354. schema = make_schema(self.header, self.sid_name, self.label_name)
  355. set_schema(data_instance, schema)
  356. return data_instance
  357. def fit(self, input_data):
  358. get_max_fid = functools.partial(self.get_max_feature_index, delimitor=self.delimitor)
  359. max_feature = input_data.mapValues(get_max_fid).reduce(lambda max_fid1, max_fid2: max(max_fid1, max_fid2))
  360. if max_feature == -1:
  361. raise ValueError("no feature value in input data, please check!")
  362. self.generate_header(max_feature)
  363. data_instance = self.gen_data_instance(input_data, max_feature)
  364. return data_instance
  365. def transform(self, input_data):
  366. max_feature = len(self.header)
  367. data_instance = self.gen_data_instance(input_data, max_feature)
  368. return data_instance
  369. def gen_data_instance(self, input_data, max_feature):
  370. params = [self.delimitor, self.data_type,
  371. self.label_type,
  372. self.output_format, max_feature]
  373. to_instance_with_param = functools.partial(self.to_instance, params)
  374. data_instance = input_data.mapValues(to_instance_with_param)
  375. return data_instance
  376. @staticmethod
  377. def to_instance(param_list, value):
  378. delimitor = param_list[0]
  379. data_type = param_list[1]
  380. label_type = param_list[2]
  381. output_format = param_list[3]
  382. max_fid = param_list[4]
  383. if output_format not in ["dense", "sparse"]:
  384. raise ValueError("output format {} is not define".format(output_format))
  385. cols = value.split(delimitor, -1)
  386. label = cols[0]
  387. if label_type == 'int':
  388. label = int(label)
  389. elif label_type in ["float", "float64"]:
  390. label = float(label)
  391. fid_value = []
  392. for i in range(1, len(cols)):
  393. fid, val = cols[i].split(":", -1)
  394. fid = int(fid)
  395. if data_type in ["float", "float64"]:
  396. val = float(val)
  397. elif data_type in ["int", "int64"]:
  398. val = int(val)
  399. fid_value.append((fid, val))
  400. if output_format == "dense":
  401. features = [0 for i in range(max_fid + 1)]
  402. for fid, val in fid_value:
  403. features[fid] = val
  404. features = np.asarray(features, dtype=data_type)
  405. else:
  406. indices = []
  407. data = []
  408. for fid, val in fid_value:
  409. indices.append(fid)
  410. data.append(val)
  411. features = SparseVector(indices, data, max_fid + 1)
  412. return Instance(inst_id=None,
  413. features=features,
  414. label=label)
  415. def save_model(self):
  416. dataio_meta, dataio_param = save_data_io_model(input_format="sparse",
  417. delimitor=self.delimitor,
  418. data_type=self.data_type,
  419. label_type=self.label_type,
  420. output_format=self.output_format,
  421. header=self.header,
  422. sid_name=self.sid_name,
  423. label_name=self.label_name,
  424. model_name="SparseFeatureReader")
  425. missing_imputer_meta, missing_imputer_param = save_missing_imputer_model(missing_fill=False,
  426. model_name="Imputer")
  427. dataio_meta.imputer_meta.CopyFrom(missing_imputer_meta)
  428. dataio_param.imputer_param.CopyFrom(missing_imputer_param)
  429. outlier_meta, outlier_param = save_outlier_model(outlier_replace=False,
  430. model_name="Outlier")
  431. dataio_meta.outlier_meta.CopyFrom(outlier_meta)
  432. dataio_param.outlier_param.CopyFrom(outlier_param)
  433. return {"DataIOMeta": dataio_meta,
  434. "DataIOParam": dataio_param
  435. }
  436. def load_model(self, model_meta, model_param):
  437. self.delimitor, self.data_type, _0, _1, _2, _3, \
  438. self.label_type, self.output_format, self.header, self.sid_name, self.label_name = load_data_io_model(
  439. "SparseFeatureReader",
  440. model_meta,
  441. model_param)
  442. # =============================================================================
  443. # SparseTagReader: mainly for tag data
  444. # =============================================================================
  445. class SparseTagReader(object):
  446. def __init__(self, data_io_param):
  447. self.delimitor = data_io_param.delimitor
  448. self.data_type = data_io_param.data_type
  449. self.tag_with_value = data_io_param.tag_with_value
  450. self.tag_value_delimitor = data_io_param.tag_value_delimitor
  451. self.with_label = data_io_param.with_label
  452. self.label_type = data_io_param.label_type if self.with_label else None
  453. self.output_format = data_io_param.output_format
  454. self.header = None
  455. self.sid_name = "sid"
  456. self.label_name = data_io_param.label_name if self.with_label else None
  457. self.missing_fill = data_io_param.missing_fill
  458. self.missing_fill_method = data_io_param.missing_fill_method
  459. self.default_value = data_io_param.default_value
  460. self.missing_impute_rate = None
  461. self.missing_impute = None
  462. @staticmethod
  463. def agg_tag(kvs, delimitor=' ', with_label=True, tag_with_value=False, tag_value_delimitor=":"):
  464. tags_set = set()
  465. for key, value in kvs:
  466. if with_label:
  467. cols = value.split(delimitor, -1)[1:]
  468. else:
  469. cols = value.split(delimitor, -1)[0:]
  470. if tag_with_value is False:
  471. tags = cols
  472. else:
  473. tags = [fea_value.split(tag_value_delimitor, -1)[0] for fea_value in cols]
  474. tags_set |= set(tags)
  475. return tags_set
  476. def generate_header(self, tags):
  477. self.header = tags
  478. def read_data(self, input_data, mode="fit"):
  479. LOGGER.info("start to read sparse data and change data to instance")
  480. abnormal_detection.empty_table_detection(input_data)
  481. if mode == "fit":
  482. data_instance = self.fit(input_data)
  483. if self.with_label:
  484. self.label_name = "label"
  485. else:
  486. data_instance = self.transform(input_data)
  487. schema = make_schema(self.header, self.sid_name, self.label_name)
  488. set_schema(data_instance, schema)
  489. return data_instance
  490. @staticmethod
  491. def change_tag_to_str(value, tags_dict=None, delimitor=",", with_label=False, tag_value_delimitor=":"):
  492. vals = value.split(delimitor, -1)
  493. ret = [''] * len(tags_dict)
  494. if with_label:
  495. vals = vals[1:]
  496. for i in range(len(vals)):
  497. tag, value = vals[i].split(tag_value_delimitor, -1)
  498. idx = tags_dict.get(tag, None)
  499. if idx is not None:
  500. ret[idx] = value
  501. return ret
  502. @staticmethod
  503. def change_str_to_tag(value, tags_dict=None, delimitor=",", tag_value_delimitor=":"):
  504. ret = [None] * len(tags_dict)
  505. tags = sorted(list(tags_dict.keys()))
  506. for i in range(len(value)):
  507. tag, val = tags[i], value[i]
  508. ret[i] = tag_value_delimitor.join([tag, val])
  509. return delimitor.join(ret)
  510. def fill_missing_value(self, input_data, tags_dict, mode="fit"):
  511. str_trans_method = functools.partial(self.change_tag_to_str,
  512. tags_dict=tags_dict,
  513. delimitor=self.delimitor,
  514. with_label=self.with_label,
  515. tag_value_delimitor=self.tag_value_delimitor)
  516. input_data = input_data.mapValues(str_trans_method)
  517. schema = make_schema(self.header, self.sid_name, self.label_name)
  518. set_schema(input_data, schema)
  519. from federatedml.feature.imputer import Imputer
  520. imputer_processor = Imputer()
  521. if mode == "fit":
  522. data, self.default_value = imputer_processor.fit(input_data,
  523. replace_method=self.missing_fill_method,
  524. replace_value=self.default_value)
  525. LOGGER.debug("self.default_value is {}".format(self.default_value))
  526. else:
  527. data = imputer_processor.transform(input_data,
  528. transform_value=self.default_value)
  529. if self.missing_impute is None:
  530. self.missing_impute = imputer_processor.get_missing_value_list()
  531. LOGGER.debug("self.missing_impute is {}".format(self.missing_impute))
  532. self.missing_impute_rate = imputer_processor.get_impute_rate(mode)
  533. str_trans_tag_method = functools.partial(self.change_str_to_tag,
  534. tags_dict=tags_dict,
  535. delimitor=self.delimitor,
  536. tag_value_delimitor=self.tag_value_delimitor)
  537. data = data.mapValues(str_trans_tag_method)
  538. return data
  539. def fit(self, input_data):
  540. tag_aggregator = functools.partial(SparseTagReader.agg_tag,
  541. delimitor=self.delimitor,
  542. with_label=self.with_label,
  543. tag_with_value=self.tag_with_value,
  544. tag_value_delimitor=self.tag_value_delimitor)
  545. tags_set_list = list(input_data.applyPartitions(tag_aggregator).collect())
  546. tags_set = set()
  547. for _, _tags_set in tags_set_list:
  548. tags_set |= _tags_set
  549. tags = list(tags_set)
  550. tags = sorted(tags)
  551. tags_dict = dict(zip(tags, range(len(tags))))
  552. self.generate_header(tags)
  553. if self.tag_with_value and self.missing_fill:
  554. input_data = self.fill_missing_value(input_data, tags_dict, mode="fit")
  555. data_instance = self.gen_data_instance(input_data, tags_dict)
  556. return data_instance
  557. def transform(self, input_data):
  558. tags_dict = dict(zip(self.header, range(len(self.header))))
  559. if self.tag_with_value and self.missing_fill:
  560. input_data = self.fill_missing_value(input_data, tags_dict, mode="transform")
  561. data_instance = self.gen_data_instance(input_data, tags_dict)
  562. return data_instance
  563. def gen_data_instance(self, input_data, tags_dict):
  564. params = [self.delimitor,
  565. self.data_type,
  566. self.tag_with_value,
  567. self.tag_value_delimitor,
  568. self.with_label,
  569. self.label_type,
  570. self.output_format,
  571. tags_dict]
  572. to_instance_with_param = functools.partial(self.to_instance, params)
  573. data_instance = input_data.mapValues(to_instance_with_param)
  574. return data_instance
  575. def get_summary(self):
  576. if not self.missing_fill:
  577. return {}
  578. missing_summary = dict()
  579. missing_summary["missing_value"] = list(self.missing_impute)
  580. missing_summary["missing_impute_value"] = dict(zip(self.header, self.default_value))
  581. missing_summary["missing_impute_rate"] = dict(zip(self.header, self.missing_impute_rate))
  582. summary_buf = {"missing_fill_info": missing_summary}
  583. return summary_buf
  584. @staticmethod
  585. def to_instance(param_list, value):
  586. delimitor = param_list[0]
  587. data_type = param_list[1]
  588. tag_with_value = param_list[2]
  589. tag_value_delimitor = param_list[3]
  590. with_label = param_list[4]
  591. label_type = param_list[5]
  592. output_format = param_list[6]
  593. tags_dict = param_list[7]
  594. if output_format not in ["dense", "sparse"]:
  595. raise ValueError("output format {} is not define".format(output_format))
  596. cols = value.split(delimitor, -1)
  597. start_pos = 0
  598. label = None
  599. if with_label:
  600. start_pos = 1
  601. label = cols[0]
  602. if label_type == 'int':
  603. label = int(label)
  604. elif label_type in ["float", "float64"]:
  605. label = float(label)
  606. if output_format == "dense":
  607. features = [0 for i in range(len(tags_dict))]
  608. for fea in cols[start_pos:]:
  609. if tag_with_value:
  610. _tag, _val = fea.split(tag_value_delimitor, -1)
  611. if _tag in tags_dict:
  612. features[tags_dict.get(_tag)] = _val
  613. else:
  614. if fea in tags_dict:
  615. features[tags_dict.get(fea)] = 1
  616. features = np.asarray(features, dtype=data_type)
  617. else:
  618. indices = []
  619. data = []
  620. for fea in cols[start_pos:]:
  621. if tag_with_value:
  622. _tag, _val = fea.split(tag_value_delimitor, -1)
  623. else:
  624. _tag = fea
  625. _val = 1
  626. if _tag not in tags_dict:
  627. continue
  628. indices.append(tags_dict.get(_tag))
  629. if data_type in ["float", "float64"]:
  630. _val = float(_val)
  631. elif data_type in ["int", "int64", "long"]:
  632. _val = int(_val)
  633. elif data_type == "str":
  634. _val = str(_val)
  635. data.append(_val)
  636. features = SparseVector(indices, data, len(tags_dict))
  637. return Instance(inst_id=None,
  638. features=features,
  639. label=label)
  640. def save_model(self):
  641. dataio_meta, dataio_param = save_data_io_model(input_format="tag",
  642. delimitor=self.delimitor,
  643. data_type=self.data_type,
  644. tag_with_value=self.tag_with_value,
  645. tag_value_delimitor=self.tag_value_delimitor,
  646. with_label=self.with_label,
  647. label_type=self.label_type,
  648. output_format=self.output_format,
  649. header=self.header,
  650. sid_name=self.sid_name,
  651. label_name=self.label_name,
  652. model_name="Reader")
  653. missing_imputer_meta, missing_imputer_param = save_missing_imputer_model(self.missing_fill,
  654. self.missing_fill_method,
  655. self.missing_impute,
  656. self.default_value,
  657. self.missing_impute_rate,
  658. self.header,
  659. "Imputer")
  660. dataio_meta.imputer_meta.CopyFrom(missing_imputer_meta)
  661. dataio_param.imputer_param.CopyFrom(missing_imputer_param)
  662. outlier_meta, outlier_param = save_outlier_model(outlier_replace=False,
  663. model_name="Outlier")
  664. dataio_meta.outlier_meta.CopyFrom(outlier_meta)
  665. dataio_param.outlier_param.CopyFrom(outlier_param)
  666. return {"DataIOMeta": dataio_meta,
  667. "DataIOParam": dataio_param
  668. }
  669. def load_model(self, model_meta, model_param):
  670. self.delimitor, self.data_type, _0, self.tag_with_value, self.tag_value_delimitor, self.with_label, \
  671. self.label_type, self.output_format, self.header, self.sid_name, self.label_name = load_data_io_model(
  672. "SparseTagReader",
  673. model_meta,
  674. model_param)
  675. self.missing_fill, self.missing_fill_method, \
  676. self.missing_impute, self.default_value = load_missing_imputer_model(self.header,
  677. "Imputer",
  678. model_meta.imputer_meta,
  679. model_param.imputer_param)
  680. class DataIO(ModelBase):
  681. def __init__(self):
  682. super(DataIO, self).__init__()
  683. self.reader = None
  684. from federatedml.param.dataio_param import DataIOParam
  685. self.model_param = DataIOParam()
  686. def _init_model(self, model_param):
  687. LOGGER.warning('DataIO is deprecated, and will be removed in 1.7, use DataTransform module instead')
  688. if model_param.input_format == "dense":
  689. self.reader = DenseFeatureReader(self.model_param)
  690. elif model_param.input_format == "sparse":
  691. self.reader = SparseFeatureReader(self.model_param)
  692. elif model_param.input_format == "tag":
  693. self.reader = SparseTagReader(self.model_param)
  694. self.model_param = model_param
  695. def load_model(self, model_dict):
  696. input_model_param = None
  697. input_model_meta = None
  698. for _, value in model_dict["model"].items():
  699. for model in value:
  700. if model.endswith("Meta"):
  701. input_model_meta = value[model]
  702. if model.endswith("Param"):
  703. input_model_param = value[model]
  704. if input_model_meta.input_format == "dense":
  705. self.reader = DenseFeatureReader(self.model_param)
  706. elif input_model_meta.input_format == "sparse":
  707. self.reader = SparseFeatureReader(self.model_param)
  708. elif input_model_meta.input_format == "tag":
  709. self.reader = SparseTagReader(self.model_param)
  710. self.reader.load_model(input_model_meta, input_model_param)
  711. def fit(self, data_inst):
  712. data_inst = self.reader.read_data(data_inst, "fit")
  713. if isinstance(self.reader, (DenseFeatureReader, SparseTagReader)):
  714. summary_buf = self.reader.get_summary()
  715. if summary_buf:
  716. self.set_summary(summary_buf)
  717. return data_inst
  718. def transform(self, data_inst):
  719. return self.reader.read_data(data_inst, "transform")
  720. def export_model(self):
  721. model_dict = self.reader.save_model()
  722. model_dict["DataIOMeta"].need_run = self.need_run
  723. return model_dict
  724. def make_schema(header=None, sid_name=None, label_name=None):
  725. schema = {}
  726. if header:
  727. schema["header"] = header
  728. if sid_name:
  729. schema["sid_name"] = sid_name
  730. if label_name:
  731. schema["label_name"] = label_name
  732. ModelBase.check_schema_content(schema)
  733. return schema
  734. def set_schema(data_instance, schema):
  735. data_instance.schema = schema
  736. def save_data_io_model(input_format="dense",
  737. delimitor=",",
  738. data_type="str",
  739. exclusive_data_type=None,
  740. tag_with_value=False,
  741. tag_value_delimitor=":",
  742. with_label=False,
  743. label_name='',
  744. label_type="int",
  745. output_format="dense",
  746. header=None,
  747. sid_name=None,
  748. model_name="DataIO"):
  749. model_meta = DataIOMeta()
  750. model_param = DataIOParam()
  751. model_meta.input_format = input_format
  752. model_meta.delimitor = delimitor
  753. model_meta.data_type = data_type
  754. model_meta.tag_with_value = tag_with_value
  755. model_meta.tag_value_delimitor = tag_value_delimitor
  756. model_meta.with_label = with_label
  757. if with_label:
  758. model_meta.label_name = label_name
  759. model_meta.label_type = label_type
  760. model_meta.output_format = output_format
  761. if header is not None:
  762. model_param.header.extend(header)
  763. if sid_name:
  764. model_param.sid_name = sid_name
  765. if label_name:
  766. model_param.label_name = label_name
  767. if exclusive_data_type is not None:
  768. model_meta.exclusive_data_type.update(exclusive_data_type)
  769. return model_meta, model_param
  770. def load_data_io_model(model_name="DataIO",
  771. model_meta=None,
  772. model_param=None):
  773. delimitor = model_meta.delimitor
  774. data_type = model_meta.data_type
  775. tag_with_value = model_meta.tag_with_value
  776. tag_value_delimitor = model_meta.tag_value_delimitor
  777. with_label = model_meta.with_label
  778. label_name = model_meta.label_name if with_label else None
  779. label_type = model_meta.label_type if with_label else None
  780. output_format = model_meta.output_format
  781. header = list(model_param.header) or None
  782. sid_name = None
  783. if model_param.sid_name:
  784. sid_name = model_param.sid_name
  785. exclusive_data_type = None
  786. if model_meta.exclusive_data_type:
  787. exclusive_data_type = {}
  788. for col_name in model_meta.exclusive_data_type:
  789. exclusive_data_type[col_name] = model_meta.exclusive_data_type.get(col_name)
  790. return delimitor, data_type, exclusive_data_type, tag_with_value, tag_value_delimitor, with_label, \
  791. label_type, output_format, header, sid_name, label_name
  792. def save_missing_imputer_model(missing_fill=False,
  793. missing_replace_method=None,
  794. missing_impute=None,
  795. missing_fill_value=None,
  796. missing_replace_rate=None,
  797. header=None,
  798. model_name="Imputer"):
  799. model_meta = ImputerMeta()
  800. model_param = ImputerParam()
  801. model_meta.is_imputer = missing_fill
  802. if missing_fill:
  803. if missing_replace_method:
  804. model_meta.strategy = str(missing_replace_method)
  805. if missing_impute is not None:
  806. model_meta.missing_value.extend(map(str, missing_impute))
  807. if missing_fill_value is not None:
  808. feature_value_dict = dict(zip(header, map(str, missing_fill_value)))
  809. model_param.missing_replace_value.update(feature_value_dict)
  810. if missing_replace_rate is not None:
  811. missing_replace_rate_dict = dict(zip(header, missing_replace_rate))
  812. model_param.missing_value_ratio.update(missing_replace_rate_dict)
  813. return model_meta, model_param
  814. def load_missing_imputer_model(header=None,
  815. model_name="Imputer",
  816. model_meta=None,
  817. model_param=None):
  818. missing_fill = model_meta.is_imputer
  819. missing_replace_method = model_meta.strategy
  820. missing_value = model_meta.missing_value
  821. missing_fill_value = model_param.missing_replace_value
  822. if missing_fill:
  823. if not missing_replace_method:
  824. missing_replace_method = None
  825. if not missing_value:
  826. missing_value = None
  827. else:
  828. missing_value = list(missing_value)
  829. if missing_fill_value:
  830. missing_fill_value = [missing_fill_value.get(head) for head in header]
  831. else:
  832. missing_fill_value = None
  833. else:
  834. missing_replace_method = None
  835. missing_value = None
  836. missing_fill_value = None
  837. return missing_fill, missing_replace_method, missing_value, missing_fill_value
  838. def save_outlier_model(outlier_replace=False,
  839. outlier_replace_method=None,
  840. outlier_impute=None,
  841. outlier_replace_value=None,
  842. outlier_replace_rate=None,
  843. header=None,
  844. model_name="Outlier"):
  845. model_meta = OutlierMeta()
  846. model_param = OutlierParam()
  847. model_meta.is_outlier = outlier_replace
  848. if outlier_replace:
  849. if outlier_replace_method:
  850. model_meta.strategy = str(outlier_replace_method)
  851. if outlier_impute:
  852. model_meta.outlier_value.extend(map(str, outlier_impute))
  853. if outlier_replace_value:
  854. outlier_value_dict = dict(zip(header, map(str, outlier_replace_value)))
  855. model_param.outlier_replace_value.update(outlier_value_dict)
  856. if outlier_replace_rate:
  857. outlier_value_ratio_dict = dict(zip(header, outlier_replace_rate))
  858. model_param.outlier_value_ratio.update(outlier_value_ratio_dict)
  859. return model_meta, model_param
  860. def load_outlier_model(header=None,
  861. model_name="Outlier",
  862. model_meta=None,
  863. model_param=None):
  864. outlier_replace = model_meta.is_outlier
  865. outlier_replace_method = model_meta.strategy
  866. outlier_value = model_meta.outlier_value
  867. outlier_replace_value = model_param.outlier_replace_value
  868. if outlier_replace:
  869. if not outlier_replace_method:
  870. outlier_replace_method = None
  871. if not outlier_value:
  872. outlier_value = None
  873. else:
  874. outlier_value = list(outlier_value)
  875. if outlier_replace_value:
  876. outlier_replace_value = [outlier_replace_value.get(head) for head in header]
  877. else:
  878. outlier_replace_value = None
  879. else:
  880. outlier_replace_method = None
  881. outlier_value = None
  882. outlier_replace_value = None
  883. return outlier_replace, outlier_replace_method, outlier_value, outlier_replace_value
  884. """
  885. class DataIO(ModelBase):
  886. def __init__(self):
  887. super(DataIO, self).__init__()
  888. from federatedml.param.data_transform_param import DataTransformParam
  889. from federatedml.util.data_transform import DataTransform
  890. self.model_param = DataTransformParam()
  891. self._transformer = DataTransform()
  892. def _init_model(self, model_param):
  893. LOGGER.warning('DataIO is deprecated, use DataTransform module instead')
  894. self._transformer._init_model(model_param)
  895. def load_model(self, model_dict):
  896. self._transformer.load_model(model_dict)
  897. def fit(self, data_inst):
  898. raise ValueError("In Fate-v1.9 or later version, DataIO is deprecated, use DataTransform instead.")
  899. def transform(self, data_inst):
  900. self._transformer.role = self.role
  901. self._transformer.component_properties = self.component_properties
  902. return self._transformer.transform(data_inst)
  903. def export_model(self):
  904. return self._transformer.export_model()