hetero_linear_model_gradient.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #
  4. # Copyright 2019 The FATE Authors. All Rights Reserved.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. import functools
  18. import numpy as np
  19. import scipy.sparse as sp
  20. from federatedml.feature.sparse_vector import SparseVector
  21. from federatedml.statistic import data_overview
  22. from federatedml.util import LOGGER
  23. from federatedml.util import consts
  24. from federatedml.util import fate_operator
  25. from federatedml.util.fixpoint_solver import FixedPointEncoder
  26. class HeteroGradientBase(object):
  27. def __init__(self):
  28. self.use_async = False
  29. self.use_sample_weight = False
  30. self.fixed_point_encoder = None
  31. def compute_gradient_procedure(self, *args):
  32. raise NotImplementedError("Should not call here")
  33. def set_total_batch_nums(self, total_batch_nums):
  34. """
  35. Use for sqn gradient.
  36. """
  37. pass
  38. def set_use_async(self):
  39. self.use_async = True
  40. def set_use_sync(self):
  41. self.use_async = False
  42. def set_use_sample_weight(self):
  43. self.use_sample_weight = True
  44. def set_fixed_float_precision(self, floating_point_precision):
  45. if floating_point_precision is not None:
  46. self.fixed_point_encoder = FixedPointEncoder(2**floating_point_precision)
  47. @staticmethod
  48. def __apply_cal_gradient(data, fixed_point_encoder, is_sparse):
  49. all_g = None
  50. for key, (feature, d) in data:
  51. if is_sparse:
  52. x = np.zeros(feature.get_shape())
  53. for idx, v in feature.get_all_data():
  54. x[idx] = v
  55. feature = x
  56. if fixed_point_encoder:
  57. # g = (feature * 2 ** floating_point_precision).astype("int") * d
  58. g = fixed_point_encoder.encode(feature) * d
  59. else:
  60. g = feature * d
  61. if all_g is None:
  62. all_g = g
  63. else:
  64. all_g += g
  65. if all_g is None:
  66. return all_g
  67. elif fixed_point_encoder:
  68. all_g = fixed_point_encoder.decode(all_g)
  69. return all_g
  70. def compute_gradient(self, data_instances, fore_gradient, fit_intercept, need_average=True):
  71. """
  72. Compute hetero-regression gradient
  73. Parameters
  74. ----------
  75. data_instances: Table, input data
  76. fore_gradient: Table, fore_gradient
  77. fit_intercept: bool, if model has intercept or not
  78. need_average: bool, gradient needs to be averaged or not
  79. Returns
  80. ----------
  81. Table
  82. the hetero regression model's gradient
  83. """
  84. # feature_num = data_overview.get_features_shape(data_instances)
  85. # data_count = data_instances.count()
  86. is_sparse = data_overview.is_sparse_data(data_instances)
  87. LOGGER.debug("Use apply partitions")
  88. feat_join_grad = data_instances.join(fore_gradient,
  89. lambda d, g: (d.features, g))
  90. f = functools.partial(self.__apply_cal_gradient,
  91. fixed_point_encoder=self.fixed_point_encoder,
  92. is_sparse=is_sparse)
  93. gradient_sum = feat_join_grad.applyPartitions(f)
  94. gradient_sum = gradient_sum.reduce(lambda x, y: x + y)
  95. if fit_intercept:
  96. # bias_grad = np.sum(fore_gradient)
  97. bias_grad = fore_gradient.reduce(lambda x, y: x + y)
  98. gradient_sum = np.append(gradient_sum, bias_grad)
  99. if need_average:
  100. gradient = gradient_sum / data_instances.count()
  101. else:
  102. gradient = gradient_sum
  103. """
  104. else:
  105. LOGGER.debug(f"Original_method")
  106. feat_join_grad = data_instances.join(fore_gradient,
  107. lambda d, g: (d.features, g))
  108. f = functools.partial(self.__compute_partition_gradient,
  109. fit_intercept=fit_intercept,
  110. is_sparse=is_sparse)
  111. gradient_partition = feat_join_grad.applyPartitions(f)
  112. gradient_partition = gradient_partition.reduce(lambda x, y: x + y)
  113. gradient = gradient_partition / data_count
  114. """
  115. return gradient
  116. class Guest(HeteroGradientBase):
  117. def __init__(self):
  118. super().__init__()
  119. self.half_d = None
  120. self.host_forwards = None
  121. self.forwards = None
  122. self.aggregated_forwards = None
  123. def _register_gradient_sync(self, host_forward_transfer, fore_gradient_transfer,
  124. guest_gradient_transfer, guest_optim_gradient_transfer):
  125. self.host_forward_transfer = host_forward_transfer
  126. self.fore_gradient_transfer = fore_gradient_transfer
  127. self.unilateral_gradient_transfer = guest_gradient_transfer
  128. self.unilateral_optim_gradient_transfer = guest_optim_gradient_transfer
  129. def compute_and_aggregate_forwards(self, data_instances, model_weights,
  130. cipher, batch_index, current_suffix, offset=None):
  131. raise NotImplementedError("Function should not be called here")
  132. def compute_half_d(self, data_instances, w, cipher, batch_index, current_suffix):
  133. raise NotImplementedError("Function should not be called here")
  134. def _asynchronous_compute_gradient(self, data_instances, model_weights, cipher, current_suffix):
  135. LOGGER.debug("Called asynchronous gradient")
  136. encrypted_half_d = cipher.distribute_encrypt(self.half_d)
  137. self.remote_fore_gradient(encrypted_half_d, suffix=current_suffix)
  138. half_g = self.compute_gradient(data_instances, self.half_d, False)
  139. self.host_forwards = self.get_host_forward(suffix=current_suffix)
  140. host_forward = self.host_forwards[0]
  141. host_half_g = self.compute_gradient(data_instances, host_forward, False)
  142. unilateral_gradient = half_g + host_half_g
  143. if model_weights.fit_intercept:
  144. n = data_instances.count()
  145. intercept = (host_forward.reduce(lambda x, y: x + y) + self.half_d.reduce(lambda x, y: x + y)) / n
  146. unilateral_gradient = np.append(unilateral_gradient, intercept)
  147. return unilateral_gradient
  148. def _centralized_compute_gradient(self, data_instances, model_weights, cipher, current_suffix, masked_index=None):
  149. self.host_forwards = self.get_host_forward(suffix=current_suffix)
  150. fore_gradient = self.half_d
  151. batch_size = data_instances.count()
  152. partial_masked_index_enc = None
  153. if masked_index:
  154. masked_index = masked_index.mapValues(lambda value: 0)
  155. masked_index_to_encrypt = masked_index.subtractByKey(self.half_d)
  156. partial_masked_index_enc = cipher.distribute_encrypt(masked_index_to_encrypt)
  157. for host_forward in self.host_forwards:
  158. if self.use_sample_weight:
  159. # host_forward = host_forward.join(data_instances, lambda h, v: h * v.weight)
  160. host_forward = data_instances.join(host_forward, lambda v, h: h * v.weight)
  161. fore_gradient = fore_gradient.join(host_forward, lambda x, y: x + y)
  162. def _apply_obfuscate(val):
  163. val.apply_obfuscator()
  164. return val
  165. fore_gradient = fore_gradient.mapValues(lambda val: _apply_obfuscate(val) / batch_size)
  166. if partial_masked_index_enc:
  167. masked_fore_gradient = partial_masked_index_enc.union(fore_gradient)
  168. self.remote_fore_gradient(masked_fore_gradient, suffix=current_suffix)
  169. else:
  170. self.remote_fore_gradient(fore_gradient, suffix=current_suffix)
  171. # self.remote_fore_gradient(fore_gradient, suffix=current_suffix)
  172. unilateral_gradient = self.compute_gradient(data_instances, fore_gradient,
  173. model_weights.fit_intercept, need_average=False)
  174. return unilateral_gradient
  175. def compute_gradient_procedure(self, data_instances, cipher, model_weights, optimizer,
  176. n_iter_, batch_index, offset=None, masked_index=None):
  177. """
  178. Linear model gradient procedure
  179. Step 1: get host forwards which differ from different algorithm
  180. For Logistic Regression and Linear Regression: forwards = wx
  181. For Poisson Regression, forwards = exp(wx)
  182. Step 2: Compute self forwards and aggregate host forwards and get d = fore_gradient
  183. Step 3: Compute unilateral gradient = ∑d*x,
  184. Step 4: Send unilateral gradients to arbiter and received the optimized and decrypted gradient.
  185. """
  186. current_suffix = (n_iter_, batch_index)
  187. # self.host_forwards = self.get_host_forward(suffix=current_suffix)
  188. # Compute Guest's partial d
  189. self.compute_half_d(data_instances, model_weights, cipher,
  190. batch_index, current_suffix)
  191. if self.use_async:
  192. unilateral_gradient = self._asynchronous_compute_gradient(data_instances, model_weights,
  193. cipher=cipher,
  194. current_suffix=current_suffix)
  195. else:
  196. unilateral_gradient = self._centralized_compute_gradient(data_instances, model_weights,
  197. cipher=cipher,
  198. current_suffix=current_suffix,
  199. masked_index=masked_index)
  200. if optimizer is not None:
  201. unilateral_gradient = optimizer.add_regular_to_grad(unilateral_gradient, model_weights)
  202. optimized_gradient = self.update_gradient(unilateral_gradient, suffix=current_suffix)
  203. # LOGGER.debug(f"Before return, optimized_gradient: {optimized_gradient}")
  204. return optimized_gradient
  205. def get_host_forward(self, suffix=tuple()):
  206. host_forward = self.host_forward_transfer.get(idx=-1, suffix=suffix)
  207. return host_forward
  208. def remote_fore_gradient(self, fore_gradient, suffix=tuple()):
  209. self.fore_gradient_transfer.remote(obj=fore_gradient, role=consts.HOST, idx=-1, suffix=suffix)
  210. def update_gradient(self, unilateral_gradient, suffix=tuple()):
  211. self.unilateral_gradient_transfer.remote(unilateral_gradient, role=consts.ARBITER, idx=0, suffix=suffix)
  212. optimized_gradient = self.unilateral_optim_gradient_transfer.get(idx=0, suffix=suffix)
  213. return optimized_gradient
  214. class Host(HeteroGradientBase):
  215. def __init__(self):
  216. super().__init__()
  217. self.forwards = None
  218. self.fore_gradient = None
  219. def _register_gradient_sync(self, host_forward_transfer, fore_gradient_transfer,
  220. host_gradient_transfer, host_optim_gradient_transfer):
  221. self.host_forward_transfer = host_forward_transfer
  222. self.fore_gradient_transfer = fore_gradient_transfer
  223. self.unilateral_gradient_transfer = host_gradient_transfer
  224. self.unilateral_optim_gradient_transfer = host_optim_gradient_transfer
  225. def compute_forwards(self, data_instances, model_weights):
  226. raise NotImplementedError("Function should not be called here")
  227. def compute_unilateral_gradient(self, data_instances, fore_gradient, model_weights, optimizer):
  228. raise NotImplementedError("Function should not be called here")
  229. def _asynchronous_compute_gradient(self, data_instances, cipher, current_suffix):
  230. encrypted_forward = cipher.distribute_encrypt(self.forwards)
  231. self.remote_host_forward(encrypted_forward, suffix=current_suffix)
  232. half_g = self.compute_gradient(data_instances, self.forwards, False)
  233. guest_half_d = self.get_fore_gradient(suffix=current_suffix)
  234. guest_half_g = self.compute_gradient(data_instances, guest_half_d, False)
  235. unilateral_gradient = half_g + guest_half_g
  236. return unilateral_gradient
  237. def _centralized_compute_gradient(self, data_instances, cipher, current_suffix):
  238. encrypted_forward = cipher.distribute_encrypt(self.forwards)
  239. self.remote_host_forward(encrypted_forward, suffix=current_suffix)
  240. fore_gradient = self.fore_gradient_transfer.get(idx=0, suffix=current_suffix)
  241. # Host case, never fit-intercept
  242. unilateral_gradient = self.compute_gradient(data_instances, fore_gradient, False, need_average=False)
  243. return unilateral_gradient
  244. def compute_gradient_procedure(self, data_instances, cipher, model_weights,
  245. optimizer,
  246. n_iter_, batch_index):
  247. """
  248. Linear model gradient procedure
  249. Step 1: get host forwards which differ from different algorithm
  250. For Logistic Regression: forwards = wx
  251. """
  252. current_suffix = (n_iter_, batch_index)
  253. self.forwards = self.compute_forwards(data_instances, model_weights)
  254. if self.use_async:
  255. unilateral_gradient = self._asynchronous_compute_gradient(data_instances,
  256. cipher,
  257. current_suffix)
  258. else:
  259. unilateral_gradient = self._centralized_compute_gradient(data_instances,
  260. cipher,
  261. current_suffix)
  262. if optimizer is not None:
  263. unilateral_gradient = optimizer.add_regular_to_grad(unilateral_gradient, model_weights)
  264. optimized_gradient = self.update_gradient(unilateral_gradient, suffix=current_suffix)
  265. LOGGER.debug(f"Before return compute_gradient_procedure")
  266. return optimized_gradient
  267. def compute_sqn_forwards(self, data_instances, delta_s, cipher):
  268. """
  269. To compute Hessian matrix, y, s are needed.
  270. g = (1/N)*∑(0.25 * wx - 0.5 * y) * x
  271. y = ∇2^F(w_t)s_t = g' * s = (1/N)*∑(0.25 * x * s) * x
  272. define forward_hess = ∑(0.25 * x * s)
  273. """
  274. sqn_forwards = data_instances.mapValues(
  275. lambda v: cipher.encrypt(fate_operator.vec_dot(v.features, delta_s.coef_) + delta_s.intercept_))
  276. # forward_sum = sqn_forwards.reduce(reduce_add)
  277. return sqn_forwards
  278. def compute_forward_hess(self, data_instances, delta_s, forward_hess):
  279. """
  280. To compute Hessian matrix, y, s are needed.
  281. g = (1/N)*∑(0.25 * wx - 0.5 * y) * x
  282. y = ∇2^F(w_t)s_t = g' * s = (1/N)*∑(0.25 * x * s) * x
  283. define forward_hess = (0.25 * x * s)
  284. """
  285. hess_vector = self.compute_gradient(data_instances,
  286. forward_hess,
  287. delta_s.fit_intercept)
  288. return np.array(hess_vector)
  289. def remote_host_forward(self, host_forward, suffix=tuple()):
  290. self.host_forward_transfer.remote(obj=host_forward, role=consts.GUEST, idx=0, suffix=suffix)
  291. def get_fore_gradient(self, suffix=tuple()):
  292. host_forward = self.fore_gradient_transfer.get(idx=0, suffix=suffix)
  293. return host_forward
  294. def update_gradient(self, unilateral_gradient, suffix=tuple()):
  295. self.unilateral_gradient_transfer.remote(unilateral_gradient, role=consts.ARBITER, idx=0, suffix=suffix)
  296. optimized_gradient = self.unilateral_optim_gradient_transfer.get(idx=0, suffix=suffix)
  297. return optimized_gradient
  298. class Arbiter(HeteroGradientBase):
  299. def __init__(self):
  300. super().__init__()
  301. self.has_multiple_hosts = False
  302. def _register_gradient_sync(self, guest_gradient_transfer, host_gradient_transfer,
  303. guest_optim_gradient_transfer, host_optim_gradient_transfer):
  304. self.guest_gradient_transfer = guest_gradient_transfer
  305. self.host_gradient_transfer = host_gradient_transfer
  306. self.guest_optim_gradient_transfer = guest_optim_gradient_transfer
  307. self.host_optim_gradient_transfer = host_optim_gradient_transfer
  308. def compute_gradient_procedure(self, cipher, optimizer, n_iter_, batch_index):
  309. """
  310. Compute gradients.
  311. Received local_gradients from guest and hosts. Merge and optimize, then separate and remote back.
  312. Parameters
  313. ----------
  314. cipher: Use for encryption
  315. optimizer: optimizer that get delta gradient of this iter
  316. n_iter_: int, current iter nums
  317. batch_index: int
  318. """
  319. current_suffix = (n_iter_, batch_index)
  320. host_gradients, guest_gradient = self.get_local_gradient(current_suffix)
  321. if len(host_gradients) > 1:
  322. self.has_multiple_hosts = True
  323. host_gradients = [np.array(h) for h in host_gradients]
  324. guest_gradient = np.array(guest_gradient)
  325. size_list = [h_g.shape[0] for h_g in host_gradients]
  326. size_list.append(guest_gradient.shape[0])
  327. gradient = np.hstack((h for h in host_gradients))
  328. gradient = np.hstack((gradient, guest_gradient))
  329. grad = np.array(cipher.decrypt_list(gradient))
  330. # LOGGER.debug("In arbiter compute_gradient_procedure, before apply grad: {}, size_list: {}".format(
  331. # grad, size_list
  332. # ))
  333. delta_grad = optimizer.apply_gradients(grad)
  334. # LOGGER.debug("In arbiter compute_gradient_procedure, delta_grad: {}".format(
  335. # delta_grad
  336. # ))
  337. separate_optim_gradient = self.separate(delta_grad, size_list)
  338. # LOGGER.debug("In arbiter compute_gradient_procedure, separated gradient: {}".format(
  339. # separate_optim_gradient
  340. # ))
  341. host_optim_gradients = separate_optim_gradient[: -1]
  342. guest_optim_gradient = separate_optim_gradient[-1]
  343. self.remote_local_gradient(host_optim_gradients, guest_optim_gradient, current_suffix)
  344. return delta_grad
  345. @staticmethod
  346. def separate(value, size_list):
  347. """
  348. Separate value in order to several set according size_list
  349. Parameters
  350. ----------
  351. value: list or ndarray, input data
  352. size_list: list, each set size
  353. Returns
  354. ----------
  355. list
  356. set after separate
  357. """
  358. separate_res = []
  359. cur = 0
  360. for size in size_list:
  361. separate_res.append(value[cur:cur + size])
  362. cur += size
  363. return separate_res
  364. def get_local_gradient(self, suffix=tuple()):
  365. host_gradients = self.host_gradient_transfer.get(idx=-1, suffix=suffix)
  366. LOGGER.info("Get host_gradient from Host")
  367. guest_gradient = self.guest_gradient_transfer.get(idx=0, suffix=suffix)
  368. LOGGER.info("Get guest_gradient from Guest")
  369. return host_gradients, guest_gradient
  370. def remote_local_gradient(self, host_optim_gradients, guest_optim_gradient, suffix=tuple()):
  371. for idx, host_optim_gradient in enumerate(host_optim_gradients):
  372. self.host_optim_gradient_transfer.remote(host_optim_gradient,
  373. role=consts.HOST,
  374. idx=idx,
  375. suffix=suffix)
  376. self.guest_optim_gradient_transfer.remote(guest_optim_gradient,
  377. role=consts.GUEST,
  378. idx=0,
  379. suffix=suffix)