|
@@ -0,0 +1,350 @@
|
|
|
+import heapq
|
|
|
+import logging
|
|
|
+import math
|
|
|
+
|
|
|
+import numpy as np
|
|
|
+
|
|
|
+SIMULATE_IID = "iid"
|
|
|
+SIMULATE_NIID_DIR = "dir"
|
|
|
+SIMULATE_NIID_CLASS = "class"
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+
|
|
|
+def shuffle(data_x, data_y):
|
|
|
+ num_of_data = len(data_y)
|
|
|
+ data_x = np.array(data_x)
|
|
|
+ data_y = np.array(data_y)
|
|
|
+ index = [i for i in range(num_of_data)]
|
|
|
+ np.random.shuffle(index)
|
|
|
+ data_x = data_x[index]
|
|
|
+ data_y = data_y[index]
|
|
|
+ return data_x, data_y
|
|
|
+
|
|
|
+
|
|
|
+def equal_division(num_groups, data_x, data_y=None):
|
|
|
+ """Partition data into multiple clients with equal quantity.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ num_groups (int): THe number of groups to partition to.
|
|
|
+ data_x (list[Object]): A list of elements to be divided.
|
|
|
+ data_y (list[Object], optional): A list of data labels to be divided together with the data.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ list[list]: A list where each element is a list of data of a group/client.
|
|
|
+ list[list]: A list where each element is a list of data label of a group/client.
|
|
|
+
|
|
|
+ Example:
|
|
|
+ >>> equal_division(3, list[range(9)])
|
|
|
+ >>> ([[0,4,2],[3,1,7],[6,5,8]], [])
|
|
|
+ """
|
|
|
+ if data_y is not None:
|
|
|
+ assert (len(data_x) == len(data_y))
|
|
|
+ data_x, data_y = shuffle(data_x, data_y)
|
|
|
+ else:
|
|
|
+ np.random.shuffle(data_x)
|
|
|
+ num_of_data = len(data_x)
|
|
|
+ assert num_of_data > 0
|
|
|
+ data_per_client = num_of_data // num_groups
|
|
|
+ large_group_num = num_of_data - num_groups * data_per_client
|
|
|
+ small_group_num = num_groups - large_group_num
|
|
|
+ splitted_data_x = []
|
|
|
+ splitted_data_y = []
|
|
|
+ for i in range(small_group_num):
|
|
|
+ base_index = data_per_client * i
|
|
|
+ splitted_data_x.append(data_x[base_index: base_index + data_per_client])
|
|
|
+ if data_y is not None:
|
|
|
+ splitted_data_y.append(data_y[base_index: base_index + data_per_client])
|
|
|
+ small_size = data_per_client * small_group_num
|
|
|
+ data_per_client += 1
|
|
|
+ for i in range(large_group_num):
|
|
|
+ base_index = small_size + data_per_client * i
|
|
|
+ splitted_data_x.append(data_x[base_index: base_index + data_per_client])
|
|
|
+ if data_y is not None:
|
|
|
+ splitted_data_y.append(data_y[base_index: base_index + data_per_client])
|
|
|
+
|
|
|
+ return splitted_data_x, splitted_data_y
|
|
|
+
|
|
|
+
|
|
|
+def quantity_hetero(weights, data_x, data_y=None):
|
|
|
+ """Partition data into multiple clients with different quantities.
|
|
|
+ The number of groups is the same as the number of elements of `weights`.
|
|
|
+ The quantity of each group depends on the values of `weights`.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ weights (list[float]): The targeted distribution of data quantities.
|
|
|
+ The values should sum up to 1. e.g., [0.1, 0.2, 0.7].
|
|
|
+ data_x (list[Object]): A list of elements to be divided.
|
|
|
+ data_y (list[Object], optional): A list of data labels to be divided together with the data.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ list[list]: A list where each element is a list of data of a group/client.
|
|
|
+ list[list]: A list where each element is a list of data label of a group/client.
|
|
|
+
|
|
|
+ Example:
|
|
|
+ >>> quantity_hetero([0.1, 0.2, 0.7], list(range(0, 10)))
|
|
|
+ >>> ([[4], [8, 9], [6, 0, 1, 7, 3, 2, 5]], [])
|
|
|
+ """
|
|
|
+ # This is due to the float number in python,
|
|
|
+ # e.g.sum([0.1,0.2,0.4,0.2,0.1]) is not exactly 1, but 1.0000000000000002.
|
|
|
+ assert (round(sum(weights), 3) == 1)
|
|
|
+
|
|
|
+ if data_y is not None:
|
|
|
+ assert (len(data_x) == len(data_y))
|
|
|
+ data_x, data_y = shuffle(data_x, data_y)
|
|
|
+ else:
|
|
|
+ np.random.shuffle(data_x)
|
|
|
+ data_size = len(data_x)
|
|
|
+
|
|
|
+ i = 0
|
|
|
+
|
|
|
+ splitted_data_x = []
|
|
|
+ splitted_data_y = []
|
|
|
+ for w in weights:
|
|
|
+ size = math.floor(data_size * w)
|
|
|
+ splitted_data_x.append(data_x[i:i + size])
|
|
|
+ if data_y is not None:
|
|
|
+ splitted_data_y.append(data_y[i:i + size])
|
|
|
+ i += size
|
|
|
+
|
|
|
+ parts = len(weights)
|
|
|
+ if i < data_size:
|
|
|
+ remain = data_size - i
|
|
|
+ for i in range(-remain, 0, 1):
|
|
|
+ splitted_data_x[(-i) % parts].append(data_x[i])
|
|
|
+ if data_y is not None:
|
|
|
+ splitted_data_y[(-i) % parts].append(data_y[i])
|
|
|
+
|
|
|
+ return splitted_data_x, splitted_data_y
|
|
|
+
|
|
|
+
|
|
|
+def iid(data_x, data_y, num_of_clients, x_dtype, y_dtype):
|
|
|
+ """Partition dataset into multiple clients with equal data quantity (difference is less than 1) randomly.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ data_x (list[Object]): A list of data.
|
|
|
+ data_y (list[Object]): A list of dataset labels.
|
|
|
+ num_of_clients (int): The number of clients to partition to.
|
|
|
+ x_dtype (numpy.dtype): The type of data.
|
|
|
+ y_dtype (numpy.dtype): The type of data label.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ list[str]: A list of client ids.
|
|
|
+ dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
|
|
|
+ """
|
|
|
+ data_x, data_y = shuffle(data_x, data_y)
|
|
|
+ x_divided_list, y_divided_list = equal_division(num_of_clients, data_x, data_y)
|
|
|
+ clients = []
|
|
|
+ federated_data = {}
|
|
|
+ for i in range(num_of_clients):
|
|
|
+ client_id = "f%07.0f" % (i)
|
|
|
+ temp_client = {}
|
|
|
+ temp_client['x'] = np.array(x_divided_list[i]).astype(x_dtype)
|
|
|
+ temp_client['y'] = np.array(y_divided_list[i]).astype(y_dtype)
|
|
|
+ federated_data[client_id] = temp_client
|
|
|
+ clients.append(client_id)
|
|
|
+ return clients, federated_data
|
|
|
+
|
|
|
+
|
|
|
+def non_iid_dirichlet(data_x, data_y, num_of_clients, alpha, min_size, x_dtype, y_dtype):
|
|
|
+ """Partition dataset into multiple clients following the Dirichlet process.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ data_x (list[Object]): A list of data.
|
|
|
+ data_y (list[Object]): A list of dataset labels.
|
|
|
+ num_of_clients (int): The number of clients to partition to.
|
|
|
+ alpha (float): The parameter for Dirichlet process simulation.
|
|
|
+ min_size (int): The minimum number of data size of a client.
|
|
|
+ x_dtype (numpy.dtype): The type of data.
|
|
|
+ y_dtype (numpy.dtype): The type of data label.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ list[str]: A list of client ids.
|
|
|
+ dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
|
|
|
+ """
|
|
|
+ n_train = data_x.shape[0]
|
|
|
+ current_min_size = 0
|
|
|
+ num_class = np.amax(data_y) + 1
|
|
|
+ data_size = data_y.shape[0]
|
|
|
+ net_dataidx_map = {}
|
|
|
+
|
|
|
+ while current_min_size < min_size:
|
|
|
+ idx_batch = [[] for _ in range(num_of_clients)]
|
|
|
+ for k in range(num_class):
|
|
|
+ idx_k = np.where(data_y == k)[0]
|
|
|
+ np.random.shuffle(idx_k)
|
|
|
+ proportions = np.random.dirichlet(np.repeat(alpha, num_of_clients))
|
|
|
+ # using the proportions from dirichlet, only selet those clients having data amount less than average
|
|
|
+ proportions = np.array(
|
|
|
+ [p * (len(idx_j) < data_size / num_of_clients) for p, idx_j in zip(proportions, idx_batch)])
|
|
|
+ # scale proportions
|
|
|
+ proportions = proportions / proportions.sum()
|
|
|
+ proportions = (np.cumsum(proportions) * len(idx_k)).astype(int)[:-1]
|
|
|
+ idx_batch = [idx_j + idx.tolist() for idx_j, idx in zip(idx_batch, np.split(idx_k, proportions))]
|
|
|
+ current_min_size = min([len(idx_j) for idx_j in idx_batch])
|
|
|
+
|
|
|
+ federated_data = {}
|
|
|
+ clients = []
|
|
|
+ for j in range(num_of_clients):
|
|
|
+ np.random.shuffle(idx_batch[j])
|
|
|
+ client_id = "f%07.0f" % j
|
|
|
+ clients.append(client_id)
|
|
|
+ temp = {}
|
|
|
+ temp['x'] = np.array(data_x[idx_batch[j]]).astype(x_dtype)
|
|
|
+ temp['y'] = np.array(data_y[idx_batch[j]]).astype(y_dtype)
|
|
|
+ federated_data[client_id] = temp
|
|
|
+ net_dataidx_map[client_id] = idx_batch[j]
|
|
|
+ print_data_distribution(data_y, net_dataidx_map)
|
|
|
+ return clients, federated_data
|
|
|
+
|
|
|
+
|
|
|
+def non_iid_class(data_x, data_y, class_per_client, num_of_clients, x_dtype, y_dtype, stack_x=True):
|
|
|
+ """Partition dataset into multiple clients based on label classes.
|
|
|
+ Each client contains [1, n] classes, where n is the number of classes of a dataset.
|
|
|
+
|
|
|
+ Note: Each class is divided into `ceil(class_per_client * num_of_clients / num_class)` parts
|
|
|
+ and each client chooses `class_per_client` parts from each class to construct its dataset.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ data_x (list[Object]): A list of data.
|
|
|
+ data_y (list[Object]): A list of dataset labels.
|
|
|
+ class_per_client (int): The number of classes in each client.
|
|
|
+ num_of_clients (int): The number of clients to partition to.
|
|
|
+ x_dtype (numpy.dtype): The type of data.
|
|
|
+ y_dtype (numpy.dtype): The type of data label.
|
|
|
+ stack_x (bool, optional): A flag to indicate whether using np.vstack or append to construct dataset.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ list[str]: A list of client ids.
|
|
|
+ dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
|
|
|
+ """
|
|
|
+ num_class = np.amax(data_y) + 1
|
|
|
+ all_index = []
|
|
|
+ clients = []
|
|
|
+ data_index_map = {}
|
|
|
+ for i in range(num_class):
|
|
|
+ # get indexes for all data with current label i at index i in all_index
|
|
|
+ all_index.append(np.where(data_y == i)[0].tolist())
|
|
|
+
|
|
|
+ federated_data = {}
|
|
|
+
|
|
|
+ # total no. of parts
|
|
|
+ total_amount = class_per_client * num_of_clients
|
|
|
+ # no. of parts each class should be diveded into
|
|
|
+ parts_per_class = math.ceil(total_amount / num_class)
|
|
|
+
|
|
|
+ for i in range(num_of_clients):
|
|
|
+ client_id = "f%07.0f" % (i)
|
|
|
+ clients.append(client_id)
|
|
|
+ data_index_map[client_id] = []
|
|
|
+ data = {}
|
|
|
+ data['x'] = np.array([])
|
|
|
+ data['y'] = np.array([])
|
|
|
+ federated_data[client_id] = data
|
|
|
+
|
|
|
+ class_map = {}
|
|
|
+ parts_consumed = []
|
|
|
+ for i in range(num_class):
|
|
|
+ class_map[i], _ = equal_division(parts_per_class, all_index[i])
|
|
|
+ heapq.heappush(parts_consumed, (0, i))
|
|
|
+ for i in clients:
|
|
|
+ for j in range(class_per_client):
|
|
|
+ class_chosen = heapq.heappop(parts_consumed)
|
|
|
+ part_indexes = class_map[class_chosen[1]].pop(0)
|
|
|
+ if len(federated_data[i]['x']) != 0:
|
|
|
+ if stack_x:
|
|
|
+ federated_data[i]['x'] = np.vstack((federated_data[i]['x'], data_x[part_indexes])).astype(x_dtype)
|
|
|
+ else:
|
|
|
+ federated_data[i]['x'] = np.append(federated_data[i]['x'], data_x[part_indexes]).astype(x_dtype)
|
|
|
+ federated_data[i]['y'] = np.append(federated_data[i]['y'], data_y[part_indexes]).astype(y_dtype)
|
|
|
+ else:
|
|
|
+ federated_data[i]['x'] = data_x[part_indexes].astype(x_dtype)
|
|
|
+ federated_data[i]['y'] = data_y[part_indexes].astype(y_dtype)
|
|
|
+ heapq.heappush(parts_consumed, (class_chosen[0] + 1, class_chosen[1]))
|
|
|
+ data_index_map[i].extend(part_indexes)
|
|
|
+ print_data_distribution(data_y, data_index_map)
|
|
|
+ return clients, federated_data
|
|
|
+
|
|
|
+
|
|
|
+def data_simulation(data_x, data_y, num_of_clients, data_distribution, weights=None, alpha=0.5, min_size=10,
|
|
|
+ class_per_client=1, stack_x=True):
|
|
|
+ """Simulate federated learning datasets by partitioning a data into multiple clients using different strategies.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ data_x (list[Object]): A list of data.
|
|
|
+ data_y (list[Object]): A list of dataset labels.
|
|
|
+ num_of_clients (int): The number of clients to partition to.
|
|
|
+ data_distribution (str): The ways to partition the dataset, options:
|
|
|
+ `iid`: Partition dataset into multiple clients with equal quantity (difference is less than 1) randomly.
|
|
|
+ `dir`: partition dataset into multiple clients following the Dirichlet process.
|
|
|
+ `class`: partition dataset into multiple clients based on classes.
|
|
|
+ weights: list, for simulating data quantity heterogeneity
|
|
|
+ If None, each client are simulated with same data quantity
|
|
|
+ Note: num_of_clients should be divisible by len(weights)
|
|
|
+ weights (list[float], optional): The targeted distribution of data quantities.
|
|
|
+ The values should sum up to 1. e.g., [0.1, 0.2, 0.7].
|
|
|
+ When `weights=None`, the data quantity of clients only depends on data_distribution.
|
|
|
+ alpha (float, optional): The parameter for Dirichlet process simulation.
|
|
|
+ It is only applicable when data_distribution is `dir`.
|
|
|
+ min_size (int, optional): The minimum number of data size of a client.
|
|
|
+ It is only applicable when data_distribution is `dir`.
|
|
|
+ class_per_client (int): The number of classes in each client.
|
|
|
+ It is only applicable when data_distribution is `class`.
|
|
|
+ stack_x (bool, optional): A flag to indicate whether using np.vstack or append to construct dataset.
|
|
|
+ It is only applicable when data_distribution is `class`.
|
|
|
+
|
|
|
+ Raise:
|
|
|
+ ValueError: When the simulation method `data_distribution` is not supported.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ list[str]: A list of client ids.
|
|
|
+ dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
|
|
|
+ """
|
|
|
+ data_x = np.array(data_x)
|
|
|
+ data_y = np.array(data_y)
|
|
|
+ x_dtype = data_x.dtype
|
|
|
+ y_dtype = data_y.dtype
|
|
|
+ if weights is not None:
|
|
|
+ assert num_of_clients % len(weights) == 0
|
|
|
+ num_of_clients = num_of_clients // len(weights)
|
|
|
+
|
|
|
+ if data_distribution == SIMULATE_IID:
|
|
|
+ group_client_list, group_federated_data = iid(data_x, data_y, num_of_clients, x_dtype, y_dtype)
|
|
|
+ elif data_distribution == SIMULATE_NIID_DIR:
|
|
|
+ group_client_list, group_federated_data = non_iid_dirichlet(data_x, data_y, num_of_clients, alpha, min_size,
|
|
|
+ x_dtype, y_dtype)
|
|
|
+ elif data_distribution == SIMULATE_NIID_CLASS:
|
|
|
+ group_client_list, group_federated_data = non_iid_class(data_x, data_y, class_per_client, num_of_clients,
|
|
|
+ x_dtype,
|
|
|
+ y_dtype, stack_x=stack_x)
|
|
|
+ else:
|
|
|
+ raise ValueError("Simulation type not supported")
|
|
|
+ if weights is None:
|
|
|
+ return group_client_list, group_federated_data
|
|
|
+
|
|
|
+ clients = []
|
|
|
+ federated_data = {}
|
|
|
+ cur_key = 0
|
|
|
+ for i in group_client_list:
|
|
|
+ current_client = group_federated_data[i]
|
|
|
+ input_lists, label_lists = quantity_hetero(weights, current_client['x'], current_client['y'])
|
|
|
+ for j in range(len(input_lists)):
|
|
|
+ client_id = "f%07.0f" % (cur_key)
|
|
|
+ temp_client = {}
|
|
|
+ temp_client['x'] = np.array(input_lists[j]).astype(x_dtype)
|
|
|
+ temp_client['y'] = np.array(label_lists[j]).astype(y_dtype)
|
|
|
+ federated_data[client_id] = temp_client
|
|
|
+ clients.append(client_id)
|
|
|
+ cur_key += 1
|
|
|
+ return clients, federated_data
|
|
|
+
|
|
|
+
|
|
|
+def print_data_distribution(data_y, data_index_map):
|
|
|
+ """Log the distribution of client datasets."""
|
|
|
+ data_distribution = {}
|
|
|
+ for index, dataidx in data_index_map.items():
|
|
|
+ unique_values, counts = np.unique(data_y[dataidx], return_counts=True)
|
|
|
+ distribution = {unique_values[i]: counts[i] for i in range(len(unique_values))}
|
|
|
+ data_distribution[index] = distribution
|
|
|
+ logger.info(data_distribution)
|
|
|
+ return data_distribution
|