#!/usr/bin/env python3 """ modeldna Stage 1 HF Scanner — core logic. Given a HuggingFace model_id, validates architectural claims against the ModelAtlas reference database. No weight download needed — uses config.json only. This is the heart of the modeldna 'test before you download' feature. """ from __future__ import annotations import json, hashlib, re, time from datetime import datetime, timezone from pathlib import Path from typing import Optional import requests HF_API = "https://huggingface.co" HF_DATASET = "RadicalNotionAI/modelatlas-reference" DB = "postgresql:///modelatlas?host=/var/run/postgresql&port=5433&user=tim" # In-process cache — loaded once per worker, refreshes when the file changes _REF_DF = None _REF_LOADED_AT: float = 0.0 _REF_TTL = 3600 # reload at most once per hour def _load_reference_df(): """Load ModelAtlas reference parquet. Tries local snapshot first, then HF dataset.""" global _REF_DF, _REF_LOADED_AT now = time.time() if _REF_DF is not None and (now - _REF_LOADED_AT) < _REF_TTL: return _REF_DF import pandas as pd # 1. Local snapshot (fast, used in dev / on local server) local_path = Path(__file__).parent.parent / "snapshots" / "modeldna_reference.parquet" if local_path.exists(): try: _REF_DF = pd.read_parquet(local_path) _REF_LOADED_AT = now return _REF_DF except Exception: pass # 2. HF dataset (used on HF Space — downloaded and cached by huggingface_hub) try: from huggingface_hub import hf_hub_download path = hf_hub_download( repo_id=HF_DATASET, filename="modeldna_reference.parquet", repo_type="dataset", ) _REF_DF = pd.read_parquet(path) _REF_LOADED_AT = now return _REF_DF except Exception: pass return None # Known base model reference configs (canonical identifiers) KNOWN_BASES = { "qwen3_5_text": { "name": "Qwen3.5 (dense)", "vocab_size": 248320, "model_type_patterns": ["qwen3_5_text", "qwen3_5"], }, "qwen3_5_moe_text": { "name": "Qwen3.5 MoE", "vocab_size": 248320, "model_type_patterns": ["qwen3_5_moe_text", "qwen3_5_moe"], }, "qwen3": { "name": "Qwen3", "vocab_size": [151936, 152064], "model_type_patterns": ["qwen3"], }, "qwen2": { "name": "Qwen2.5", "vocab_size": [151936, 152064], "model_type_patterns": ["qwen2"], }, "llama3": { "name": "Llama 3.x", "vocab_size": 128256, "model_type_patterns": ["llama"], "num_key_value_heads_hint": [8, 32], }, "llama2": { "name": "Llama 2", "vocab_size": 32000, "model_type_patterns": ["llama"], }, "mistral": { "name": "Mistral 7B family", "vocab_size": 32000, "model_type_patterns": ["mistral", "mixtral"], }, "deepseek_v3": { "name": "DeepSeek V3/R1", "vocab_size": 129280, "model_type_patterns": ["deepseek_v3", "deepseek_v2"], "kv_lora_rank": 512, }, "gemma": { "name": "Gemma family", "vocab_size": [256000, 262144], "model_type_patterns": ["gemma"], }, "nemotron_h": { "name": "NemotronH (NVIDIA Mamba+MoE hybrid)", "vocab_size": 131072, "model_type_patterns": ["nemotron_h", "nemotronh"], }, } def fetch_config(model_id: str) -> Optional[dict]: """Fetch config.json from HuggingFace. Returns None on failure.""" url = f"{HF_API}/{model_id}/resolve/main/config.json" try: r = requests.get(url, timeout=20) r.raise_for_status() return r.json() except Exception as e: return None def fetch_model_metadata(model_id: str) -> dict: """Fetch HF model metadata (downloads, likes, author, tags).""" try: r = requests.get(f"{HF_API}/api/models/{model_id}", timeout=10) r.raise_for_status() d = r.json() return { "downloads": d.get("downloads", 0), "likes": d.get("likes", 0), "author": d.get("author", ""), "tags": d.get("tags", []), "pipeline_tag": d.get("pipeline_tag", ""), "base_model": d.get("cardData", {}).get("base_model", ""), "license": d.get("cardData", {}).get("license", ""), "created_at": d.get("createdAt", ""), "last_modified": d.get("lastModified", ""), } except Exception: return {} def detect_claimed_base(model_id: str, config: dict, metadata: dict) -> dict: """Detect what base model a model claims to be derived from.""" claims = {} name = model_id.split("/")[-1].lower() # Explicit base_model field if metadata.get("base_model"): claims["explicit_base"] = metadata["base_model"] # Name-based detection name_signals = [] for term, base_key in [ ("qwen3.5", "qwen3_5"), ("qwen3-5", "qwen3_5"), ("qwen35", "qwen3_5"), ("qwen3", "qwen3"), ("qwen2.5", "qwen2"), ("qwen2", "qwen2"), ("llama-3", "llama3"), ("llama3", "llama3"), ("llama-2", "llama2"), ("mistral", "mistral"), ("mixtral", "mistral"), ("deepseek", "deepseek_v3"), ("gemma", "gemma"), ]: if term in name: name_signals.append(base_key) if name_signals: claims["name_implies"] = name_signals # Suspicious claims in name suspicious = [] for term in ["claude", "gpt", "chatgpt", "openai", "gemini", "anthropic"]: if term in name: suspicious.append(term) if suspicious: claims["suspicious_name_terms"] = suspicious return claims def stage1_screen(model_id: str, config: dict) -> dict: """ Stage 1: Architecture screening against ModelAtlas reference. Returns a structured verdict without downloading any weights. Handles nested text_config (Qwen3.5/3.6, Mistral3, MiMo-V2.5 pattern). """ # Lift nested LLM config into top-level when top-level vocab/hidden is absent. # Handles: text_config (Qwen3.5/3.6, Mistral3, MiMo-V2.5), llm_config (NemotronH Omni) _NESTED_KEYS = ("text_config", "llm_config") _SKIP_KEYS = ("text_config", "llm_config", "vision_config", "audio_config", "sound_config") if not config.get("vocab_size"): for nested_key in _NESTED_KEYS: if config.get(nested_key) and config[nested_key].get("vocab_size"): tc = config[nested_key] config = {**tc, **{k: v for k, v in config.items() if k not in _SKIP_KEYS}} break vocab = config.get("vocab_size") model_type = (config.get("model_type") or "").lower() hidden = config.get("hidden_size") layers = config.get("num_hidden_layers") kv_lora = config.get("kv_lora_rank") # MLA signal base_model_field = config.get("base_model") or config.get("_name_or_path", "") # Compute architecture signature key_fields = sorted([ f"vocab={vocab}", f"type={model_type}", f"hidden={hidden}", f"layers={layers}", f"kv_lora={kv_lora}", ]) arch_sig = hashlib.md5("|".join(str(f) for f in key_fields).encode()).hexdigest()[:12] # Match against known bases base_matches = [] for base_key, base_info in KNOWN_BASES.items(): score = 0 reasons = [] # Vocab match expected_vocab = base_info.get("vocab_size") if isinstance(expected_vocab, list): if vocab in expected_vocab: score += 3; reasons.append(f"vocab matches ({vocab})") elif vocab == expected_vocab: score += 3; reasons.append(f"vocab matches ({vocab})") # Model type match for pat in base_info.get("model_type_patterns", []): if model_type == pat: score += 3; reasons.append(f"model_type '{model_type}' exact"); break elif model_type.startswith(pat): score += 2; reasons.append(f"model_type '{model_type}' matches {pat}"); break # MLA signal if base_key == "deepseek_v3" and kv_lora and kv_lora > 0: score += 2; reasons.append(f"MLA kv_lora_rank={kv_lora}") if score >= 3: base_matches.append({ "base": base_key, "name": base_info["name"], "confidence": "HIGH" if score >= 5 else "MODERATE", "score": score, "evidence": reasons, }) # Query ModelAtlas reference parquet for architecturally similar models db_matches = [] try: ref = _load_reference_df() if ref is not None and vocab and hidden: hit = ref[ (ref["vocab_size"] == vocab) & (ref["hidden_size"] == hidden) & (~ref["model_id"].str.contains("tiny|/", case=False, na=False)) ].sort_values("hf_downloads", ascending=False).head(5) db_matches = hit[ ["model_id", "org_display", "hf_downloads", "total_params", "technique_signature", "num_layers", "hidden_size", "vocab_size"] ].rename(columns={"org_display": "lab"}).to_dict("records") except Exception: pass # Also try local DB if available (dev / local server) if not db_matches: try: import psycopg2, psycopg2.extras conn = psycopg2.connect(DB) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute(""" SELECT m.model_id, o.name AS lab, m.hf_downloads, m.release_date, a.technique_signature, a.total_params, a.num_layers, a.hidden_size, a.vocab_size FROM analyses a JOIN models m ON m.id=a.model_id JOIN organizations o ON m.org_id=o.id WHERE a.is_current=true AND a.vocab_size=%s AND a.hidden_size=%s AND m.model_id NOT ILIKE '%%tiny%%' AND m.model_id NOT ILIKE '/%%' ORDER BY m.hf_downloads DESC NULLS LAST LIMIT 5 """, (vocab, hidden)) db_matches = [dict(r) for r in cur.fetchall()] cur.close(); conn.close() except Exception: pass return { "arch_signature": arch_sig, "config_signals": { "model_type": model_type, "vocab_size": vocab, "hidden_size": hidden, "num_layers": layers, "has_mla": bool(kv_lora and kv_lora > 0), "kv_lora_rank": kv_lora, }, "base_matches": sorted(base_matches, key=lambda x: -x["score"]), "modelatlas_similar": db_matches, } def generate_verdict( model_id: str, config: dict, metadata: dict, claims: dict, stage1: dict, ) -> dict: """Synthesize all signals into a human-readable verdict.""" now = datetime.now(timezone.utc).isoformat() base_matches = stage1["base_matches"] suspicious = claims.get("suspicious_name_terms", []) # Headline verdict if base_matches: top = base_matches[0] if top["confidence"] == "HIGH": architecture_verdict = f"CONFIRMED — architecture matches {top['name']}" else: architecture_verdict = f"LIKELY — architecture consistent with {top['name']}" else: architecture_verdict = "UNRECOGNIZED — architecture does not match any known base model" # Claim accuracy flags flags = [] if "claude" in suspicious or "anthropic" in suspicious: flags.append({ "type": "UNVERIFIABLE_CLAIM", "term": "claude/anthropic", "explanation": ( "Claude weights are not publicly available — no weight transfer from Claude " "is possible. If this model used Claude-generated reasoning traces as training " "data (distillation), that is a post-training technique that leaves no " "architectural trace and cannot be verified from weights alone. " "The base architecture claim can be checked; the Claude claim cannot." ), }) if "gpt" in suspicious or "openai" in suspicious or "chatgpt" in suspicious: flags.append({ "type": "UNVERIFIABLE_CLAIM", "term": "gpt/openai", "explanation": "GPT-4/OpenAI weights are closed. Any weight transfer claim is false. Distillation via outputs is possible but unverifiable from architecture.", }) if "gemini" in suspicious: flags.append({ "type": "UNVERIFIABLE_CLAIM", "term": "gemini", "explanation": "Gemini weights are closed. Architecture shows no Gemini structure.", }) # Name vs architecture consistency name_implied = claims.get("name_implies", []) if name_implied and base_matches: top_base = base_matches[0]["base"] if not any(n in top_base or top_base in n for n in name_implied): flags.append({ "type": "NAME_MISMATCH", "explanation": f"Model name implies {name_implied} but architecture suggests {top_base}. Possible mislabeling.", }) return { "model_id": model_id, "scanned_at": now, "verdict": { "architecture": architecture_verdict, "base_model_confirmed": base_matches[0]["name"] if base_matches else "Unknown", "confidence": base_matches[0]["confidence"] if base_matches else "NONE", "flags": flags, "flag_count": len(flags), "stage": "Stage 1 (config-only — no weight download)", }, "evidence": { "config_signals": stage1["config_signals"], "base_matches": stage1["base_matches"][:3], "modelatlas_similar": stage1["modelatlas_similar"][:3], "claimed_base": claims.get("explicit_base"), "name_implies": name_implied, }, "metadata": { "downloads": metadata.get("downloads", 0), "likes": metadata.get("likes", 0), "license": metadata.get("license", ""), "created_at": metadata.get("created_at", ""), }, "note": ( "Stage 1 validates architecture from config.json only (~2KB). " "Stage 2 weight analysis (requires model download) provides stronger confirmation. " "Powered by ModelAtlas — modeldna.ai · a RadicalNotion product." ), } def scan(model_id: str) -> dict: """Full Stage 1 scan. Entry point.""" t0 = time.time() # Detect unsupported formats before attempting config fetch name_lower = model_id.lower() if "gguf" in name_lower: return { "model_id": model_id, "error": ( "GGUF models pack weights into a single file and don't have a standard config.json. " "Stage 1 scanning works with standard HuggingFace checkpoints (safetensors/PyTorch). " "Try the original (non-quantized) model instead — e.g. the unsloth/Qwen3.6-35B-A3B " "base would be Qwen/Qwen2.5-... or the upstream source. " "GGUF support is on the roadmap." ), "scanned_at": datetime.now(timezone.utc).isoformat(), } config = fetch_config(model_id) if not config: return { "model_id": model_id, "error": "Could not fetch config.json — model may be private, gated, or not exist on HuggingFace.", "scanned_at": datetime.now(timezone.utc).isoformat(), } metadata = fetch_model_metadata(model_id) claims = detect_claimed_base(model_id, config, metadata) stage1 = stage1_screen(model_id, config) verdict = generate_verdict(model_id, config, metadata, claims, stage1) verdict["elapsed_s"] = round(time.time() - t0, 2) return verdict if __name__ == "__main__": import sys model_id = sys.argv[1] if len(sys.argv) > 1 else "Qwen/Qwen3.5-27B" result = scan(model_id) print(json.dumps(result, indent=2, default=str))