123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import functools
- import numpy as np
- import scipy.sparse as sp
- from federatedml.feature.sparse_vector import SparseVector
- from federatedml.statistic import data_overview
- from federatedml.util import LOGGER
- from federatedml.util import consts
- from federatedml.util import fate_operator
- from federatedml.util.fixpoint_solver import FixedPointEncoder
- class HeteroGradientBase(object):
- def __init__(self):
- self.use_async = False
- self.use_sample_weight = False
- self.fixed_point_encoder = None
- def compute_gradient_procedure(self, *args):
- raise NotImplementedError("Should not call here")
- def set_total_batch_nums(self, total_batch_nums):
- """
- Use for sqn gradient.
- """
- pass
- def set_use_async(self):
- self.use_async = True
- def set_use_sync(self):
- self.use_async = False
- def set_use_sample_weight(self):
- self.use_sample_weight = True
- def set_fixed_float_precision(self, floating_point_precision):
- if floating_point_precision is not None:
- self.fixed_point_encoder = FixedPointEncoder(2**floating_point_precision)
- @staticmethod
- def __apply_cal_gradient(data, fixed_point_encoder, is_sparse):
- all_g = None
- for key, (feature, d) in data:
- if is_sparse:
- x = np.zeros(feature.get_shape())
- for idx, v in feature.get_all_data():
- x[idx] = v
- feature = x
- if fixed_point_encoder:
- # g = (feature * 2 ** floating_point_precision).astype("int") * d
- g = fixed_point_encoder.encode(feature) * d
- else:
- g = feature * d
- if all_g is None:
- all_g = g
- else:
- all_g += g
- if all_g is None:
- return all_g
- elif fixed_point_encoder:
- all_g = fixed_point_encoder.decode(all_g)
- return all_g
- def compute_gradient(self, data_instances, fore_gradient, fit_intercept, need_average=True):
- """
- Compute hetero-regression gradient
- Parameters
- ----------
- data_instances: Table, input data
- fore_gradient: Table, fore_gradient
- fit_intercept: bool, if model has intercept or not
- need_average: bool, gradient needs to be averaged or not
- Returns
- ----------
- Table
- the hetero regression model's gradient
- """
- # feature_num = data_overview.get_features_shape(data_instances)
- # data_count = data_instances.count()
- is_sparse = data_overview.is_sparse_data(data_instances)
- LOGGER.debug("Use apply partitions")
- feat_join_grad = data_instances.join(fore_gradient,
- lambda d, g: (d.features, g))
- f = functools.partial(self.__apply_cal_gradient,
- fixed_point_encoder=self.fixed_point_encoder,
- is_sparse=is_sparse)
- gradient_sum = feat_join_grad.applyPartitions(f)
- gradient_sum = gradient_sum.reduce(lambda x, y: x + y)
- if fit_intercept:
- # bias_grad = np.sum(fore_gradient)
- bias_grad = fore_gradient.reduce(lambda x, y: x + y)
- gradient_sum = np.append(gradient_sum, bias_grad)
- if need_average:
- gradient = gradient_sum / data_instances.count()
- else:
- gradient = gradient_sum
- """
- else:
- LOGGER.debug(f"Original_method")
- feat_join_grad = data_instances.join(fore_gradient,
- lambda d, g: (d.features, g))
- f = functools.partial(self.__compute_partition_gradient,
- fit_intercept=fit_intercept,
- is_sparse=is_sparse)
- gradient_partition = feat_join_grad.applyPartitions(f)
- gradient_partition = gradient_partition.reduce(lambda x, y: x + y)
- gradient = gradient_partition / data_count
- """
- return gradient
- class Guest(HeteroGradientBase):
- def __init__(self):
- super().__init__()
- self.half_d = None
- self.host_forwards = None
- self.forwards = None
- self.aggregated_forwards = None
- def _register_gradient_sync(self, host_forward_transfer, fore_gradient_transfer,
- guest_gradient_transfer, guest_optim_gradient_transfer):
- self.host_forward_transfer = host_forward_transfer
- self.fore_gradient_transfer = fore_gradient_transfer
- self.unilateral_gradient_transfer = guest_gradient_transfer
- self.unilateral_optim_gradient_transfer = guest_optim_gradient_transfer
- def compute_and_aggregate_forwards(self, data_instances, model_weights,
- cipher, batch_index, current_suffix, offset=None):
- raise NotImplementedError("Function should not be called here")
- def compute_half_d(self, data_instances, w, cipher, batch_index, current_suffix):
- raise NotImplementedError("Function should not be called here")
- def _asynchronous_compute_gradient(self, data_instances, model_weights, cipher, current_suffix):
- LOGGER.debug("Called asynchronous gradient")
- encrypted_half_d = cipher.distribute_encrypt(self.half_d)
- self.remote_fore_gradient(encrypted_half_d, suffix=current_suffix)
- half_g = self.compute_gradient(data_instances, self.half_d, False)
- self.host_forwards = self.get_host_forward(suffix=current_suffix)
- host_forward = self.host_forwards[0]
- host_half_g = self.compute_gradient(data_instances, host_forward, False)
- unilateral_gradient = half_g + host_half_g
- if model_weights.fit_intercept:
- n = data_instances.count()
- intercept = (host_forward.reduce(lambda x, y: x + y) + self.half_d.reduce(lambda x, y: x + y)) / n
- unilateral_gradient = np.append(unilateral_gradient, intercept)
- return unilateral_gradient
- def _centralized_compute_gradient(self, data_instances, model_weights, cipher, current_suffix, masked_index=None):
- self.host_forwards = self.get_host_forward(suffix=current_suffix)
- fore_gradient = self.half_d
- batch_size = data_instances.count()
- partial_masked_index_enc = None
- if masked_index:
- masked_index = masked_index.mapValues(lambda value: 0)
- masked_index_to_encrypt = masked_index.subtractByKey(self.half_d)
- partial_masked_index_enc = cipher.distribute_encrypt(masked_index_to_encrypt)
- for host_forward in self.host_forwards:
- if self.use_sample_weight:
- # host_forward = host_forward.join(data_instances, lambda h, v: h * v.weight)
- host_forward = data_instances.join(host_forward, lambda v, h: h * v.weight)
- fore_gradient = fore_gradient.join(host_forward, lambda x, y: x + y)
- def _apply_obfuscate(val):
- val.apply_obfuscator()
- return val
- fore_gradient = fore_gradient.mapValues(lambda val: _apply_obfuscate(val) / batch_size)
- if partial_masked_index_enc:
- masked_fore_gradient = partial_masked_index_enc.union(fore_gradient)
- self.remote_fore_gradient(masked_fore_gradient, suffix=current_suffix)
- else:
- self.remote_fore_gradient(fore_gradient, suffix=current_suffix)
- # self.remote_fore_gradient(fore_gradient, suffix=current_suffix)
- unilateral_gradient = self.compute_gradient(data_instances, fore_gradient,
- model_weights.fit_intercept, need_average=False)
- return unilateral_gradient
- def compute_gradient_procedure(self, data_instances, cipher, model_weights, optimizer,
- n_iter_, batch_index, offset=None, masked_index=None):
- """
- Linear model gradient procedure
- Step 1: get host forwards which differ from different algorithm
- For Logistic Regression and Linear Regression: forwards = wx
- For Poisson Regression, forwards = exp(wx)
- Step 2: Compute self forwards and aggregate host forwards and get d = fore_gradient
- Step 3: Compute unilateral gradient = ∑d*x,
- Step 4: Send unilateral gradients to arbiter and received the optimized and decrypted gradient.
- """
- current_suffix = (n_iter_, batch_index)
- # self.host_forwards = self.get_host_forward(suffix=current_suffix)
- # Compute Guest's partial d
- self.compute_half_d(data_instances, model_weights, cipher,
- batch_index, current_suffix)
- if self.use_async:
- unilateral_gradient = self._asynchronous_compute_gradient(data_instances, model_weights,
- cipher=cipher,
- current_suffix=current_suffix)
- else:
- unilateral_gradient = self._centralized_compute_gradient(data_instances, model_weights,
- cipher=cipher,
- current_suffix=current_suffix,
- masked_index=masked_index)
- if optimizer is not None:
- unilateral_gradient = optimizer.add_regular_to_grad(unilateral_gradient, model_weights)
- optimized_gradient = self.update_gradient(unilateral_gradient, suffix=current_suffix)
- # LOGGER.debug(f"Before return, optimized_gradient: {optimized_gradient}")
- return optimized_gradient
- def get_host_forward(self, suffix=tuple()):
- host_forward = self.host_forward_transfer.get(idx=-1, suffix=suffix)
- return host_forward
- def remote_fore_gradient(self, fore_gradient, suffix=tuple()):
- self.fore_gradient_transfer.remote(obj=fore_gradient, role=consts.HOST, idx=-1, suffix=suffix)
- def update_gradient(self, unilateral_gradient, suffix=tuple()):
- self.unilateral_gradient_transfer.remote(unilateral_gradient, role=consts.ARBITER, idx=0, suffix=suffix)
- optimized_gradient = self.unilateral_optim_gradient_transfer.get(idx=0, suffix=suffix)
- return optimized_gradient
- class Host(HeteroGradientBase):
- def __init__(self):
- super().__init__()
- self.forwards = None
- self.fore_gradient = None
- def _register_gradient_sync(self, host_forward_transfer, fore_gradient_transfer,
- host_gradient_transfer, host_optim_gradient_transfer):
- self.host_forward_transfer = host_forward_transfer
- self.fore_gradient_transfer = fore_gradient_transfer
- self.unilateral_gradient_transfer = host_gradient_transfer
- self.unilateral_optim_gradient_transfer = host_optim_gradient_transfer
- def compute_forwards(self, data_instances, model_weights):
- raise NotImplementedError("Function should not be called here")
- def compute_unilateral_gradient(self, data_instances, fore_gradient, model_weights, optimizer):
- raise NotImplementedError("Function should not be called here")
- def _asynchronous_compute_gradient(self, data_instances, cipher, current_suffix):
- encrypted_forward = cipher.distribute_encrypt(self.forwards)
- self.remote_host_forward(encrypted_forward, suffix=current_suffix)
- half_g = self.compute_gradient(data_instances, self.forwards, False)
- guest_half_d = self.get_fore_gradient(suffix=current_suffix)
- guest_half_g = self.compute_gradient(data_instances, guest_half_d, False)
- unilateral_gradient = half_g + guest_half_g
- return unilateral_gradient
- def _centralized_compute_gradient(self, data_instances, cipher, current_suffix):
- encrypted_forward = cipher.distribute_encrypt(self.forwards)
- self.remote_host_forward(encrypted_forward, suffix=current_suffix)
- fore_gradient = self.fore_gradient_transfer.get(idx=0, suffix=current_suffix)
- # Host case, never fit-intercept
- unilateral_gradient = self.compute_gradient(data_instances, fore_gradient, False, need_average=False)
- return unilateral_gradient
- def compute_gradient_procedure(self, data_instances, cipher, model_weights,
- optimizer,
- n_iter_, batch_index):
- """
- Linear model gradient procedure
- Step 1: get host forwards which differ from different algorithm
- For Logistic Regression: forwards = wx
- """
- current_suffix = (n_iter_, batch_index)
- self.forwards = self.compute_forwards(data_instances, model_weights)
- if self.use_async:
- unilateral_gradient = self._asynchronous_compute_gradient(data_instances,
- cipher,
- current_suffix)
- else:
- unilateral_gradient = self._centralized_compute_gradient(data_instances,
- cipher,
- current_suffix)
- if optimizer is not None:
- unilateral_gradient = optimizer.add_regular_to_grad(unilateral_gradient, model_weights)
- optimized_gradient = self.update_gradient(unilateral_gradient, suffix=current_suffix)
- LOGGER.debug(f"Before return compute_gradient_procedure")
- return optimized_gradient
- def compute_sqn_forwards(self, data_instances, delta_s, cipher):
- """
- To compute Hessian matrix, y, s are needed.
- g = (1/N)*∑(0.25 * wx - 0.5 * y) * x
- y = ∇2^F(w_t)s_t = g' * s = (1/N)*∑(0.25 * x * s) * x
- define forward_hess = ∑(0.25 * x * s)
- """
- sqn_forwards = data_instances.mapValues(
- lambda v: cipher.encrypt(fate_operator.vec_dot(v.features, delta_s.coef_) + delta_s.intercept_))
- # forward_sum = sqn_forwards.reduce(reduce_add)
- return sqn_forwards
- def compute_forward_hess(self, data_instances, delta_s, forward_hess):
- """
- To compute Hessian matrix, y, s are needed.
- g = (1/N)*∑(0.25 * wx - 0.5 * y) * x
- y = ∇2^F(w_t)s_t = g' * s = (1/N)*∑(0.25 * x * s) * x
- define forward_hess = (0.25 * x * s)
- """
- hess_vector = self.compute_gradient(data_instances,
- forward_hess,
- delta_s.fit_intercept)
- return np.array(hess_vector)
- def remote_host_forward(self, host_forward, suffix=tuple()):
- self.host_forward_transfer.remote(obj=host_forward, role=consts.GUEST, idx=0, suffix=suffix)
- def get_fore_gradient(self, suffix=tuple()):
- host_forward = self.fore_gradient_transfer.get(idx=0, suffix=suffix)
- return host_forward
- def update_gradient(self, unilateral_gradient, suffix=tuple()):
- self.unilateral_gradient_transfer.remote(unilateral_gradient, role=consts.ARBITER, idx=0, suffix=suffix)
- optimized_gradient = self.unilateral_optim_gradient_transfer.get(idx=0, suffix=suffix)
- return optimized_gradient
- class Arbiter(HeteroGradientBase):
- def __init__(self):
- super().__init__()
- self.has_multiple_hosts = False
- def _register_gradient_sync(self, guest_gradient_transfer, host_gradient_transfer,
- guest_optim_gradient_transfer, host_optim_gradient_transfer):
- self.guest_gradient_transfer = guest_gradient_transfer
- self.host_gradient_transfer = host_gradient_transfer
- self.guest_optim_gradient_transfer = guest_optim_gradient_transfer
- self.host_optim_gradient_transfer = host_optim_gradient_transfer
- def compute_gradient_procedure(self, cipher, optimizer, n_iter_, batch_index):
- """
- Compute gradients.
- Received local_gradients from guest and hosts. Merge and optimize, then separate and remote back.
- Parameters
- ----------
- cipher: Use for encryption
- optimizer: optimizer that get delta gradient of this iter
- n_iter_: int, current iter nums
- batch_index: int
- """
- current_suffix = (n_iter_, batch_index)
- host_gradients, guest_gradient = self.get_local_gradient(current_suffix)
- if len(host_gradients) > 1:
- self.has_multiple_hosts = True
- host_gradients = [np.array(h) for h in host_gradients]
- guest_gradient = np.array(guest_gradient)
- size_list = [h_g.shape[0] for h_g in host_gradients]
- size_list.append(guest_gradient.shape[0])
- gradient = np.hstack((h for h in host_gradients))
- gradient = np.hstack((gradient, guest_gradient))
- grad = np.array(cipher.decrypt_list(gradient))
- # LOGGER.debug("In arbiter compute_gradient_procedure, before apply grad: {}, size_list: {}".format(
- # grad, size_list
- # ))
- delta_grad = optimizer.apply_gradients(grad)
- # LOGGER.debug("In arbiter compute_gradient_procedure, delta_grad: {}".format(
- # delta_grad
- # ))
- separate_optim_gradient = self.separate(delta_grad, size_list)
- # LOGGER.debug("In arbiter compute_gradient_procedure, separated gradient: {}".format(
- # separate_optim_gradient
- # ))
- host_optim_gradients = separate_optim_gradient[: -1]
- guest_optim_gradient = separate_optim_gradient[-1]
- self.remote_local_gradient(host_optim_gradients, guest_optim_gradient, current_suffix)
- return delta_grad
- @staticmethod
- def separate(value, size_list):
- """
- Separate value in order to several set according size_list
- Parameters
- ----------
- value: list or ndarray, input data
- size_list: list, each set size
- Returns
- ----------
- list
- set after separate
- """
- separate_res = []
- cur = 0
- for size in size_list:
- separate_res.append(value[cur:cur + size])
- cur += size
- return separate_res
- def get_local_gradient(self, suffix=tuple()):
- host_gradients = self.host_gradient_transfer.get(idx=-1, suffix=suffix)
- LOGGER.info("Get host_gradient from Host")
- guest_gradient = self.guest_gradient_transfer.get(idx=0, suffix=suffix)
- LOGGER.info("Get guest_gradient from Guest")
- return host_gradients, guest_gradient
- def remote_local_gradient(self, host_optim_gradients, guest_optim_gradient, suffix=tuple()):
- for idx, host_optim_gradient in enumerate(host_optim_gradients):
- self.host_optim_gradient_transfer.remote(host_optim_gradient,
- role=consts.HOST,
- idx=idx,
- suffix=suffix)
- self.guest_optim_gradient_transfer.remote(guest_optim_gradient,
- role=consts.GUEST,
- idx=0,
- suffix=suffix)
|