""" app.py — HuggingFace Spaces entry point. Architecture: Python : Gradio UI, Claude API calls, HF I/O, PDF processing Julia : Indicators, BacktestEngine, WalkForwardOptimizer, SignalCompiler Python NEVER does numerical computation. It only: 1. Calls Claude API (extraction + strategy code generation) 2. Calls Julia via juliacall for all math 3. Reads/writes HuggingFace datasets 4. Renders Gradio UI """ import io, json, zipfile, tempfile from pathlib import Path from datetime import datetime import gradio as gr from loguru import logger import utils.config as cfg import utils.hf_io as hf from pipeline.pdf_processor import PDFProcessor from pipeline.extractor import AIExtractor, Deduplicator from pipeline.julia_bridge import full_backtest_pipeline, julia_available from pipeline.exporter import ( slugify, strategy_md, formula_md, backtest_report_md, optimal_json, mt5_set, julia_config, index_md, ) # ── Lazy KB ─────────────────────────────────────────── _kb = None def get_kb(): global _kb if _kb is None: _kb = hf.kb_load() return _kb def reset_kb(): global _kb; _kb = hf.kb_load() # ═══════════════════════════════════════════════════ # TAB 1 — UPLOAD & EXTRACT # ═══════════════════════════════════════════════════ def _save_and_resolve_pdfs(pdf_files) -> list: """ Gradio 6 passes uploaded files as plain string paths into a temp dir that may be cleaned up before or during processing. This function: 1. Immediately copies every uploaded file to /tmp/quant/pdfs/ (persistent for session) 2. Uploads each to HuggingFace dataset pdfs/ folder (persistent across restarts) 3. Returns stable local Path objects ready for processing """ import shutil PDF_DIR = cfg.TMP / "pdfs" PDF_DIR.mkdir(parents=True, exist_ok=True) resolved = [] for f in (pdf_files or []): try: # Gradio 6: f is a str path; Gradio 5: f has .name attribute src = Path(f.name if hasattr(f, "name") else f) if not src.exists(): logger.warning(f"Uploaded path does not exist: {src}") continue dst = PDF_DIR / src.name if not dst.exists(): shutil.copy2(str(src), str(dst)) resolved.append(dst) # Persist to HuggingFace if cfg.HF_TOKEN and cfg.HF_DATASET_REPO: hf.pdf_upload(dst) except Exception as e: logger.error(f"Failed to resolve upload {f}: {e}") return resolved def load_pdfs_from_hf() -> list: """List PDFs previously uploaded to HuggingFace dataset.""" try: from huggingface_hub import list_repo_files files = list(list_repo_files( repo_id=cfg.HF_DATASET_REPO, repo_type="dataset", token=cfg.HF_TOKEN, )) return sorted([f for f in files if f.startswith("pdfs/") and f.endswith(".pdf")]) except Exception as e: logger.warning(f"Could not list HF PDFs: {e}") return [] def download_pdf_from_hf(remote_path: str) -> Path | None: """Download a PDF from HuggingFace to local cache.""" try: from huggingface_hub import hf_hub_download PDF_DIR = cfg.TMP / "pdfs" PDF_DIR.mkdir(parents=True, exist_ok=True) local = hf_hub_download( repo_id=cfg.HF_DATASET_REPO, filename=remote_path, repo_type="dataset", token=cfg.HF_TOKEN, local_dir=str(PDF_DIR), force_download=False, ) return Path(local) except Exception as e: logger.warning(f"Failed to download {remote_path}: {e}") return None def _extract_paths(paths: list, log: list, totals: dict, progress, kb: dict): """Core extraction loop — shared by new upload and re-process from HF.""" proc = PDFProcessor() ai = AIExtractor() dedup = Deduplicator() hf_files = [] for i, path in enumerate(paths): progress((i + 1) / max(len(paths), 1), desc=f"{path.name}") log.append(f"\n📖 [{i+1}/{len(paths)}] {path.name}") try: chunks = list(proc.process(path)) log.append(f" → {len(chunks)} chunks extracted") except Exception as e: log.append(f" ❌ PDF read error: {e}") continue for chunk in chunks: try: extracted = ai.extract(chunk) stats = dedup.process(extracted, kb) for kind in ("strategies", "formulas", "systems"): for act in ("added", "merged", "skipped"): totals[kind][act] += stats[kind][act] except Exception as e: log.append(f" ⚠️ Chunk error: {e}") log.append(f" → New: {totals['strategies']['added']} strats, " f"{totals['formulas']['added']} formulas") for cid, rec in kb["strategies"].items(): hf_files.append((f"extracted/strategies/{slugify(rec.get('name',''))}.md", strategy_md(rec).encode())) for cid, rec in kb["formulas"].items(): hf_files.append((f"extracted/formulas/{slugify(rec.get('name',''))}.md", formula_md(rec).encode())) progress(0.95, desc="Saving to HuggingFace…") hf.kb_save(kb) if hf_files and cfg.HF_TOKEN: pushed = hf.push_batch(hf_files, "Update extracted knowledge") log.append(f"\n☁️ Pushed {pushed} markdown files to HuggingFace") reset_kb() return ai.tokens_used def run_extraction(pdf_files, progress=gr.Progress()): if not cfg.ANTHROPIC_API_KEY: return "❌ ANTHROPIC_API_KEY secret not set.", "" if not cfg.HF_DATASET_REPO: return "❌ HF_DATASET_REPO secret not set.", "" # Step 1: resolve uploads → stable local paths + upload to HF progress(0.0, desc="Saving uploads to HuggingFace…") paths = _save_and_resolve_pdfs(pdf_files) if not paths: return ("⚠️ No valid PDFs found. Upload files above, " "or use 'Re-process from HF' to reprocess previously uploaded PDFs."), "" kb = get_kb() log = [] totals = {k: {"added":0,"merged":0,"skipped":0} for k in ("strategies","formulas","systems")} tokens = _extract_paths(paths, log, totals, progress, kb) counts = {k: len(kb[k]) for k in kb} summary = f"""✅ Extraction Complete PDFs processed : {len(paths)} Strategies — added: {totals['strategies']['added']} merged: {totals['strategies']['merged']} skipped: {totals['strategies']['skipped']} Formulas — added: {totals['formulas']['added']} merged: {totals['formulas']['merged']} skipped: {totals['formulas']['skipped']} Systems — added: {totals['systems']['added']} merged: {totals['systems']['merged']} skipped: {totals['systems']['skipped']} KB totals : {counts['strategies']} strategies · {counts['formulas']} formulas · {counts['systems']} systems Tokens used : {tokens:,} PDFs stored : HuggingFace → {cfg.HF_DATASET_REPO}/pdfs/""" return summary, "\n".join(log[-50:]) def reprocess_from_hf(selected_pdfs, progress=gr.Progress()): """Download selected PDFs from HF and re-extract.""" if not cfg.ANTHROPIC_API_KEY: return "❌ ANTHROPIC_API_KEY secret not set.", "" if not cfg.HF_DATASET_REPO: return "❌ HF_DATASET_REPO secret not set.", "" if not selected_pdfs: return "⚠️ No PDFs selected.", "" progress(0.0, desc="Downloading from HuggingFace…") paths = [] for remote in selected_pdfs: p = download_pdf_from_hf(remote) if p: paths.append(p) if not paths: return "❌ Could not download any PDFs from HuggingFace.", "" kb = get_kb() log = [f"Re-processing {len(paths)} PDF(s) from HuggingFace\n"] totals = {k: {"added":0,"merged":0,"skipped":0} for k in ("strategies","formulas","systems")} tokens = _extract_paths(paths, log, totals, progress, kb) counts = {k: len(kb[k]) for k in kb} return (f"✅ Re-extraction complete\n" f"PDFs: {len(paths)} · " f"Strategies: +{totals['strategies']['added']} · " f"Formulas: +{totals['formulas']['added']}\n" f"KB totals: {counts['strategies']} strategies · " f"{counts['formulas']} formulas\n" f"Tokens: {tokens:,}"), "\n".join(log[-50:]) def refresh_hf_pdf_list(): pdfs = load_pdfs_from_hf() return gr.update(choices=pdfs, value=[]) # ═══════════════════════════════════════════════════ # TAB 2 — BROWSE KB # ═══════════════════════════════════════════════════ def search_strategies(query, category): kb = get_kb(); items = list(kb["strategies"].values()) if category and category != "All": items = [x for x in items if x.get("category") == category] if query: q = query.lower() items = [x for x in items if q in x.get("name","").lower() or q in x.get("description","").lower()] rows = [[x.get("name","")[:50], x.get("category",""), x.get("description","")[:100], ", ".join(x.get("sources",[]))[:40], len(x.get("layers",[]))] for x in items[:100]] return rows, f"{len(items)} strategies" def search_formulas(query): kb = get_kb(); items = list(kb["formulas"].values()) if query: q = query.lower() items = [x for x in items if q in x.get("name","").lower() or q in x.get("purpose","").lower()] return [[x.get("name","")[:50], x.get("category",""), x.get("purpose","")[:80], "✅" if x.get("latex") else "—", ", ".join(x.get("sources",[]))[:40]] for x in items[:100]] def dl_strategy(name): kb = get_kb() for rec in kb["strategies"].values(): if rec.get("name","").lower() == name.strip().lower(): tmp = tempfile.mktemp(suffix=".md") Path(tmp).write_text(strategy_md(rec), encoding="utf-8") return tmp return None def dl_all_strategies_zip(category): kb = get_kb(); items = list(kb["strategies"].values()) if category and category != "All": items = [x for x in items if x.get("category") == category] tmp = tempfile.mktemp(suffix=".zip") with zipfile.ZipFile(tmp, "w", zipfile.ZIP_DEFLATED) as zf: for rec in items: zf.writestr(f"{slugify(rec.get('name','unknown'))}.md", strategy_md(rec)) return tmp # ═══════════════════════════════════════════════════ # TAB 3 — BACKTEST (Julia Engine) # ═══════════════════════════════════════════════════ def load_symbols(): syms = hf.tick_list_symbols() return gr.update(choices=syms, value=syms[:2] if len(syms)>=2 else syms) def run_backtests(selected_symbols, selected_timeframes, strategy_filter, max_strategies, viable_only, progress=gr.Progress()): if not cfg.HF_TICK_REPO: return "❌ HF_TICK_REPO not set.", "" if not cfg.ANTHROPIC_API_KEY: return "❌ ANTHROPIC_API_KEY not set.", "" if not julia_available(): return "❌ Julia runtime not available. Check build logs.", "" ai = AIExtractor() kb = get_kb() strats = list(kb["strategies"].values()) if strategy_filter: strats = [s for s in strats if strategy_filter.lower() in s.get("name","").lower()] if max_strategies > 0: strats = strats[:int(max_strategies)] if not strats: return "⚠️ No strategies. Run extraction first.", "" symbols = selected_symbols or hf.tick_list_symbols()[:2] timeframes = selected_timeframes or ["1h"] log, all_results, viable_count = [], [], 0 for si, rec in enumerate(strats): name = rec.get("name","?") progress(si/len(strats), desc=f"[{si+1}/{len(strats)}] {name[:35]}") # 1. Generate Julia signal code via Claude jl_code = ai.compile_strategy_code(rec) if not jl_code: log.append(f"❌ Code gen failed: {name[:40]}"); continue log.append(f"✅ Julia code generated: {name[:40]}") for sym in symbols: for tf in timeframes: df = hf.tick_load(sym, tf) if df is None or len(df) < 200: log.append(f" ⚠️ {sym} {tf}: no data"); continue # 2. Full Julia pipeline (compile → optimize → backtest) result = full_backtest_pipeline( strategy_code = jl_code, strategy_name = name, open_p = df["open"].values, high = df["high"].values, low = df["low"].values, close = df["close"].values, volume = df["volume"].values, timeframe = tf, symbol = sym, n_windows = cfg.WF_WINDOWS, is_ratio = cfg.WF_IS_RATIO, min_trades = cfg.MIN_TRADES, min_sharpe = cfg.MIN_SHARPE, max_combos = cfg.MAX_PARAM_COMBOS, initial_equity = cfg.INITIAL_EQUITY, commission_pct = cfg.COMMISSION_PCT, risk_per_trade = cfg.RISK_PER_TRADE, ) all_results.append(result) # 3. Build + push output files if cfg.HF_TOKEN and cfg.HF_DATASET_REPO: if not viable_only or result.get("is_viable"): hf.push_result( name, sym, tf, backtest_report_md(result, rec), optimal_json(result, rec), mt5_set(result, rec), julia_config(result), ) status = "✅" if result.get("is_viable") else "❌" log.append( f" {status} {sym} {tf}: " f"Sharpe={result.get('oos_sharpe_mean',0):.2f} " f"DD={result.get('oos_max_dd',0):.1f}% " f"Score={result.get('robustness',0):.0f}") if result.get("is_viable"): viable_count += 1 # 4. Push master index if all_results and cfg.HF_TOKEN: hf.push_index(index_md(all_results), { "generated": datetime.now().isoformat(), "engine": "Julia 1.11", "total_strategies": len(all_results), "viable_count": viable_count, "strategies": all_results, }) summary = f"""🏁 Julia Backtest Complete Engine: Julia 1.11 BacktestEngine.jl Strategies compiled: {len(strats)} Combinations tested: {len(all_results)} Viable strategies: {viable_count} Pass rate: {viable_count/max(len(all_results),1)*100:.1f}% Results on HuggingFace: {cfg.HF_DATASET_REPO}/optimal_sets/BACKTEST_INDEX.md""" return summary, "\n".join(log[-60:]) # ═══════════════════════════════════════════════════ # TAB 4 — RESULTS # ═══════════════════════════════════════════════════ def load_results(): data = hf.fetch_index() if not data: return [], "No results yet." strats = data.get("strategies",[]) viable = sorted([s for s in strats if s.get("is_viable")], key=lambda x: x.get("oos_sharpe_mean",0), reverse=True) rows = [[s.get("strategy","")[:45], s.get("symbol",""), s.get("timeframe",""), f'{s.get("oos_sharpe_mean",0):.2f}', f'{s.get("oos_max_dd",0):.1f}%', f'{s.get("oos_win_rate",0):.1f}%', f'{s.get("oos_pf_mean",0):.2f}', f'{s.get("robustness",0):.0f}'] for s in viable] count = (f"✅ {len(viable)} viable / {len(strats)} tested | " f"Engine: Julia | {data.get('generated','')[:16]}") return rows, count def dl_result_file(name, symbol, tf, ftype): sl = slugify(name); sym = symbol.upper().strip() pre = f"{sl}_{sym}_{tf}" ext_map = {"MT5 .set file": f"optimal_sets/{pre}.set", "Optimal JSON": f"optimal_sets/{pre}_optimal.json", "Julia config": f"optimal_sets/{pre}_config.jl", "Full report": f"backtests/{sl}/{pre}_report.md"} remote = ext_map.get(ftype,"") if not remote: return None data = hf.fetch_file(remote) if not data: return None tmp = tempfile.mktemp(suffix=Path(remote).suffix) Path(tmp).write_bytes(data) return tmp def dl_all_sets(): data = hf.fetch_index() if not data: return None tmp = tempfile.mktemp(suffix=".zip") with zipfile.ZipFile(tmp,"w",zipfile.ZIP_DEFLATED) as zf: for s in data.get("strategies",[]): if not s.get("is_viable"): continue sl = slugify(s["strategy"]); sym = s["symbol"]; tf = s["timeframe"] content = hf.fetch_file(f"optimal_sets/{sl}_{sym}_{tf}.set") if content: zf.writestr(f"{sl}_{sym}_{tf}.set", content) return tmp # ═══════════════════════════════════════════════════ # TAB 5 — SETUP # ═══════════════════════════════════════════════════ def check_config(): checks = [ ("ANTHROPIC_API_KEY", cfg.ANTHROPIC_API_KEY, "Claude API"), ("HF_TOKEN", cfg.HF_TOKEN, "HF write access"), ("HF_DATASET_REPO", cfg.HF_DATASET_REPO, "Results storage"), ("HF_TICK_REPO", cfg.HF_TICK_REPO, "Tick data source"), ] kb = get_kb() symbols = hf.tick_list_symbols() if cfg.HF_TICK_REPO else [] jl_ok = julia_available() lines = ["## Configuration Status", ""] for name, val, desc in checks: icon = "✅" if val else "❌" lines.append(f"{icon} `{name}` — {desc}") lines += ["", "## Julia Engine", "", f"{'✅' if jl_ok else '❌'} Julia runtime: {'available' if jl_ok else 'not available (check build logs)'}", "", "## Data Status", "", f"- Tick symbols: **{len(symbols)}** — {', '.join(symbols[:8])}", f"- Strategies in KB: **{len(kb['strategies'])}**", f"- Formulas in KB: **{len(kb['formulas'])}**", "", "## Backtest Settings", "", f"- WF Windows: `{cfg.WF_WINDOWS}` · IS Ratio: `{cfg.WF_IS_RATIO}`", f"- Min Trades: `{cfg.MIN_TRADES}` · Min Sharpe: `{cfg.MIN_SHARPE}`", f"- Commission: `{cfg.COMMISSION_PCT*100:.3f}%` · Risk/trade: `{cfg.RISK_PER_TRADE*100:.1f}%`", f"- Timeframes: `{', '.join(cfg.BACKTEST_TFS)}`"] return "\n".join(lines) # ═══════════════════════════════════════════════════ # BUILD APP # ═══════════════════════════════════════════════════ CATS = ["All"] + cfg.CATEGORIES CSS = ".status-box{font-family:monospace;font-size:.82em}" with gr.Blocks(title="Quant Knowledge Extractor — Julia Engine") as demo: gr.HTML("""
Julia 1.11 Engine · BacktestEngine.jl · WalkForward Optimizer · MT5 .set Output