import os import fire import gradio as gr import torch import transformers from peft import ( PeftModel, LoraConfig, get_peft_model, get_peft_model_state_dict, prepare_model_for_int8_training, set_peft_model_state_dict, ) from transformers import GenerationConfig, LlamaForCausalLM, LlamaTokenizer,AutoTokenizer from utils.callbacks import Iteratorize, Stream from utils.prompter import Prompter if torch.cuda.is_available(): device = "cuda" else: device = "cpu" try: if torch.backends.mps.is_available(): device = "mps" except: pass def main( load_8bit: bool = False, base_model: str = "", lora_weights_path: str = "", lora_config_path: str= "", # provide only the file path, excluding the file name 'adapter_config.json' prompt_template: str = "", # The prompt template to use, will default to alpaca. server_name: str = "127.0.0.1", share_gradio: bool = False, ): base_model = base_model or os.environ.get("BASE_MODEL", "") assert ( base_model ), "Please specify a --base_model, e.g. --base_model='huggyllama/llama-7b'" prompter = Prompter(prompt_template) tokenizer = LlamaTokenizer.from_pretrained(base_model) if not lora_weights_path.endswith(".bin"): if device == "cuda": model = LlamaForCausalLM.from_pretrained( base_model, load_in_8bit=load_8bit, torch_dtype=torch.float16, device_map="auto", ) model = PeftModel.from_pretrained( model, lora_weights_path, torch_dtype=torch.float16, ) elif device == "mps": model = LlamaForCausalLM.from_pretrained( base_model, device_map={"": device}, torch_dtype=torch.float16, ) model = PeftModel.from_pretrained( model, lora_weights_path, device_map={"": device}, torch_dtype=torch.float16, ) else: model = LlamaForCausalLM.from_pretrained( base_model, device_map={"": device}, low_cpu_mem_usage=True ) model = PeftModel.from_pretrained( model, lora_weights_path, device_map={"": device}, ) else: model = LlamaForCausalLM.from_pretrained( base_model, load_in_8bit=True, torch_dtype=torch.float16, device_map="auto", ) model = prepare_model_for_int8_training(model) config = LoraConfig.from_pretrained(lora_config_path) lora_weights = torch.load(lora_weights_path) model = PeftModel(model, config) set_peft_model_state_dict(model,lora_weights,"default") del lora_weights # unwind broken decapoda-research config model.config.pad_token_id = tokenizer.pad_token_id = 0 # unk model.config.bos_token_id = 1 model.config.eos_token_id = 2 if not load_8bit: model.half() # seems to fix bugs for some users. model.eval() def evaluate( instruction, input=None, temperature=0.1, top_p=0.75, top_k=40, num_beams=4, max_new_tokens=128, stream_output=True, **kwargs, ): prompt = prompter.generate_prompt(instruction, input) inputs = tokenizer(prompt, return_tensors="pt") input_ids = inputs["input_ids"].to(device) generation_config = GenerationConfig( temperature=temperature, top_p=top_p, top_k=top_k, num_beams=num_beams, **kwargs, ) generate_params = { "input_ids": input_ids, "generation_config": generation_config, "return_dict_in_generate": True, "output_scores": True, "max_new_tokens": max_new_tokens, } if stream_output: # Stream the reply 1 token at a time. # This is based on the trick of using 'stopping_criteria' to create an iterator, # from https://github.com/oobabooga/text-generation-webui/blob/ad37f396fc8bcbab90e11ecf17c56c97bfbd4a9c/modules/text_generation.py#L216-L243. def generate_with_callback(callback=None, **kwargs): kwargs.setdefault( "stopping_criteria", transformers.StoppingCriteriaList() ) kwargs["stopping_criteria"].append( Stream(callback_func=callback) ) with torch.no_grad(): model.generate(**kwargs) def generate_with_streaming(**kwargs): return Iteratorize( generate_with_callback, kwargs, callback=None ) with generate_with_streaming(**generate_params) as generator: for output in generator: # new_tokens = len(output) - len(input_ids[0]) decoded_output = tokenizer.decode(output) if output[-1] in [tokenizer.eos_token_id]: break yield prompter.get_response(decoded_output) return # early return for stream_output # Without streaming with torch.no_grad(): generation_output = model.generate( input_ids=input_ids, generation_config=generation_config, return_dict_in_generate=True, output_scores=True, max_new_tokens=max_new_tokens, ) s = generation_output.sequences[0] output = tokenizer.decode(s) yield prompter.get_response(output) sherpherd_UI=gr.Interface( fn=evaluate, inputs=[ gr.components.Textbox( lines=2, label="Instruction", placeholder="Tell me about alpacas.", ), gr.components.Textbox(lines=2, label="Input", placeholder="none"), gr.components.Slider( minimum=0, maximum=1, value=0.1, label="Temperature" ), gr.components.Slider( minimum=0, maximum=1, value=0.75, label="Top p" ), gr.components.Slider( minimum=0, maximum=100, step=1, value=40, label="Top k" ), gr.components.Slider( minimum=1, maximum=4, step=1, value=4, label="Beams" ), gr.components.Slider( minimum=1, maximum=2000, step=1, value=128, label="Max tokens" ), gr.components.Checkbox(label="Stream output"), ], outputs=[ gr.inputs.Textbox( lines=5, label="Output", ) ], title="FederatedGPT-shepherd", description="Shepherd is a LLM that has been fine-tuned in a federated manner ", ).queue() sherpherd_UI.launch(share=True) if __name__ == "__main__": fire.Fire(main)