demo / app.py
Hasarindu Perera
chore: update to v0.8.0
df233ec unverified
"""QuPrep — HuggingFace Spaces demo (v0.8.0)"""
from __future__ import annotations
import io
import json
import os
import tempfile
import traceback
import gradio as gr
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Sample registry {label: (type, loader_fn)}
# type: tabular | image | text | graph | timeseries
# ---------------------------------------------------------------------------
def _iris():
from sklearn.datasets import load_iris
ds = load_iris(as_frame=True)
return "tabular", ds.data.copy().values.astype(float), list(ds.data.columns)
def _heart():
rng = np.random.default_rng(42)
n = 50
cols = ["age", "trestbps", "chol", "thalach", "oldpeak"]
X = np.column_stack([
rng.integers(30, 75, n).astype(float),
rng.integers(90, 180, n).astype(float),
rng.integers(150, 350, n).astype(float),
rng.integers(90, 200, n).astype(float),
rng.uniform(0, 5, n).round(1),
])
return "tabular", X, cols
def _digits():
from sklearn.datasets import load_digits
d = load_digits()
X = d.images[:8].reshape(8, -1).astype(float) / 16.0
cols = [f"px{i}" for i in range(X.shape[1])]
return "image", X, cols
def _timeseries():
rng = np.random.default_rng(42)
t = np.linspace(0, 4 * np.pi, 120)
X = np.column_stack([
np.sin(t) + rng.normal(0, 0.05, 120),
np.cos(t) + rng.normal(0, 0.05, 120),
t / (4 * np.pi) + rng.normal(0, 0.02, 120),
])
return "timeseries", X, ["sine", "cosine", "trend"]
def _graph():
# Petersen-like 6-node molecule graph adjacency
adj = np.array([
[0,1,1,0,0,1],
[1,0,1,1,0,0],
[1,1,0,0,1,0],
[0,1,0,0,1,1],
[0,0,1,1,0,1],
[1,0,0,1,1,0],
], dtype=float)
return "graph", adj, [f"node{i}" for i in range(6)]
def _text():
sentences = [
"Quantum computing processes information using quantum bits.",
"Machine learning models learn patterns from data.",
"Quantum machine learning combines both fields.",
"Data preprocessing is essential before encoding.",
"Entanglement allows quantum correlations between qubits.",
"Classical data must be normalized before amplitude encoding.",
]
return "text", sentences, []
SAMPLES = {
"Iris (tabular · 150×4)": _iris,
"Synthetic Heart (tabular · 50×5)": _heart,
"Digits (image · 8 samples, 64 px)": _digits,
"Sine / cosine (time series · 120t)": _timeseries,
"Molecule (graph · 6 nodes)": _graph,
"Quantum sentences (text · 6)": _text,
}
# ---------------------------------------------------------------------------
# Encodings / frameworks
# ---------------------------------------------------------------------------
ENCODINGS = [
"angle", "entangled_angle", "amplitude", "basis", "iqp",
"reupload", "hamiltonian", "zz_feature_map", "pauli_feature_map",
"random_fourier", "tensor_product", "qaoa_problem",
]
ENCODING_DESC = {
"angle": "Ry/Rx/Rz rotation per feature. NISQ-safe, depth O(1).",
"entangled_angle": "Rotation + CNOT entangling layers. NISQ-safe.",
"amplitude": "Feature vector as quantum state amplitudes. Qubit-efficient.",
"basis": "Binary features encoded as X gates. NISQ-safe.",
"iqp": "Havlíček ZZ feature map. Good for kernel methods.",
"reupload": "Data re-uploading (Pérez-Salinas). High expressivity.",
"hamiltonian": "Trotterized Hamiltonian evolution.",
"zz_feature_map": "Qiskit-compatible ZZ feature map.",
"pauli_feature_map": "Generalised Pauli feature map (configurable strings).",
"random_fourier": "RBF kernel approximation via random Fourier features.",
"tensor_product": "Ry+Rz per qubit — full Bloch sphere, qubit-efficient.",
"qaoa_problem": "QAOA-inspired feature map. Features as cost Hamiltonian parameters.",
}
FRAMEWORKS = ["qasm", "qiskit", "pennylane", "cirq", "tket", "braket", "qsharp", "iqm"]
TASKS = ["classification", "regression", "kernel", "qaoa", "simulation"]
_EMPTY_DF = pd.DataFrame()
# ---------------------------------------------------------------------------
# Data loading — single function, explicit source
# ---------------------------------------------------------------------------
def load_data(source: str, csv_file, sample_name: str, hf_name: str, hf_split: str):
"""
Returns (dtype, X, columns, status_msg).
dtype: tabular | image | timeseries | graph | text
X: np.ndarray for numeric types, list[str] for text, np.ndarray for graph adj
"""
if source == "upload":
if csv_file is None:
raise ValueError("No file uploaded.")
path = csv_file if isinstance(csv_file, str) else csv_file.name
df = pd.read_csv(path).select_dtypes(include="number").dropna()
if df.empty:
raise ValueError("No numeric columns found in uploaded file.")
return "tabular", df.values.astype(float), list(df.columns)
elif source == "sample":
if not sample_name or sample_name not in SAMPLES:
raise ValueError("Select a sample dataset.")
return SAMPLES[sample_name]()
elif source == "huggingface":
if not hf_name or not hf_name.strip():
raise ValueError("Enter a HuggingFace dataset name.")
from quprep.ingest.huggingface_ingester import HuggingFaceIngester
ingester = HuggingFaceIngester(modality="auto", split=hf_split or "train")
dataset = ingester.load(hf_name.strip())
X = dataset.data
if hasattr(X, "values"):
X = X.values
X = X.astype(float)
# map HF modality metadata → internal dtype
_modality_map = {"image": "image", "text": "text",
"time_series": "timeseries", "tabular": "tabular"}
hf_modality = dataset.metadata.get("modality", "tabular")
dtype = _modality_map.get(hf_modality, "tabular")
return dtype, X, [f"f{i}" for i in range(X.shape[1])]
raise ValueError(f"Unknown source: {source}")
def _write_tmp(X: np.ndarray) -> str:
f = tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False, encoding="utf-8")
pd.DataFrame(X).to_csv(f, index=False)
f.close()
return f.name
def _rm(path):
try:
os.unlink(path)
except Exception:
pass
def _nisq(ok: bool) -> str:
return ('<span style="color:#4ade80;font-weight:600">✓ NISQ-safe</span>'
if ok else '<span style="color:#f87171;font-weight:600">✗ Not NISQ-safe</span>')
# ---------------------------------------------------------------------------
# Convert — handles all data types
# ---------------------------------------------------------------------------
def _get_exporter(framework: str):
"""Return an exporter instance for the given framework name."""
import quprep as qd
_map = {
"qasm": lambda: qd.QASMExporter(),
"qiskit": lambda: __import__("quprep.export.qiskit_export", fromlist=["QiskitExporter"]).QiskitExporter(),
"pennylane": lambda: __import__("quprep.export.pennylane_export", fromlist=["PennyLaneExporter"]).PennyLaneExporter(),
"cirq": lambda: __import__("quprep.export.cirq_export", fromlist=["CirqExporter"]).CirqExporter(),
"tket": lambda: __import__("quprep.export.tket_export", fromlist=["TKETExporter"]).TKETExporter(),
"braket": lambda: __import__("quprep.export.braket_export", fromlist=["BraketExporter"]).BraketExporter(),
"qsharp": lambda: __import__("quprep.export.qsharp_export", fromlist=["QSharpExporter"]).QSharpExporter(),
"iqm": lambda: __import__("quprep.export.iqm_export", fromlist=["IQMExporter"]).IQMExporter(),
}
return _map.get(framework, _map["qasm"])()
def _prepare_rff(X_full, X_slice, framework):
"""Manually fit RandomFourierEncoder on full data and encode/export the slice."""
from quprep.encode.random_fourier import RandomFourierEncoder
from quprep.core.dataset import Dataset
enc = RandomFourierEncoder()
enc.fit(X_full)
ds = Dataset(data=X_slice,
feature_names=[f"f{i}" for i in range(X_slice.shape[1])],
feature_types=["continuous"] * X_slice.shape[1],
metadata={})
encoded_list = enc.encode_batch(ds)
exporter = _get_exporter(framework)
circuits = [exporter.export(e) for e in encoded_list]
from quprep.validation.cost import CostEstimate
n_qubits = enc.n_components
cost = CostEstimate(
encoding="random_fourier",
n_features=X_slice.shape[1],
n_qubits=n_qubits,
gate_count=n_qubits,
circuit_depth=1,
two_qubit_gates=0,
nisq_safe=True,
warning=None,
)
class _FakeResult:
pass
r = _FakeResult()
r.circuits = circuits
r.encoded = encoded_list
r.cost = cost
return r
def _reducer_kwargs(X_slice, X_full, n_qubits):
"""Return (kwargs_dict, clamp_note) — applies PCA reducer if budget < n_features.
Clamps budget to min(n_samples, n_features) when PCA limit is hit."""
if n_qubits <= 0 or X_full.shape[1] <= n_qubits:
return {}, None
effective = min(n_qubits, X_slice.shape[0], X_full.shape[1])
note = (f"⚠️ Qubit budget clamped {n_qubits}{effective} "
f"(PCA limit: min(samples={X_slice.shape[0]}, features={X_full.shape[1]})). "
f"Increase Samples slider for a higher budget.") if effective < n_qubits else None
from quprep.reduce.hardware_aware import HardwareAwareReducer
return {"preprocessor": HardwareAwareReducer(backend=effective)}, note
def _encode_tabular(X, encoding, framework, n_samples, n_qubits):
import quprep as qd
X_slice = X[:max(1, n_samples)]
if encoding == "random_fourier":
return _prepare_rff(X, X_slice, framework), None
kw, note = _reducer_kwargs(X_slice, X, n_qubits)
return qd.prepare(X_slice, encoding=encoding, framework=framework, **kw), note
def _encode_image(X, encoding, framework, n_samples, n_qubits=0):
import quprep as qd
X_slice = X[:max(1, n_samples)]
if encoding == "random_fourier":
return _prepare_rff(X, X_slice, framework), None
kw, note = _reducer_kwargs(X_slice, X, n_qubits)
return qd.prepare(X_slice, encoding=encoding, framework=framework, **kw), note
def _encode_timeseries(X, encoding, framework, n_samples, n_qubits=0, window=4):
import quprep as qd
from quprep.preprocess.window import WindowTransformer
from quprep.core.dataset import Dataset
ds = Dataset(data=X, feature_names=[f"t{i}" for i in range(X.shape[1])],
feature_types=["continuous"] * X.shape[1], metadata={})
ds_win = WindowTransformer(window_size=window).transform(ds)
X_win = ds_win.data
X_slice = X_win[:max(1, n_samples)]
if encoding == "random_fourier":
return _prepare_rff(X_win, X_slice, framework), None
kw, note = _reducer_kwargs(X_slice, X_win, n_qubits)
return qd.prepare(X_slice, encoding=encoding, framework=framework, **kw), note
def _encode_text(sentences, encoding, framework, n_qubits=0):
import quprep as qd
from quprep.ingest.text_ingester import TextIngester
dataset = TextIngester(method="tfidf", max_features=8).load(sentences)
X = dataset.data
if encoding == "random_fourier":
return _prepare_rff(X, X, framework), None
kw, note = _reducer_kwargs(X, X, n_qubits)
return qd.prepare(X, encoding=encoding, framework=framework, **kw), note
def _encode_graph(adj):
import quprep as qd
from quprep.encode.graph_state import GraphStateEncoder
enc = GraphStateEncoder()
encoded = enc._from_adj(adj)
return qd.QASMExporter().export(encoded)
def run_convert(source, csv_file, sample_name, hf_name, hf_split,
encoding, framework, n_samples, n_qubits):
try:
import quprep as qd
except ImportError:
return _EMPTY_DF, _EMPTY_DF, "", "", "❌ quprep not installed."
try:
dtype, X, cols = load_data(source, csv_file, sample_name, hf_name, hf_split)
except Exception as exc:
return _EMPTY_DF, _EMPTY_DF, "", "", f"❌ {exc}"
# Early check: PennyLane's circuit drawer is recursive and crashes on large circuits.
# Estimate qubit count from data shape + encoding before spending time encoding.
if framework == "pennylane" and dtype not in ("graph", "text"):
_enc_qubits = {
"amplitude": int(np.ceil(np.log2(max(X.shape[1], 2)))),
"random_fourier": 8, # default n_components
}
est_qubits = _enc_qubits.get(encoding, X.shape[1]) # most encoders use n_features qubits
if est_qubits > 100:
msg = (f"⚠️ PennyLane's circuit drawer uses recursion and will crash at this scale "
f"({est_qubits} qubits estimated). Switch to framework=qasm to see the circuit.")
return _EMPTY_DF, _EMPTY_DF, "", "", msg
clamp_note = None
try:
if dtype == "graph":
circuit_text = _encode_graph(X)
n_nodes = X.shape[0]
edges = int((X != 0).sum() // 2)
status = f"✓ Graph: {n_nodes} nodes, {edges} edges → GraphState | 1 circuit (qubit budget ignored — graph state requires exactly 1 qubit per node)"
preview = pd.DataFrame(X, columns=cols, index=cols).round(0).astype(int)
return preview, _EMPTY_DF, circuit_text, "", status
elif dtype == "text":
result, clamp_note = _encode_text(X, encoding, framework, n_qubits)
elif dtype == "image":
result, clamp_note = _encode_image(X, encoding, framework, n_samples, n_qubits)
elif dtype == "timeseries":
result, clamp_note = _encode_timeseries(X, encoding, framework, n_samples, n_qubits)
else: # tabular
result, clamp_note = _encode_tabular(X, encoding, framework, n_samples, n_qubits)
except ImportError as exc:
return _EMPTY_DF, _EMPTY_DF, "", "", f"⚠️ Optional dep missing: {exc}\nTry framework=qasm."
except ValueError as exc:
return _EMPTY_DF, _EMPTY_DF, "", "", f"⚠️ {exc}"
except Exception:
return _EMPTY_DF, _EMPTY_DF, "", "", f"❌ {traceback.format_exc()}"
circuits = result.circuits or []
if not circuits:
return _EMPTY_DF, _EMPTY_DF, "", "", "⚠️ No circuits produced."
X_num = X if isinstance(X, np.ndarray) else np.array([])
preview = pd.DataFrame(X_num[:5], columns=cols[:X_num.shape[1]] if cols else None).round(4) if X_num.ndim == 2 else _EMPTY_DF
enc_preview = _EMPTY_DF
if result.encoded:
try:
rows = [{"sample": i, **{f"q{j}": round(float(p), 4) for j, p in enumerate(e.parameters)}}
for i, e in enumerate(result.encoded[:5])]
enc_preview = pd.DataFrame(rows).set_index("sample")
except Exception:
pass
first = circuits[0]
if isinstance(first, str):
circuit_text = first
else:
type_name = type(first).__name__
if type_name == "QNode":
import pennylane as qml
try:
circuit_text = qml.draw(first)()
except RecursionError:
n_w = len(first.device.wires)
circuit_text = (
f"# Circuit has {n_w} wires — too large for PennyLane's drawer.\n"
f"# PennyLane draws circuits recursively and hits Python's recursion\n"
f"# limit at this scale. Try a smaller dataset or fewer samples,\n"
f"# or switch to the qasm framework to see the full circuit."
)
elif type_name == "Circuit" and hasattr(first, "num_qubits"):
# pytket Circuit
try:
from pytket.qasm import circuit_to_qasm_str
circuit_text = circuit_to_qasm_str(first)
except Exception:
circuit_text = str(first)
else:
circuit_text = str(first)
cost = result.cost
cost_html = ""
if cost:
warn = f'<p style="color:#fbbf24;margin:8px 0 0">⚠️ {cost.warning}</p>' if cost.warning else ""
cost_html = f"""
<div style="font-family:monospace;font-size:0.9rem;line-height:1.8">
<div style="display:grid;grid-template-columns:1fr 1fr;gap:4px 24px">
<span style="color:#94a3b8">Encoding</span> <span>{cost.encoding}</span>
<span style="color:#94a3b8">Qubits</span> <span>{cost.n_qubits}</span>
<span style="color:#94a3b8">Gates</span> <span>{cost.gate_count}</span>
<span style="color:#94a3b8">Depth</span> <span>{cost.circuit_depth}</span>
<span style="color:#94a3b8">2Q gates</span> <span>{cost.two_qubit_gates}</span>
<span style="color:#94a3b8">NISQ</span> <span>{_nisq(cost.nisq_safe)}</span>
</div>{warn}
</div>"""
n_circ = len(circuits)
shape = f"{X_num.shape[0]}×{X_num.shape[1]}" if X_num.ndim == 2 else str(len(X))
status = f"✓ {dtype} · {shape}{n_circ} circuit(s) | showing sample 0"
if clamp_note:
status = clamp_note
return preview, enc_preview, circuit_text, cost_html, status
# ---------------------------------------------------------------------------
# Recommend
# ---------------------------------------------------------------------------
def run_recommend(source, csv_file, sample_name, hf_name, hf_split, task, n_qubits):
import quprep as qd
tmp = None
try:
dtype, X, _ = load_data(source, csv_file, sample_name, hf_name, hf_split)
if dtype not in ("tabular", "image"):
return "<p style='color:#fbbf24'>⚠️ Recommendation works on tabular/image data.</p>"
tmp = _write_tmp(X)
rec = qd.recommend(tmp, task=task, qubits=n_qubits if n_qubits > 0 else None)
alt_rows = "".join(
f"<tr><td style='padding:6px 12px'>{a.method}</td>"
f"<td style='padding:6px 12px;text-align:center'>{a.score:.0f}</td>"
f"<td style='padding:6px 12px;color:#94a3b8'>{a.depth}</td></tr>"
for a in rec.alternatives
)
return f"""
<div style="font-family:sans-serif;font-size:0.9rem;line-height:1.6">
<div style="display:flex;align-items:baseline;gap:12px;margin-bottom:16px">
<span style="font-size:1.6rem;font-weight:700;color:#e2e8f0">{rec.method}</span>
<span style="color:#a78bfa;font-weight:600;font-size:0.8rem">recommended</span>
</div>
<div style="display:grid;grid-template-columns:auto 1fr;gap:4px 24px;margin-bottom:16px">
<span style="color:#64748b">Qubits</span> <span>{rec.qubits}</span>
<span style="color:#64748b">Depth</span> <span style="font-family:monospace">{rec.depth}</span>
<span style="color:#64748b">NISQ</span> <span>{_nisq(rec.nisq_safe)}</span>
<span style="color:#64748b">Score</span> <span>{rec.score:.0f}</span>
</div>
<div style="padding:12px 16px;background:#1e293b;border-radius:8px;color:#cbd5e1;font-size:0.85rem;line-height:1.6">{rec.reason}</div>
{"<div style='margin-top:20px'><table style='width:100%;border-collapse:collapse;font-size:0.85rem'><thead><tr style='border-bottom:1px solid #334155'><th style='padding:6px 12px;text-align:left;color:#64748b;font-weight:500'>Encoding</th><th style='padding:6px 12px;text-align:center;color:#64748b;font-weight:500'>Score</th><th style='padding:6px 12px;color:#64748b;font-weight:500'>Depth</th></tr></thead><tbody>" + alt_rows + "</tbody></table></div>" if alt_rows else ""}
</div>"""
except Exception as exc:
return f"<p>❌ {exc}</p>"
finally:
if tmp: _rm(tmp)
# ---------------------------------------------------------------------------
# Compare
# ---------------------------------------------------------------------------
def run_compare(source, csv_file, sample_name, hf_name, hf_split, task, n_qubits):
import quprep as qd
tmp = None
try:
dtype, X, _ = load_data(source, csv_file, sample_name, hf_name, hf_split)
if dtype not in ("tabular", "image"):
return "<p style='color:#fbbf24'>⚠️ Comparison works on tabular/image data.</p>"
tmp = _write_tmp(X)
result = qd.compare_encodings(tmp, task=task, qubits=n_qubits if n_qubits > 0 else None)
rows_html = ""
for r in result.rows:
nisq = '<span style="color:#4ade80">Yes</span>' if r.nisq_safe else '<span style="color:#f87171">No</span>'
name = f"{r.encoding} ★" if r.encoding == result.recommended else r.encoding
bg = "background:#1e293b;" if r.encoding == result.recommended else ""
rows_html += f"<tr style='{bg}'><td style='padding:8px 14px;font-weight:{'600' if r.encoding == result.recommended else '400'}'>{name}</td><td style='padding:8px 14px;text-align:center'>{r.n_qubits}</td><td style='padding:8px 14px;text-align:center'>{r.gate_count}</td><td style='padding:8px 14px;text-align:center'>{r.circuit_depth}</td><td style='padding:8px 14px;text-align:center'>{r.two_qubit_gates}</td><td style='padding:8px 14px;text-align:center'>{nisq}</td></tr>"
warn_html = "".join(f"<p style='color:#fbbf24;font-size:0.78rem'>⚠️ [{r.encoding}] {r.warning}</p>" for r in result.rows if r.warning)
return f"""
<div style="font-family:sans-serif;font-size:0.88rem">
<table style="width:100%;border-collapse:collapse">
<thead><tr style="border-bottom:1px solid #334155">
<th style="padding:8px 14px;text-align:left;color:#64748b;font-weight:500">Encoding</th>
<th style="padding:8px 14px;text-align:center;color:#64748b;font-weight:500">Qubits</th>
<th style="padding:8px 14px;text-align:center;color:#64748b;font-weight:500">Gates</th>
<th style="padding:8px 14px;text-align:center;color:#64748b;font-weight:500">Depth</th>
<th style="padding:8px 14px;text-align:center;color:#64748b;font-weight:500">2Q</th>
<th style="padding:8px 14px;text-align:center;color:#64748b;font-weight:500">NISQ</th>
</tr></thead><tbody>{rows_html}</tbody>
</table>
{"<p style='margin:10px 0 0;font-size:0.78rem;color:#475569'>★ recommended for task/budget</p>" if result.recommended else ""}
{warn_html}
</div>"""
except Exception as exc:
return f"<p>❌ {exc}</p>"
finally:
if tmp: _rm(tmp)
# ---------------------------------------------------------------------------
# Inspect
# ---------------------------------------------------------------------------
def run_inspect(source, csv_file, sample_name, hf_name, hf_split):
try:
from quprep.ingest.numpy_ingester import NumpyIngester
from quprep.ingest.profiler import profile
dtype, X, cols = load_data(source, csv_file, sample_name, hf_name, hf_split)
if dtype == "text":
return f"<p style='color:#94a3b8'>{len(X)} sentences loaded. Text data — no numeric profile.</p>"
if dtype == "graph":
n = X.shape[0]
edges = int((X != 0).sum() // 2)
return f"""<div style="font-family:monospace;font-size:0.9rem;line-height:1.8">
<div style="display:grid;grid-template-columns:auto 1fr;gap:4px 24px">
<span style="color:#94a3b8">Type</span> <span>Graph</span>
<span style="color:#94a3b8">Nodes</span> <span>{n}</span>
<span style="color:#94a3b8">Edges</span> <span>{edges}</span>
<span style="color:#94a3b8">Density</span><span>{edges / (n*(n-1)/2):.2f}</span>
</div></div>"""
p = profile(NumpyIngester().load(X))
missing = int(p.missing_counts.sum())
sparsity = 100.0 * (X == 0).sum() / X.size
feat_rows = "".join(
f"<tr><td style='padding:6px 12px;font-family:monospace'>{cols[i] if i < len(cols) else f'f{i}'}</td>"
f"<td style='padding:6px 12px;text-align:right'>{p.mins[i]:.3f}</td>"
f"<td style='padding:6px 12px;text-align:right'>{p.maxs[i]:.3f}</td>"
f"<td style='padding:6px 12px;text-align:right'>{p.means[i]:.3f}</td>"
f"<td style='padding:6px 12px;text-align:right'>{p.stds[i]:.3f}</td>"
f"<td style='padding:6px 12px;text-align:center'>{int(p.missing_counts[i])}</td></tr>"
for i in range(min(p.n_features, 12))
)
more = f"<p style='color:#475569;font-size:0.78rem'>… and {p.n_features-12} more</p>" if p.n_features > 12 else ""
return f"""
<div style="font-family:sans-serif;font-size:0.9rem;line-height:1.8">
<div style="display:grid;grid-template-columns:auto 1fr;gap:2px 24px;margin-bottom:20px">
<span style="color:#64748b">Type</span> <span>{dtype}</span>
<span style="color:#64748b">Shape</span> <span>{p.n_samples} × {p.n_features}</span>
<span style="color:#64748b">Missing</span> <span>{"none" if not missing else missing}</span>
<span style="color:#64748b">Sparsity</span> <span>{sparsity:.1f}% zeros</span>
</div>
<table style="width:100%;border-collapse:collapse;font-size:0.85rem">
<thead><tr style="border-bottom:1px solid #334155">
<th style="padding:6px 12px;text-align:left;color:#64748b;font-weight:500">Feature</th>
<th style="padding:6px 12px;text-align:right;color:#64748b;font-weight:500">Min</th>
<th style="padding:6px 12px;text-align:right;color:#64748b;font-weight:500">Max</th>
<th style="padding:6px 12px;text-align:right;color:#64748b;font-weight:500">Mean</th>
<th style="padding:6px 12px;text-align:right;color:#64748b;font-weight:500">Std</th>
<th style="padding:6px 12px;text-align:center;color:#64748b;font-weight:500">Missing</th>
</tr></thead><tbody>{feat_rows}</tbody>
</table>{more}
</div>"""
except Exception as exc:
return f"<p>❌ {exc}</p>"
# ---------------------------------------------------------------------------
# Suggest qubits
# ---------------------------------------------------------------------------
def run_suggest(source, csv_file, sample_name, hf_name, hf_split, task, max_qubits):
tmp = None
try:
import quprep as qd
dtype, X, _ = load_data(source, csv_file, sample_name, hf_name, hf_split)
if dtype not in ("tabular", "image", "timeseries"):
return "<p style='color:#fbbf24'>⚠️ Qubit suggestion works on numeric data.</p>"
tmp = _write_tmp(X)
kwargs = {"task": task}
if max_qubits > 0:
kwargs["max_qubits"] = max_qubits
suggestion = qd.suggest_qubits(tmp, **kwargs)
nisq_badge = _nisq(suggestion.nisq_safe)
warning_html = (
f"<div style='padding:8px 14px;background:#451a03;border-radius:6px;"
f"color:#fbbf24;font-size:0.82rem;margin-bottom:12px'>"
f"⚠️ {suggestion.warning}</div>"
if suggestion.warning else ""
)
return f"""
<div style="font-family:sans-serif;font-size:0.9rem">
<div style="display:flex;align-items:baseline;gap:16px;margin-bottom:20px">
<span style="font-size:2rem;font-weight:700;color:#e2e8f0">{suggestion.n_qubits}</span>
<span style="color:#a78bfa;font-weight:600">suggested qubits</span>
<span style="color:#475569;font-size:0.82rem">· {suggestion.n_features} features · task={task}</span>
<span style="margin-left:8px">{nisq_badge}</span>
</div>
{warning_html}
<div style="padding:10px 16px;background:#1e293b;border-radius:8px;color:#cbd5e1;font-size:0.85rem;margin-bottom:16px;line-height:1.6">{suggestion.reasoning}</div>
<div style="color:#64748b;font-size:0.82rem">Recommended encoding: <span style="color:#a78bfa;font-weight:600">{suggestion.encoding_hint}</span></div>
</div>"""
except Exception as exc:
return f"<p>❌ {exc}</p>"
finally:
if tmp: _rm(tmp)
# ---------------------------------------------------------------------------
# Fingerprint
# ---------------------------------------------------------------------------
def run_fingerprint(source, csv_file, sample_name, hf_name, hf_split,
encoding, framework, reducer_type, n_components, n_qubits,
use_scaler, scaler_strategy,
use_imputer, imputer_strategy,
use_outlier, outlier_method,
use_fsel, fsel_method, fsel_max):
try:
import quprep as qd
dtype, X, _ = load_data(source, csv_file, sample_name, hf_name, hf_split)
if dtype not in ("tabular", "image", "timeseries"):
return "", "<p style='color:#fbbf24'>⚠️ Fingerprinting works on numeric data (tabular / image / time series).</p>"
X_use = X
_enc_map = {
"angle": qd.AngleEncoder,
"amplitude": qd.AmplitudeEncoder,
"basis": qd.BasisEncoder,
"iqp": qd.IQPEncoder,
"entangled_angle": qd.EntangledAngleEncoder,
"reupload": qd.ReUploadEncoder,
"hamiltonian": qd.HamiltonianEncoder,
"zz_feature_map": qd.ZZFeatureMapEncoder,
"pauli_feature_map": qd.PauliFeatureMapEncoder,
"random_fourier": qd.RandomFourierEncoder,
"tensor_product": qd.TensorProductEncoder,
"qaoa_problem": qd.QAOAProblemEncoder,
}
encoder_cls = _enc_map.get(encoding, qd.AngleEncoder)
encoder = encoder_cls()
if hasattr(encoder, "fit"):
encoder.fit(X_use)
_red_map = {
"pca": lambda: qd.PCAReducer(n_components=int(n_components)),
"lda": lambda: qd.LDAReducer(n_components=int(n_components)),
"spectral": lambda: qd.SpectralReducer(n_components=int(n_components)),
"tsne": lambda: qd.TSNEReducer(n_components=int(n_components)),
"hardware_aware": lambda: qd.HardwareAwareReducer(backend=int(n_qubits)),
}
reducer = _red_map[reducer_type]() if reducer_type != "none" else None
cleaner = qd.Imputer(strategy=imputer_strategy) if use_imputer else None
outlier = qd.OutlierHandler(method=outlier_method) if use_outlier else None
scaler = qd.Scaler(strategy=scaler_strategy) if use_scaler else None
selector = qd.FeatureSelector(method=fsel_method, max_features=int(fsel_max)) if use_fsel else None
exporter = _get_exporter(framework)
pipeline = qd.Pipeline(
encoder=encoder,
exporter=exporter,
reducer=reducer,
cleaner=cleaner,
normalizer=scaler,
preprocessor=selector,
)
pipeline.fit(X_use)
fp = qd.fingerprint_pipeline(pipeline)
fp_dict = json.loads(fp.to_json())
stages_html = "".join(
f"<tr><td style='padding:6px 12px;font-family:monospace'>{stage}</td>"
f"<td style='padding:6px 12px;color:#94a3b8'>{info.get('class','')}</td>"
f"<td style='padding:6px 12px;font-size:0.78rem;color:#64748b'>{json.dumps(info.get('params',{}))}</td></tr>"
for stage, info in fp_dict.get("stages", {}).items()
)
_fw_dep = {
"qiskit": "qiskit", "pennylane": "pennylane", "cirq": "cirq-core",
"tket": "pytket", "braket": "amazon-braket-sdk", "qsharp": "qsharp",
"iqm": "iqm-client",
}
active_dep = _fw_dep.get(framework)
deps_html = "".join(
f"<span style='background:{'#2d1f63' if k == active_dep else '#1e293b'};"
f"padding:2px 8px;border-radius:4px;font-family:monospace;font-size:0.8rem;"
f"margin:2px;color:{'#a78bfa' if k == active_dep else 'inherit'}'>"
f"{k}=={v}{' ← active exporter' if k == active_dep else ''}</span> "
for k, v in fp_dict.get("dependencies", {}).items()
)
return f"""
<div style="font-family:sans-serif;font-size:0.9rem">
<p style="margin:0 0 8px;font-size:0.75rem;font-weight:600;color:#94a3b8;text-transform:uppercase;letter-spacing:.05em">Pipeline hash</p>
<div style="background:#1e293b;border-radius:8px;padding:12px 16px;font-family:monospace;font-size:0.88rem;color:#a78bfa;word-break:break-all;margin-bottom:20px">sha256:{fp.hash}</div>
<p style="margin:0 0 8px;font-size:0.75rem;font-weight:600;color:#94a3b8;text-transform:uppercase;letter-spacing:.05em">Stages</p>
<table style="width:100%;border-collapse:collapse;font-size:0.85rem;margin-bottom:16px">
<thead><tr style="border-bottom:1px solid #334155">
<th style="padding:6px 12px;text-align:left;color:#64748b;font-weight:500">Stage</th>
<th style="padding:6px 12px;text-align:left;color:#64748b;font-weight:500">Class</th>
<th style="padding:6px 12px;text-align:left;color:#64748b;font-weight:500">Params</th>
</tr></thead><tbody>{stages_html}</tbody>
</table>
<p style="margin:0 0 8px;font-size:0.75rem;font-weight:600;color:#94a3b8;text-transform:uppercase;letter-spacing:.05em">Key dependencies</p>
<div style="line-height:2">{deps_html}</div>
</div>""", ""
except Exception as exc:
return "", f"<p>❌ {exc}</p>"
# ---------------------------------------------------------------------------
# QUBO / QAOA
# ---------------------------------------------------------------------------
def run_qubo(adj_text, p_layers, problem):
try:
from quprep.qubo import max_cut, qaoa_circuit
rows = [r.strip() for r in adj_text.strip().splitlines() if r.strip()]
adj = np.array([[float(x) for x in r.split()] for r in rows])
q = max_cut(adj)
qasm = qaoa_circuit(q, p=p_layers)
info = f"""
<div style="font-family:monospace;font-size:0.9rem;line-height:1.8">
<div style="display:grid;grid-template-columns:auto 1fr;gap:2px 20px">
<span style="color:#94a3b8">Problem</span> <span>{problem.replace('_',' ').title()}</span>
<span style="color:#94a3b8">Nodes</span> <span>{adj.shape[0]}</span>
<span style="color:#94a3b8">Edges</span> <span>{int((adj != 0).sum() // 2)}</span>
<span style="color:#94a3b8">QAOA p</span> <span>{p_layers}</span>
<span style="color:#94a3b8">Qubits</span> <span>{q.n_original}</span>
</div>
</div>"""
return qasm, info
except Exception as exc:
return "", f"<p>❌ {exc}</p>"
# ---------------------------------------------------------------------------
# Gradio UI
# ---------------------------------------------------------------------------
THEME = gr.themes.Soft(primary_hue="violet", secondary_hue="blue")
CSS = """
#header-left, #header-right {
border:1px solid #334155 !important;
border-radius:12px !important;
padding:20px 24px !important;
}
#circuit-out .codemirror-wrapper,
#circuit-out .cm-editor {
max-height: 420px !important;
overflow-y: auto !important;
}
"""
SOURCE_CHOICES = ["📋 Sample dataset", "📁 Upload CSV", "🤗 HuggingFace Hub"]
with gr.Blocks(title="QuPrep — Quantum Data Preparation") as demo:
# ── Header ─────────────────────────────────────────────────────────────
with gr.Row(equal_height=True):
with gr.Column(scale=1, elem_id="header-left"):
gr.HTML("""
<p style="margin:0 0 2px;font-size:1.5rem;font-weight:700;color:#e2e8f0">⚛️ QuPrep</p>
<p style="margin:0 0 14px;font-size:0.9rem;font-weight:500;color:#a78bfa">v0.8.0 · Quantum Data Preparation</p>
<p style="margin:0 0 14px;font-size:0.85rem;color:#94a3b8;line-height:1.6">
The missing preprocessing layer between classical datasets and quantum computing.
</p>
<div style="display:flex;flex-direction:column;gap:8px;font-size:0.85rem">
<div>📦 <code style="background:#1e293b;padding:2px 8px;border-radius:4px">pip install quprep</code></div>
<div>📖 <a href="https://docs.quprep.org" target="_blank" style="color:#818cf8">docs.quprep.org</a></div>
<div>💻 <a href="https://github.com/quprep/quprep" target="_blank" style="color:#818cf8">github.com/quprep/quprep</a></div>
</div>
<p style="margin:14px 0 0;font-size:0.75rem;color:#475569">13 encodings · 8 frameworks · Apache 2.0 · Python ≥ 3.10</p>""")
with gr.Column(scale=1, elem_id="header-right"):
gr.HTML('<p style="margin:0 0 10px;font-size:1.1rem;font-weight:700;color:#e2e8f0">📂 Data source</p>')
source_radio = gr.Radio(
choices=SOURCE_CHOICES,
value="📋 Sample dataset",
label="",
container=False,
)
sample_dd = gr.Dropdown(
choices=list(SAMPLES.keys()),
value="Iris (tabular · 150×4)",
label="Sample dataset",
visible=True,
)
csv_upload = gr.File(
label="Upload CSV / TSV",
file_types=[".csv", ".tsv"],
height=100,
visible=False,
)
with gr.Row(visible=False) as hf_row:
hf_name = gr.Textbox(label="Dataset (owner/name)", placeholder="scikit-learn/iris", scale=3)
hf_split = gr.Textbox(label="Split", value="train", scale=1)
def _toggle_source(s):
is_sample = s == "📋 Sample dataset"
is_upload = s == "📁 Upload CSV"
is_hf = s == "🤗 HuggingFace Hub"
return (
gr.update(visible=is_sample),
gr.update(visible=is_upload),
gr.update(visible=is_hf),
)
source_radio.change(
fn=_toggle_source,
inputs=source_radio,
outputs=[sample_dd, csv_upload, hf_row],
)
def _src(s):
return {"📋 Sample dataset": "sample",
"📁 Upload CSV": "upload",
"🤗 HuggingFace Hub": "huggingface"}[s]
def _inputs(*extra):
return [source_radio, csv_upload, sample_dd, hf_name, hf_split] + list(extra)
# ── Tabs ───────────────────────────────────────────────────────────────
with gr.Tabs():
# Convert ──────────────────────────────────────────────────────────
with gr.TabItem("Convert"):
with gr.Row():
with gr.Column(scale=1, min_width=210):
enc_dd = gr.Dropdown(choices=ENCODINGS, value="angle", label="Encoding")
enc_info = gr.Markdown(f"<small><i>{ENCODING_DESC['angle']}</i></small>")
fw_dd = gr.Dropdown(choices=FRAMEWORKS, value="qasm", label="Framework")
ns_sl = gr.Slider(1, 20, value=5, step=1, label="Samples")
nq_sl = gr.Slider(0, 1121, value=0, step=1, label="Qubit budget (0=auto)")
gr.HTML('<p style="font-size:0.75rem;color:#475569;margin:4px 0">Graph & text data use fixed encoding — encoding/framework dropdowns are ignored.</p>')
conv_btn = gr.Button("Convert →", variant="primary", size="lg")
with gr.Column(scale=4):
conv_status = gr.Textbox(label="", lines=1, max_lines=1, interactive=False,
show_label=False, placeholder="Press Convert →")
with gr.Row(equal_height=True):
with gr.Column(scale=3):
circuit_out = gr.Code(label="Circuit (sample 0)", language="python", lines=18, max_lines=25, elem_id="circuit-out")
with gr.Column(scale=1):
cost_out = gr.HTML(label="Cost")
with gr.Row():
input_tbl = gr.Dataframe(label="Input data (first 5)", interactive=False)
encoded_tbl = gr.Dataframe(label="Encoded parameters (first 5)", interactive=False)
enc_dd.change(fn=lambda e: f"<small><i>{ENCODING_DESC.get(e,'')}</i></small>",
inputs=enc_dd, outputs=enc_info)
def _conv(src, csv, samp, hfn, hfs, enc, fw, ns, nq):
return run_convert(_src(src), csv, samp, hfn, hfs, enc, fw, ns, nq)
conv_btn.click(fn=_conv,
inputs=_inputs(enc_dd, fw_dd, ns_sl, nq_sl),
outputs=[input_tbl, encoded_tbl, circuit_out, cost_out, conv_status])
# Recommend ────────────────────────────────────────────────────────
with gr.TabItem("Recommend"):
with gr.Row():
with gr.Column(scale=1):
rec_task = gr.Dropdown(choices=TASKS, value="classification", label="Task")
rec_qsl = gr.Slider(0, 1121, value=0, step=1, label="Qubit budget (0=auto)")
rec_btn = gr.Button("Recommend →", variant="primary")
with gr.Column(scale=2):
rec_out = gr.HTML(value="<p style='color:#475569;padding:24px 0'>Click <strong>Recommend →</strong></p>")
def _rec(src, csv, samp, hfn, hfs, task, nq):
return run_recommend(_src(src), csv, samp, hfn, hfs, task, nq)
rec_btn.click(fn=_rec, inputs=_inputs(rec_task, rec_qsl), outputs=rec_out)
# Compare ──────────────────────────────────────────────────────────
with gr.TabItem("Compare encoders"):
with gr.Row():
with gr.Column(scale=1):
cmp_task = gr.Dropdown(choices=TASKS, value="classification", label="Task")
cmp_qsl = gr.Slider(0, 20, value=8, step=1, label="Qubit budget")
cmp_btn = gr.Button("Compare →", variant="primary")
with gr.Column(scale=2):
cmp_out = gr.HTML(value="<p style='color:#475569;padding:24px 0'>Click <strong>Compare →</strong></p>")
def _cmp(src, csv, samp, hfn, hfs, task, nq):
return run_compare(_src(src), csv, samp, hfn, hfs, task, nq)
cmp_btn.click(fn=_cmp, inputs=_inputs(cmp_task, cmp_qsl), outputs=cmp_out)
# Inspect ──────────────────────────────────────────────────────────
with gr.TabItem("Inspect"):
with gr.Row():
with gr.Column(scale=1):
ins_btn = gr.Button("Inspect →", variant="primary")
gr.HTML('<p style="font-size:0.82rem;color:#475569;margin:8px 0">Shape, types, missing, sparsity, per-feature stats.</p>')
with gr.Column(scale=3):
ins_out = gr.HTML(value="<p style='color:#475569;padding:24px 0'>Click <strong>Inspect →</strong></p>")
def _ins(src, csv, samp, hfn, hfs):
return run_inspect(_src(src), csv, samp, hfn, hfs)
ins_btn.click(fn=_ins, inputs=_inputs(), outputs=ins_out)
# Suggest qubits ───────────────────────────────────────────────────
with gr.TabItem("Suggest qubits"):
with gr.Row():
with gr.Column(scale=1):
sug_task = gr.Dropdown(choices=TASKS, value="classification", label="Task")
sug_max = gr.Slider(0, 30, value=0, step=1, label="Max qubits (0 = no ceiling)")
sug_btn = gr.Button("Suggest →", variant="primary")
gr.HTML('<p style="font-size:0.82rem;color:#475569;margin:8px 0">Returns the minimum qubit count that fits your data and task, with per-encoding breakdown.</p>')
with gr.Column(scale=3):
sug_out = gr.HTML(value="<p style='color:#475569;padding:24px 0'>Click <strong>Suggest →</strong></p>")
def _sug(src, csv, samp, hfn, hfs, task, mq):
return run_suggest(_src(src), csv, samp, hfn, hfs, task, mq)
sug_btn.click(fn=_sug, inputs=_inputs(sug_task, sug_max), outputs=sug_out)
# Fingerprint ──────────────────────────────────────────────────────
with gr.TabItem("Fingerprint"):
with gr.Row():
with gr.Column(scale=1):
fp_enc = gr.Dropdown(choices=ENCODINGS, value="angle", label="Encoder")
fp_fw = gr.Dropdown(choices=FRAMEWORKS, value="qasm", label="Exporter (framework)")
with gr.Accordion("Reducer", open=False):
fp_red = gr.Dropdown(choices=["none","pca","lda","spectral","tsne","hardware_aware"], value="none", label="Type")
fp_nc = gr.Slider(1, 64, value=4, step=1, label="n_components", visible=False)
fp_nq = gr.Slider(1, 1121, value=8, step=1, label="Qubit budget (hardware_aware)", visible=False)
with gr.Accordion("Scaler", open=False):
fp_scl = gr.Checkbox(label="Enable scaler", value=False)
fp_sst = gr.Dropdown(choices=["minmax","minmax_pi","minmax_pm_pi","zscore","l2","binary","pm_one"], value="minmax_pi", label="Strategy", visible=False)
with gr.Accordion("Cleaner", open=False):
fp_imp = gr.Checkbox(label="Enable imputer", value=False)
fp_ist = gr.Dropdown(choices=["mean","median","mode","knn","drop"], value="mean", label="Imputer strategy", visible=False)
fp_out = gr.Checkbox(label="Enable outlier handler", value=False)
fp_ost = gr.Dropdown(choices=["iqr","zscore","isolation_forest"], value="iqr", label="Outlier method", visible=False)
with gr.Accordion("Feature selector", open=False):
fp_fsel = gr.Checkbox(label="Enable feature selector", value=False)
fp_fsm = gr.Dropdown(choices=["correlation","mutual_info","variance"], value="correlation", label="Method", visible=False)
fp_fsmx = gr.Slider(1, 64, value=8, step=1, label="Max features", visible=False)
fp_btn = gr.Button("Fingerprint →", variant="primary")
gr.HTML('<p style="font-size:0.82rem;color:#475569;margin:8px 0">Generates a deterministic SHA-256 hash of the pipeline config — stable across runs for the same setup.</p>')
with gr.Column(scale=3):
fp_result = gr.HTML(value="<p style='color:#475569;padding:24px 0'>Click <strong>Fingerprint →</strong></p>")
fp_errmsg = gr.HTML()
fp_red.change(fn=lambda v: (gr.update(visible=v not in ("none","hardware_aware")), gr.update(visible=v=="hardware_aware")),
inputs=fp_red, outputs=[fp_nc, fp_nq])
fp_scl.change(fn=lambda v: gr.update(visible=v), inputs=fp_scl, outputs=fp_sst)
fp_imp.change(fn=lambda v: gr.update(visible=v), inputs=fp_imp, outputs=fp_ist)
fp_out.change(fn=lambda v: gr.update(visible=v), inputs=fp_out, outputs=fp_ost)
fp_fsel.change(fn=lambda v: (gr.update(visible=v), gr.update(visible=v)),
inputs=fp_fsel, outputs=[fp_fsm, fp_fsmx])
def _fp(src, csv, samp, hfn, hfs, enc, fw, red, nc, nq, scl, sst, imp, ist, out_flag, ost, fsel, fsm, fsmx):
return run_fingerprint(_src(src), csv, samp, hfn, hfs,
enc, fw, red, nc, nq, scl, sst, imp, ist, out_flag, ost, fsel, fsm, int(fsmx))
fp_btn.click(fn=_fp,
inputs=_inputs(fp_enc, fp_fw, fp_red, fp_nc, fp_nq,
fp_scl, fp_sst, fp_imp, fp_ist,
fp_out, fp_ost, fp_fsel, fp_fsm, fp_fsmx),
outputs=[fp_result, fp_errmsg])
# QUBO / QAOA ──────────────────────────────────────────────────────
with gr.TabItem("QUBO / QAOA"):
gr.HTML('<p style="font-size:0.82rem;color:#475569;margin:4px 0 12px">Independent of the data selector above — takes a graph adjacency matrix directly.</p>')
with gr.Row():
with gr.Column(scale=1):
qb_prob = gr.Dropdown(choices=["max_cut"], value="max_cut", label="Problem")
qb_adj = gr.Textbox(label="Adjacency matrix (space-separated rows)",
value="0 1 1\n1 0 1\n1 1 0", lines=5)
qb_p = gr.Slider(1, 5, value=2, step=1, label="QAOA layers (p)")
qb_btn = gr.Button("Generate QAOA circuit →", variant="primary")
with gr.Column(scale=3):
qb_info = gr.HTML(value="<p style='color:#475569'>Problem stats will appear here.</p>")
qb_out = gr.Code(label="QAOA circuit (OpenQASM 3.0)", language="python", lines=20)
qb_btn.click(fn=run_qubo, inputs=[qb_adj, qb_p, qb_prob], outputs=[qb_out, qb_info])
# About ────────────────────────────────────────────────────────────
with gr.TabItem("About"):
gr.Markdown("""
## About QuPrep
The missing preprocessing layer between classical datasets and quantum computing.
### Pipeline
```
Connect → Ingest → Clean → Reduce → Normalise → Encode → Export
```
Supports tabular, image, time series, graph, and text data.
Data connectors: HuggingFace Hub, OpenML, Kaggle, CSV/NumPy upload.
### Supported encodings (13)
| Encoding | Qubits | NISQ-safe |
|---|---|---|
| Angle | d | ✓ |
| Amplitude | ⌈log₂d⌉ | ✗ |
| Basis | d | ✓ |
| IQP | d | conditional |
| Entangled Angle | d | ✓ |
| Re-Upload | d | ✓ |
| Hamiltonian | d | ✗ |
| ZZ Feature Map | d | conditional |
| Pauli Feature Map | d | conditional |
| Random Fourier | n_components | ✓ |
| Tensor Product | ⌈d/2⌉ | ✓ |
| QAOA Problem | d | ✓ |
| Graph State | n_nodes | ✓ |
### Links
- 📦 [pypi.org/project/quprep](https://pypi.org/project/quprep/)
- 📖 [docs.quprep.org](https://docs.quprep.org)
- 🌐 [quprep.org](https://quprep.org)
- 💻 [github.com/quprep/quprep](https://github.com/quprep/quprep)
Apache 2.0 · Python ≥ 3.10
""")
if __name__ == "__main__":
demo.launch(theme=THEME, css=CSS)