Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| modeldna Stage 1 HF Scanner β core logic. | |
| Given a HuggingFace model_id, validates architectural claims against the | |
| ModelAtlas reference database. No weight download needed β uses config.json only. | |
| This is the heart of the modeldna 'test before you download' feature. | |
| """ | |
| from __future__ import annotations | |
| import json, hashlib, re, time | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Optional | |
| import requests | |
| HF_API = "https://huggingface.co" | |
| HF_DATASET = "RadicalNotionAI/modelatlas-reference" | |
| DB = "postgresql:///modelatlas?host=/var/run/postgresql&port=5433&user=tim" | |
| # In-process cache β loaded once per worker, refreshes when the file changes | |
| _REF_DF = None | |
| _REF_LOADED_AT: float = 0.0 | |
| _REF_TTL = 3600 # reload at most once per hour | |
| def _load_reference_df(): | |
| """Load ModelAtlas reference parquet. Tries local snapshot first, then HF dataset.""" | |
| global _REF_DF, _REF_LOADED_AT | |
| now = time.time() | |
| if _REF_DF is not None and (now - _REF_LOADED_AT) < _REF_TTL: | |
| return _REF_DF | |
| import pandas as pd | |
| # 1. Local snapshot (fast, used in dev / on local server) | |
| local_path = Path(__file__).parent.parent / "snapshots" / "modeldna_reference.parquet" | |
| if local_path.exists(): | |
| try: | |
| _REF_DF = pd.read_parquet(local_path) | |
| _REF_LOADED_AT = now | |
| return _REF_DF | |
| except Exception: | |
| pass | |
| # 2. HF dataset (used on HF Space β downloaded and cached by huggingface_hub) | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| path = hf_hub_download( | |
| repo_id=HF_DATASET, | |
| filename="modeldna_reference.parquet", | |
| repo_type="dataset", | |
| ) | |
| _REF_DF = pd.read_parquet(path) | |
| _REF_LOADED_AT = now | |
| return _REF_DF | |
| except Exception: | |
| pass | |
| return None | |
| # Known base model reference configs (canonical identifiers) | |
| KNOWN_BASES = { | |
| "qwen3_5_text": { | |
| "name": "Qwen3.5 (dense)", | |
| "vocab_size": 248320, | |
| "model_type_patterns": ["qwen3_5_text", "qwen3_5"], | |
| }, | |
| "qwen3_5_moe_text": { | |
| "name": "Qwen3.5 MoE", | |
| "vocab_size": 248320, | |
| "model_type_patterns": ["qwen3_5_moe_text", "qwen3_5_moe"], | |
| }, | |
| "qwen3": { | |
| "name": "Qwen3", | |
| "vocab_size": [151936, 152064], | |
| "model_type_patterns": ["qwen3"], | |
| }, | |
| "qwen2": { | |
| "name": "Qwen2.5", | |
| "vocab_size": [151936, 152064], | |
| "model_type_patterns": ["qwen2"], | |
| }, | |
| "llama3": { | |
| "name": "Llama 3.x", | |
| "vocab_size": 128256, | |
| "model_type_patterns": ["llama"], | |
| "num_key_value_heads_hint": [8, 32], | |
| }, | |
| "llama2": { | |
| "name": "Llama 2", | |
| "vocab_size": 32000, | |
| "model_type_patterns": ["llama"], | |
| }, | |
| "mistral": { | |
| "name": "Mistral 7B family", | |
| "vocab_size": 32000, | |
| "model_type_patterns": ["mistral", "mixtral"], | |
| }, | |
| "deepseek_v3": { | |
| "name": "DeepSeek V3/R1", | |
| "vocab_size": 129280, | |
| "model_type_patterns": ["deepseek_v3", "deepseek_v2"], | |
| "kv_lora_rank": 512, | |
| }, | |
| "gemma": { | |
| "name": "Gemma family", | |
| "vocab_size": [256000, 262144], | |
| "model_type_patterns": ["gemma"], | |
| }, | |
| "nemotron_h": { | |
| "name": "NemotronH (NVIDIA Mamba+MoE hybrid)", | |
| "vocab_size": 131072, | |
| "model_type_patterns": ["nemotron_h", "nemotronh"], | |
| }, | |
| } | |
| def fetch_config(model_id: str) -> Optional[dict]: | |
| """Fetch config.json from HuggingFace. Returns None on failure.""" | |
| url = f"{HF_API}/{model_id}/resolve/main/config.json" | |
| try: | |
| r = requests.get(url, timeout=20) | |
| r.raise_for_status() | |
| return r.json() | |
| except Exception as e: | |
| return None | |
| def fetch_model_metadata(model_id: str) -> dict: | |
| """Fetch HF model metadata (downloads, likes, author, tags).""" | |
| try: | |
| r = requests.get(f"{HF_API}/api/models/{model_id}", timeout=10) | |
| r.raise_for_status() | |
| d = r.json() | |
| return { | |
| "downloads": d.get("downloads", 0), | |
| "likes": d.get("likes", 0), | |
| "author": d.get("author", ""), | |
| "tags": d.get("tags", []), | |
| "pipeline_tag": d.get("pipeline_tag", ""), | |
| "base_model": d.get("cardData", {}).get("base_model", ""), | |
| "license": d.get("cardData", {}).get("license", ""), | |
| "created_at": d.get("createdAt", ""), | |
| "last_modified": d.get("lastModified", ""), | |
| } | |
| except Exception: | |
| return {} | |
| def detect_claimed_base(model_id: str, config: dict, metadata: dict) -> dict: | |
| """Detect what base model a model claims to be derived from.""" | |
| claims = {} | |
| name = model_id.split("/")[-1].lower() | |
| # Explicit base_model field | |
| if metadata.get("base_model"): | |
| claims["explicit_base"] = metadata["base_model"] | |
| # Name-based detection | |
| name_signals = [] | |
| for term, base_key in [ | |
| ("qwen3.5", "qwen3_5"), ("qwen3-5", "qwen3_5"), ("qwen35", "qwen3_5"), | |
| ("qwen3", "qwen3"), ("qwen2.5", "qwen2"), ("qwen2", "qwen2"), | |
| ("llama-3", "llama3"), ("llama3", "llama3"), ("llama-2", "llama2"), | |
| ("mistral", "mistral"), ("mixtral", "mistral"), | |
| ("deepseek", "deepseek_v3"), ("gemma", "gemma"), | |
| ]: | |
| if term in name: | |
| name_signals.append(base_key) | |
| if name_signals: | |
| claims["name_implies"] = name_signals | |
| # Suspicious claims in name | |
| suspicious = [] | |
| for term in ["claude", "gpt", "chatgpt", "openai", "gemini", "anthropic"]: | |
| if term in name: | |
| suspicious.append(term) | |
| if suspicious: | |
| claims["suspicious_name_terms"] = suspicious | |
| return claims | |
| def stage1_screen(model_id: str, config: dict) -> dict: | |
| """ | |
| Stage 1: Architecture screening against ModelAtlas reference. | |
| Returns a structured verdict without downloading any weights. | |
| Handles nested text_config (Qwen3.5/3.6, Mistral3, MiMo-V2.5 pattern). | |
| """ | |
| # Lift nested LLM config into top-level when top-level vocab/hidden is absent. | |
| # Handles: text_config (Qwen3.5/3.6, Mistral3, MiMo-V2.5), llm_config (NemotronH Omni) | |
| _NESTED_KEYS = ("text_config", "llm_config") | |
| _SKIP_KEYS = ("text_config", "llm_config", "vision_config", "audio_config", "sound_config") | |
| if not config.get("vocab_size"): | |
| for nested_key in _NESTED_KEYS: | |
| if config.get(nested_key) and config[nested_key].get("vocab_size"): | |
| tc = config[nested_key] | |
| config = {**tc, **{k: v for k, v in config.items() if k not in _SKIP_KEYS}} | |
| break | |
| vocab = config.get("vocab_size") | |
| model_type = (config.get("model_type") or "").lower() | |
| hidden = config.get("hidden_size") | |
| layers = config.get("num_hidden_layers") | |
| kv_lora = config.get("kv_lora_rank") # MLA signal | |
| base_model_field = config.get("base_model") or config.get("_name_or_path", "") | |
| # Compute architecture signature | |
| key_fields = sorted([ | |
| f"vocab={vocab}", f"type={model_type}", f"hidden={hidden}", | |
| f"layers={layers}", f"kv_lora={kv_lora}", | |
| ]) | |
| arch_sig = hashlib.md5("|".join(str(f) for f in key_fields).encode()).hexdigest()[:12] | |
| # Match against known bases | |
| base_matches = [] | |
| for base_key, base_info in KNOWN_BASES.items(): | |
| score = 0 | |
| reasons = [] | |
| # Vocab match | |
| expected_vocab = base_info.get("vocab_size") | |
| if isinstance(expected_vocab, list): | |
| if vocab in expected_vocab: score += 3; reasons.append(f"vocab matches ({vocab})") | |
| elif vocab == expected_vocab: | |
| score += 3; reasons.append(f"vocab matches ({vocab})") | |
| # Model type match | |
| for pat in base_info.get("model_type_patterns", []): | |
| if model_type == pat: | |
| score += 3; reasons.append(f"model_type '{model_type}' exact"); break | |
| elif model_type.startswith(pat): | |
| score += 2; reasons.append(f"model_type '{model_type}' matches {pat}"); break | |
| # MLA signal | |
| if base_key == "deepseek_v3" and kv_lora and kv_lora > 0: | |
| score += 2; reasons.append(f"MLA kv_lora_rank={kv_lora}") | |
| if score >= 3: | |
| base_matches.append({ | |
| "base": base_key, | |
| "name": base_info["name"], | |
| "confidence": "HIGH" if score >= 5 else "MODERATE", | |
| "score": score, | |
| "evidence": reasons, | |
| }) | |
| # Query ModelAtlas reference parquet for architecturally similar models | |
| db_matches = [] | |
| try: | |
| ref = _load_reference_df() | |
| if ref is not None and vocab and hidden: | |
| hit = ref[ | |
| (ref["vocab_size"] == vocab) & | |
| (ref["hidden_size"] == hidden) & | |
| (~ref["model_id"].str.contains("tiny|/", case=False, na=False)) | |
| ].sort_values("hf_downloads", ascending=False).head(5) | |
| db_matches = hit[ | |
| ["model_id", "org_display", "hf_downloads", "total_params", | |
| "technique_signature", "num_layers", "hidden_size", "vocab_size"] | |
| ].rename(columns={"org_display": "lab"}).to_dict("records") | |
| except Exception: | |
| pass | |
| # Also try local DB if available (dev / local server) | |
| if not db_matches: | |
| try: | |
| import psycopg2, psycopg2.extras | |
| conn = psycopg2.connect(DB) | |
| cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) | |
| cur.execute(""" | |
| SELECT m.model_id, o.name AS lab, m.hf_downloads, m.release_date, | |
| a.technique_signature, a.total_params, a.num_layers, a.hidden_size, a.vocab_size | |
| FROM analyses a JOIN models m ON m.id=a.model_id | |
| JOIN organizations o ON m.org_id=o.id | |
| WHERE a.is_current=true AND a.vocab_size=%s AND a.hidden_size=%s | |
| AND m.model_id NOT ILIKE '%%tiny%%' AND m.model_id NOT ILIKE '/%%' | |
| ORDER BY m.hf_downloads DESC NULLS LAST | |
| LIMIT 5 | |
| """, (vocab, hidden)) | |
| db_matches = [dict(r) for r in cur.fetchall()] | |
| cur.close(); conn.close() | |
| except Exception: | |
| pass | |
| return { | |
| "arch_signature": arch_sig, | |
| "config_signals": { | |
| "model_type": model_type, | |
| "vocab_size": vocab, | |
| "hidden_size": hidden, | |
| "num_layers": layers, | |
| "has_mla": bool(kv_lora and kv_lora > 0), | |
| "kv_lora_rank": kv_lora, | |
| }, | |
| "base_matches": sorted(base_matches, key=lambda x: -x["score"]), | |
| "modelatlas_similar": db_matches, | |
| } | |
| def generate_verdict( | |
| model_id: str, | |
| config: dict, | |
| metadata: dict, | |
| claims: dict, | |
| stage1: dict, | |
| ) -> dict: | |
| """Synthesize all signals into a human-readable verdict.""" | |
| now = datetime.now(timezone.utc).isoformat() | |
| base_matches = stage1["base_matches"] | |
| suspicious = claims.get("suspicious_name_terms", []) | |
| # Headline verdict | |
| if base_matches: | |
| top = base_matches[0] | |
| if top["confidence"] == "HIGH": | |
| architecture_verdict = f"CONFIRMED β architecture matches {top['name']}" | |
| else: | |
| architecture_verdict = f"LIKELY β architecture consistent with {top['name']}" | |
| else: | |
| architecture_verdict = "UNRECOGNIZED β architecture does not match any known base model" | |
| # Claim accuracy flags | |
| flags = [] | |
| if "claude" in suspicious or "anthropic" in suspicious: | |
| flags.append({ | |
| "type": "UNVERIFIABLE_CLAIM", | |
| "term": "claude/anthropic", | |
| "explanation": ( | |
| "Claude weights are not publicly available β no weight transfer from Claude " | |
| "is possible. If this model used Claude-generated reasoning traces as training " | |
| "data (distillation), that is a post-training technique that leaves no " | |
| "architectural trace and cannot be verified from weights alone. " | |
| "The base architecture claim can be checked; the Claude claim cannot." | |
| ), | |
| }) | |
| if "gpt" in suspicious or "openai" in suspicious or "chatgpt" in suspicious: | |
| flags.append({ | |
| "type": "UNVERIFIABLE_CLAIM", | |
| "term": "gpt/openai", | |
| "explanation": "GPT-4/OpenAI weights are closed. Any weight transfer claim is false. Distillation via outputs is possible but unverifiable from architecture.", | |
| }) | |
| if "gemini" in suspicious: | |
| flags.append({ | |
| "type": "UNVERIFIABLE_CLAIM", | |
| "term": "gemini", | |
| "explanation": "Gemini weights are closed. Architecture shows no Gemini structure.", | |
| }) | |
| # Name vs architecture consistency | |
| name_implied = claims.get("name_implies", []) | |
| if name_implied and base_matches: | |
| top_base = base_matches[0]["base"] | |
| if not any(n in top_base or top_base in n for n in name_implied): | |
| flags.append({ | |
| "type": "NAME_MISMATCH", | |
| "explanation": f"Model name implies {name_implied} but architecture suggests {top_base}. Possible mislabeling.", | |
| }) | |
| return { | |
| "model_id": model_id, | |
| "scanned_at": now, | |
| "verdict": { | |
| "architecture": architecture_verdict, | |
| "base_model_confirmed": base_matches[0]["name"] if base_matches else "Unknown", | |
| "confidence": base_matches[0]["confidence"] if base_matches else "NONE", | |
| "flags": flags, | |
| "flag_count": len(flags), | |
| "stage": "Stage 1 (config-only β no weight download)", | |
| }, | |
| "evidence": { | |
| "config_signals": stage1["config_signals"], | |
| "base_matches": stage1["base_matches"][:3], | |
| "modelatlas_similar": stage1["modelatlas_similar"][:3], | |
| "claimed_base": claims.get("explicit_base"), | |
| "name_implies": name_implied, | |
| }, | |
| "metadata": { | |
| "downloads": metadata.get("downloads", 0), | |
| "likes": metadata.get("likes", 0), | |
| "license": metadata.get("license", ""), | |
| "created_at": metadata.get("created_at", ""), | |
| }, | |
| "note": ( | |
| "Stage 1 validates architecture from config.json only (~2KB). " | |
| "Stage 2 weight analysis (requires model download) provides stronger confirmation. " | |
| "Powered by ModelAtlas β modeldna.ai Β· a RadicalNotion product." | |
| ), | |
| } | |
| def scan(model_id: str) -> dict: | |
| """Full Stage 1 scan. Entry point.""" | |
| t0 = time.time() | |
| # Detect unsupported formats before attempting config fetch | |
| name_lower = model_id.lower() | |
| if "gguf" in name_lower: | |
| return { | |
| "model_id": model_id, | |
| "error": ( | |
| "GGUF models pack weights into a single file and don't have a standard config.json. " | |
| "Stage 1 scanning works with standard HuggingFace checkpoints (safetensors/PyTorch). " | |
| "Try the original (non-quantized) model instead β e.g. the unsloth/Qwen3.6-35B-A3B " | |
| "base would be Qwen/Qwen2.5-... or the upstream source. " | |
| "GGUF support is on the roadmap." | |
| ), | |
| "scanned_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| config = fetch_config(model_id) | |
| if not config: | |
| return { | |
| "model_id": model_id, | |
| "error": "Could not fetch config.json β model may be private, gated, or not exist on HuggingFace.", | |
| "scanned_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| metadata = fetch_model_metadata(model_id) | |
| claims = detect_claimed_base(model_id, config, metadata) | |
| stage1 = stage1_screen(model_id, config) | |
| verdict = generate_verdict(model_id, config, metadata, claims, stage1) | |
| verdict["elapsed_s"] = round(time.time() - t0, 2) | |
| return verdict | |
| if __name__ == "__main__": | |
| import sys | |
| model_id = sys.argv[1] if len(sys.argv) > 1 else "Qwen/Qwen3.5-27B" | |
| result = scan(model_id) | |
| print(json.dumps(result, indent=2, default=str)) | |