modeldna / scan.py
trohrbaugh's picture
Wire modelatlas_similar to HF parquet dataset (2,435 models)
4b77797 verified
#!/usr/bin/env python3
"""
modeldna Stage 1 HF Scanner β€” core logic.
Given a HuggingFace model_id, validates architectural claims against the
ModelAtlas reference database. No weight download needed β€” uses config.json only.
This is the heart of the modeldna 'test before you download' feature.
"""
from __future__ import annotations
import json, hashlib, re, time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import requests
HF_API = "https://huggingface.co"
HF_DATASET = "RadicalNotionAI/modelatlas-reference"
DB = "postgresql:///modelatlas?host=/var/run/postgresql&port=5433&user=tim"
# In-process cache β€” loaded once per worker, refreshes when the file changes
_REF_DF = None
_REF_LOADED_AT: float = 0.0
_REF_TTL = 3600 # reload at most once per hour
def _load_reference_df():
"""Load ModelAtlas reference parquet. Tries local snapshot first, then HF dataset."""
global _REF_DF, _REF_LOADED_AT
now = time.time()
if _REF_DF is not None and (now - _REF_LOADED_AT) < _REF_TTL:
return _REF_DF
import pandas as pd
# 1. Local snapshot (fast, used in dev / on local server)
local_path = Path(__file__).parent.parent / "snapshots" / "modeldna_reference.parquet"
if local_path.exists():
try:
_REF_DF = pd.read_parquet(local_path)
_REF_LOADED_AT = now
return _REF_DF
except Exception:
pass
# 2. HF dataset (used on HF Space β€” downloaded and cached by huggingface_hub)
try:
from huggingface_hub import hf_hub_download
path = hf_hub_download(
repo_id=HF_DATASET,
filename="modeldna_reference.parquet",
repo_type="dataset",
)
_REF_DF = pd.read_parquet(path)
_REF_LOADED_AT = now
return _REF_DF
except Exception:
pass
return None
# Known base model reference configs (canonical identifiers)
KNOWN_BASES = {
"qwen3_5_text": {
"name": "Qwen3.5 (dense)",
"vocab_size": 248320,
"model_type_patterns": ["qwen3_5_text", "qwen3_5"],
},
"qwen3_5_moe_text": {
"name": "Qwen3.5 MoE",
"vocab_size": 248320,
"model_type_patterns": ["qwen3_5_moe_text", "qwen3_5_moe"],
},
"qwen3": {
"name": "Qwen3",
"vocab_size": [151936, 152064],
"model_type_patterns": ["qwen3"],
},
"qwen2": {
"name": "Qwen2.5",
"vocab_size": [151936, 152064],
"model_type_patterns": ["qwen2"],
},
"llama3": {
"name": "Llama 3.x",
"vocab_size": 128256,
"model_type_patterns": ["llama"],
"num_key_value_heads_hint": [8, 32],
},
"llama2": {
"name": "Llama 2",
"vocab_size": 32000,
"model_type_patterns": ["llama"],
},
"mistral": {
"name": "Mistral 7B family",
"vocab_size": 32000,
"model_type_patterns": ["mistral", "mixtral"],
},
"deepseek_v3": {
"name": "DeepSeek V3/R1",
"vocab_size": 129280,
"model_type_patterns": ["deepseek_v3", "deepseek_v2"],
"kv_lora_rank": 512,
},
"gemma": {
"name": "Gemma family",
"vocab_size": [256000, 262144],
"model_type_patterns": ["gemma"],
},
"nemotron_h": {
"name": "NemotronH (NVIDIA Mamba+MoE hybrid)",
"vocab_size": 131072,
"model_type_patterns": ["nemotron_h", "nemotronh"],
},
}
def fetch_config(model_id: str) -> Optional[dict]:
"""Fetch config.json from HuggingFace. Returns None on failure."""
url = f"{HF_API}/{model_id}/resolve/main/config.json"
try:
r = requests.get(url, timeout=20)
r.raise_for_status()
return r.json()
except Exception as e:
return None
def fetch_model_metadata(model_id: str) -> dict:
"""Fetch HF model metadata (downloads, likes, author, tags)."""
try:
r = requests.get(f"{HF_API}/api/models/{model_id}", timeout=10)
r.raise_for_status()
d = r.json()
return {
"downloads": d.get("downloads", 0),
"likes": d.get("likes", 0),
"author": d.get("author", ""),
"tags": d.get("tags", []),
"pipeline_tag": d.get("pipeline_tag", ""),
"base_model": d.get("cardData", {}).get("base_model", ""),
"license": d.get("cardData", {}).get("license", ""),
"created_at": d.get("createdAt", ""),
"last_modified": d.get("lastModified", ""),
}
except Exception:
return {}
def detect_claimed_base(model_id: str, config: dict, metadata: dict) -> dict:
"""Detect what base model a model claims to be derived from."""
claims = {}
name = model_id.split("/")[-1].lower()
# Explicit base_model field
if metadata.get("base_model"):
claims["explicit_base"] = metadata["base_model"]
# Name-based detection
name_signals = []
for term, base_key in [
("qwen3.5", "qwen3_5"), ("qwen3-5", "qwen3_5"), ("qwen35", "qwen3_5"),
("qwen3", "qwen3"), ("qwen2.5", "qwen2"), ("qwen2", "qwen2"),
("llama-3", "llama3"), ("llama3", "llama3"), ("llama-2", "llama2"),
("mistral", "mistral"), ("mixtral", "mistral"),
("deepseek", "deepseek_v3"), ("gemma", "gemma"),
]:
if term in name:
name_signals.append(base_key)
if name_signals:
claims["name_implies"] = name_signals
# Suspicious claims in name
suspicious = []
for term in ["claude", "gpt", "chatgpt", "openai", "gemini", "anthropic"]:
if term in name:
suspicious.append(term)
if suspicious:
claims["suspicious_name_terms"] = suspicious
return claims
def stage1_screen(model_id: str, config: dict) -> dict:
"""
Stage 1: Architecture screening against ModelAtlas reference.
Returns a structured verdict without downloading any weights.
Handles nested text_config (Qwen3.5/3.6, Mistral3, MiMo-V2.5 pattern).
"""
# Lift nested LLM config into top-level when top-level vocab/hidden is absent.
# Handles: text_config (Qwen3.5/3.6, Mistral3, MiMo-V2.5), llm_config (NemotronH Omni)
_NESTED_KEYS = ("text_config", "llm_config")
_SKIP_KEYS = ("text_config", "llm_config", "vision_config", "audio_config", "sound_config")
if not config.get("vocab_size"):
for nested_key in _NESTED_KEYS:
if config.get(nested_key) and config[nested_key].get("vocab_size"):
tc = config[nested_key]
config = {**tc, **{k: v for k, v in config.items() if k not in _SKIP_KEYS}}
break
vocab = config.get("vocab_size")
model_type = (config.get("model_type") or "").lower()
hidden = config.get("hidden_size")
layers = config.get("num_hidden_layers")
kv_lora = config.get("kv_lora_rank") # MLA signal
base_model_field = config.get("base_model") or config.get("_name_or_path", "")
# Compute architecture signature
key_fields = sorted([
f"vocab={vocab}", f"type={model_type}", f"hidden={hidden}",
f"layers={layers}", f"kv_lora={kv_lora}",
])
arch_sig = hashlib.md5("|".join(str(f) for f in key_fields).encode()).hexdigest()[:12]
# Match against known bases
base_matches = []
for base_key, base_info in KNOWN_BASES.items():
score = 0
reasons = []
# Vocab match
expected_vocab = base_info.get("vocab_size")
if isinstance(expected_vocab, list):
if vocab in expected_vocab: score += 3; reasons.append(f"vocab matches ({vocab})")
elif vocab == expected_vocab:
score += 3; reasons.append(f"vocab matches ({vocab})")
# Model type match
for pat in base_info.get("model_type_patterns", []):
if model_type == pat:
score += 3; reasons.append(f"model_type '{model_type}' exact"); break
elif model_type.startswith(pat):
score += 2; reasons.append(f"model_type '{model_type}' matches {pat}"); break
# MLA signal
if base_key == "deepseek_v3" and kv_lora and kv_lora > 0:
score += 2; reasons.append(f"MLA kv_lora_rank={kv_lora}")
if score >= 3:
base_matches.append({
"base": base_key,
"name": base_info["name"],
"confidence": "HIGH" if score >= 5 else "MODERATE",
"score": score,
"evidence": reasons,
})
# Query ModelAtlas reference parquet for architecturally similar models
db_matches = []
try:
ref = _load_reference_df()
if ref is not None and vocab and hidden:
hit = ref[
(ref["vocab_size"] == vocab) &
(ref["hidden_size"] == hidden) &
(~ref["model_id"].str.contains("tiny|/", case=False, na=False))
].sort_values("hf_downloads", ascending=False).head(5)
db_matches = hit[
["model_id", "org_display", "hf_downloads", "total_params",
"technique_signature", "num_layers", "hidden_size", "vocab_size"]
].rename(columns={"org_display": "lab"}).to_dict("records")
except Exception:
pass
# Also try local DB if available (dev / local server)
if not db_matches:
try:
import psycopg2, psycopg2.extras
conn = psycopg2.connect(DB)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("""
SELECT m.model_id, o.name AS lab, m.hf_downloads, m.release_date,
a.technique_signature, a.total_params, a.num_layers, a.hidden_size, a.vocab_size
FROM analyses a JOIN models m ON m.id=a.model_id
JOIN organizations o ON m.org_id=o.id
WHERE a.is_current=true AND a.vocab_size=%s AND a.hidden_size=%s
AND m.model_id NOT ILIKE '%%tiny%%' AND m.model_id NOT ILIKE '/%%'
ORDER BY m.hf_downloads DESC NULLS LAST
LIMIT 5
""", (vocab, hidden))
db_matches = [dict(r) for r in cur.fetchall()]
cur.close(); conn.close()
except Exception:
pass
return {
"arch_signature": arch_sig,
"config_signals": {
"model_type": model_type,
"vocab_size": vocab,
"hidden_size": hidden,
"num_layers": layers,
"has_mla": bool(kv_lora and kv_lora > 0),
"kv_lora_rank": kv_lora,
},
"base_matches": sorted(base_matches, key=lambda x: -x["score"]),
"modelatlas_similar": db_matches,
}
def generate_verdict(
model_id: str,
config: dict,
metadata: dict,
claims: dict,
stage1: dict,
) -> dict:
"""Synthesize all signals into a human-readable verdict."""
now = datetime.now(timezone.utc).isoformat()
base_matches = stage1["base_matches"]
suspicious = claims.get("suspicious_name_terms", [])
# Headline verdict
if base_matches:
top = base_matches[0]
if top["confidence"] == "HIGH":
architecture_verdict = f"CONFIRMED β€” architecture matches {top['name']}"
else:
architecture_verdict = f"LIKELY β€” architecture consistent with {top['name']}"
else:
architecture_verdict = "UNRECOGNIZED β€” architecture does not match any known base model"
# Claim accuracy flags
flags = []
if "claude" in suspicious or "anthropic" in suspicious:
flags.append({
"type": "UNVERIFIABLE_CLAIM",
"term": "claude/anthropic",
"explanation": (
"Claude weights are not publicly available β€” no weight transfer from Claude "
"is possible. If this model used Claude-generated reasoning traces as training "
"data (distillation), that is a post-training technique that leaves no "
"architectural trace and cannot be verified from weights alone. "
"The base architecture claim can be checked; the Claude claim cannot."
),
})
if "gpt" in suspicious or "openai" in suspicious or "chatgpt" in suspicious:
flags.append({
"type": "UNVERIFIABLE_CLAIM",
"term": "gpt/openai",
"explanation": "GPT-4/OpenAI weights are closed. Any weight transfer claim is false. Distillation via outputs is possible but unverifiable from architecture.",
})
if "gemini" in suspicious:
flags.append({
"type": "UNVERIFIABLE_CLAIM",
"term": "gemini",
"explanation": "Gemini weights are closed. Architecture shows no Gemini structure.",
})
# Name vs architecture consistency
name_implied = claims.get("name_implies", [])
if name_implied and base_matches:
top_base = base_matches[0]["base"]
if not any(n in top_base or top_base in n for n in name_implied):
flags.append({
"type": "NAME_MISMATCH",
"explanation": f"Model name implies {name_implied} but architecture suggests {top_base}. Possible mislabeling.",
})
return {
"model_id": model_id,
"scanned_at": now,
"verdict": {
"architecture": architecture_verdict,
"base_model_confirmed": base_matches[0]["name"] if base_matches else "Unknown",
"confidence": base_matches[0]["confidence"] if base_matches else "NONE",
"flags": flags,
"flag_count": len(flags),
"stage": "Stage 1 (config-only β€” no weight download)",
},
"evidence": {
"config_signals": stage1["config_signals"],
"base_matches": stage1["base_matches"][:3],
"modelatlas_similar": stage1["modelatlas_similar"][:3],
"claimed_base": claims.get("explicit_base"),
"name_implies": name_implied,
},
"metadata": {
"downloads": metadata.get("downloads", 0),
"likes": metadata.get("likes", 0),
"license": metadata.get("license", ""),
"created_at": metadata.get("created_at", ""),
},
"note": (
"Stage 1 validates architecture from config.json only (~2KB). "
"Stage 2 weight analysis (requires model download) provides stronger confirmation. "
"Powered by ModelAtlas β€” modeldna.ai Β· a RadicalNotion product."
),
}
def scan(model_id: str) -> dict:
"""Full Stage 1 scan. Entry point."""
t0 = time.time()
# Detect unsupported formats before attempting config fetch
name_lower = model_id.lower()
if "gguf" in name_lower:
return {
"model_id": model_id,
"error": (
"GGUF models pack weights into a single file and don't have a standard config.json. "
"Stage 1 scanning works with standard HuggingFace checkpoints (safetensors/PyTorch). "
"Try the original (non-quantized) model instead β€” e.g. the unsloth/Qwen3.6-35B-A3B "
"base would be Qwen/Qwen2.5-... or the upstream source. "
"GGUF support is on the roadmap."
),
"scanned_at": datetime.now(timezone.utc).isoformat(),
}
config = fetch_config(model_id)
if not config:
return {
"model_id": model_id,
"error": "Could not fetch config.json β€” model may be private, gated, or not exist on HuggingFace.",
"scanned_at": datetime.now(timezone.utc).isoformat(),
}
metadata = fetch_model_metadata(model_id)
claims = detect_claimed_base(model_id, config, metadata)
stage1 = stage1_screen(model_id, config)
verdict = generate_verdict(model_id, config, metadata, claims, stage1)
verdict["elapsed_s"] = round(time.time() - t0, 2)
return verdict
if __name__ == "__main__":
import sys
model_id = sys.argv[1] if len(sys.argv) > 1 else "Qwen/Qwen3.5-27B"
result = scan(model_id)
print(json.dumps(result, indent=2, default=str))