123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- import heapq
- import logging
- import math
- import numpy as np
- SIMULATE_IID = "iid"
- SIMULATE_NIID_DIR = "dir"
- SIMULATE_NIID_CLASS = "class"
- logger = logging.getLogger(__name__)
- def shuffle(data_x, data_y):
- num_of_data = len(data_y)
- data_x = np.array(data_x)
- data_y = np.array(data_y)
- index = [i for i in range(num_of_data)]
- np.random.shuffle(index)
- data_x = data_x[index]
- data_y = data_y[index]
- return data_x, data_y
- def equal_division(num_groups, data_x, data_y=None):
- """Partition data into multiple clients with equal quantity.
- Args:
- num_groups (int): THe number of groups to partition to.
- data_x (list[Object]): A list of elements to be divided.
- data_y (list[Object], optional): A list of data labels to be divided together with the data.
- Returns:
- list[list]: A list where each element is a list of data of a group/client.
- list[list]: A list where each element is a list of data label of a group/client.
- Example:
- >>> equal_division(3, list[range(9)])
- >>> ([[0,4,2],[3,1,7],[6,5,8]], [])
- """
- if data_y is not None:
- assert (len(data_x) == len(data_y))
- data_x, data_y = shuffle(data_x, data_y)
- else:
- np.random.shuffle(data_x)
- num_of_data = len(data_x)
- assert num_of_data > 0
- data_per_client = num_of_data // num_groups
- large_group_num = num_of_data - num_groups * data_per_client
- small_group_num = num_groups - large_group_num
- splitted_data_x = []
- splitted_data_y = []
- for i in range(small_group_num):
- base_index = data_per_client * i
- splitted_data_x.append(data_x[base_index: base_index + data_per_client])
- if data_y is not None:
- splitted_data_y.append(data_y[base_index: base_index + data_per_client])
- small_size = data_per_client * small_group_num
- data_per_client += 1
- for i in range(large_group_num):
- base_index = small_size + data_per_client * i
- splitted_data_x.append(data_x[base_index: base_index + data_per_client])
- if data_y is not None:
- splitted_data_y.append(data_y[base_index: base_index + data_per_client])
- return splitted_data_x, splitted_data_y
- def quantity_hetero(weights, data_x, data_y=None):
- """Partition data into multiple clients with different quantities.
- The number of groups is the same as the number of elements of `weights`.
- The quantity of each group depends on the values of `weights`.
- Args:
- weights (list[float]): The targeted distribution of data quantities.
- The values should sum up to 1. e.g., [0.1, 0.2, 0.7].
- data_x (list[Object]): A list of elements to be divided.
- data_y (list[Object], optional): A list of data labels to be divided together with the data.
- Returns:
- list[list]: A list where each element is a list of data of a group/client.
- list[list]: A list where each element is a list of data label of a group/client.
-
- Example:
- >>> quantity_hetero([0.1, 0.2, 0.7], list(range(0, 10)))
- >>> ([[4], [8, 9], [6, 0, 1, 7, 3, 2, 5]], [])
- """
- # This is due to the float number in python,
- # e.g.sum([0.1,0.2,0.4,0.2,0.1]) is not exactly 1, but 1.0000000000000002.
- assert (round(sum(weights), 3) == 1)
- if data_y is not None:
- assert (len(data_x) == len(data_y))
- data_x, data_y = shuffle(data_x, data_y)
- else:
- np.random.shuffle(data_x)
- data_size = len(data_x)
- i = 0
- splitted_data_x = []
- splitted_data_y = []
- for w in weights:
- size = math.floor(data_size * w)
- splitted_data_x.append(data_x[i:i + size])
- if data_y is not None:
- splitted_data_y.append(data_y[i:i + size])
- i += size
- parts = len(weights)
- if i < data_size:
- remain = data_size - i
- for i in range(-remain, 0, 1):
- splitted_data_x[(-i) % parts].append(data_x[i])
- if data_y is not None:
- splitted_data_y[(-i) % parts].append(data_y[i])
- return splitted_data_x, splitted_data_y
- def iid(data_x, data_y, num_of_clients, x_dtype, y_dtype):
- """Partition dataset into multiple clients with equal data quantity (difference is less than 1) randomly.
- Args:
- data_x (list[Object]): A list of data.
- data_y (list[Object]): A list of dataset labels.
- num_of_clients (int): The number of clients to partition to.
- x_dtype (numpy.dtype): The type of data.
- y_dtype (numpy.dtype): The type of data label.
- Returns:
- list[str]: A list of client ids.
- dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
- """
- data_x, data_y = shuffle(data_x, data_y)
- x_divided_list, y_divided_list = equal_division(num_of_clients, data_x, data_y)
- clients = []
- federated_data = {}
- for i in range(num_of_clients):
- client_id = "f%07.0f" % (i)
- temp_client = {}
- temp_client['x'] = np.array(x_divided_list[i]).astype(x_dtype)
- temp_client['y'] = np.array(y_divided_list[i]).astype(y_dtype)
- federated_data[client_id] = temp_client
- clients.append(client_id)
- return clients, federated_data
- def non_iid_dirichlet(data_x, data_y, num_of_clients, alpha, min_size, x_dtype, y_dtype):
- """Partition dataset into multiple clients following the Dirichlet process.
- Args:
- data_x (list[Object]): A list of data.
- data_y (list[Object]): A list of dataset labels.
- num_of_clients (int): The number of clients to partition to.
- alpha (float): The parameter for Dirichlet process simulation.
- min_size (int): The minimum number of data size of a client.
- x_dtype (numpy.dtype): The type of data.
- y_dtype (numpy.dtype): The type of data label.
- Returns:
- list[str]: A list of client ids.
- dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
- """
- n_train = data_x.shape[0]
- current_min_size = 0
- num_class = np.amax(data_y) + 1
- data_size = data_y.shape[0]
- net_dataidx_map = {}
- while current_min_size < min_size:
- idx_batch = [[] for _ in range(num_of_clients)]
- for k in range(num_class):
- idx_k = np.where(data_y == k)[0]
- np.random.shuffle(idx_k)
- proportions = np.random.dirichlet(np.repeat(alpha, num_of_clients))
- # using the proportions from dirichlet, only selet those clients having data amount less than average
- proportions = np.array(
- [p * (len(idx_j) < data_size / num_of_clients) for p, idx_j in zip(proportions, idx_batch)])
- # scale proportions
- proportions = proportions / proportions.sum()
- proportions = (np.cumsum(proportions) * len(idx_k)).astype(int)[:-1]
- idx_batch = [idx_j + idx.tolist() for idx_j, idx in zip(idx_batch, np.split(idx_k, proportions))]
- current_min_size = min([len(idx_j) for idx_j in idx_batch])
- federated_data = {}
- clients = []
- for j in range(num_of_clients):
- np.random.shuffle(idx_batch[j])
- client_id = "f%07.0f" % j
- clients.append(client_id)
- temp = {}
- temp['x'] = np.array(data_x[idx_batch[j]]).astype(x_dtype)
- temp['y'] = np.array(data_y[idx_batch[j]]).astype(y_dtype)
- federated_data[client_id] = temp
- net_dataidx_map[client_id] = idx_batch[j]
- print_data_distribution(data_y, net_dataidx_map)
- return clients, federated_data
- def non_iid_class(data_x, data_y, class_per_client, num_of_clients, x_dtype, y_dtype, stack_x=True):
- """Partition dataset into multiple clients based on label classes.
- Each client contains [1, n] classes, where n is the number of classes of a dataset.
- Note: Each class is divided into `ceil(class_per_client * num_of_clients / num_class)` parts
- and each client chooses `class_per_client` parts from each class to construct its dataset.
- Args:
- data_x (list[Object]): A list of data.
- data_y (list[Object]): A list of dataset labels.
- class_per_client (int): The number of classes in each client.
- num_of_clients (int): The number of clients to partition to.
- x_dtype (numpy.dtype): The type of data.
- y_dtype (numpy.dtype): The type of data label.
- stack_x (bool, optional): A flag to indicate whether using np.vstack or append to construct dataset.
- Returns:
- list[str]: A list of client ids.
- dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
- """
- num_class = np.amax(data_y) + 1
- all_index = []
- clients = []
- data_index_map = {}
- for i in range(num_class):
- # get indexes for all data with current label i at index i in all_index
- all_index.append(np.where(data_y == i)[0].tolist())
- federated_data = {}
- # total no. of parts
- total_amount = class_per_client * num_of_clients
- # no. of parts each class should be diveded into
- parts_per_class = math.ceil(total_amount / num_class)
- for i in range(num_of_clients):
- client_id = "f%07.0f" % (i)
- clients.append(client_id)
- data_index_map[client_id] = []
- data = {}
- data['x'] = np.array([])
- data['y'] = np.array([])
- federated_data[client_id] = data
- class_map = {}
- parts_consumed = []
- for i in range(num_class):
- class_map[i], _ = equal_division(parts_per_class, all_index[i])
- heapq.heappush(parts_consumed, (0, i))
- for i in clients:
- for j in range(class_per_client):
- class_chosen = heapq.heappop(parts_consumed)
- part_indexes = class_map[class_chosen[1]].pop(0)
- if len(federated_data[i]['x']) != 0:
- if stack_x:
- federated_data[i]['x'] = np.vstack((federated_data[i]['x'], data_x[part_indexes])).astype(x_dtype)
- else:
- federated_data[i]['x'] = np.append(federated_data[i]['x'], data_x[part_indexes]).astype(x_dtype)
- federated_data[i]['y'] = np.append(federated_data[i]['y'], data_y[part_indexes]).astype(y_dtype)
- else:
- federated_data[i]['x'] = data_x[part_indexes].astype(x_dtype)
- federated_data[i]['y'] = data_y[part_indexes].astype(y_dtype)
- heapq.heappush(parts_consumed, (class_chosen[0] + 1, class_chosen[1]))
- data_index_map[i].extend(part_indexes)
- print_data_distribution(data_y, data_index_map)
- return clients, federated_data
- def data_simulation(data_x, data_y, num_of_clients, data_distribution, weights=None, alpha=0.5, min_size=10,
- class_per_client=1, stack_x=True):
- """Simulate federated learning datasets by partitioning a data into multiple clients using different strategies.
- Args:
- data_x (list[Object]): A list of data.
- data_y (list[Object]): A list of dataset labels.
- num_of_clients (int): The number of clients to partition to.
- data_distribution (str): The ways to partition the dataset, options:
- `iid`: Partition dataset into multiple clients with equal quantity (difference is less than 1) randomly.
- `dir`: partition dataset into multiple clients following the Dirichlet process.
- `class`: partition dataset into multiple clients based on classes.
- weights: list, for simulating data quantity heterogeneity
- If None, each client are simulated with same data quantity
- Note: num_of_clients should be divisible by len(weights)
- weights (list[float], optional): The targeted distribution of data quantities.
- The values should sum up to 1. e.g., [0.1, 0.2, 0.7].
- When `weights=None`, the data quantity of clients only depends on data_distribution.
- alpha (float, optional): The parameter for Dirichlet process simulation.
- It is only applicable when data_distribution is `dir`.
- min_size (int, optional): The minimum number of data size of a client.
- It is only applicable when data_distribution is `dir`.
- class_per_client (int): The number of classes in each client.
- It is only applicable when data_distribution is `class`.
- stack_x (bool, optional): A flag to indicate whether using np.vstack or append to construct dataset.
- It is only applicable when data_distribution is `class`.
- Raise:
- ValueError: When the simulation method `data_distribution` is not supported.
- Returns:
- list[str]: A list of client ids.
- dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
- """
- data_x = np.array(data_x)
- data_y = np.array(data_y)
- x_dtype = data_x.dtype
- y_dtype = data_y.dtype
- if weights is not None:
- assert num_of_clients % len(weights) == 0
- num_of_clients = num_of_clients // len(weights)
- if data_distribution == SIMULATE_IID:
- group_client_list, group_federated_data = iid(data_x, data_y, num_of_clients, x_dtype, y_dtype)
- elif data_distribution == SIMULATE_NIID_DIR:
- group_client_list, group_federated_data = non_iid_dirichlet(data_x, data_y, num_of_clients, alpha, min_size,
- x_dtype, y_dtype)
- elif data_distribution == SIMULATE_NIID_CLASS:
- group_client_list, group_federated_data = non_iid_class(data_x, data_y, class_per_client, num_of_clients,
- x_dtype,
- y_dtype, stack_x=stack_x)
- else:
- raise ValueError("Simulation type not supported")
- if weights is None:
- return group_client_list, group_federated_data
- clients = []
- federated_data = {}
- cur_key = 0
- for i in group_client_list:
- current_client = group_federated_data[i]
- input_lists, label_lists = quantity_hetero(weights, current_client['x'], current_client['y'])
- for j in range(len(input_lists)):
- client_id = "f%07.0f" % (cur_key)
- temp_client = {}
- temp_client['x'] = np.array(input_lists[j]).astype(x_dtype)
- temp_client['y'] = np.array(label_lists[j]).astype(y_dtype)
- federated_data[client_id] = temp_client
- clients.append(client_id)
- cur_key += 1
- return clients, federated_data
- def print_data_distribution(data_y, data_index_map):
- """Log the distribution of client datasets."""
- data_distribution = {}
- for index, dataidx in data_index_map.items():
- unique_values, counts = np.unique(data_y[dataidx], return_counts=True)
- distribution = {unique_values[i]: counts[i] for i in range(len(unique_values))}
- data_distribution[index] = distribution
- logger.info(data_distribution)
- return data_distribution
|