| """test_conversational_neo_manual.py - Manual generation for NeoMini model"""
|
|
|
| from model_neo import NeoMini, NeoMiniConfig
|
| from transformers import AutoTokenizer
|
| import torch
|
| import torch.nn.functional as F
|
| import json
|
|
|
| def load_conversational_model(model_path="conversational_neo_extended"):
|
| """Load the fine-tuned conversational model"""
|
|
|
| print("Loading fine-tuned conversational model...")
|
|
|
|
|
| try:
|
| with open(f"{model_path}/model_config.json", 'r') as f:
|
| model_config = json.load(f)
|
| max_seq_len = model_config.get('max_seq_len', 4096)
|
| except:
|
| max_seq_len = 4096
|
|
|
|
|
| config = NeoMiniConfig()
|
| config.max_seq_len = max_seq_len
|
|
|
| model = NeoMini(config)
|
| checkpoint = torch.load(f"{model_path}/conversational_model.pt", map_location='cpu')
|
| model.load_state_dict(checkpoint['model_state_dict'])
|
|
|
|
|
| tokenizer = AutoTokenizer.from_pretrained(model_path)
|
|
|
| model.eval()
|
|
|
|
|
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
| model = model.to(device)
|
|
|
| print(f"β
Model loaded with {max_seq_len} token context window on {device}")
|
| return model, tokenizer, device
|
|
|
| def generate_response(model, tokenizer, prompt, device, max_new_tokens=200, temperature=0.8, top_k=50, top_p=0.9):
|
| """Manual text generation for NeoMini model"""
|
|
|
|
|
| input_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)
|
| original_length = input_ids.shape[1]
|
|
|
|
|
| with torch.no_grad():
|
| for step in range(max_new_tokens):
|
|
|
| logits = model(input_ids)
|
|
|
|
|
| next_token_logits = logits[0, -1, :] / temperature
|
|
|
|
|
| if top_k > 0:
|
| top_k_logits, top_k_indices = torch.topk(next_token_logits, top_k)
|
| next_token_logits = torch.full_like(next_token_logits, float('-inf'))
|
| next_token_logits.scatter_(0, top_k_indices, top_k_logits)
|
|
|
|
|
| if top_p < 1.0:
|
| sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True)
|
| cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)
|
|
|
|
|
| sorted_indices_to_remove = cumulative_probs > top_p
|
| sorted_indices_to_remove[1:] = sorted_indices_to_remove[:-1].clone()
|
| sorted_indices_to_remove[0] = 0
|
|
|
| indices_to_remove = sorted_indices[sorted_indices_to_remove]
|
| next_token_logits[indices_to_remove] = float('-inf')
|
|
|
|
|
| probs = F.softmax(next_token_logits, dim=-1)
|
| next_token = torch.multinomial(probs, num_samples=1)
|
|
|
|
|
| input_ids = torch.cat([input_ids, next_token.unsqueeze(0)], dim=1)
|
|
|
|
|
| if next_token.item() == tokenizer.eos_token_id:
|
| break
|
|
|
| if input_ids.shape[1] >= model.config.max_seq_len:
|
| break
|
|
|
|
|
| generated_tokens = input_ids[0, original_length:]
|
| generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True)
|
|
|
| return generated_text.strip()
|
|
|
| def chat_with_model(model, tokenizer, device):
|
| """Interactive chat with the conversational model"""
|
|
|
| print("\nπ€ MAP-NEO Conversational AI (Fine-Tuned)")
|
| print("Type 'quit' to exit, 'clear' to clear history, 'test' for quality tests")
|
| print("="*70)
|
|
|
| conversation_history = []
|
| system_prompt = "You are MAP-NEO, a helpful, harmless, and honest AI assistant. Engage in natural conversation and provide thoughtful, accurate responses."
|
|
|
| while True:
|
| user_input = input("\nπ§ You: ").strip()
|
|
|
| if user_input.lower() in ['quit', 'exit']:
|
| print("π Goodbye!")
|
| break
|
|
|
| if user_input.lower() == 'clear':
|
| conversation_history = []
|
| print("π Conversation cleared!")
|
| continue
|
|
|
| if user_input.lower() == 'test':
|
| test_model_quality(model, tokenizer, device)
|
| continue
|
|
|
| if not user_input:
|
| continue
|
|
|
|
|
| conversation_history.append(f"User: {user_input}")
|
|
|
|
|
| recent_context = conversation_history[-10:]
|
| context = "\n".join(recent_context)
|
|
|
| prompt = f"{system_prompt}\n\n{context}\nAssistant:"
|
|
|
|
|
| prompt_tokens = tokenizer.encode(prompt)
|
| if len(prompt_tokens) > 1800:
|
| recent_context = conversation_history[-6:]
|
| context = "\n".join(recent_context)
|
| prompt = f"{system_prompt}\n\n{context}\nAssistant:"
|
|
|
| print("π€ MAP-NEO: ", end="", flush=True)
|
|
|
|
|
| try:
|
| assistant_response = generate_response(
|
| model, tokenizer, prompt, device,
|
| max_new_tokens=150,
|
| temperature=0.8,
|
| top_k=50,
|
| top_p=0.9
|
| )
|
|
|
|
|
| if assistant_response.startswith("Assistant:"):
|
| assistant_response = assistant_response[10:].strip()
|
|
|
| print(assistant_response)
|
|
|
|
|
| conversation_history.append(f"Assistant: {assistant_response}")
|
|
|
|
|
| total_tokens = len(tokenizer.encode(prompt + assistant_response))
|
| print(f" π Tokens: {total_tokens}/4096 ({total_tokens/4096*100:.1f}%)")
|
|
|
| except Exception as e:
|
| print(f"β Error generating response: {e}")
|
| print("Try again with a different prompt.")
|
|
|
| def test_model_quality(model, tokenizer, device):
|
| """Test model quality with sample prompts"""
|
|
|
| print("\nπ§ͺ Testing Model Quality...")
|
| print("="*60)
|
|
|
| test_prompts = [
|
| "Hello! Can you help me understand machine learning?",
|
| "What's the difference between AI and machine learning?",
|
| "I'm feeling stressed about work. Any advice?",
|
| "Can you write a short story about a robot?",
|
| "Explain quantum physics in simple terms.",
|
| "How do I make a good cup of coffee?",
|
| "What are the benefits of exercise?"
|
| ]
|
|
|
| system_prompt = "You are MAP-NEO, a helpful, harmless, and honest AI assistant. Engage in natural conversation and provide thoughtful, accurate responses."
|
|
|
| for i, user_prompt in enumerate(test_prompts[:5], 1):
|
| print(f"\n--- Test {i}/5 ---")
|
| print(f"π§ User: {user_prompt}")
|
|
|
| prompt = f"{system_prompt}\n\nUser: {user_prompt}\nAssistant:"
|
|
|
| try:
|
| assistant_response = generate_response(
|
| model, tokenizer, prompt, device,
|
| max_new_tokens=120,
|
| temperature=0.7,
|
| top_k=50,
|
| top_p=0.9
|
| )
|
|
|
| print(f"π€ MAP-NEO: {assistant_response}")
|
|
|
| except Exception as e:
|
| print(f"β Error: {e}")
|
|
|
| print(f"\nβ
Quality tests completed!")
|
|
|
| def compare_before_after(model, tokenizer, device):
|
| """Compare responses before and after fine-tuning"""
|
|
|
| print("\nπ Before vs After Fine-Tuning Comparison")
|
| print("="*60)
|
|
|
|
|
| try:
|
| print("Loading original model for comparison...")
|
| original_config = NeoMiniConfig()
|
| original_model = NeoMini(original_config)
|
| original_checkpoint = torch.load('checkpoints/checkpoint_step_99999.pt', map_location='cpu')
|
| original_model.load_state_dict(original_checkpoint['model_state_dict'])
|
| original_model.eval().to(device)
|
|
|
| test_prompt = "Hello! Can you help me learn about artificial intelligence?"
|
| prompt = f"You are MAP-NEO, a helpful AI assistant.\n\nUser: {test_prompt}\nAssistant:"
|
|
|
|
|
| print(f"\nπ§ User: {test_prompt}")
|
| print("\nπ€ Original Model:")
|
| original_response = generate_response(original_model, tokenizer, prompt, device, max_new_tokens=100, temperature=0.7)
|
| print(original_response)
|
|
|
|
|
| print("\nπ€ Fine-Tuned Model:")
|
| finetuned_response = generate_response(model, tokenizer, prompt, device, max_new_tokens=100, temperature=0.7)
|
| print(finetuned_response)
|
|
|
| print("\nπ The fine-tuned model should be much more conversational and helpful!")
|
|
|
| except Exception as e:
|
| print(f"Comparison unavailable: {e}")
|
|
|
| if __name__ == "__main__":
|
| print("π MAP-NEO Conversational AI Testing Suite")
|
| print("="*60)
|
|
|
|
|
| model, tokenizer, device = load_conversational_model()
|
|
|
|
|
| test_model_quality(model, tokenizer, device)
|
|
|
|
|
| compare_before_after(model, tokenizer, device)
|
|
|
| print("\n" + "="*70)
|
| print("π Ready for interactive conversation!")
|
| print("="*70)
|
|
|
|
|
| chat_with_model(model, tokenizer, device)
|
|
|