anonymous_generator_util.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import copy
  17. import numpy as np
  18. from federatedml.util.data_format_preprocess import DataFormatPreProcess
  19. ANONYMOUS_COLUMN_PREFIX = "x"
  20. ANONYMOUS_LABEL = "y"
  21. SPLICES = "_"
  22. class Anonymous(object):
  23. def __init__(self, role=None, party_id=None, migrate_mapping=None):
  24. self._role = role
  25. self._party_id = party_id
  26. self._migrate_mapping = migrate_mapping
  27. def migrate_schema_anonymous(self, schema):
  28. if "anonymous_header" in schema:
  29. schema["anonymous_header"] = self.migrate_anonymous(schema["anonymous_header"])
  30. if "anonymous_label" in schema:
  31. schema["anonymous_label"] = self.migrate_anonymous(schema['anonymous_label'])
  32. return schema
  33. def migrate_anonymous(self, anonymous_header):
  34. ret_list = True
  35. if not isinstance(anonymous_header, list):
  36. ret_list = False
  37. anonymous_header = [anonymous_header]
  38. migrate_anonymous_header = []
  39. for column in anonymous_header:
  40. role, party_id, suf = column.split(SPLICES, 2)
  41. try:
  42. migrate_party_id = self._migrate_mapping[role][int(party_id)]
  43. except KeyError:
  44. migrate_party_id = self._migrate_mapping[role][party_id]
  45. except BaseException:
  46. migrate_party_id = None
  47. if migrate_party_id is not None:
  48. migrate_anonymous_header.append(self.generate_anonymous_column(role, migrate_party_id, suf))
  49. else:
  50. migrate_anonymous_header.append(column)
  51. if not ret_list:
  52. migrate_anonymous_header = migrate_anonymous_header[0]
  53. return migrate_anonymous_header
  54. def is_anonymous(self, column):
  55. splits = self.get_anonymous_column_splits(column)
  56. if len(splits) < 3:
  57. return False
  58. role, party_id = splits[0], splits[1]
  59. return role in self._migrate_mapping and int(party_id) in self._migrate_mapping[role]
  60. def extend_columns(self, original_anonymous_header, extend_header):
  61. extend_anonymous_header = []
  62. exp_start_idx = 0
  63. for anonymous_col_name in original_anonymous_header:
  64. if not self.is_expand_column(anonymous_col_name):
  65. continue
  66. exp_start_idx = max(exp_start_idx, self.get_expand_idx(anonymous_col_name) + 1)
  67. for i in range(len(extend_header)):
  68. extend_anonymous_header.append(self.__generate_expand_anonymous_column(exp_start_idx + i))
  69. return original_anonymous_header + extend_anonymous_header
  70. @staticmethod
  71. def get_party_id_from_anonymous_column(anonymous_column):
  72. splits = Anonymous.get_anonymous_column_splits(anonymous_column)
  73. if len(splits) < 3:
  74. raise ValueError("This is not a anonymous_column")
  75. return splits[1]
  76. @staticmethod
  77. def get_role_from_anonymous_column(anonymous_column):
  78. splits = Anonymous.get_anonymous_column_splits(anonymous_column)
  79. if len(splits) < 3:
  80. raise ValueError("This is not a anonymous_column")
  81. return splits[0]
  82. @staticmethod
  83. def get_suffix_from_anonymous_column(anonymous_column):
  84. splits = Anonymous.get_anonymous_column_splits(anonymous_column, num=2)
  85. if len(splits) < 3:
  86. raise ValueError("This is not a anonymous_column")
  87. return splits[-1]
  88. @staticmethod
  89. def get_anonymous_header(schema):
  90. return schema["anonymous_header"]
  91. @staticmethod
  92. def filter_anonymous_header(schema, filter_ins):
  93. return schema["anonymous_header"][np.array(filter_ins)]
  94. @staticmethod
  95. def reset_anonymous_header(schema, anonymous_header):
  96. new_schema = copy.deepcopy(schema)
  97. new_schema["anonymous_header"] = anonymous_header
  98. return new_schema
  99. @staticmethod
  100. def generate_derived_header(original_header, original_anonymous_header, derived_dict):
  101. new_anonymous_header = []
  102. for column, anonymous_column in zip(original_header, original_anonymous_header):
  103. if column not in derived_dict:
  104. new_anonymous_header.append(anonymous_column)
  105. else:
  106. for i in range(len(derived_dict[column])):
  107. new_anonymous_column = SPLICES.join([anonymous_column, str(i)])
  108. new_anonymous_header.append(new_anonymous_column)
  109. return new_anonymous_header
  110. def __generate_expand_anonymous_column(self, fid):
  111. return SPLICES.join(map(str, [self._role, self._party_id, "exp", fid]))
  112. @staticmethod
  113. def generate_anonymous_column(role, party_id, suf):
  114. return SPLICES.join([role, str(party_id), suf])
  115. @staticmethod
  116. def get_anonymous_column_splits(column, num=-1):
  117. return column.split(SPLICES, num)
  118. @staticmethod
  119. def is_expand_column(column_name):
  120. splits = Anonymous.get_anonymous_column_splits(column_name)
  121. return splits[-2] == "exp"
  122. @staticmethod
  123. def get_expand_idx(column_name):
  124. return int(Anonymous.get_anonymous_column_splits(column_name)[-1])
  125. @staticmethod
  126. def update_anonymous_header_with_role(schema, role, party_id):
  127. party_id = str(party_id)
  128. new_schema = copy.deepcopy(schema)
  129. if "anonymous_header" in schema:
  130. old_anonymous_header = schema["anonymous_header"]
  131. new_anonymous_header = [Anonymous.generate_anonymous_column(role, party_id, col_name)
  132. for col_name in old_anonymous_header]
  133. new_schema["anonymous_header"] = new_anonymous_header
  134. if "label_name" in schema:
  135. new_schema["anonymous_label"] = Anonymous.generate_anonymous_column(role, party_id, ANONYMOUS_LABEL)
  136. return new_schema
  137. def generate_anonymous_header(self, schema):
  138. new_schema = copy.deepcopy(schema)
  139. header = schema["header"]
  140. if self._role:
  141. anonymous_header = [Anonymous.generate_anonymous_column(self._role,
  142. self._party_id,
  143. ANONYMOUS_COLUMN_PREFIX + str(i))
  144. for i in range(len(header))]
  145. else:
  146. anonymous_header = [ANONYMOUS_COLUMN_PREFIX + str(i) for i in range(len(header))]
  147. new_schema["anonymous_header"] = anonymous_header
  148. if "label_name" in schema:
  149. if self._role:
  150. new_schema["anonymous_label"] = self.generate_anonymous_column(self._role,
  151. self._party_id,
  152. ANONYMOUS_LABEL)
  153. else:
  154. new_schema["anonymous_label"] = ANONYMOUS_LABEL
  155. return new_schema
  156. def generated_compatible_anonymous_header_with_old_version(self, header):
  157. if self._role is None or self._party_id is None:
  158. raise ValueError("Please init anonymous generator with role & party_id")
  159. return [SPLICES.join([self._role, str(self._party_id), str(idx)]) for idx in range(len(header))]
  160. @staticmethod
  161. def is_old_version_anonymous_header(anonymous_header):
  162. for anonymous_col in anonymous_header:
  163. splits = anonymous_col.split(SPLICES, -1)
  164. if len(splits) != 3:
  165. return False
  166. try:
  167. index = int(splits[2])
  168. except ValueError:
  169. return False
  170. return True