123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- import torch
- import numpy as np
- from federatedml.util import consts
- from federatedml.secureprotol.paillier_tensor import PaillierTensor
- class NumpyDenseLayer(object):
- """
- NumpyDenseLayer is designed for Pailler Tensor compute
- """
- def __init__(self):
- self.input = None
- self.model_weight = None
- self.model_shape = None
- self.bias = None
- self.lr = 1.0
- self.role = None
- self.is_empty_model = False
- self.activation_input = None
- self.input_cached = np.array([])
- self.activation_cached = np.array([])
- self.do_backward_selective_strategy = False
- self.batch_size = None
- def set_backward_selective_strategy(self):
- self.do_backward_selective_strategy = True
- def set_batch(self, batch_size):
- self.batch_size = batch_size
- def build(self, torch_linear: torch.nn.Linear):
- if torch_linear is None:
- if self.role == "host":
- raise ValueError("host input is empty!")
- self.is_empty_model = True
- return
- assert isinstance(
- torch_linear, torch.nn.Linear), 'must use a torch Linear to build this class, but got {}' .format(torch_linear)
- self.model_weight = torch_linear.weight.cpu().detach().numpy().transpose()
- if torch_linear.bias is not None:
- self.bias = torch_linear.bias.cpu().detach().numpy()
- def export_model(self):
- if self.is_empty_model:
- return "".encode()
- layer_weights = [self.model_weight]
- return layer_weights
- def get_selective_activation_input(self):
- self.activation_input = self.activation_cached[: self.batch_size]
- self.activation_cached = self.activation_cached[self.batch_size:]
- return self.activation_input
- def get_weight(self):
- return self.model_weight.transpose()
- def get_bias(self):
- return self.bias
- def set_learning_rate(self, lr):
- self.lr = lr
- def forward(self, x, **kwargs):
- pass
- def get_weight_gradient(self, delta):
- pass
- def restore_model(self, model_bytes):
- pass
- def update_weight(self, delta):
- pass
- def update_bias(self, delta):
- pass
- @property
- def empty(self):
- return self.is_empty_model
- @property
- def output_shape(self):
- return self.model_weight.shape[1:]
- def __repr__(self):
- return 'model weights: {}, model bias {}'.format(
- self.model_weight, self.bias)
- def __call__(self, *args, **kwargs):
- return self.forward(*args, **kwargs)
- class NumpyDenseLayerGuest(NumpyDenseLayer):
- def __init__(self):
- super(NumpyDenseLayerGuest, self).__init__()
- self.role = consts.GUEST
- def forward(self, x):
- if self.empty:
- return None
- self.input = x
- output = np.matmul(x, self.model_weight)
- if self.bias is not None:
- output += self.bias
- return output
- def select_backward_sample(self, selective_ids):
- if self.input_cached.shape[0] == 0:
- self.input_cached = self.input[selective_ids]
- else:
- self.input_cached = np.vstack(
- (self.input_cached, self.input[selective_ids])
- )
- def get_input_gradient(self, delta):
- if self.empty:
- return None
- error = np.matmul(delta, self.model_weight.T)
- return error
- def get_weight_gradient(self, delta):
- if self.empty:
- return None
- if self.do_backward_selective_strategy:
- self.input = self.input_cached[: self.batch_size]
- self.input_cached = self.input_cached[self.batch_size:]
- delta_w = np.matmul(delta.T, self.input)
- return delta_w
- def update_weight(self, delta):
- if self.empty:
- return None
- self.model_weight -= self.lr * delta.T
- def update_bias(self, delta):
- if self.bias is not None:
- self.bias -= np.sum(delta, axis=0) * self.lr
- class NumpyDenseLayerHost(NumpyDenseLayer):
- """
- This dense layer can directly compute pallier-tensor forward
- """
- def __init__(self):
- super(NumpyDenseLayerHost, self).__init__()
- self.role = consts.HOST
- def select_backward_sample(self, selective_ids):
- cached_shape = self.input_cached.shape[0]
- offsets = [i + cached_shape for i in range(len(selective_ids))]
- id_map = dict(zip(selective_ids, offsets))
- if cached_shape == 0:
- self.input_cached = (
- self.input.get_obj()
- .filter(lambda k, v: k in id_map)
- .map(lambda k, v: (id_map[k], v))
- )
- self.input_cached = PaillierTensor(self.input_cached)
- self.activation_cached = self.activation_input[selective_ids]
- else:
- selective_input = (
- self.input.get_obj()
- .filter(lambda k, v: k in id_map)
- .map(lambda k, v: (id_map[k], v))
- )
- self.input_cached = PaillierTensor(
- self.input_cached.get_obj().union(selective_input)
- )
- self.activation_cached = np.vstack(
- (self.activation_cached, self.activation_input[selective_ids])
- )
- def forward(self, x, encoder=None):
- self.input = x
- if encoder is not None:
- output = x * encoder.encode(self.model_weight)
- else:
- output = x * self.model_weight
- if self.bias is not None:
- if encoder is not None:
- output += encoder.encode(self.bias)
- else:
- output += self.bias
- return output
- def get_input_gradient(self, delta, acc_noise, encoder=None):
- if not encoder:
- error = delta * self.model_weight.T + delta * acc_noise.T
- else:
- error = delta.encode(encoder) * (self.model_weight + acc_noise).T
- return error
- def get_weight_gradient(self, delta, encoder=None):
- if self.do_backward_selective_strategy:
- batch_size = self.batch_size
- self.input = PaillierTensor(
- self.input_cached.get_obj().filter(lambda k, v: k < batch_size)
- )
- self.input_cached = PaillierTensor(
- self.input_cached.get_obj()
- .filter(lambda k, v: k >= batch_size)
- .map(lambda k, v: (k - batch_size, v))
- )
- if encoder:
- delta_w = self.input.fast_matmul_2d(encoder.encode(delta))
- else:
- delta_w = self.input.fast_matmul_2d(delta)
- return delta_w
- def update_weight(self, delta):
- self.model_weight -= delta * self.lr
- def update_bias(self, delta):
- if self.bias is not None:
- self.bias -= np.sum(delta, axis=0) * self.lr
|