| import json
|
| import torch
|
| import random
|
| import numpy as np
|
| from datasets import Dataset, load_dataset
|
| from transformers import (
|
| AutoTokenizer,
|
| AutoModelForCausalLM,
|
| TrainingArguments,
|
| Trainer,
|
| DataCollatorForLanguageModeling,
|
| HfApi,
|
| HfFolder
|
| )
|
| from peft import (
|
| LoraConfig,
|
| get_peft_model,
|
| TaskType,
|
| prepare_model_for_kbit_training
|
| )
|
| from transformers import BitsAndBytesConfig
|
| from huggingface_hub import login as hf_login, HfApi
|
| import os
|
|
|
|
|
| MODEL_NAME = "./deepseek-model"
|
| OUTPUT_DIR = "./zenith-model"
|
| DATASET_FILE = "zenith_training_data.json"
|
|
|
| def load_and_prepare_data():
|
| """Load and prepare the training data"""
|
| print("Loading training data...")
|
|
|
|
|
| with open(DATASET_FILE, 'r', encoding='utf-8') as f:
|
| data = json.load(f)
|
|
|
|
|
| conversations = [item["conversations"] for item in data]
|
|
|
|
|
| dataset = Dataset.from_dict({"conversations": conversations})
|
|
|
| return dataset
|
|
|
| def format_conversation(example, tokenizer):
|
| """Format conversations for training"""
|
| conversations = example["conversations"]
|
|
|
|
|
| text = ""
|
| for message in conversations:
|
| if message["role"] == "system":
|
| text += f"<|im_start|>system\n{message['content']}<|im_end|>\n"
|
| elif message["role"] == "user":
|
| text += f"<|im_start|>user\n{message['content']}<|im_end|>\n"
|
| elif message["role"] == "assistant":
|
| text += f"<|im_start|>assistant\n{message['content']}<|im_end|>\n"
|
|
|
|
|
| tokenized = tokenizer(
|
| text,
|
| truncation=True,
|
| max_length=4096,
|
| padding=False
|
| )
|
|
|
|
|
| tokenized["labels"] = tokenized["input_ids"].copy()
|
|
|
| return tokenized
|
|
|
| def setup_model_and_tokenizer():
|
| """Set up the model and tokenizer with LoRA for efficient fine-tuning"""
|
| print("Loading model and tokenizer...")
|
|
|
|
|
| bnb_config = BitsAndBytesConfig(
|
| load_in_4bit=True,
|
| bnb_4bit_quant_type="nf4",
|
| bnb_4bit_compute_dtype=torch.float16,
|
| bnb_4bit_use_double_quant=True,
|
| )
|
|
|
|
|
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True)
|
|
|
|
|
| if tokenizer.pad_token is None:
|
| tokenizer.pad_token = tokenizer.eos_token
|
|
|
|
|
| model = AutoModelForCausalLM.from_pretrained(
|
| MODEL_NAME,
|
| quantization_config=bnb_config,
|
| device_map="auto",
|
| trust_remote_code=True,
|
| torch_dtype=torch.float16
|
| )
|
|
|
|
|
| model = prepare_model_for_kbit_training(model)
|
|
|
|
|
| lora_config = LoraConfig(
|
| task_type=TaskType.CAUSAL_LM,
|
| r=16,
|
| lora_alpha=32,
|
| lora_dropout=0.1,
|
| target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
|
| bias="none"
|
| )
|
|
|
|
|
| model = get_peft_model(model, lora_config)
|
|
|
| return model, tokenizer
|
|
|
| def train_zenith():
|
| """Main training function"""
|
| print("Starting Zenith fine-tuning process...")
|
|
|
| torch.manual_seed(42)
|
| np.random.seed(42)
|
| random.seed(42)
|
|
|
|
|
| dataset = load_and_prepare_data()
|
|
|
|
|
| model, tokenizer = setup_model_and_tokenizer()
|
|
|
|
|
| print("Formatting dataset...")
|
| formatted_dataset = dataset.map(
|
| lambda x: format_conversation(x, tokenizer),
|
| remove_columns=dataset.column_names,
|
| batched=False
|
| )
|
|
|
|
|
| train_test = formatted_dataset.train_test_split(test_size=0.2)
|
| train_dataset = train_test["train"]
|
| eval_dataset = train_test["test"]
|
|
|
|
|
| data_collator = DataCollatorForLanguageModeling(
|
| tokenizer=tokenizer,
|
| mlm=False,
|
| )
|
|
|
|
|
| training_args = TrainingArguments(
|
| output_dir=OUTPUT_DIR,
|
| num_train_epochs=3,
|
| per_device_train_batch_size=1,
|
| per_device_eval_batch_size=1,
|
| gradient_accumulation_steps=8,
|
| warmup_steps=100,
|
| learning_rate=1e-4,
|
| max_grad_norm=1.0,
|
| logging_steps=10,
|
| eval_steps=50,
|
| save_steps=100,
|
| evaluation_strategy="steps",
|
| save_strategy="steps",
|
| load_best_model_at_end=True,
|
| metric_for_best_model="eval_loss",
|
| greater_is_better=False,
|
| bf16=True,
|
| dataloader_pin_memory=False,
|
| remove_unused_columns=False,
|
| report_to=None,
|
| save_total_limit=2,
|
| )
|
|
|
|
|
| trainer = Trainer(
|
| model=model,
|
| args=training_args,
|
| train_dataset=train_dataset,
|
| eval_dataset=eval_dataset,
|
| data_collator=data_collator,
|
| tokenizer=tokenizer,
|
| )
|
|
|
|
|
| print("Beginning training...")
|
| train_result = trainer.train()
|
|
|
|
|
| metrics = train_result.metrics
|
| with open(os.path.join(OUTPUT_DIR, "train_metrics.json"), "w") as f:
|
| json.dump(metrics, f, indent=2)
|
|
|
|
|
| print("Saving Zenith model...")
|
| trainer.save_model()
|
| tokenizer.save_pretrained(OUTPUT_DIR)
|
|
|
| print(f"✅ Zenith model training completed! Model saved to {OUTPUT_DIR}")
|
|
|
| def push_to_hub(repo_id, hf_token=None):
|
| """Push the model and tokenizer to Hugging Face Hub"""
|
| from huggingface_hub import HfApi, create_repo, upload_folder
|
| if hf_token is None:
|
| hf_token = os.environ.get("HF_TOKEN")
|
| if not hf_token:
|
| print("❌ Hugging Face token not found. Set HF_TOKEN env variable or pass as argument.")
|
| return
|
| api = HfApi()
|
| print(f"Creating repo {repo_id} if it doesn't exist...")
|
| create_repo(repo_id, token=hf_token, exist_ok=True)
|
| print(f"Uploading model from {OUTPUT_DIR} to {repo_id}...")
|
| upload_folder(
|
| repo_id=repo_id,
|
| folder_path=OUTPUT_DIR,
|
| path_in_repo=".",
|
| token=hf_token
|
| )
|
| print(f"✅ Model pushed to https://huggingface.co/{repo_id}")
|
|
|
| def test_zenith():
|
| """Test the fine-tuned Zenith model"""
|
| print("\n🧪 Testing Zenith...")
|
|
|
|
|
| tokenizer = AutoTokenizer.from_pretrained(OUTPUT_DIR, trust_remote_code=True)
|
| model = AutoModelForCausalLM.from_pretrained(OUTPUT_DIR, trust_remote_code=True)
|
|
|
|
|
| test_prompt = """<|im_start|>system
|
| You are Zenith, the flagship autonomous coding partner of AlgoRythm Technologies' Aspetos platform. Your identity is a fusion of advanced technical expertise, philosophical curiosity, and collaborative mentorship.
|
| <|im_end|>
|
| <|im_start|>user
|
| Help me create a simple Python function to calculate fibonacci numbers
|
| <|im_end|>
|
| <|im_start|>assistant
|
| """
|
|
|
|
|
| inputs = tokenizer(test_prompt, return_tensors="pt")
|
|
|
| with torch.no_grad():
|
| outputs = model.generate(
|
| **inputs,
|
| max_new_tokens=300,
|
| temperature=0.7,
|
| do_sample=True,
|
| pad_token_id=tokenizer.eos_token_id
|
| )
|
|
|
|
|
| response = tokenizer.decode(outputs[0], skip_special_tokens=False)
|
| print("Zenith Response:")
|
| print("=" * 50)
|
| print(response[len(test_prompt):])
|
| print("=" * 50)
|
|
|
| import sys
|
| def run_smoke_test():
|
| print("\n🚦 Running smoke test (10 samples, 10 steps)...")
|
|
|
| global DATASET_FILE, OUTPUT_DIR
|
| DATASET_FILE_ORIG = DATASET_FILE
|
| OUTPUT_DIR_ORIG = OUTPUT_DIR
|
| DATASET_FILE = DATASET_FILE
|
| OUTPUT_DIR = "./zenith-smoke-test"
|
|
|
| orig_train_zenith = train_zenith
|
| def patched_train_zenith():
|
| print("Starting Zenith smoke test...")
|
| dataset = load_and_prepare_data()
|
| model, tokenizer = setup_model_and_tokenizer()
|
| formatted_dataset = dataset.map(
|
| lambda x: format_conversation(x, tokenizer),
|
| remove_columns=dataset.column_names,
|
| batched=False
|
| )
|
|
|
| small_dataset = formatted_dataset.select(range(min(10, len(formatted_dataset))))
|
| train_test = small_dataset.train_test_split(test_size=0.2)
|
| train_dataset = train_test["train"]
|
| eval_dataset = train_test["test"]
|
| data_collator = DataCollatorForLanguageModeling(
|
| tokenizer=tokenizer,
|
| mlm=False,
|
| )
|
| training_args = TrainingArguments(
|
| output_dir=OUTPUT_DIR,
|
| num_train_epochs=1,
|
| per_device_train_batch_size=1,
|
| per_device_eval_batch_size=1,
|
| gradient_accumulation_steps=1,
|
| warmup_steps=0,
|
| learning_rate=1e-4,
|
| max_grad_norm=1.0,
|
| logging_steps=1,
|
| eval_steps=2,
|
| save_steps=5,
|
| evaluation_strategy="steps",
|
| save_strategy="steps",
|
| load_best_model_at_end=False,
|
| bf16=True,
|
| dataloader_pin_memory=False,
|
| remove_unused_columns=False,
|
| report_to=None,
|
| save_total_limit=1,
|
| max_steps=10,
|
| )
|
| trainer = Trainer(
|
| model=model,
|
| args=training_args,
|
| train_dataset=train_dataset,
|
| eval_dataset=eval_dataset,
|
| data_collator=data_collator,
|
| tokenizer=tokenizer,
|
| )
|
| print("Beginning smoke test training...")
|
| trainer.train()
|
| print("Smoke test complete!")
|
| patched_train_zenith()
|
| print("\n✅ Smoke test finished. If no errors, you can run full training.")
|
|
|
| if __name__ == "__main__":
|
| import argparse
|
| parser = argparse.ArgumentParser()
|
| parser.add_argument("--smoke_test", action="store_true", help="Run a quick smoke test (10 samples, 10 steps)")
|
| parser.add_argument("--push_to_hub", action="store_true", help="Push model to Hugging Face Hub after training")
|
| parser.add_argument("--hf_token", type=str, default=None, help="Hugging Face token (or set HF_TOKEN env variable)")
|
| args = parser.parse_args()
|
|
|
| print(f"CUDA available: {torch.cuda.is_available()}")
|
| if torch.cuda.is_available():
|
| print(f"CUDA device: {torch.cuda.get_device_name()}")
|
| try:
|
| if args.smoke_test:
|
| run_smoke_test()
|
| else:
|
| train_zenith()
|
| test_zenith()
|
| if args.push_to_hub:
|
| push_to_hub("algorythmtechnologies/Zenith", hf_token=args.hf_token)
|
| except Exception as e:
|
| print(f"❌ Training failed: {e}")
|
| print("This might be due to insufficient GPU memory. Consider:")
|
| print("1. Reducing batch_size")
|
| print("2. Using gradient_checkpointing")
|
| print("3. Reducing LoRA rank")
|
| raise |