""" Benchmark evaluation script for telecom intent-to-config models. Evaluates on a test dataset and computes metrics: - JSON validity rate - Schema compliance (key presence) - Semantic fidelity (embedding similarity) - Per-target-layer breakdown Usage on Kaggle: python benchmark.py \ --adapter_path ./qwen2.5-7b-telecom-intent-lora \ --dataset nraptisss/TMF921-intent-to-config-augmented \ --split test \ --max_samples 100 \ --output benchmark_results.json """ import argparse import json import os import re import sys import torch from datasets import load_dataset from transformers import AutoModelForCausalLM, AutoTokenizer from peft import PeftModel from sentence_transformers import SentenceTransformer import numpy as np # ============================================================================ # CONFIGURATION # ============================================================================ BASE_MODEL = "Qwen/Qwen2.5-7B-Instruct" MAX_NEW_TOKENS = 1024 TEMPERATURE = 0.1 TOP_P = 0.95 def load_model(adapter_path: str, base_model: str): """Load base model + LoRA adapters.""" adapter_path = os.path.abspath(adapter_path) if not os.path.isdir(adapter_path): print(f"ERROR: Adapter path not found: {adapter_path}") print("Run train.py first to generate adapters.") sys.exit(1) print(f"Loading base model: {base_model}") model = AutoModelForCausalLM.from_pretrained( base_model, dtype=torch.float16, device_map="auto", trust_remote_code=True, ) print(f"Loading LoRA adapters: {adapter_path}") model = PeftModel.from_pretrained(model, adapter_path) model.eval() tokenizer = AutoTokenizer.from_pretrained( base_model, trust_remote_code=True, padding_side="left", ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token return model, tokenizer def generate_config(model, tokenizer, messages: list) -> str: """Generate config from messages list.""" prompt = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, ) inputs = tokenizer(prompt, return_tensors="pt").to(model.device) with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=MAX_NEW_TOKENS, temperature=TEMPERATURE, top_p=TOP_P, do_sample=True, pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, ) generated = tokenizer.decode(outputs[0], skip_special_tokens=True) response = generated[len(prompt):].strip() # Extract JSON from markdown code blocks json_match = re.search(r"```(?:json)?\s*(.*?)\s*```", response, re.DOTALL) if json_match: response = json_match.group(1) return response.strip() def validate_json(text: str) -> tuple[bool, dict | None]: """Try to parse as JSON.""" try: text = text.strip() start = text.find("{") end = text.rfind("}") if start != -1 and end != -1 and end > start: text = text[start:end + 1] parsed = json.loads(text) return True, parsed except json.JSONDecodeError: return False, None def check_schema_compliance(parsed: dict, target_layer: str) -> dict: """Check required keys based on target layer.""" schema_map = { "tmf921": ["intent", "intentId", "name"], "camara": ["networkSliceBooking", "sliceType"], "intent_3gpp": ["ManagedElement", "intent"], "etsi_zsm": ["intent", "serviceProfile"], "a1_policy": ["policy", "policyType"], "o1_nrm": ["ManagedElement", "GNBDUFunction"], } expected = schema_map.get(target_layer.lower(), []) present = [k for k in expected if k in parsed] missing = [k for k in expected if k not in parsed] return { "compliance_score": len(present) / max(len(expected), 1), "present_keys": present, "missing_keys": missing, } def main(): parser = argparse.ArgumentParser(description="Telecom Intent Benchmark") parser.add_argument( "--adapter_path", type=str, default="./qwen2.5-7b-telecom-intent-lora", help="Path to LoRA adapters", ) parser.add_argument( "--base_model", type=str, default=BASE_MODEL, help="Base model name", ) parser.add_argument( "--dataset", type=str, default="nraptisss/TMF921-intent-to-config-augmented", help="Dataset to evaluate on", ) parser.add_argument( "--dataset_config", type=str, default="default", help="Dataset config name", ) parser.add_argument( "--split", type=str, default="test", help="Dataset split to evaluate", ) parser.add_argument( "--max_samples", type=int, default=100, help="Max number of samples to evaluate", ) parser.add_argument( "--output", type=str, default="benchmark_results.json", help="Output file for results", ) args = parser.parse_args() # Load model model, tokenizer = load_model(args.adapter_path, args.base_model) # Load dataset print(f"\nLoading dataset: {args.dataset} ({args.split})") ds = load_dataset(args.dataset, args.dataset_config, split=args.split) if args.max_samples: ds = ds.select(range(min(args.max_samples, len(ds)))) print(f"Evaluating on {len(ds)} samples") # Load embedding model for semantic similarity try: embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") use_embedding = True print("Loaded embedding model for semantic similarity") except Exception as e: print(f"Embedding model not available ({e}), using string similarity only") use_embedding = False # Run evaluation results = [] valid_count = 0 compliance_scores = [] layer_stats = {} for i, sample in enumerate(ds): messages = sample["messages"] target_layer = sample.get("target_layer", "unknown") # Extract reference (assistant content) reference = "" for m in messages: if m.get("role") == "assistant": reference = m.get("content", "") break # Reconstruct user messages for generation gen_messages = [m for m in messages if m.get("role") != "assistant"] # Generate generated = generate_config(model, tokenizer, gen_messages) is_valid, parsed = validate_json(generated) if is_valid: valid_count += 1 schema = check_schema_compliance(parsed, target_layer) compliance_scores.append(schema["compliance_score"]) else: schema = {"compliance_score": 0.0, "present_keys": [], "missing_keys": []} # Semantic similarity semantic_sim = None if use_embedding and is_valid: ref_emb = embed_model.encode(reference, convert_to_tensor=True) gen_emb = embed_model.encode(generated, convert_to_tensor=True) semantic_sim = float(torch.cosine_similarity(ref_emb, gen_emb, dim=0)) result = { "id": sample.get("id", i), "target_layer": target_layer, "slice_type": sample.get("slice_type", "unknown"), "intent": next((m["content"] for m in messages if m.get("role") == "user"), ""), "generated": generated, "reference": reference, "json_valid": is_valid, "schema_compliance": schema, "semantic_similarity": semantic_sim, } results.append(result) # Per-layer stats if target_layer not in layer_stats: layer_stats[target_layer] = {"total": 0, "valid": 0, "compliance": []} layer_stats[target_layer]["total"] += 1 if is_valid: layer_stats[target_layer]["valid"] += 1 layer_stats[target_layer]["compliance"].append(schema["compliance_score"]) if (i + 1) % 10 == 0: print(f" Processed {i + 1}/{len(ds)} samples") # Compute summary statistics total = len(results) summary = { "total_samples": total, "json_valid_rate": valid_count / total, "avg_schema_compliance": float(np.mean(compliance_scores)) if compliance_scores else 0.0, "semantic_similarity_avg": float(np.mean([r["semantic_similarity"] for r in results if r["semantic_similarity"] is not None])) if any(r["semantic_similarity"] is not None for r in results) else None, "per_layer": {}, } for layer, stats in layer_stats.items(): summary["per_layer"][layer] = { "total": stats["total"], "valid_rate": stats["valid"] / stats["total"], "avg_compliance": float(np.mean(stats["compliance"])) if stats["compliance"] else 0.0, } # Save results output_data = {"summary": summary, "results": results} with open(args.output, "w") as f: json.dump(output_data, f, indent=2) # Print summary print(f"\n{'=' * 60}") print("BENCHMARK RESULTS") print(f"{'=' * 60}") print(f"Total samples: {summary['total_samples']}") print(f"JSON valid rate: {summary['json_valid_rate']:.1%}") print(f"Schema compliance: {summary['avg_schema_compliance']:.1%}") if summary["semantic_similarity_avg"] is not None: print(f"Semantic similarity: {summary['semantic_similarity_avg']:.3f}") print(f"\nPer-layer breakdown:") for layer, s in summary["per_layer"].items(): print(f" {layer:20s} valid={s['valid_rate']:.1%} compliance={s['avg_compliance']:.1%}") print(f"\nDetailed results saved to: {args.output}") if __name__ == "__main__": main()