clients_datasets.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import sys
  2. import pandas as pd
  3. import numpy as np
  4. import random
  5. import os
  6. import json
  7. import pdb
  8. num_clients = int(sys.argv[1])
  9. diff_quantity = int(sys.argv[2])
  10. np.random.seed(42)
  11. random.seed(42)
  12. # Divide the entire dataset into a training set and a test set.
  13. df = pd.read_json("new-databricks-dolly-15k.json", orient='records')
  14. sorted_df = df.sort_values(by=['category'])
  15. grouped = sorted_df.groupby('category')
  16. sampled_df = grouped.apply(lambda x: x.sample(n=10))
  17. sampled_df = sampled_df.reset_index(level=0, drop=True)
  18. remaining_df = sorted_df.drop(index=sampled_df.index)
  19. sampled_df = sampled_df.reset_index().drop('index', axis=1)
  20. remaining_df = remaining_df.reset_index().drop('index', axis=1)
  21. data_path = os.path.join("data", str(num_clients))
  22. os.makedirs(data_path,exist_ok=True)
  23. remaining_df_dic = remaining_df.to_dict(orient='records')
  24. with open(os.path.join(data_path, "global_training.json"), 'w') as outfile:
  25. json.dump(remaining_df_dic, outfile)
  26. sampled_df_dic = sampled_df.to_dict(orient='records')
  27. with open(os.path.join(data_path, "global_test.json"), 'w') as outfile:
  28. json.dump(sampled_df_dic, outfile)
  29. # Partition the global training data into smaller subsets for each client's local training dataset
  30. if diff_quantity:
  31. min_size = 0
  32. min_require_size = 40
  33. alpha = 0.5
  34. N = len(remaining_df)
  35. net_dataidx_map = {}
  36. category_uniques = remaining_df['category'].unique().tolist()
  37. while min_size < min_require_size:
  38. idx_partition = [[] for _ in range(num_clients)]
  39. for k in range(len(category_uniques)):
  40. category_rows_k = remaining_df.loc[remaining_df['category'] == category_uniques[k]]
  41. category_rows_k_index = category_rows_k.index.values
  42. np.random.shuffle(category_rows_k_index)
  43. proportions = np.random.dirichlet(np.repeat(alpha, num_clients))
  44. proportions = np.array([p * (len(idx_j) < N / num_clients) for p, idx_j in zip(proportions, idx_partition)])
  45. proportions = proportions / proportions.sum()
  46. proportions = (np.cumsum(proportions) * len(category_rows_k_index)).astype(int)[:-1]
  47. idx_partition = [idx_j + idx.tolist() for idx_j, idx in
  48. zip(idx_partition, np.split(category_rows_k_index, proportions))]
  49. min_size = min([len(idx_j) for idx_j in idx_partition])
  50. print(min_size)
  51. else:
  52. num_shards_per_clients = 2
  53. remaining_df_index = remaining_df.index.values
  54. shards = np.array_split(remaining_df_index, int(num_shards_per_clients * num_clients))
  55. random.shuffle(shards)
  56. shards = [shards[i:i + num_shards_per_clients] for i in range(0, len(shards), num_shards_per_clients)]
  57. idx_partition = [np.concatenate(shards[n]).tolist() for n in range(num_clients)]
  58. for client_id, idx in enumerate(idx_partition):
  59. print(
  60. "\n Generating the local training dataset of Client_{}".format(client_id)
  61. )
  62. sub_remaining_df = remaining_df.loc[idx]
  63. sub_remaining_df = sub_remaining_df.reset_index().drop('index', axis=1)
  64. sub_remaining_df_dic = sub_remaining_df.to_dict(orient='records')
  65. with open(os.path.join(data_path, "local_training_{}.json".format(client_id)), 'w') as outfile:
  66. json.dump(sub_remaining_df_dic, outfile)