#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright 2019 The FATE Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ################################################################################ # # ################################################################################ import copy import functools import numpy as np from federatedml.feature.instance import Instance from federatedml.feature.sparse_vector import SparseVector from federatedml.model_base import ModelBase from federatedml.protobuf.generated.data_transform_meta_pb2 import DataTransformMeta from federatedml.protobuf.generated.data_transform_meta_pb2 import DataTransformImputerMeta from federatedml.protobuf.generated.data_transform_meta_pb2 import DataTransformOutlierMeta from federatedml.protobuf.generated.data_transform_param_pb2 import DataTransformParam from federatedml.protobuf.generated.data_transform_param_pb2 import DataTransformImputerParam from federatedml.protobuf.generated.data_transform_param_pb2 import DataTransformOutlierParam from federatedml.statistic import data_overview from federatedml.util import abnormal_detection from federatedml.util import consts from federatedml.util import LOGGER from federatedml.util.io_check import assert_io_num_rows_equal from federatedml.util.data_format_preprocess import DataFormatPreProcess from federatedml.util.anonymous_generator_util import Anonymous # ============================================================================= # DenseFeatureTransformer # ============================================================================= class DenseFeatureTransformer(object): def __init__(self, data_transform_param): self.delimitor = data_transform_param.delimitor self.data_type = data_transform_param.data_type self.missing_fill = data_transform_param.missing_fill self.default_value = data_transform_param.default_value self.missing_fill_method = data_transform_param.missing_fill_method self.missing_impute = data_transform_param.missing_impute self.outlier_replace = data_transform_param.outlier_replace self.outlier_replace_method = data_transform_param.outlier_replace_method self.outlier_impute = data_transform_param.outlier_impute self.outlier_replace_value = data_transform_param.outlier_replace_value self.with_label = data_transform_param.with_label self.label_name = data_transform_param.label_name.lower() if self.with_label else None self.label_type = data_transform_param.label_type if self.with_label else None self.output_format = data_transform_param.output_format self.missing_impute_rate = None self.outlier_replace_rate = None self.header = None self.sid_name = None self.exclusive_data_type_fid_map = {} self.match_id_name = data_transform_param.match_id_name self.match_id_index = 0 self.with_match_id = data_transform_param.with_match_id self.anonymous_generator = None self.anonymous_header = None if data_transform_param.exclusive_data_type: self.exclusive_data_type = dict([(k.lower(), v) for k, v in data_transform_param.exclusive_data_type.items()]) else: self.exclusive_data_type = None def _update_param(self, schema): meta = schema["meta"] self.delimitor = meta.get("delimiter", ",") self.data_type = meta.get("data_type") self.with_label = meta.get("with_label", False) if self.with_label: self.label_type = meta.get("label_type", "int") self.label_name = meta.get("label_name", '') self.with_match_id = meta.get("with_match_id", False) if self.with_match_id: match_id_name = schema.get("match_id_name", []) if not self.match_id_name: if isinstance(match_id_name, list) and len(self.match_id_name) > 1: raise ValueError("Multiple Match ID exist, please specified the one to use") self.match_id_name = match_id_name[0] if isinstance(match_id_name, list) else match_id_name self.match_id_index = schema["original_index_info"]["match_id_index"][0] else: try: idx = match_id_name.index(self.match_id_name) except ValueError: raise ValueError(f"Can not find {self.match_id_name} in {match_id_name}") self.match_id_index = schema["original_index_info"]["match_id_index"][idx] schema["match_id_name"] = self.match_id_name header = schema["header"] exclusive_data_type = meta.get("exclusive_data_type", None) if exclusive_data_type: self.exclusive_data_type = dict([(k.lower(), v) for k, v in exclusive_data_type.items()]) for idx, col_name in enumerate(header): if col_name in self.exclusive_data_type: self.exclusive_data_type_fid_map[idx] = self.exclusive_data_type[col_name] def extract_feature_value(self, value, header_index=None): if not header_index: return [] value = value.split(self.delimitor, -1) if len(value) <= header_index[-1]: raise ValueError("Feature shape is smaller than header shape") feature_values = [] for idx in header_index: feature_values.append(value[idx]) return feature_values def read_data(self, input_data, mode="fit"): LOGGER.info("start to read dense data and change data to instance") abnormal_detection.empty_table_detection(input_data) schema = copy.deepcopy(input_data.schema) if not schema.get("meta"): LOGGER.warning("Data meta is supported to be set with data uploading or binding, " "please refer to data transform using guides.") meta = dict(input_format="dense", delimiter=self.delimitor, with_label=self.with_label, label_name=self.label_name, with_match_id=self.with_match_id, data_type=self.data_type, ) if mode == "transform" and self.with_label \ and self.label_name not in schema["header"].split(self.delimitor, -1): del meta["label_name"] del meta["with_label"] schema["meta"] = meta generated_header = DataFormatPreProcess.generate_header(input_data, schema) schema.update(generated_header) schema = self.anonymous_generator.generate_anonymous_header(schema) set_schema(input_data, schema) else: self._update_param(schema) header = schema["header"] anonymous_header = schema["anonymous_header"] training_header = self.header if mode == "transform": if (set(self.header) & set(header)) != set(self.header): raise ValueError(f"Transform Data's header is {header}, expect {self.header}") self.header = header if not self.anonymous_header: self.anonymous_header = anonymous_header else: self.header = header self.anonymous_header = anonymous_header header_index = schema["original_index_info"]["header_index"] extract_feature_func = functools.partial(self.extract_feature_value, header_index=header_index) input_data_features = input_data.mapValues(extract_feature_func) # input_data_features.schema = input_data.schema input_data_features.schema = schema input_data_labels = None input_data_match_id = None if "label_name" in schema: label_index = schema["original_index_info"]["label_index"] input_data_labels = input_data.mapValues(lambda value: value.split(self.delimitor, -1)[label_index]) if self.with_match_id: input_data_match_id = input_data.mapValues( lambda value: value.split(self.delimitor, -1)[self.match_id_index]) if mode == "fit": data_instance = self.fit(input_data, input_data_features, input_data_labels, input_data_match_id) set_schema(data_instance, schema) else: data_instance = self.transform(input_data_features, input_data_labels, input_data_match_id) data_instance = data_overview.header_alignment(data_instance, training_header, self.anonymous_header) self.header = training_header return data_instance def fit(self, input_data, input_data_features, input_data_labels, input_data_match_id): input_data_features = self.fill_missing_value(input_data_features, "fit") input_data_features = self.replace_outlier_value(input_data_features, "fit") data_instance = self.gen_data_instance(input_data_features, input_data_labels, input_data_match_id) return data_instance @assert_io_num_rows_equal def transform(self, input_data_features, input_data_labels, input_data_match_id): schema = input_data_features.schema input_data_features = self.fill_missing_value(input_data_features, "transform") input_data_features = self.replace_outlier_value(input_data_features, "transform") data_instance = self.gen_data_instance(input_data_features, input_data_labels, input_data_match_id) data_instance.schema = schema return data_instance def fill_missing_value(self, input_data_features, mode="fit"): if self.missing_fill: from federatedml.feature.imputer import Imputer imputer_processor = Imputer(self.missing_impute) if mode == "fit": input_data_features, self.default_value = imputer_processor.fit(input_data_features, replace_method=self.missing_fill_method, replace_value=self.default_value) if self.missing_impute is None: self.missing_impute = imputer_processor.get_missing_value_list() else: input_data_features = imputer_processor.transform(input_data_features, transform_value=self.default_value) if self.missing_impute is None: self.missing_impute = imputer_processor.get_missing_value_list() self.missing_impute_rate = imputer_processor.get_impute_rate(mode) return input_data_features def replace_outlier_value(self, input_data_features, mode="fit"): if self.outlier_replace: from federatedml.feature.imputer import Imputer imputer_processor = Imputer(self.outlier_impute) if mode == "fit": input_data_features, self.outlier_replace_value = \ imputer_processor.fit(input_data_features, replace_method=self.outlier_replace_method, replace_value=self.outlier_replace_value) if self.outlier_impute is None: self.outlier_impute = imputer_processor.get_missing_value_list() else: input_data_features = imputer_processor.transform(input_data_features, transform_value=self.outlier_replace_value) self.outlier_replace_rate = imputer_processor.get_impute_rate(mode) return input_data_features def gen_data_instance(self, input_data_features, input_data_labels, input_data_match_id): if input_data_labels: data_instance = input_data_features.join(input_data_labels, lambda features, label: self.to_instance(features, label)) else: data_instance = input_data_features.mapValues(lambda features: self.to_instance(features)) if self.with_match_id: data_instance = data_instance.join(input_data_match_id, self.append_match_id) return data_instance def append_match_id(self, inst, match_id): inst.inst_id = match_id return inst def to_instance(self, features, label=None): if self.header is None and len(features) != 0: raise ValueError("features shape {} not equal to header shape 0".format(len(features))) elif self.header is not None and len(self.header) != len(features): raise ValueError("features shape {} not equal to header shape {}".format(len(features), len(self.header))) if label is not None: if self.label_type == 'int': label = int(label) elif self.label_type in ["float", "float64"]: label = float(label) format_features = DenseFeatureTransformer.gen_output_format(features, self.data_type, self.exclusive_data_type_fid_map, self.output_format, missing_impute=self.missing_impute) else: format_features = DenseFeatureTransformer.gen_output_format(features, self.data_type, self.exclusive_data_type_fid_map, self.output_format, missing_impute=self.missing_impute) return Instance(inst_id=None, features=format_features, label=label) @staticmethod def gen_output_format(features, data_type='float', exclusive_data_type_fid_map=None, output_format='dense', missing_impute=None): if output_format not in ["dense", "sparse"]: raise ValueError("output format {} is not define".format(output_format)) missing_impute_dtype_set = {"int", "int64", "long", "float", "float64", "double"} missing_impute_value_set = {'', 'NULL', 'null', "NA"} type_mapping = dict() if output_format == "dense": # format_features = copy.deepcopy(features) format_features = [None] * len(features) for fid in range(len(features)): if exclusive_data_type_fid_map is not None and fid in exclusive_data_type_fid_map: dtype = exclusive_data_type_fid_map[fid] else: dtype = data_type if dtype in missing_impute_dtype_set: if (missing_impute is not None and features[fid] in missing_impute) or \ (missing_impute is None and features[fid] in missing_impute_value_set): format_features[fid] = np.nan continue format_features[fid] = features[fid] if exclusive_data_type_fid_map: if dtype not in type_mapping: np_type = getattr(np, dtype) type_mapping[dtype] = np_type format_features[fid] = type_mapping[dtype](format_features[fid]) if exclusive_data_type_fid_map: return np.asarray(format_features, dtype=object) else: return np.asarray(format_features, dtype=data_type) indices = [] data = [] column_shape = len(features) non_zero = 0 for i in range(column_shape): if (missing_impute is not None and features[i] in missing_impute) or \ (missing_impute is None and features[i] in missing_impute_value_set): indices.append(i) data.append(np.nan) non_zero += 1 elif data_type in ['float', 'float64', "double"]: if np.fabs(float(features[i])) < consts.FLOAT_ZERO: continue indices.append(i) data.append(float(features[i])) non_zero += 1 elif data_type in ['int', "int64", "long"]: if int(features[i]) == 0: continue indices.append(i) data.append(int(features[i])) else: indices.append(i) data.append(features[i]) return SparseVector(indices, data, column_shape) def get_summary(self): if not self.missing_fill and not self.outlier_replace: return {} summary_buf = {} if self.missing_fill: missing_summary = dict() missing_summary["missing_value"] = list(self.missing_impute) missing_summary["missing_impute_value"] = dict(zip(self.header, self.default_value)) missing_summary["missing_impute_rate"] = dict(zip(self.header, self.missing_impute_rate)) summary_buf["missing_fill_info"] = missing_summary if self.outlier_replace: outlier_replace_summary = dict() outlier_replace_summary["outlier_value"] = list(self.outlier_impute) outlier_replace_summary["outlier_replace_value"] = dict(zip(self.header, self.outlier_replace_value)) outlier_replace_summary["outlier_replace_rate"] = dict(zip(self.header, self.outlier_replace_rate)) summary_buf["outlier_replace_rate"] = outlier_replace_summary return summary_buf def save_model(self): transform_meta, transform_param = save_data_transform_model(input_format="dense", delimitor=self.delimitor, data_type=self.data_type, exclusive_data_type=self.exclusive_data_type, with_label=self.with_label, label_type=self.label_type, output_format=self.output_format, header=self.header, sid_name=self.sid_name, label_name=self.label_name, with_match_id=self.with_match_id, model_name="DenseFeatureTransformer", anonymous_header=self.anonymous_header) missing_imputer_meta, missing_imputer_param = save_missing_imputer_model(self.missing_fill, self.missing_fill_method, self.missing_impute, self.default_value, self.missing_impute_rate, self.header, "Imputer") transform_meta.imputer_meta.CopyFrom(missing_imputer_meta) transform_param.imputer_param.CopyFrom(missing_imputer_param) outlier_meta, outlier_param = save_outlier_model(self.outlier_replace, self.outlier_replace_method, self.outlier_impute, self.outlier_replace_value, self.outlier_replace_rate, self.header, "Outlier") transform_meta.outlier_meta.CopyFrom(outlier_meta) transform_param.outlier_param.CopyFrom(outlier_param) return {"DataTransformMeta": transform_meta, "DataTransformParam": transform_param } def load_model(self, model_meta, model_param): self.delimitor, self.data_type, self.exclusive_data_type, _1, _2, self.with_label, \ self.label_type, self.output_format, self.header, self.sid_name, self.label_name, self.with_match_id, self.anonymous_header = \ load_data_transform_model("DenseFeatureTransformer", model_meta, model_param) self.missing_fill, self.missing_fill_method, \ self.missing_impute, self.default_value = load_missing_imputer_model(self.header, "Imputer", model_meta.imputer_meta, model_param.imputer_param) self.outlier_replace, self.outlier_replace_method, \ self.outlier_impute, self.outlier_replace_value = load_outlier_model(self.header, "Outlier", model_meta.outlier_meta, model_param.outlier_param) # ============================================================================= # SparseFeatureTransformer: mainly for libsvm input format # ============================================================================= class SparseFeatureTransformer(object): def __init__(self, data_transform_param): self.delimitor = data_transform_param.delimitor self.data_type = data_transform_param.data_type self.label_type = data_transform_param.label_type self.output_format = data_transform_param.output_format self.header = None self.sid_name = "sid" self.with_match_id = data_transform_param.with_match_id self.match_id_name = "match_id" if self.with_match_id else None self.match_id_index = data_transform_param.match_id_index self.with_label = data_transform_param.with_label self.label_name = data_transform_param.label_name.lower() if self.with_label else None self.anonymous_generator = None self.anonymous_header = None def _update_param(self, schema): meta = schema["meta"] self.delimitor = meta.get("delimiter", ",") self.data_type = meta.get("data_type") self.with_label = meta.get("with_label", False) if self.with_label: self.label_type = meta.get("label_type", "int") self.label_name = meta.get("label_name", "") self.with_match_id = meta.get("with_match_id", False) if self.with_match_id: match_id_name = schema.get("match_id_name") if isinstance(match_id_name, list): self.match_id_name = match_id_name[self.match_id_index] else: self.match_id_name = match_id_name schema["match_id_name"] = self.match_id_name def read_data(self, input_data, mode="fit"): LOGGER.info("start to read sparse data and change data to instance") abnormal_detection.empty_table_detection(input_data) schema = copy.deepcopy(input_data.schema) if not schema.get("meta", {}): LOGGER.warning("Data meta is supported to be set with data uploading or binding, " "please refer to data transform using guides.") meta = dict(input_format="sparse", delimiter=self.delimitor, with_label=self.with_label, with_match_id=self.with_match_id, data_type=self.data_type) schema["meta"] = meta generated_header = DataFormatPreProcess.generate_header(input_data, schema) schema.update(generated_header) schema = self.anonymous_generator.generate_anonymous_header(schema) set_schema(input_data, schema) else: self._update_param(schema) if mode == "fit": self.header = schema["header"] self.anonymous_header = schema["anonymous_header"] data_instance = self.fit(input_data) else: if not self.anonymous_header: header_set = set(self.header) self.anonymous_header = [] for column, anonymous_column in zip(schema["header"], schema["anonymous_header"]): if column not in header_set: continue self.anonymous_header.append(anonymous_column) schema["header"] = self.header schema["anonymous_header"] = self.anonymous_header set_schema(input_data, schema) data_instance = self.transform(input_data) set_schema(data_instance, schema) return data_instance def fit(self, input_data): max_feature = len(self.header) if max_feature == 0: raise ValueError("no feature value in input data, please check!") data_instance = self.gen_data_instance(input_data, max_feature) return data_instance def transform(self, input_data): max_feature = len(self.header) data_instance = self.gen_data_instance(input_data, max_feature) return data_instance def gen_data_instance(self, input_data, max_feature): id_range = input_data.schema["meta"].get("id_range", 0) params = [self.delimitor, self.data_type, self.label_type, self.with_match_id, self.match_id_index, id_range, self.output_format, self.with_label, max_feature] to_instance_with_param = functools.partial(self.to_instance, params) data_instance = input_data.mapValues(to_instance_with_param) return data_instance @staticmethod def to_instance(param_list, value): delimitor = param_list[0] data_type = param_list[1] label_type = param_list[2] with_match_id = param_list[3] match_id_index = param_list[4] id_range = param_list[5] output_format = param_list[6] with_label = param_list[7] max_fid = param_list[8] if output_format not in ["dense", "sparse"]: raise ValueError("output format {} is not define".format(output_format)) cols = value.split(delimitor, -1) offset = 0 if with_match_id: offset = id_range if id_range else 1 match_id = cols[match_id_index] else: match_id = None label = None if with_label: label = cols[offset] if label_type == 'int': label = int(label) elif label_type in ["float", "float64"]: label = float(label) offset += 1 fid_value = [] for i in range(offset, len(cols)): fid, val = cols[i].split(":", -1) fid = int(fid) if data_type in ["float", "float64"]: val = float(val) elif data_type in ["int", "int64"]: val = int(val) fid_value.append((fid, val)) if output_format == "dense": features = [0 for i in range(max_fid)] for fid, val in fid_value: features[fid] = val features = np.asarray(features, dtype=data_type) else: indices = [] data = [] for fid, val in fid_value: indices.append(fid) data.append(val) features = SparseVector(indices, data, max_fid) return Instance(inst_id=match_id, features=features, label=label) def save_model(self): transform_meta, transform_param = save_data_transform_model(input_format="sparse", delimitor=self.delimitor, data_type=self.data_type, label_type=self.label_type, output_format=self.output_format, header=self.header, sid_name=self.sid_name, label_name=self.label_name, with_match_id=self.with_match_id, with_label=self.with_label, model_name="SparseFeatureTransformer", anonymous_header=self.anonymous_header) missing_imputer_meta, missing_imputer_param = save_missing_imputer_model(missing_fill=False, model_name="Imputer") transform_meta.imputer_meta.CopyFrom(missing_imputer_meta) transform_param.imputer_param.CopyFrom(missing_imputer_param) outlier_meta, outlier_param = save_outlier_model(outlier_replace=False, model_name="Outlier") transform_meta.outlier_meta.CopyFrom(outlier_meta) transform_param.outlier_param.CopyFrom(outlier_param) return {"DataTransformMeta": transform_meta, "DataTransformParam": transform_param } def load_model(self, model_meta, model_param): self.delimitor, self.data_type, _0, _1, _2, self.with_label, self.label_type, self.output_format, \ self.header, self.sid_name, self.label_name, self.with_match_id, self.anonymous_header = \ load_data_transform_model( "SparseFeatureTransformer", model_meta, model_param) # ============================================================================= # SparseTagTransformer: mainly for tag data # ============================================================================= class SparseTagTransformer(object): def __init__(self, data_transform_param): self.delimitor = data_transform_param.delimitor self.data_type = data_transform_param.data_type self.tag_with_value = data_transform_param.tag_with_value self.tag_value_delimitor = data_transform_param.tag_value_delimitor self.with_label = data_transform_param.with_label self.label_type = data_transform_param.label_type if self.with_label else None self.output_format = data_transform_param.output_format self.header = None self.sid_name = "sid" self.label_name = data_transform_param.label_name.lower() if data_transform_param.label_name else None self.missing_fill = data_transform_param.missing_fill self.missing_fill_method = data_transform_param.missing_fill_method self.default_value = data_transform_param.default_value self.with_match_id = data_transform_param.with_match_id self.match_id_index = data_transform_param.match_id_index self.match_id_name = "match_id" if self.with_match_id else None self.missing_impute_rate = None self.missing_impute = None self.anonymous_generator = None self.anonymous_header = None def _update_param(self, schema): meta = schema["meta"] self.delimitor = meta.get("delimiter", ",") self.data_type = meta.get("data_type") self.tag_with_value = meta.get("tag_with_value") self.tag_value_delimitor = meta.get("tag_value_delimiter", ":") self.with_label = meta.get("with_label", False) if self.with_label: self.label_type = meta.get("label_type", "int") self.label_name = meta.get("label_name") self.with_match_id = meta.get("with_match_id", False) if self.with_match_id: match_id_name = schema.get("match_id_name") if isinstance(match_id_name, list): if not isinstance(self.match_id_index, int) or self.match_id_index >= len(match_id_name): raise ValueError(f"match id index should between 0 and {len(match_id_name) - 1}, " f"but {self.match_id_index} is given") self.match_id_name = match_id_name[self.match_id_index] else: if self.match_id_index != 0: raise ValueError("Only one match_id exist, match_id_index should be 0") self.match_id_name = match_id_name schema["match_id_name"] = self.match_id_name def read_data(self, input_data, mode="fit"): LOGGER.info("start to read sparse data and change data to instance") abnormal_detection.empty_table_detection(input_data) schema = copy.deepcopy(input_data.schema) if not schema.get("meta", {}): LOGGER.warning("Data meta is supported to be set with data uploading or binding, " "please refer to data transform using guides.") meta = dict(input_format="tag", delimiter=self.delimitor, with_label=self.with_label, with_match_id=self.with_match_id, tag_with_value=self.tag_with_value, tag_value_delimiter=self.tag_value_delimitor, data_type=self.data_type) schema["meta"] = meta generated_header = DataFormatPreProcess.generate_header(input_data, schema) schema.update(generated_header) schema = self.anonymous_generator.generate_anonymous_header(schema) set_schema(input_data, schema) else: self._update_param(schema) if mode == "fit": self.header = schema["header"] self.anonymous_header = schema["anonymous_header"] data_instance = self.fit(input_data) else: if not self.anonymous_header: header_set = set(self.header) self.anonymous_header = [] for column, anonymous_column in zip(schema["header"], schema["anonymous_header"]): if column not in header_set: continue self.anonymous_header.append(anonymous_column) schema["header"] = self.header schema["anonymous_header"] = self.anonymous_header set_schema(input_data, schema) data_instance = self.transform(input_data) set_schema(data_instance, schema) return data_instance @staticmethod def change_tag_to_str(value, tags_dict=None, delimitor=",", feature_offset=0, tag_value_delimitor=":"): vals = value.split(delimitor, -1) ret = [''] * len(tags_dict) vals = vals[feature_offset:] for i in range(len(vals)): tag, value = vals[i].split(tag_value_delimitor, -1) idx = tags_dict.get(tag, None) if idx is not None: ret[idx] = value return ret @staticmethod def change_str_to_tag(value, tags_dict=None, delimitor=",", tag_value_delimitor=":"): ret = [None] * len(tags_dict) tags = sorted(list(tags_dict.keys())) for i in range(len(value)): tag, val = tags[i], value[i] ret[i] = tag_value_delimitor.join([tag, val]) return delimitor.join(ret) def fill_missing_value(self, input_data, tags_dict, schema, mode="fit"): feature_offset = DataFormatPreProcess.get_feature_offset(schema) str_trans_method = functools.partial(self.change_tag_to_str, tags_dict=tags_dict, delimitor=self.delimitor, feature_offset=feature_offset, tag_value_delimitor=self.tag_value_delimitor) input_data = input_data.mapValues(str_trans_method) set_schema(input_data, schema) from federatedml.feature.imputer import Imputer imputer_processor = Imputer() if mode == "fit": data, self.default_value = imputer_processor.fit(input_data, replace_method=self.missing_fill_method, replace_value=self.default_value) LOGGER.debug("self.default_value is {}".format(self.default_value)) else: data = imputer_processor.transform(input_data, transform_value=self.default_value) if self.missing_impute is None: self.missing_impute = imputer_processor.get_missing_value_list() LOGGER.debug("self.missing_impute is {}".format(self.missing_impute)) self.missing_impute_rate = imputer_processor.get_impute_rate(mode) str_trans_tag_method = functools.partial(self.change_str_to_tag, tags_dict=tags_dict, delimitor=self.delimitor, tag_value_delimitor=self.tag_value_delimitor) data = data.mapValues(str_trans_tag_method) return data def fit(self, input_data): schema = input_data.schema tags_dict = dict(zip(schema["header"], range(len(schema["header"])))) if self.tag_with_value and self.missing_fill: input_data = self.fill_missing_value(input_data, tags_dict, schema, mode="fit") data_instance = self.gen_data_instance(input_data, schema["meta"], tags_dict) return data_instance def transform(self, input_data): schema = input_data.schema tags_dict = dict(zip(self.header, range(len(self.header)))) if self.tag_with_value and self.missing_fill: input_data = self.fill_missing_value(input_data, tags_dict, schema, mode="transform") data_instance = self.gen_data_instance(input_data, schema["meta"], tags_dict) return data_instance def gen_data_instance(self, input_data, meta, tags_dict): params = [self.delimitor, self.data_type, self.tag_with_value, self.tag_value_delimitor, self.with_label, self.with_match_id, self.match_id_index, meta.get("id_range", 0), self.label_type, self.output_format, tags_dict] to_instance_with_param = functools.partial(self.to_instance, params) data_instance = input_data.mapValues(to_instance_with_param) return data_instance def get_summary(self): if not self.missing_fill: return {} missing_summary = dict() missing_summary["missing_value"] = list(self.missing_impute) missing_summary["missing_impute_value"] = dict(zip(self.header, self.default_value)) missing_summary["missing_impute_rate"] = dict(zip(self.header, self.missing_impute_rate)) summary_buf = {"missing_fill_info": missing_summary} return summary_buf @staticmethod def to_instance(param_list, value): delimitor = param_list[0] data_type = param_list[1] tag_with_value = param_list[2] tag_value_delimitor = param_list[3] with_label = param_list[4] with_match_id = param_list[5] match_id_index = param_list[6] id_range = param_list[7] label_type = param_list[8] output_format = param_list[9] tags_dict = param_list[10] if output_format not in ["dense", "sparse"]: raise ValueError("output format {} is not define".format(output_format)) cols = value.split(delimitor, -1) offset = 0 label = None match_id = None if with_match_id: offset = id_range if id_range else 1 if offset == 0: offset = 1 match_id = cols[match_id_index] if with_label: label = cols[offset] offset += 1 if label_type == 'int': label = int(label) elif label_type in ["float", "float64"]: label = float(label) if output_format == "dense": features = [0 for i in range(len(tags_dict))] for fea in cols[offset:]: if tag_with_value: _tag, _val = fea.split(tag_value_delimitor, -1) if _tag in tags_dict: features[tags_dict.get(_tag)] = _val else: if fea in tags_dict: features[tags_dict.get(fea)] = 1 features = np.asarray(features, dtype=data_type) else: indices = [] data = [] for fea in cols[offset:]: if tag_with_value: _tag, _val = fea.split(tag_value_delimitor, -1) else: _tag = fea _val = 1 if _tag not in tags_dict: continue indices.append(tags_dict.get(_tag)) if data_type in ["float", "float64"]: _val = float(_val) elif data_type in ["int", "int64", "long"]: _val = int(_val) elif data_type == "str": _val = str(_val) data.append(_val) features = SparseVector(indices, data, len(tags_dict)) return Instance(inst_id=match_id, features=features, label=label) def save_model(self): transform_meta, transform_param = save_data_transform_model(input_format="tag", delimitor=self.delimitor, data_type=self.data_type, tag_with_value=self.tag_with_value, tag_value_delimitor=self.tag_value_delimitor, with_label=self.with_label, label_type=self.label_type, with_match_id=self.with_match_id, output_format=self.output_format, header=self.header, sid_name=self.sid_name, label_name=self.label_name, model_name="Transformer", anonymous_header=self.anonymous_header) missing_imputer_meta, missing_imputer_param = save_missing_imputer_model(self.missing_fill, self.missing_fill_method, self.missing_impute, self.default_value, self.missing_impute_rate, self.header, "Imputer") transform_meta.imputer_meta.CopyFrom(missing_imputer_meta) transform_param.imputer_param.CopyFrom(missing_imputer_param) outlier_meta, outlier_param = save_outlier_model(outlier_replace=False, model_name="Outlier") transform_meta.outlier_meta.CopyFrom(outlier_meta) transform_param.outlier_param.CopyFrom(outlier_param) return {"DataTransformMeta": transform_meta, "DataTransformParam": transform_param } def load_model(self, model_meta, model_param): self.delimitor, self.data_type, _0, self.tag_with_value, self.tag_value_delimitor, self.with_label, \ self.label_type, self.output_format, self.header, self.sid_name, self.label_name, self.with_match_id, \ self.anonymous_header = load_data_transform_model( "SparseTagTransformer", model_meta, model_param) self.missing_fill, self.missing_fill_method, \ self.missing_impute, self.default_value = load_missing_imputer_model(self.header, "Imputer", model_meta.imputer_meta, model_param.imputer_param) class DataTransform(ModelBase): def __init__(self): super(DataTransform, self).__init__() self.transformer = None from federatedml.param.data_transform_param import DataTransformParam self.model_param = DataTransformParam() self._input_model_meta = None self._input_model_param = None def _load_reader(self, schema=None): if schema is None or not schema.get("meta", {}): input_format = self.model_param.input_format else: input_format = schema["meta"].get("input_format") if input_format == "dense": self.transformer = DenseFeatureTransformer(self.model_param) elif input_format == "sparse" or input_format == "svmlight": self.transformer = SparseFeatureTransformer(self.model_param) elif input_format == "tag": self.transformer = SparseTagTransformer(self.model_param) else: raise ValueError("Cannot recognize input format") if self._input_model_meta: self.transformer.load_model(self._input_model_meta, self._input_model_param) self._input_model_meta, self._input_model_param = None, None self.transformer.anonymous_generator = Anonymous(self.role, self.component_properties.local_partyid) def _init_model(self, model_param): self.model_param = model_param def load_model(self, model_dict): for _, value in model_dict["model"].items(): for model in value: if model.endswith("Meta"): self._input_model_meta = value[model] if model.endswith("Param"): self._input_model_param = value[model] def fit(self, data): self._load_reader(data.schema) data_inst = self.transformer.read_data(data, "fit") if isinstance(self.transformer, (DenseFeatureTransformer, SparseTagTransformer)): summary_buf = self.transformer.get_summary() if summary_buf: self.set_summary(summary_buf) clear_schema(data_inst) return data_inst def transform(self, data): self._load_reader(data.schema) data_inst = self.transformer.read_data(data, "transform") clear_schema(data_inst) return data_inst def export_model(self): if not self.need_run: model_meta = DataTransformMeta() model_meta.need_run = False model_param = DataTransformParam() model_dict = dict(DataTransformMeta=model_param, DataTransformParam=model_param) else: model_dict = self.transformer.save_model() return model_dict def clear_schema(data_inst): ret_schema = copy.deepcopy(data_inst.schema) key_words = {"sid", "header", "anonymous_header", "label_name", "anonymous_label", "match_id_name"} for key in data_inst.schema: if key not in key_words: del ret_schema[key] data_inst.schema = ret_schema def set_schema(data_instance, schema): data_instance.schema = schema def save_data_transform_model(input_format="dense", delimitor=",", data_type="str", exclusive_data_type=None, tag_with_value=False, tag_value_delimitor=":", with_label=False, label_name='', label_type="int", output_format="dense", header=None, sid_name=None, with_match_id=False, model_name="DataTransform", anonymous_header=None): model_meta = DataTransformMeta() model_param = DataTransformParam() model_meta.input_format = input_format model_meta.delimitor = delimitor model_meta.data_type = data_type model_meta.tag_with_value = tag_with_value model_meta.tag_value_delimitor = tag_value_delimitor model_meta.with_label = with_label if with_label: model_meta.label_name = label_name model_meta.label_type = label_type model_meta.output_format = output_format model_meta.with_match_id = with_match_id if header is not None: model_param.header.extend(header) if anonymous_header is not None: model_param.anonymous_header.extend(anonymous_header) if sid_name: model_param.sid_name = sid_name if label_name: model_param.label_name = label_name if exclusive_data_type is not None: model_meta.exclusive_data_type.update(exclusive_data_type) return model_meta, model_param def load_data_transform_model(model_name="DataTransform", model_meta=None, model_param=None): delimitor = model_meta.delimitor data_type = model_meta.data_type tag_with_value = model_meta.tag_with_value tag_value_delimitor = model_meta.tag_value_delimitor with_label = model_meta.with_label label_name = model_meta.label_name if with_label else None label_type = model_meta.label_type if with_label else None try: with_match_id = model_meta.with_match_id except AttributeError: with_match_id = False output_format = model_meta.output_format header = list(model_param.header) or None try: anonymous_header = list(model_param.anonymous_header) except AttributeError: anonymous_header = None sid_name = None if model_param.sid_name: sid_name = model_param.sid_name exclusive_data_type = None if model_meta.exclusive_data_type: exclusive_data_type = {} for col_name in model_meta.exclusive_data_type: exclusive_data_type[col_name] = model_meta.exclusive_data_type.get(col_name) return delimitor, data_type, exclusive_data_type, tag_with_value, tag_value_delimitor, with_label, \ label_type, output_format, header, sid_name, label_name, with_match_id, anonymous_header def save_missing_imputer_model(missing_fill=False, missing_replace_method=None, missing_impute=None, missing_fill_value=None, missing_replace_rate=None, header=None, model_name="Imputer"): model_meta = DataTransformImputerMeta() model_param = DataTransformImputerParam() model_meta.is_imputer = missing_fill if missing_fill: if missing_replace_method: model_meta.strategy = str(missing_replace_method) if missing_impute is not None: model_meta.missing_value.extend(map(str, missing_impute)) if missing_fill_value is not None: feature_value_dict = dict(zip(header, map(str, missing_fill_value))) model_param.missing_replace_value.update(feature_value_dict) if missing_replace_rate is not None: missing_replace_rate_dict = dict(zip(header, missing_replace_rate)) model_param.missing_value_ratio.update(missing_replace_rate_dict) return model_meta, model_param def load_missing_imputer_model(header=None, model_name="Imputer", model_meta=None, model_param=None): missing_fill = model_meta.is_imputer missing_replace_method = model_meta.strategy missing_value = model_meta.missing_value missing_fill_value = model_param.missing_replace_value if missing_fill: if not missing_replace_method: missing_replace_method = None if not missing_value: missing_value = None else: missing_value = list(missing_value) if missing_fill_value: missing_fill_value = [missing_fill_value.get(head) for head in header] else: missing_fill_value = None else: missing_replace_method = None missing_value = None missing_fill_value = None return missing_fill, missing_replace_method, missing_value, missing_fill_value def save_outlier_model(outlier_replace=False, outlier_replace_method=None, outlier_impute=None, outlier_replace_value=None, outlier_replace_rate=None, header=None, model_name="Outlier"): model_meta = DataTransformOutlierMeta() model_param = DataTransformOutlierParam() model_meta.is_outlier = outlier_replace if outlier_replace: if outlier_replace_method: model_meta.strategy = str(outlier_replace_method) if outlier_impute: model_meta.outlier_value.extend(map(str, outlier_impute)) if outlier_replace_value: outlier_value_dict = dict(zip(header, map(str, outlier_replace_value))) model_param.outlier_replace_value.update(outlier_value_dict) if outlier_replace_rate: outlier_value_ratio_dict = dict(zip(header, outlier_replace_rate)) model_param.outlier_value_ratio.update(outlier_value_ratio_dict) return model_meta, model_param def load_outlier_model(header=None, model_name="Outlier", model_meta=None, model_param=None): outlier_replace = model_meta.is_outlier outlier_replace_method = model_meta.strategy outlier_value = model_meta.outlier_value outlier_replace_value = model_param.outlier_replace_value if outlier_replace: if not outlier_replace_method: outlier_replace_method = None if not outlier_value: outlier_value = None else: outlier_value = list(outlier_value) if outlier_replace_value: outlier_replace_value = [outlier_replace_value.get(head) for head in header] else: outlier_replace_value = None else: outlier_replace_method = None outlier_value = None outlier_replace_value = None return outlier_replace, outlier_replace_method, outlier_value, outlier_replace_value