123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- ################################################################################
- #
- #
- ################################################################################
- import copy
- import functools
- import numpy as np
- from federatedml.feature.instance import Instance
- from federatedml.feature.sparse_vector import SparseVector
- from federatedml.model_base import ModelBase
- from federatedml.protobuf.generated.data_io_meta_pb2 import DataIOMeta
- from federatedml.protobuf.generated.data_io_meta_pb2 import ImputerMeta
- from federatedml.protobuf.generated.data_io_meta_pb2 import OutlierMeta
- from federatedml.protobuf.generated.data_io_param_pb2 import DataIOParam
- from federatedml.protobuf.generated.data_io_param_pb2 import ImputerParam
- from federatedml.protobuf.generated.data_io_param_pb2 import OutlierParam
- from federatedml.statistic import data_overview
- from federatedml.util import abnormal_detection
- from federatedml.util import consts
- from federatedml.util import LOGGER
- from federatedml.util.io_check import assert_io_num_rows_equal
- """
- # =============================================================================
- # DenseFeatureReader
- # =============================================================================
- class DenseFeatureReader(object):
- def __init__(self, data_io_param):
- self.delimitor = data_io_param.delimitor
- self.data_type = data_io_param.data_type
- self.exclusive_data_type = data_io_param.exclusive_data_type
- self.missing_fill = data_io_param.missing_fill
- self.default_value = data_io_param.default_value
- self.missing_fill_method = data_io_param.missing_fill_method
- self.missing_impute = data_io_param.missing_impute
- self.outlier_replace = data_io_param.outlier_replace
- self.outlier_replace_method = data_io_param.outlier_replace_method
- self.outlier_impute = data_io_param.outlier_impute
- self.outlier_replace_value = data_io_param.outlier_replace_value
- self.with_label = data_io_param.with_label
- self.label_name = data_io_param.label_name if self.with_label else None
- self.label_type = data_io_param.label_type if self.with_label else None
- self.output_format = data_io_param.output_format
- self.missing_impute_rate = None
- self.outlier_replace_rate = None
- self.label_idx = None
- self.header = None
- self.sid_name = None
- self.exclusive_data_type_fid_map = {}
- def generate_header(self, input_data, mode="fit"):
- header = input_data.schema["header"]
- sid_name = input_data.schema["sid"]
- LOGGER.debug("header is {}".format(header))
- LOGGER.debug("sid_name is {}".format(sid_name))
- if not header and not sid_name:
- raise ValueError("dense input-format should have header schema")
- header_gen = None
- if self.with_label:
- if mode == "fit":
- if not header:
- raise ValueError("dense input-format for fit stage should not be None if with_label is true")
- self.label_idx = header.split(self.delimitor, -1).index(self.label_name)
- header_gen = header.split(self.delimitor, -1)[: self.label_idx] + \
- header.split(self.delimitor, -1)[self.label_idx + 1:] or None
- elif header:
- header_list = header.split(self.delimitor, -1)
- if self.label_name in header_list:
- self.label_idx = header_list.index(self.label_name)
- header_gen = header.split(self.delimitor, -1)[: self.label_idx] + \
- header.split(self.delimitor, -1)[self.label_idx + 1:] or None
- else:
- self.label_idx = None
- header_gen = header.split(self.delimitor, -1)
- elif header:
- header_gen = header.split(self.delimitor, -1)
- self.header = header_gen
- self.sid_name = sid_name
- if header_gen:
- for i in range(len(header_gen)):
- col_name = header_gen[i]
- if self.exclusive_data_type is not None and col_name in self.exclusive_data_type:
- self.exclusive_data_type_fid_map[i] = self.exclusive_data_type[col_name]
- def get_schema(self):
- schema = make_schema(self.header, self.sid_name, self.label_name)
- return schema
- def read_data(self, input_data, mode="fit"):
- LOGGER.info("start to read dense data and change data to instance")
- abnormal_detection.empty_table_detection(input_data)
- input_data_labels = None
- fit_header = None
- if mode == "transform":
- fit_header = self.header
- self.generate_header(input_data, mode=mode)
- if self.label_idx is not None:
- data_shape = data_overview.get_data_shape(input_data)
- if not data_shape or self.label_idx >= data_shape:
- raise ValueError("input data's value is empty, it does not contain a label")
- input_data_features = input_data.mapValues(
- lambda value: [] if data_shape == 1 else value.split(self.delimitor, -1)[:self.label_idx] + value.split(
- self.delimitor, -1)[self.label_idx + 1:])
- input_data_labels = input_data.mapValues(lambda value: value.split(self.delimitor, -1)[self.label_idx])
- else:
- input_data_features = input_data.mapValues(
- lambda value: [] if not self.header else value.split(self.delimitor, -1))
- if mode == "fit":
- data_instance = self.fit(input_data, input_data_features, input_data_labels)
- else:
- data_instance = self.transform(input_data_features, input_data_labels)
- # data_instance = ModelBase.align_data_header(data_instance, fit_header)
- data_instance = data_overview.header_alignment(data_instance, fit_header)
- return data_instance
- def fit(self, input_data, input_data_features, input_data_labels):
- raise ValueError("In Fate-v1.9 or later version, DataIO is deprecated, use DataTransform instead.")
- schema = self.get_schema()
- set_schema(input_data_features, schema)
- input_data_features = self.fill_missing_value(input_data_features, "fit")
- input_data_features = self.replace_outlier_value(input_data_features, "fit")
- data_instance = self.gen_data_instance(input_data_features, input_data_labels)
- set_schema(data_instance, schema)
- return data_instance
- @assert_io_num_rows_equal
- def transform(self, input_data_features, input_data_labels):
- schema = make_schema(self.header, self.sid_name, self.label_name)
- set_schema(input_data_features, schema)
- input_data_features = self.fill_missing_value(input_data_features, "transform")
- input_data_features = self.replace_outlier_value(input_data_features, "transform")
- data_instance = self.gen_data_instance(input_data_features, input_data_labels)
- set_schema(data_instance, schema)
- return data_instance
- def fill_missing_value(self, input_data_features, mode="fit"):
- if self.missing_fill:
- from federatedml.feature.imputer import Imputer
- imputer_processor = Imputer(self.missing_impute)
- if mode == "fit":
- input_data_features, self.default_value = imputer_processor.fit(input_data_features,
- replace_method=self.missing_fill_method,
- replace_value=self.default_value)
- if self.missing_impute is None:
- self.missing_impute = imputer_processor.get_missing_value_list()
- else:
- input_data_features = imputer_processor.transform(input_data_features,
- transform_value=self.default_value)
- if self.missing_impute is None:
- self.missing_impute = imputer_processor.get_missing_value_list()
- self.missing_impute_rate = imputer_processor.get_impute_rate(mode)
- return input_data_features
- def replace_outlier_value(self, input_data_features, mode="fit"):
- if self.outlier_replace:
- from federatedml.feature.imputer import Imputer
- imputer_processor = Imputer(self.outlier_impute)
- if mode == "fit":
- input_data_features, self.outlier_replace_value = \
- imputer_processor.fit(input_data_features,
- replace_method=self.outlier_replace_method,
- replace_value=self.outlier_replace_value)
- if self.outlier_impute is None:
- self.outlier_impute = imputer_processor.get_missing_value_list()
- else:
- input_data_features = imputer_processor.transform(input_data_features,
- transform_value=self.outlier_replace_value)
- self.outlier_replace_rate = imputer_processor.get_impute_rate(mode)
- return input_data_features
- def gen_data_instance(self, input_data_features, input_data_labels):
- if self.label_idx is not None:
- data_instance = input_data_features.join(input_data_labels,
- lambda features, label:
- self.to_instance(features, label))
- else:
- data_instance = input_data_features.mapValues(lambda features: self.to_instance(features))
- return data_instance
- def to_instance(self, features, label=None):
- if self.header is None and len(features) != 0:
- raise ValueError("features shape {} not equal to header shape 0".format(len(features)))
- elif self.header is not None and len(self.header) != len(features):
- raise ValueError("features shape {} not equal to header shape {}".format(len(features), len(self.header)))
- if self.label_idx is not None:
- if self.label_type == 'int':
- label = int(label)
- elif self.label_type in ["float", "float64"]:
- label = float(label)
- format_features = DenseFeatureReader.gen_output_format(
- features,
- self.data_type,
- self.exclusive_data_type_fid_map,
- self.output_format,
- missing_impute=self.missing_impute)
- else:
- format_features = DenseFeatureReader.gen_output_format(
- features,
- self.data_type,
- self.exclusive_data_type_fid_map,
- self.output_format,
- missing_impute=self.missing_impute)
- return Instance(inst_id=None,
- features=format_features,
- label=label)
- @staticmethod
- def gen_output_format(features, data_type='float', exclusive_data_type_fid_map=None,
- output_format='dense', missing_impute=None):
- if output_format not in ["dense", "sparse"]:
- raise ValueError("output format {} is not define".format(output_format))
- if output_format == "dense":
- format_features = copy.deepcopy(features)
- if data_type in ["int", "int64", "long", "float", "float64", "double"]:
- for i in range(len(features)):
- if (missing_impute is not None and features[i] in missing_impute) or \
- (missing_impute is None and features[i] in ['', 'NULL', 'null', "NA"]):
- format_features[i] = np.nan
- if exclusive_data_type_fid_map:
- for fid in range(len(features)):
- if fid in exclusive_data_type_fid_map:
- dtype = exclusive_data_type_fid_map[fid]
- else:
- dtype = data_type
- format_features[fid] = getattr(np, dtype)(features[fid])
- return np.asarray(format_features, dtype=object)
- else:
- return np.asarray(format_features, dtype=data_type)
- indices = []
- data = []
- column_shape = len(features)
- non_zero = 0
- for i in range(column_shape):
- if (missing_impute is not None and features[i] in missing_impute) or \
- (missing_impute is None and features[i] in ['', 'NULL', 'null', "NA"]):
- indices.append(i)
- data.append(np.nan)
- non_zero += 1
- elif data_type in ['float', 'float64', "double"]:
- if np.fabs(float(features[i])) < consts.FLOAT_ZERO:
- continue
- indices.append(i)
- data.append(float(features[i]))
- non_zero += 1
- elif data_type in ['int', "int64", "long"]:
- if int(features[i]) == 0:
- continue
- indices.append(i)
- data.append(int(features[i]))
- else:
- indices.append(i)
- data.append(features[i])
- return SparseVector(indices, data, column_shape)
- def get_summary(self):
- if not self.missing_fill and not self.outlier_replace:
- return {}
- summary_buf = {}
- if self.missing_fill:
- missing_summary = dict()
- missing_summary["missing_value"] = list(self.missing_impute)
- missing_summary["missing_impute_value"] = dict(zip(self.header, self.default_value))
- missing_summary["missing_impute_rate"] = dict(zip(self.header, self.missing_impute_rate))
- summary_buf["missing_fill_info"] = missing_summary
- if self.outlier_replace:
- outlier_replace_summary = dict()
- outlier_replace_summary["outlier_value"] = list(self.outlier_impute)
- outlier_replace_summary["outlier_replace_value"] = dict(zip(self.header, self.outlier_replace_value))
- outlier_replace_summary["outlier_replace_rate"] = dict(zip(self.header, self.outlier_replace_rate))
- summary_buf["outlier_replace_rate"] = outlier_replace_summary
- return summary_buf
- def save_model(self):
- dataio_meta, dataio_param = save_data_io_model(input_format="dense",
- delimitor=self.delimitor,
- data_type=self.data_type,
- exclusive_data_type=self.exclusive_data_type,
- with_label=self.with_label,
- label_type=self.label_type,
- output_format=self.output_format,
- header=self.header,
- sid_name=self.sid_name,
- label_name=self.label_name,
- model_name="DenseFeatureReader")
- missing_imputer_meta, missing_imputer_param = save_missing_imputer_model(self.missing_fill,
- self.missing_fill_method,
- self.missing_impute,
- self.default_value,
- self.missing_impute_rate,
- self.header,
- "Imputer")
- dataio_meta.imputer_meta.CopyFrom(missing_imputer_meta)
- dataio_param.imputer_param.CopyFrom(missing_imputer_param)
- outlier_meta, outlier_param = save_outlier_model(self.outlier_replace,
- self.outlier_replace_method,
- self.outlier_impute,
- self.outlier_replace_value,
- self.outlier_replace_rate,
- self.header,
- "Outlier")
- dataio_meta.outlier_meta.CopyFrom(outlier_meta)
- dataio_param.outlier_param.CopyFrom(outlier_param)
- return {"DataIOMeta": dataio_meta,
- "DataIOParam": dataio_param
- }
- def load_model(self, model_meta, model_param):
- self.delimitor, self.data_type, self.exclusive_data_type, _1, _2, self.with_label, \
- self.label_type, self.output_format, self.header, self.sid_name, self.label_name = \
- load_data_io_model("DenseFeatureReader", model_meta, model_param)
- self.missing_fill, self.missing_fill_method, \
- self.missing_impute, self.default_value = load_missing_imputer_model(self.header,
- "Imputer",
- model_meta.imputer_meta,
- model_param.imputer_param)
- self.outlier_replace, self.outlier_replace_method, \
- self.outlier_impute, self.outlier_replace_value = load_outlier_model(self.header,
- "Outlier",
- model_meta.outlier_meta,
- model_param.outlier_param)
- # =============================================================================
- # SparseFeatureReader: mainly for libsvm input format
- # =============================================================================
- class SparseFeatureReader(object):
- def __init__(self, data_io_param):
- self.delimitor = data_io_param.delimitor
- self.data_type = data_io_param.data_type
- self.label_type = data_io_param.label_type
- self.output_format = data_io_param.output_format
- self.header = None
- self.sid_name = "sid"
- self.label_name = data_io_param.label_name
- def get_max_feature_index(self, line, delimitor=' '):
- if line.strip() == '':
- raise ValueError("find an empty line, please check!!!")
- cols = line.split(delimitor, -1)
- if len(cols) <= 1:
- return -1
- return max([int(fid_value.split(":", -1)[0]) for fid_value in cols[1:]])
- def generate_header(self, max_feature):
- self.header = [str(i) for i in range(max_feature + 1)]
- def read_data(self, input_data, mode="fit"):
- LOGGER.info("start to read sparse data and change data to instance")
- abnormal_detection.empty_table_detection(input_data)
- if not data_overview.get_data_shape(input_data):
- raise ValueError("input data's value is empty, it does not contain a label")
- if mode == "fit":
- data_instance = self.fit(input_data)
- else:
- data_instance = self.transform(input_data)
- schema = make_schema(self.header, self.sid_name, self.label_name)
- set_schema(data_instance, schema)
- return data_instance
- def fit(self, input_data):
- get_max_fid = functools.partial(self.get_max_feature_index, delimitor=self.delimitor)
- max_feature = input_data.mapValues(get_max_fid).reduce(lambda max_fid1, max_fid2: max(max_fid1, max_fid2))
- if max_feature == -1:
- raise ValueError("no feature value in input data, please check!")
- self.generate_header(max_feature)
- data_instance = self.gen_data_instance(input_data, max_feature)
- return data_instance
- def transform(self, input_data):
- max_feature = len(self.header)
- data_instance = self.gen_data_instance(input_data, max_feature)
- return data_instance
- def gen_data_instance(self, input_data, max_feature):
- params = [self.delimitor, self.data_type,
- self.label_type,
- self.output_format, max_feature]
- to_instance_with_param = functools.partial(self.to_instance, params)
- data_instance = input_data.mapValues(to_instance_with_param)
- return data_instance
- @staticmethod
- def to_instance(param_list, value):
- delimitor = param_list[0]
- data_type = param_list[1]
- label_type = param_list[2]
- output_format = param_list[3]
- max_fid = param_list[4]
- if output_format not in ["dense", "sparse"]:
- raise ValueError("output format {} is not define".format(output_format))
- cols = value.split(delimitor, -1)
- label = cols[0]
- if label_type == 'int':
- label = int(label)
- elif label_type in ["float", "float64"]:
- label = float(label)
- fid_value = []
- for i in range(1, len(cols)):
- fid, val = cols[i].split(":", -1)
- fid = int(fid)
- if data_type in ["float", "float64"]:
- val = float(val)
- elif data_type in ["int", "int64"]:
- val = int(val)
- fid_value.append((fid, val))
- if output_format == "dense":
- features = [0 for i in range(max_fid + 1)]
- for fid, val in fid_value:
- features[fid] = val
- features = np.asarray(features, dtype=data_type)
- else:
- indices = []
- data = []
- for fid, val in fid_value:
- indices.append(fid)
- data.append(val)
- features = SparseVector(indices, data, max_fid + 1)
- return Instance(inst_id=None,
- features=features,
- label=label)
- def save_model(self):
- dataio_meta, dataio_param = save_data_io_model(input_format="sparse",
- delimitor=self.delimitor,
- data_type=self.data_type,
- label_type=self.label_type,
- output_format=self.output_format,
- header=self.header,
- sid_name=self.sid_name,
- label_name=self.label_name,
- model_name="SparseFeatureReader")
- missing_imputer_meta, missing_imputer_param = save_missing_imputer_model(missing_fill=False,
- model_name="Imputer")
- dataio_meta.imputer_meta.CopyFrom(missing_imputer_meta)
- dataio_param.imputer_param.CopyFrom(missing_imputer_param)
- outlier_meta, outlier_param = save_outlier_model(outlier_replace=False,
- model_name="Outlier")
- dataio_meta.outlier_meta.CopyFrom(outlier_meta)
- dataio_param.outlier_param.CopyFrom(outlier_param)
- return {"DataIOMeta": dataio_meta,
- "DataIOParam": dataio_param
- }
- def load_model(self, model_meta, model_param):
- self.delimitor, self.data_type, _0, _1, _2, _3, \
- self.label_type, self.output_format, self.header, self.sid_name, self.label_name = load_data_io_model(
- "SparseFeatureReader",
- model_meta,
- model_param)
- # =============================================================================
- # SparseTagReader: mainly for tag data
- # =============================================================================
- class SparseTagReader(object):
- def __init__(self, data_io_param):
- self.delimitor = data_io_param.delimitor
- self.data_type = data_io_param.data_type
- self.tag_with_value = data_io_param.tag_with_value
- self.tag_value_delimitor = data_io_param.tag_value_delimitor
- self.with_label = data_io_param.with_label
- self.label_type = data_io_param.label_type if self.with_label else None
- self.output_format = data_io_param.output_format
- self.header = None
- self.sid_name = "sid"
- self.label_name = data_io_param.label_name if self.with_label else None
- self.missing_fill = data_io_param.missing_fill
- self.missing_fill_method = data_io_param.missing_fill_method
- self.default_value = data_io_param.default_value
- self.missing_impute_rate = None
- self.missing_impute = None
- @staticmethod
- def agg_tag(kvs, delimitor=' ', with_label=True, tag_with_value=False, tag_value_delimitor=":"):
- tags_set = set()
- for key, value in kvs:
- if with_label:
- cols = value.split(delimitor, -1)[1:]
- else:
- cols = value.split(delimitor, -1)[0:]
- if tag_with_value is False:
- tags = cols
- else:
- tags = [fea_value.split(tag_value_delimitor, -1)[0] for fea_value in cols]
- tags_set |= set(tags)
- return tags_set
- def generate_header(self, tags):
- self.header = tags
- def read_data(self, input_data, mode="fit"):
- LOGGER.info("start to read sparse data and change data to instance")
- abnormal_detection.empty_table_detection(input_data)
- if mode == "fit":
- data_instance = self.fit(input_data)
- if self.with_label:
- self.label_name = "label"
- else:
- data_instance = self.transform(input_data)
- schema = make_schema(self.header, self.sid_name, self.label_name)
- set_schema(data_instance, schema)
- return data_instance
- @staticmethod
- def change_tag_to_str(value, tags_dict=None, delimitor=",", with_label=False, tag_value_delimitor=":"):
- vals = value.split(delimitor, -1)
- ret = [''] * len(tags_dict)
- if with_label:
- vals = vals[1:]
- for i in range(len(vals)):
- tag, value = vals[i].split(tag_value_delimitor, -1)
- idx = tags_dict.get(tag, None)
- if idx is not None:
- ret[idx] = value
- return ret
- @staticmethod
- def change_str_to_tag(value, tags_dict=None, delimitor=",", tag_value_delimitor=":"):
- ret = [None] * len(tags_dict)
- tags = sorted(list(tags_dict.keys()))
- for i in range(len(value)):
- tag, val = tags[i], value[i]
- ret[i] = tag_value_delimitor.join([tag, val])
- return delimitor.join(ret)
- def fill_missing_value(self, input_data, tags_dict, mode="fit"):
- str_trans_method = functools.partial(self.change_tag_to_str,
- tags_dict=tags_dict,
- delimitor=self.delimitor,
- with_label=self.with_label,
- tag_value_delimitor=self.tag_value_delimitor)
- input_data = input_data.mapValues(str_trans_method)
- schema = make_schema(self.header, self.sid_name, self.label_name)
- set_schema(input_data, schema)
- from federatedml.feature.imputer import Imputer
- imputer_processor = Imputer()
- if mode == "fit":
- data, self.default_value = imputer_processor.fit(input_data,
- replace_method=self.missing_fill_method,
- replace_value=self.default_value)
- LOGGER.debug("self.default_value is {}".format(self.default_value))
- else:
- data = imputer_processor.transform(input_data,
- transform_value=self.default_value)
- if self.missing_impute is None:
- self.missing_impute = imputer_processor.get_missing_value_list()
- LOGGER.debug("self.missing_impute is {}".format(self.missing_impute))
- self.missing_impute_rate = imputer_processor.get_impute_rate(mode)
- str_trans_tag_method = functools.partial(self.change_str_to_tag,
- tags_dict=tags_dict,
- delimitor=self.delimitor,
- tag_value_delimitor=self.tag_value_delimitor)
- data = data.mapValues(str_trans_tag_method)
- return data
- def fit(self, input_data):
- tag_aggregator = functools.partial(SparseTagReader.agg_tag,
- delimitor=self.delimitor,
- with_label=self.with_label,
- tag_with_value=self.tag_with_value,
- tag_value_delimitor=self.tag_value_delimitor)
- tags_set_list = list(input_data.applyPartitions(tag_aggregator).collect())
- tags_set = set()
- for _, _tags_set in tags_set_list:
- tags_set |= _tags_set
- tags = list(tags_set)
- tags = sorted(tags)
- tags_dict = dict(zip(tags, range(len(tags))))
- self.generate_header(tags)
- if self.tag_with_value and self.missing_fill:
- input_data = self.fill_missing_value(input_data, tags_dict, mode="fit")
- data_instance = self.gen_data_instance(input_data, tags_dict)
- return data_instance
- def transform(self, input_data):
- tags_dict = dict(zip(self.header, range(len(self.header))))
- if self.tag_with_value and self.missing_fill:
- input_data = self.fill_missing_value(input_data, tags_dict, mode="transform")
- data_instance = self.gen_data_instance(input_data, tags_dict)
- return data_instance
- def gen_data_instance(self, input_data, tags_dict):
- params = [self.delimitor,
- self.data_type,
- self.tag_with_value,
- self.tag_value_delimitor,
- self.with_label,
- self.label_type,
- self.output_format,
- tags_dict]
- to_instance_with_param = functools.partial(self.to_instance, params)
- data_instance = input_data.mapValues(to_instance_with_param)
- return data_instance
- def get_summary(self):
- if not self.missing_fill:
- return {}
- missing_summary = dict()
- missing_summary["missing_value"] = list(self.missing_impute)
- missing_summary["missing_impute_value"] = dict(zip(self.header, self.default_value))
- missing_summary["missing_impute_rate"] = dict(zip(self.header, self.missing_impute_rate))
- summary_buf = {"missing_fill_info": missing_summary}
- return summary_buf
- @staticmethod
- def to_instance(param_list, value):
- delimitor = param_list[0]
- data_type = param_list[1]
- tag_with_value = param_list[2]
- tag_value_delimitor = param_list[3]
- with_label = param_list[4]
- label_type = param_list[5]
- output_format = param_list[6]
- tags_dict = param_list[7]
- if output_format not in ["dense", "sparse"]:
- raise ValueError("output format {} is not define".format(output_format))
- cols = value.split(delimitor, -1)
- start_pos = 0
- label = None
- if with_label:
- start_pos = 1
- label = cols[0]
- if label_type == 'int':
- label = int(label)
- elif label_type in ["float", "float64"]:
- label = float(label)
- if output_format == "dense":
- features = [0 for i in range(len(tags_dict))]
- for fea in cols[start_pos:]:
- if tag_with_value:
- _tag, _val = fea.split(tag_value_delimitor, -1)
- if _tag in tags_dict:
- features[tags_dict.get(_tag)] = _val
- else:
- if fea in tags_dict:
- features[tags_dict.get(fea)] = 1
- features = np.asarray(features, dtype=data_type)
- else:
- indices = []
- data = []
- for fea in cols[start_pos:]:
- if tag_with_value:
- _tag, _val = fea.split(tag_value_delimitor, -1)
- else:
- _tag = fea
- _val = 1
- if _tag not in tags_dict:
- continue
- indices.append(tags_dict.get(_tag))
- if data_type in ["float", "float64"]:
- _val = float(_val)
- elif data_type in ["int", "int64", "long"]:
- _val = int(_val)
- elif data_type == "str":
- _val = str(_val)
- data.append(_val)
- features = SparseVector(indices, data, len(tags_dict))
- return Instance(inst_id=None,
- features=features,
- label=label)
- def save_model(self):
- dataio_meta, dataio_param = save_data_io_model(input_format="tag",
- delimitor=self.delimitor,
- data_type=self.data_type,
- tag_with_value=self.tag_with_value,
- tag_value_delimitor=self.tag_value_delimitor,
- with_label=self.with_label,
- label_type=self.label_type,
- output_format=self.output_format,
- header=self.header,
- sid_name=self.sid_name,
- label_name=self.label_name,
- model_name="Reader")
- missing_imputer_meta, missing_imputer_param = save_missing_imputer_model(self.missing_fill,
- self.missing_fill_method,
- self.missing_impute,
- self.default_value,
- self.missing_impute_rate,
- self.header,
- "Imputer")
- dataio_meta.imputer_meta.CopyFrom(missing_imputer_meta)
- dataio_param.imputer_param.CopyFrom(missing_imputer_param)
- outlier_meta, outlier_param = save_outlier_model(outlier_replace=False,
- model_name="Outlier")
- dataio_meta.outlier_meta.CopyFrom(outlier_meta)
- dataio_param.outlier_param.CopyFrom(outlier_param)
- return {"DataIOMeta": dataio_meta,
- "DataIOParam": dataio_param
- }
- def load_model(self, model_meta, model_param):
- self.delimitor, self.data_type, _0, self.tag_with_value, self.tag_value_delimitor, self.with_label, \
- self.label_type, self.output_format, self.header, self.sid_name, self.label_name = load_data_io_model(
- "SparseTagReader",
- model_meta,
- model_param)
- self.missing_fill, self.missing_fill_method, \
- self.missing_impute, self.default_value = load_missing_imputer_model(self.header,
- "Imputer",
- model_meta.imputer_meta,
- model_param.imputer_param)
- class DataIO(ModelBase):
- def __init__(self):
- super(DataIO, self).__init__()
- self.reader = None
- from federatedml.param.dataio_param import DataIOParam
- self.model_param = DataIOParam()
- def _init_model(self, model_param):
- LOGGER.warning('DataIO is deprecated, and will be removed in 1.7, use DataTransform module instead')
- if model_param.input_format == "dense":
- self.reader = DenseFeatureReader(self.model_param)
- elif model_param.input_format == "sparse":
- self.reader = SparseFeatureReader(self.model_param)
- elif model_param.input_format == "tag":
- self.reader = SparseTagReader(self.model_param)
- self.model_param = model_param
- def load_model(self, model_dict):
- input_model_param = None
- input_model_meta = None
- for _, value in model_dict["model"].items():
- for model in value:
- if model.endswith("Meta"):
- input_model_meta = value[model]
- if model.endswith("Param"):
- input_model_param = value[model]
- if input_model_meta.input_format == "dense":
- self.reader = DenseFeatureReader(self.model_param)
- elif input_model_meta.input_format == "sparse":
- self.reader = SparseFeatureReader(self.model_param)
- elif input_model_meta.input_format == "tag":
- self.reader = SparseTagReader(self.model_param)
- self.reader.load_model(input_model_meta, input_model_param)
- def fit(self, data_inst):
- data_inst = self.reader.read_data(data_inst, "fit")
- if isinstance(self.reader, (DenseFeatureReader, SparseTagReader)):
- summary_buf = self.reader.get_summary()
- if summary_buf:
- self.set_summary(summary_buf)
- return data_inst
- def transform(self, data_inst):
- return self.reader.read_data(data_inst, "transform")
- def export_model(self):
- model_dict = self.reader.save_model()
- model_dict["DataIOMeta"].need_run = self.need_run
- return model_dict
- def make_schema(header=None, sid_name=None, label_name=None):
- schema = {}
- if header:
- schema["header"] = header
- if sid_name:
- schema["sid_name"] = sid_name
- if label_name:
- schema["label_name"] = label_name
- ModelBase.check_schema_content(schema)
- return schema
- def set_schema(data_instance, schema):
- data_instance.schema = schema
- def save_data_io_model(input_format="dense",
- delimitor=",",
- data_type="str",
- exclusive_data_type=None,
- tag_with_value=False,
- tag_value_delimitor=":",
- with_label=False,
- label_name='',
- label_type="int",
- output_format="dense",
- header=None,
- sid_name=None,
- model_name="DataIO"):
- model_meta = DataIOMeta()
- model_param = DataIOParam()
- model_meta.input_format = input_format
- model_meta.delimitor = delimitor
- model_meta.data_type = data_type
- model_meta.tag_with_value = tag_with_value
- model_meta.tag_value_delimitor = tag_value_delimitor
- model_meta.with_label = with_label
- if with_label:
- model_meta.label_name = label_name
- model_meta.label_type = label_type
- model_meta.output_format = output_format
- if header is not None:
- model_param.header.extend(header)
- if sid_name:
- model_param.sid_name = sid_name
- if label_name:
- model_param.label_name = label_name
- if exclusive_data_type is not None:
- model_meta.exclusive_data_type.update(exclusive_data_type)
- return model_meta, model_param
- def load_data_io_model(model_name="DataIO",
- model_meta=None,
- model_param=None):
- delimitor = model_meta.delimitor
- data_type = model_meta.data_type
- tag_with_value = model_meta.tag_with_value
- tag_value_delimitor = model_meta.tag_value_delimitor
- with_label = model_meta.with_label
- label_name = model_meta.label_name if with_label else None
- label_type = model_meta.label_type if with_label else None
- output_format = model_meta.output_format
- header = list(model_param.header) or None
- sid_name = None
- if model_param.sid_name:
- sid_name = model_param.sid_name
- exclusive_data_type = None
- if model_meta.exclusive_data_type:
- exclusive_data_type = {}
- for col_name in model_meta.exclusive_data_type:
- exclusive_data_type[col_name] = model_meta.exclusive_data_type.get(col_name)
- return delimitor, data_type, exclusive_data_type, tag_with_value, tag_value_delimitor, with_label, \
- label_type, output_format, header, sid_name, label_name
- def save_missing_imputer_model(missing_fill=False,
- missing_replace_method=None,
- missing_impute=None,
- missing_fill_value=None,
- missing_replace_rate=None,
- header=None,
- model_name="Imputer"):
- model_meta = ImputerMeta()
- model_param = ImputerParam()
- model_meta.is_imputer = missing_fill
- if missing_fill:
- if missing_replace_method:
- model_meta.strategy = str(missing_replace_method)
- if missing_impute is not None:
- model_meta.missing_value.extend(map(str, missing_impute))
- if missing_fill_value is not None:
- feature_value_dict = dict(zip(header, map(str, missing_fill_value)))
- model_param.missing_replace_value.update(feature_value_dict)
- if missing_replace_rate is not None:
- missing_replace_rate_dict = dict(zip(header, missing_replace_rate))
- model_param.missing_value_ratio.update(missing_replace_rate_dict)
- return model_meta, model_param
- def load_missing_imputer_model(header=None,
- model_name="Imputer",
- model_meta=None,
- model_param=None):
- missing_fill = model_meta.is_imputer
- missing_replace_method = model_meta.strategy
- missing_value = model_meta.missing_value
- missing_fill_value = model_param.missing_replace_value
- if missing_fill:
- if not missing_replace_method:
- missing_replace_method = None
- if not missing_value:
- missing_value = None
- else:
- missing_value = list(missing_value)
- if missing_fill_value:
- missing_fill_value = [missing_fill_value.get(head) for head in header]
- else:
- missing_fill_value = None
- else:
- missing_replace_method = None
- missing_value = None
- missing_fill_value = None
- return missing_fill, missing_replace_method, missing_value, missing_fill_value
- def save_outlier_model(outlier_replace=False,
- outlier_replace_method=None,
- outlier_impute=None,
- outlier_replace_value=None,
- outlier_replace_rate=None,
- header=None,
- model_name="Outlier"):
- model_meta = OutlierMeta()
- model_param = OutlierParam()
- model_meta.is_outlier = outlier_replace
- if outlier_replace:
- if outlier_replace_method:
- model_meta.strategy = str(outlier_replace_method)
- if outlier_impute:
- model_meta.outlier_value.extend(map(str, outlier_impute))
- if outlier_replace_value:
- outlier_value_dict = dict(zip(header, map(str, outlier_replace_value)))
- model_param.outlier_replace_value.update(outlier_value_dict)
- if outlier_replace_rate:
- outlier_value_ratio_dict = dict(zip(header, outlier_replace_rate))
- model_param.outlier_value_ratio.update(outlier_value_ratio_dict)
- return model_meta, model_param
- def load_outlier_model(header=None,
- model_name="Outlier",
- model_meta=None,
- model_param=None):
- outlier_replace = model_meta.is_outlier
- outlier_replace_method = model_meta.strategy
- outlier_value = model_meta.outlier_value
- outlier_replace_value = model_param.outlier_replace_value
- if outlier_replace:
- if not outlier_replace_method:
- outlier_replace_method = None
- if not outlier_value:
- outlier_value = None
- else:
- outlier_value = list(outlier_value)
- if outlier_replace_value:
- outlier_replace_value = [outlier_replace_value.get(head) for head in header]
- else:
- outlier_replace_value = None
- else:
- outlier_replace_method = None
- outlier_value = None
- outlier_replace_value = None
- return outlier_replace, outlier_replace_method, outlier_value, outlier_replace_value
- """
- class DataIO(ModelBase):
- def __init__(self):
- super(DataIO, self).__init__()
- from federatedml.param.data_transform_param import DataTransformParam
- from federatedml.util.data_transform import DataTransform
- self.model_param = DataTransformParam()
- self._transformer = DataTransform()
- def _init_model(self, model_param):
- LOGGER.warning('DataIO is deprecated, use DataTransform module instead')
- self._transformer._init_model(model_param)
- def load_model(self, model_dict):
- self._transformer.load_model(model_dict)
- def fit(self, data_inst):
- raise ValueError("In Fate-v1.9 or later version, DataIO is deprecated, use DataTransform instead.")
- def transform(self, data_inst):
- self._transformer.role = self.role
- self._transformer.component_properties = self.component_properties
- return self._transformer.transform(data_inst)
- def export_model(self):
- return self._transformer.export_model()
|