simulation.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. import heapq
  2. import logging
  3. import math
  4. import numpy as np
  5. SIMULATE_IID = "iid"
  6. SIMULATE_NIID_DIR = "dir"
  7. SIMULATE_NIID_CLASS = "class"
  8. logger = logging.getLogger(__name__)
  9. def shuffle(data_x, data_y):
  10. num_of_data = len(data_y)
  11. data_x = np.array(data_x)
  12. data_y = np.array(data_y)
  13. index = [i for i in range(num_of_data)]
  14. np.random.shuffle(index)
  15. data_x = data_x[index]
  16. data_y = data_y[index]
  17. return data_x, data_y
  18. def equal_division(num_groups, data_x, data_y=None):
  19. """Partition data into multiple clients with equal quantity.
  20. Args:
  21. num_groups (int): THe number of groups to partition to.
  22. data_x (list[Object]): A list of elements to be divided.
  23. data_y (list[Object], optional): A list of data labels to be divided together with the data.
  24. Returns:
  25. list[list]: A list where each element is a list of data of a group/client.
  26. list[list]: A list where each element is a list of data label of a group/client.
  27. Example:
  28. >>> equal_division(3, list[range(9)])
  29. >>> ([[0,4,2],[3,1,7],[6,5,8]], [])
  30. """
  31. if data_y is not None:
  32. assert (len(data_x) == len(data_y))
  33. data_x, data_y = shuffle(data_x, data_y)
  34. else:
  35. np.random.shuffle(data_x)
  36. num_of_data = len(data_x)
  37. assert num_of_data > 0
  38. data_per_client = num_of_data // num_groups
  39. large_group_num = num_of_data - num_groups * data_per_client
  40. small_group_num = num_groups - large_group_num
  41. splitted_data_x = []
  42. splitted_data_y = []
  43. for i in range(small_group_num):
  44. base_index = data_per_client * i
  45. splitted_data_x.append(data_x[base_index: base_index + data_per_client])
  46. if data_y is not None:
  47. splitted_data_y.append(data_y[base_index: base_index + data_per_client])
  48. small_size = data_per_client * small_group_num
  49. data_per_client += 1
  50. for i in range(large_group_num):
  51. base_index = small_size + data_per_client * i
  52. splitted_data_x.append(data_x[base_index: base_index + data_per_client])
  53. if data_y is not None:
  54. splitted_data_y.append(data_y[base_index: base_index + data_per_client])
  55. return splitted_data_x, splitted_data_y
  56. def quantity_hetero(weights, data_x, data_y=None):
  57. """Partition data into multiple clients with different quantities.
  58. The number of groups is the same as the number of elements of `weights`.
  59. The quantity of each group depends on the values of `weights`.
  60. Args:
  61. weights (list[float]): The targeted distribution of data quantities.
  62. The values should sum up to 1. e.g., [0.1, 0.2, 0.7].
  63. data_x (list[Object]): A list of elements to be divided.
  64. data_y (list[Object], optional): A list of data labels to be divided together with the data.
  65. Returns:
  66. list[list]: A list where each element is a list of data of a group/client.
  67. list[list]: A list where each element is a list of data label of a group/client.
  68. Example:
  69. >>> quantity_hetero([0.1, 0.2, 0.7], list(range(0, 10)))
  70. >>> ([[4], [8, 9], [6, 0, 1, 7, 3, 2, 5]], [])
  71. """
  72. # This is due to the float number in python,
  73. # e.g.sum([0.1,0.2,0.4,0.2,0.1]) is not exactly 1, but 1.0000000000000002.
  74. assert (round(sum(weights), 3) == 1)
  75. if data_y is not None:
  76. assert (len(data_x) == len(data_y))
  77. data_x, data_y = shuffle(data_x, data_y)
  78. else:
  79. np.random.shuffle(data_x)
  80. data_size = len(data_x)
  81. i = 0
  82. splitted_data_x = []
  83. splitted_data_y = []
  84. for w in weights:
  85. size = math.floor(data_size * w)
  86. splitted_data_x.append(data_x[i:i + size])
  87. if data_y is not None:
  88. splitted_data_y.append(data_y[i:i + size])
  89. i += size
  90. parts = len(weights)
  91. if i < data_size:
  92. remain = data_size - i
  93. for i in range(-remain, 0, 1):
  94. splitted_data_x[(-i) % parts].append(data_x[i])
  95. if data_y is not None:
  96. splitted_data_y[(-i) % parts].append(data_y[i])
  97. return splitted_data_x, splitted_data_y
  98. def iid(data_x, data_y, num_of_clients, x_dtype, y_dtype):
  99. """Partition dataset into multiple clients with equal data quantity (difference is less than 1) randomly.
  100. Args:
  101. data_x (list[Object]): A list of data.
  102. data_y (list[Object]): A list of dataset labels.
  103. num_of_clients (int): The number of clients to partition to.
  104. x_dtype (numpy.dtype): The type of data.
  105. y_dtype (numpy.dtype): The type of data label.
  106. Returns:
  107. list[str]: A list of client ids.
  108. dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
  109. """
  110. data_x, data_y = shuffle(data_x, data_y)
  111. x_divided_list, y_divided_list = equal_division(num_of_clients, data_x, data_y)
  112. clients = []
  113. federated_data = {}
  114. for i in range(num_of_clients):
  115. client_id = "f%07.0f" % (i)
  116. temp_client = {}
  117. temp_client['x'] = np.array(x_divided_list[i]).astype(x_dtype)
  118. temp_client['y'] = np.array(y_divided_list[i]).astype(y_dtype)
  119. federated_data[client_id] = temp_client
  120. clients.append(client_id)
  121. return clients, federated_data
  122. def non_iid_dirichlet(data_x, data_y, num_of_clients, alpha, min_size, x_dtype, y_dtype):
  123. """Partition dataset into multiple clients following the Dirichlet process.
  124. Args:
  125. data_x (list[Object]): A list of data.
  126. data_y (list[Object]): A list of dataset labels.
  127. num_of_clients (int): The number of clients to partition to.
  128. alpha (float): The parameter for Dirichlet process simulation.
  129. min_size (int): The minimum number of data size of a client.
  130. x_dtype (numpy.dtype): The type of data.
  131. y_dtype (numpy.dtype): The type of data label.
  132. Returns:
  133. list[str]: A list of client ids.
  134. dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
  135. """
  136. n_train = data_x.shape[0]
  137. current_min_size = 0
  138. num_class = np.amax(data_y) + 1
  139. data_size = data_y.shape[0]
  140. net_dataidx_map = {}
  141. while current_min_size < min_size:
  142. idx_batch = [[] for _ in range(num_of_clients)]
  143. for k in range(num_class):
  144. idx_k = np.where(data_y == k)[0]
  145. np.random.shuffle(idx_k)
  146. proportions = np.random.dirichlet(np.repeat(alpha, num_of_clients))
  147. # using the proportions from dirichlet, only selet those clients having data amount less than average
  148. proportions = np.array(
  149. [p * (len(idx_j) < data_size / num_of_clients) for p, idx_j in zip(proportions, idx_batch)])
  150. # scale proportions
  151. proportions = proportions / proportions.sum()
  152. proportions = (np.cumsum(proportions) * len(idx_k)).astype(int)[:-1]
  153. idx_batch = [idx_j + idx.tolist() for idx_j, idx in zip(idx_batch, np.split(idx_k, proportions))]
  154. current_min_size = min([len(idx_j) for idx_j in idx_batch])
  155. federated_data = {}
  156. clients = []
  157. for j in range(num_of_clients):
  158. np.random.shuffle(idx_batch[j])
  159. client_id = "f%07.0f" % j
  160. clients.append(client_id)
  161. temp = {}
  162. temp['x'] = np.array(data_x[idx_batch[j]]).astype(x_dtype)
  163. temp['y'] = np.array(data_y[idx_batch[j]]).astype(y_dtype)
  164. federated_data[client_id] = temp
  165. net_dataidx_map[client_id] = idx_batch[j]
  166. print_data_distribution(data_y, net_dataidx_map)
  167. return clients, federated_data
  168. def non_iid_class(data_x, data_y, class_per_client, num_of_clients, x_dtype, y_dtype, stack_x=True):
  169. """Partition dataset into multiple clients based on label classes.
  170. Each client contains [1, n] classes, where n is the number of classes of a dataset.
  171. Note: Each class is divided into `ceil(class_per_client * num_of_clients / num_class)` parts
  172. and each client chooses `class_per_client` parts from each class to construct its dataset.
  173. Args:
  174. data_x (list[Object]): A list of data.
  175. data_y (list[Object]): A list of dataset labels.
  176. class_per_client (int): The number of classes in each client.
  177. num_of_clients (int): The number of clients to partition to.
  178. x_dtype (numpy.dtype): The type of data.
  179. y_dtype (numpy.dtype): The type of data label.
  180. stack_x (bool, optional): A flag to indicate whether using np.vstack or append to construct dataset.
  181. Returns:
  182. list[str]: A list of client ids.
  183. dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
  184. """
  185. num_class = np.amax(data_y) + 1
  186. all_index = []
  187. clients = []
  188. data_index_map = {}
  189. for i in range(num_class):
  190. # get indexes for all data with current label i at index i in all_index
  191. all_index.append(np.where(data_y == i)[0].tolist())
  192. federated_data = {}
  193. # total no. of parts
  194. total_amount = class_per_client * num_of_clients
  195. # no. of parts each class should be diveded into
  196. parts_per_class = math.ceil(total_amount / num_class)
  197. for i in range(num_of_clients):
  198. client_id = "f%07.0f" % (i)
  199. clients.append(client_id)
  200. data_index_map[client_id] = []
  201. data = {}
  202. data['x'] = np.array([])
  203. data['y'] = np.array([])
  204. federated_data[client_id] = data
  205. class_map = {}
  206. parts_consumed = []
  207. for i in range(num_class):
  208. class_map[i], _ = equal_division(parts_per_class, all_index[i])
  209. heapq.heappush(parts_consumed, (0, i))
  210. for i in clients:
  211. for j in range(class_per_client):
  212. class_chosen = heapq.heappop(parts_consumed)
  213. part_indexes = class_map[class_chosen[1]].pop(0)
  214. if len(federated_data[i]['x']) != 0:
  215. if stack_x:
  216. federated_data[i]['x'] = np.vstack((federated_data[i]['x'], data_x[part_indexes])).astype(x_dtype)
  217. else:
  218. federated_data[i]['x'] = np.append(federated_data[i]['x'], data_x[part_indexes]).astype(x_dtype)
  219. federated_data[i]['y'] = np.append(federated_data[i]['y'], data_y[part_indexes]).astype(y_dtype)
  220. else:
  221. federated_data[i]['x'] = data_x[part_indexes].astype(x_dtype)
  222. federated_data[i]['y'] = data_y[part_indexes].astype(y_dtype)
  223. heapq.heappush(parts_consumed, (class_chosen[0] + 1, class_chosen[1]))
  224. data_index_map[i].extend(part_indexes)
  225. print_data_distribution(data_y, data_index_map)
  226. return clients, federated_data
  227. def data_simulation(data_x, data_y, num_of_clients, data_distribution, weights=None, alpha=0.5, min_size=10,
  228. class_per_client=1, stack_x=True):
  229. """Simulate federated learning datasets by partitioning a data into multiple clients using different strategies.
  230. Args:
  231. data_x (list[Object]): A list of data.
  232. data_y (list[Object]): A list of dataset labels.
  233. num_of_clients (int): The number of clients to partition to.
  234. data_distribution (str): The ways to partition the dataset, options:
  235. `iid`: Partition dataset into multiple clients with equal quantity (difference is less than 1) randomly.
  236. `dir`: partition dataset into multiple clients following the Dirichlet process.
  237. `class`: partition dataset into multiple clients based on classes.
  238. weights: list, for simulating data quantity heterogeneity
  239. If None, each client are simulated with same data quantity
  240. Note: num_of_clients should be divisible by len(weights)
  241. weights (list[float], optional): The targeted distribution of data quantities.
  242. The values should sum up to 1. e.g., [0.1, 0.2, 0.7].
  243. When `weights=None`, the data quantity of clients only depends on data_distribution.
  244. alpha (float, optional): The parameter for Dirichlet process simulation.
  245. It is only applicable when data_distribution is `dir`.
  246. min_size (int, optional): The minimum number of data size of a client.
  247. It is only applicable when data_distribution is `dir`.
  248. class_per_client (int): The number of classes in each client.
  249. It is only applicable when data_distribution is `class`.
  250. stack_x (bool, optional): A flag to indicate whether using np.vstack or append to construct dataset.
  251. It is only applicable when data_distribution is `class`.
  252. Raise:
  253. ValueError: When the simulation method `data_distribution` is not supported.
  254. Returns:
  255. list[str]: A list of client ids.
  256. dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
  257. """
  258. data_x = np.array(data_x)
  259. data_y = np.array(data_y)
  260. x_dtype = data_x.dtype
  261. y_dtype = data_y.dtype
  262. if weights is not None:
  263. assert num_of_clients % len(weights) == 0
  264. num_of_clients = num_of_clients // len(weights)
  265. if data_distribution == SIMULATE_IID:
  266. group_client_list, group_federated_data = iid(data_x, data_y, num_of_clients, x_dtype, y_dtype)
  267. elif data_distribution == SIMULATE_NIID_DIR:
  268. group_client_list, group_federated_data = non_iid_dirichlet(data_x, data_y, num_of_clients, alpha, min_size,
  269. x_dtype, y_dtype)
  270. elif data_distribution == SIMULATE_NIID_CLASS:
  271. group_client_list, group_federated_data = non_iid_class(data_x, data_y, class_per_client, num_of_clients,
  272. x_dtype,
  273. y_dtype, stack_x=stack_x)
  274. else:
  275. raise ValueError("Simulation type not supported")
  276. if weights is None:
  277. return group_client_list, group_federated_data
  278. clients = []
  279. federated_data = {}
  280. cur_key = 0
  281. for i in group_client_list:
  282. current_client = group_federated_data[i]
  283. input_lists, label_lists = quantity_hetero(weights, current_client['x'], current_client['y'])
  284. for j in range(len(input_lists)):
  285. client_id = "f%07.0f" % (cur_key)
  286. temp_client = {}
  287. temp_client['x'] = np.array(input_lists[j]).astype(x_dtype)
  288. temp_client['y'] = np.array(label_lists[j]).astype(y_dtype)
  289. federated_data[client_id] = temp_client
  290. clients.append(client_id)
  291. cur_key += 1
  292. return clients, federated_data
  293. def print_data_distribution(data_y, data_index_map):
  294. """Log the distribution of client datasets."""
  295. data_distribution = {}
  296. for index, dataidx in data_index_map.items():
  297. unique_values, counts = np.unique(data_y[dataidx], return_counts=True)
  298. distribution = {unique_values[i]: counts[i] for i in range(len(unique_values))}
  299. data_distribution[index] = distribution
  300. logger.info(data_distribution)
  301. return data_distribution