瀏覽代碼

feat: add comments

shellmiao 1 年之前
父節點
當前提交
cd47153881
共有 5 個文件被更改,包括 42 次插入26 次删除
  1. 11 4
      GlobalModel_generated.py
  2. 8 13
      client_data_allocation.py
  3. 2 0
      fed_utils/client.py
  4. 9 3
      fed_utils/model_aggregation.py
  5. 12 6
      main.py

+ 11 - 4
GlobalModel_generated.py

@@ -17,6 +17,8 @@ from peft import (
 from transformers import GenerationConfig, LlamaForCausalLM, LlamaTokenizer,AutoTokenizer
 from utils.callbacks import Iteratorize, Stream
 from utils.prompter import Prompter
+
+# 检查设备是否可用,决定使用CPU还是GPU,或者使用mps(如果可用)
 if torch.cuda.is_available():
     device = "cuda"
 else:
@@ -38,6 +40,7 @@ def main(
     server_name: str = "127.0.0.1",
     share_gradio: bool = False,
 ):
+    # 从命令行参数或环境变量获取基础模型名称
     base_model = base_model or os.environ.get("BASE_MODEL", "")
     assert (
         base_model
@@ -95,16 +98,18 @@ def main(
 
 
     # unwind broken decapoda-research config
+    # 进行模型配置修正
     model.config.pad_token_id = tokenizer.pad_token_id = 0  # unk
     model.config.bos_token_id = 1
     model.config.eos_token_id = 2
 
+    # 为某些用户修复错误
     if not load_8bit:
         model.half()  # seems to fix bugs for some users.
-
+    # 设置模型为评估模式
     model.eval()
 
-
+    # 定义评估函数,将输入的文本转换为模型可理解的形式,然后生成响应
     def evaluate(
         instruction,
         input=None,
@@ -116,9 +121,11 @@ def main(
         stream_output=True,
         **kwargs,
     ):
+        # 生成提示,并将其转换为模型所需的输入格式
         prompt = prompter.generate_prompt(instruction, input)
         inputs = tokenizer(prompt, return_tensors="pt")
         input_ids = inputs["input_ids"].to(device)
+        # 配置生成参数
         generation_config = GenerationConfig(
             temperature=temperature,
             top_p=top_p,
@@ -134,7 +141,7 @@ def main(
             "output_scores": True,
             "max_new_tokens": max_new_tokens,
         }
-
+        # 如果stream_output为True,则以流的方式生成和返回响应
         if stream_output:
             # Stream the reply 1 token at a time.
             # This is based on the trick of using 'stopping_criteria' to create an iterator,
@@ -154,7 +161,7 @@ def main(
                 return Iteratorize(
                     generate_with_callback, kwargs, callback=None
                 )
-
+            # 使用迭代器方式生成响应
             with generate_with_streaming(**generate_params) as generator:
                 for output in generator:
                     # new_tokens = len(output) - len(input_ids[0])

+ 8 - 13
client_data_allocation.py

@@ -6,14 +6,17 @@ import os
 import json
 import pdb
 
+# 从命令行参数中获取客户端数量和差异数量
 num_clients = int(sys.argv[1])
 diff_quantity = int(sys.argv[2])
 
+# 设置随机数种子,保证结果的可重复性
 np.random.seed(42)
 random.seed(42)
 
-# Divide the entire dataset into a training set and a test set.
 
+# 从JSON文件中读取数据集,并对数据进行排序和分组
+# 然后从每个组中随机抽样10个样本作为测试集,其余的作为训练集
 df = pd.read_json("new-databricks-dolly-15k.json", orient='records')
 sorted_df = df.sort_values(by=['category'])
 grouped = sorted_df.groupby('category')
@@ -21,32 +24,28 @@ sampled_df = grouped.apply(lambda x: x.sample(n=10))
 sampled_df = sampled_df.reset_index(level=0, drop=True)
 remaining_df = sorted_df.drop(index=sampled_df.index)
 
+# 重置索引并保存测试集和训练集为JSON文件
 sampled_df = sampled_df.reset_index().drop('index', axis=1)
 remaining_df = remaining_df.reset_index().drop('index', axis=1)
 data_path = os.path.join("data", str(num_clients))
-
 os.makedirs(data_path,exist_ok=True)
-
 remaining_df_dic = remaining_df.to_dict(orient='records')
 with open(os.path.join(data_path, "global_training.json"), 'w') as outfile:
     json.dump(remaining_df_dic, outfile)
-
 sampled_df_dic = sampled_df.to_dict(orient='records')
 with open(os.path.join(data_path, "global_test.json"), 'w') as outfile:
     json.dump(sampled_df_dic, outfile)
 
-# Partition the global training data into smaller subsets for each client's local training dataset
-
+# 如果diff_quantity为True,则按类别生成不同数量的训练集子集
+# 否则,均匀地将训练集分成num_clients个子集
 if diff_quantity:
     min_size = 0
     min_require_size = 40
     alpha = 0.5
-
     N = len(remaining_df)
     net_dataidx_map = {}
     category_uniques = remaining_df['category'].unique().tolist()
     while min_size < min_require_size:
-
         idx_partition = [[] for _ in range(num_clients)]
         for k in range(len(category_uniques)):
             category_rows_k = remaining_df.loc[remaining_df['category'] == category_uniques[k]]
@@ -59,20 +58,16 @@ if diff_quantity:
             idx_partition = [idx_j + idx.tolist() for idx_j, idx in
                              zip(idx_partition, np.split(category_rows_k_index, proportions))]
             min_size = min([len(idx_j) for idx_j in idx_partition])
-
         print(min_size)
-
-
 else:
     num_shards_per_clients = 2
     remaining_df_index = remaining_df.index.values
     shards = np.array_split(remaining_df_index, int(num_shards_per_clients * num_clients))
     random.shuffle(shards)
-
     shards = [shards[i:i + num_shards_per_clients] for i in range(0, len(shards), num_shards_per_clients)]
     idx_partition = [np.concatenate(shards[n]).tolist() for n in range(num_clients)]
 
-
+# 为每个客户端生成本地训练数据集,并保存为JSON文件
 for client_id, idx in enumerate(idx_partition):
     print(
         "\n Generating the local training dataset of Client_{}".format(client_id)

+ 2 - 0
fed_utils/client.py

@@ -74,11 +74,13 @@ class GeneralClient:
 
     def initiate_local_training(self):
         self.model.config.use_cache = False
+        # 获取模型的参数
         self.params_dict_old = copy.deepcopy(
             OrderedDict((name, param.detach()) for name, param in self.model.named_parameters() if
                         "default" in name))
         self.params_dict_new = OrderedDict((name, param.detach()) for name, param in self.model.named_parameters() if
                                            "default" in name)
+        # 设置模型的 state_dict 方法
         self.model.state_dict = (
             lambda instance, *_, **__: get_peft_model_state_dict(
                 instance, self.params_dict_new, "default"

+ 9 - 3
fed_utils/model_aggregation.py

@@ -5,17 +5,23 @@ import torch
 import os
 from torch.nn.functional import normalize
 
-
+# 联邦平均算法
 def FedAvg(model, selected_clients_set, output_dir, local_dataset_len_dict, epoch):
+    # 对各个客户端的本地数据集大小进行归一化,作为权重的基础
+    # 这里将每个客户端的数据集大小转换为张量,并按照第0维进行 L1 归一化
     weights_array = normalize(
         torch.tensor([local_dataset_len_dict[client_id] for client_id in selected_clients_set],
                      dtype=torch.float32),
         p=1, dim=0)
-
+    # 遍历选定的客户端集合
     for k, client_id in enumerate(selected_clients_set):
+        # 对每个选定的客户端,加载其训练得到的模型权重
+        # 构造每个客户端权重的文件路径,并加载权重
         single_output_dir = os.path.join(output_dir, str(epoch), "local_output_{}".format(client_id),
                                          "pytorch_model.bin")
         single_weights = torch.load(single_output_dir)
+        # 如果是第一个客户端,则将其权重乘以对应的归一化权重
+        # 否则,将其权重乘以归一化权重后累加到总权重上
         if k == 0:
             weighted_single_weights = {key: single_weights[key] * (weights_array[k]) for key in
                                        single_weights.keys()}
@@ -23,7 +29,7 @@ def FedAvg(model, selected_clients_set, output_dir, local_dataset_len_dict, epoc
             weighted_single_weights = {key: weighted_single_weights[key] + single_weights[key] * (weights_array[k])
                                        for key in
                                        single_weights.keys()}
-
+    # 使用计算得到的加权平均权重,设置全局模型的状态
     set_peft_model_state_dict(model, weighted_single_weights, "default")
 
     return model

+ 12 - 6
main.py

@@ -15,7 +15,7 @@ from utils.prompter import Prompter
 
 datasets.utils.logging.set_verbosity_error()
 
-
+# 定义 federated learning 微调函数
 def fl_finetune(
         # model/data params
         global_model: str = '',
@@ -81,6 +81,7 @@ def fl_finetune(
     assert (os.path.exists(data_path), "Please generate the data files for each client")
 
     # set up the global model & toknizer
+    # 设置全局模型和分词器
     gradient_accumulation_steps = local_batch_size // local_micro_batch_size
     prompter = Prompter(prompt_template_name)
     device_map = "auto"
@@ -90,13 +91,14 @@ def fl_finetune(
         device_map = {"": int(os.environ.get("LOCAL_RANK") or 0)}
         gradient_accumulation_steps = gradient_accumulation_steps // world_size
 
+    # 从预训练模型加载模型
     model = LlamaForCausalLM.from_pretrained(
         global_model,
         load_in_8bit=True,
         torch_dtype=torch.float16,
         device_map=device_map,
     )
-
+    # 从预训练模型加载分词器
     tokenizer = LlamaTokenizer.from_pretrained(global_model)
     tokenizer.pad_token_id = (
         0
@@ -143,8 +145,9 @@ def fl_finetune(
                                                                     user_prompt_len:
                                                                     ]  # could be sped up, probably
         return tokenized_full_prompt
-
+    # 准备模型进行 int8 训练
     model = prepare_model_for_int8_training(model)
+    # 设置 LoRA 配置
     config = LoraConfig(
         r=lora_r,
         lora_alpha=lora_alpha,
@@ -153,17 +156,19 @@ def fl_finetune(
         bias="none",
         task_type="CAUSAL_LM",
     )
+    # 获取 peft 模型
     model = get_peft_model(model, config)
+    # 判断是否使用数据并行
     if not ddp and torch.cuda.device_count() > 1:
         model.is_parallelizable = True
         model.model_parallel = True
-
+    # 开始联邦训练过程
     print("The process of federated instruction-tuning has started..")
     previously_selected_clients_set = set()
     last_client_id = None
     local_dataset_len_dict = dict()
     output_dir = os.path.join(output_dir, str(num_clients))
-
+    # 进行多轮联邦训练
     for epoch in tqdm(range(num_communication_rounds)):
 
         print("\nConducting the client selection")
@@ -193,7 +198,7 @@ def fl_finetune(
             model, local_dataset_len_dict, previously_selected_clients_set, last_client_id = client.terminate_local_training(
                 epoch, local_dataset_len_dict, previously_selected_clients_set)
             del client
-
+        # 收集客户权重并进行聚合
         print("Collecting the weights of clients and performing aggregation")
         model = FedAvg(model,
                        selected_clients_set,
@@ -201,6 +206,7 @@ def fl_finetune(
                        local_dataset_len_dict,
                        epoch,
                        )
+        # 保存模型状态
         torch.save(model.state_dict(), os.path.join(output_dir, str(epoch), "adapter_model.bin"))
         config.save_pretrained(output_dir)