| """ |
| Benchmark evaluation script for telecom intent-to-config models. |
| Evaluates on a test dataset and computes metrics: |
| - JSON validity rate |
| - Schema compliance (key presence) |
| - Semantic fidelity (embedding similarity) |
| - Per-target-layer breakdown |
| |
| Usage on Kaggle: |
| python benchmark.py \ |
| --adapter_path ./qwen2.5-7b-telecom-intent-lora \ |
| --dataset nraptisss/TMF921-intent-to-config-augmented \ |
| --split test \ |
| --max_samples 100 \ |
| --output benchmark_results.json |
| """ |
|
|
| import argparse |
| import json |
| import os |
| import re |
| import sys |
|
|
| import torch |
| from datasets import load_dataset |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| from peft import PeftModel |
| from sentence_transformers import SentenceTransformer |
| import numpy as np |
|
|
| |
| |
| |
|
|
| BASE_MODEL = "Qwen/Qwen2.5-7B-Instruct" |
| MAX_NEW_TOKENS = 1024 |
| TEMPERATURE = 0.1 |
| TOP_P = 0.95 |
|
|
|
|
| def load_model(adapter_path: str, base_model: str): |
| """Load base model + LoRA adapters.""" |
| adapter_path = os.path.abspath(adapter_path) |
| if not os.path.isdir(adapter_path): |
| print(f"ERROR: Adapter path not found: {adapter_path}") |
| print("Run train.py first to generate adapters.") |
| sys.exit(1) |
|
|
| print(f"Loading base model: {base_model}") |
| model = AutoModelForCausalLM.from_pretrained( |
| base_model, |
| dtype=torch.float16, |
| device_map="auto", |
| trust_remote_code=True, |
| ) |
| print(f"Loading LoRA adapters: {adapter_path}") |
| model = PeftModel.from_pretrained(model, adapter_path) |
| model.eval() |
|
|
| tokenizer = AutoTokenizer.from_pretrained( |
| base_model, |
| trust_remote_code=True, |
| padding_side="left", |
| ) |
| if tokenizer.pad_token is None: |
| tokenizer.pad_token = tokenizer.eos_token |
|
|
| return model, tokenizer |
|
|
|
|
| def generate_config(model, tokenizer, messages: list) -> str: |
| """Generate config from messages list.""" |
| prompt = tokenizer.apply_chat_template( |
| messages, |
| tokenize=False, |
| add_generation_prompt=True, |
| ) |
| inputs = tokenizer(prompt, return_tensors="pt").to(model.device) |
|
|
| with torch.no_grad(): |
| outputs = model.generate( |
| **inputs, |
| max_new_tokens=MAX_NEW_TOKENS, |
| temperature=TEMPERATURE, |
| top_p=TOP_P, |
| do_sample=True, |
| pad_token_id=tokenizer.pad_token_id, |
| eos_token_id=tokenizer.eos_token_id, |
| ) |
|
|
| generated = tokenizer.decode(outputs[0], skip_special_tokens=True) |
| response = generated[len(prompt):].strip() |
|
|
| |
| json_match = re.search(r"```(?:json)?\s*(.*?)\s*```", response, re.DOTALL) |
| if json_match: |
| response = json_match.group(1) |
|
|
| return response.strip() |
|
|
|
|
| def validate_json(text: str) -> tuple[bool, dict | None]: |
| """Try to parse as JSON.""" |
| try: |
| text = text.strip() |
| start = text.find("{") |
| end = text.rfind("}") |
| if start != -1 and end != -1 and end > start: |
| text = text[start:end + 1] |
| parsed = json.loads(text) |
| return True, parsed |
| except json.JSONDecodeError: |
| return False, None |
|
|
|
|
| def check_schema_compliance(parsed: dict, target_layer: str) -> dict: |
| """Check required keys based on target layer.""" |
| schema_map = { |
| "tmf921": ["intent", "intentId", "name"], |
| "camara": ["networkSliceBooking", "sliceType"], |
| "intent_3gpp": ["ManagedElement", "intent"], |
| "etsi_zsm": ["intent", "serviceProfile"], |
| "a1_policy": ["policy", "policyType"], |
| "o1_nrm": ["ManagedElement", "GNBDUFunction"], |
| } |
|
|
| expected = schema_map.get(target_layer.lower(), []) |
| present = [k for k in expected if k in parsed] |
| missing = [k for k in expected if k not in parsed] |
|
|
| return { |
| "compliance_score": len(present) / max(len(expected), 1), |
| "present_keys": present, |
| "missing_keys": missing, |
| } |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Telecom Intent Benchmark") |
| parser.add_argument( |
| "--adapter_path", |
| type=str, |
| default="./qwen2.5-7b-telecom-intent-lora", |
| help="Path to LoRA adapters", |
| ) |
| parser.add_argument( |
| "--base_model", |
| type=str, |
| default=BASE_MODEL, |
| help="Base model name", |
| ) |
| parser.add_argument( |
| "--dataset", |
| type=str, |
| default="nraptisss/TMF921-intent-to-config-augmented", |
| help="Dataset to evaluate on", |
| ) |
| parser.add_argument( |
| "--dataset_config", |
| type=str, |
| default="default", |
| help="Dataset config name", |
| ) |
| parser.add_argument( |
| "--split", |
| type=str, |
| default="test", |
| help="Dataset split to evaluate", |
| ) |
| parser.add_argument( |
| "--max_samples", |
| type=int, |
| default=100, |
| help="Max number of samples to evaluate", |
| ) |
| parser.add_argument( |
| "--output", |
| type=str, |
| default="benchmark_results.json", |
| help="Output file for results", |
| ) |
| args = parser.parse_args() |
|
|
| |
| model, tokenizer = load_model(args.adapter_path, args.base_model) |
|
|
| |
| print(f"\nLoading dataset: {args.dataset} ({args.split})") |
| ds = load_dataset(args.dataset, args.dataset_config, split=args.split) |
| if args.max_samples: |
| ds = ds.select(range(min(args.max_samples, len(ds)))) |
| print(f"Evaluating on {len(ds)} samples") |
|
|
| |
| try: |
| embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") |
| use_embedding = True |
| print("Loaded embedding model for semantic similarity") |
| except Exception as e: |
| print(f"Embedding model not available ({e}), using string similarity only") |
| use_embedding = False |
|
|
| |
| results = [] |
| valid_count = 0 |
| compliance_scores = [] |
| layer_stats = {} |
|
|
| for i, sample in enumerate(ds): |
| messages = sample["messages"] |
| target_layer = sample.get("target_layer", "unknown") |
|
|
| |
| reference = "" |
| for m in messages: |
| if m.get("role") == "assistant": |
| reference = m.get("content", "") |
| break |
|
|
| |
| gen_messages = [m for m in messages if m.get("role") != "assistant"] |
|
|
| |
| generated = generate_config(model, tokenizer, gen_messages) |
| is_valid, parsed = validate_json(generated) |
|
|
| if is_valid: |
| valid_count += 1 |
| schema = check_schema_compliance(parsed, target_layer) |
| compliance_scores.append(schema["compliance_score"]) |
| else: |
| schema = {"compliance_score": 0.0, "present_keys": [], "missing_keys": []} |
|
|
| |
| semantic_sim = None |
| if use_embedding and is_valid: |
| ref_emb = embed_model.encode(reference, convert_to_tensor=True) |
| gen_emb = embed_model.encode(generated, convert_to_tensor=True) |
| semantic_sim = float(torch.cosine_similarity(ref_emb, gen_emb, dim=0)) |
|
|
| result = { |
| "id": sample.get("id", i), |
| "target_layer": target_layer, |
| "slice_type": sample.get("slice_type", "unknown"), |
| "intent": next((m["content"] for m in messages if m.get("role") == "user"), ""), |
| "generated": generated, |
| "reference": reference, |
| "json_valid": is_valid, |
| "schema_compliance": schema, |
| "semantic_similarity": semantic_sim, |
| } |
| results.append(result) |
|
|
| |
| if target_layer not in layer_stats: |
| layer_stats[target_layer] = {"total": 0, "valid": 0, "compliance": []} |
| layer_stats[target_layer]["total"] += 1 |
| if is_valid: |
| layer_stats[target_layer]["valid"] += 1 |
| layer_stats[target_layer]["compliance"].append(schema["compliance_score"]) |
|
|
| if (i + 1) % 10 == 0: |
| print(f" Processed {i + 1}/{len(ds)} samples") |
|
|
| |
| total = len(results) |
| summary = { |
| "total_samples": total, |
| "json_valid_rate": valid_count / total, |
| "avg_schema_compliance": float(np.mean(compliance_scores)) if compliance_scores else 0.0, |
| "semantic_similarity_avg": float(np.mean([r["semantic_similarity"] for r in results if r["semantic_similarity"] is not None])) if any(r["semantic_similarity"] is not None for r in results) else None, |
| "per_layer": {}, |
| } |
|
|
| for layer, stats in layer_stats.items(): |
| summary["per_layer"][layer] = { |
| "total": stats["total"], |
| "valid_rate": stats["valid"] / stats["total"], |
| "avg_compliance": float(np.mean(stats["compliance"])) if stats["compliance"] else 0.0, |
| } |
|
|
| |
| output_data = {"summary": summary, "results": results} |
| with open(args.output, "w") as f: |
| json.dump(output_data, f, indent=2) |
|
|
| |
| print(f"\n{'=' * 60}") |
| print("BENCHMARK RESULTS") |
| print(f"{'=' * 60}") |
| print(f"Total samples: {summary['total_samples']}") |
| print(f"JSON valid rate: {summary['json_valid_rate']:.1%}") |
| print(f"Schema compliance: {summary['avg_schema_compliance']:.1%}") |
| if summary["semantic_similarity_avg"] is not None: |
| print(f"Semantic similarity: {summary['semantic_similarity_avg']:.3f}") |
| print(f"\nPer-layer breakdown:") |
| for layer, s in summary["per_layer"].items(): |
| print(f" {layer:20s} valid={s['valid_rate']:.1%} compliance={s['avg_compliance']:.1%}") |
| print(f"\nDetailed results saved to: {args.output}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|