"""utils/hf_io.py — All HuggingFace Hub read/write.""" import io, json from pathlib import Path from typing import Optional import pandas as pd from huggingface_hub import HfApi, hf_hub_download, list_repo_files, CommitOperationAdd from loguru import logger import utils.config as cfg def _api(): return HfApi(token=cfg.HF_TOKEN) # ── Knowledge base ───────────────────────────────────── def kb_load() -> dict: empty = {"strategies": {}, "formulas": {}, "systems": {}} if not cfg.HF_DATASET_REPO: return empty try: path = hf_hub_download( repo_id=cfg.HF_DATASET_REPO, filename="knowledge_base.jsonl", repo_type="dataset", token=cfg.HF_TOKEN, local_dir=str(cfg.TMP), force_download=True, ) result = {"strategies": {}, "formulas": {}, "systems": {}} with open(path, encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue rec = json.loads(line) kind = rec.get("_type", ""); cid = rec.get("canonical_id", "") if kind in result and cid: result[kind][cid] = rec logger.info(f"KB: {len(result['strategies'])} strats, {len(result['formulas'])} formulas") return result except Exception as e: logger.warning(f"KB load (may not exist yet): {e}") return empty def kb_save(kb: dict) -> bool: if not cfg.HF_DATASET_REPO: return False try: lines = [] for kind in ("strategies","formulas","systems"): for rec in kb[kind].values(): lines.append(json.dumps({**rec, "_type": kind})) _api().upload_file( path_or_fileobj=io.BytesIO("\n".join(lines).encode()), path_in_repo="knowledge_base.jsonl", repo_id=cfg.HF_DATASET_REPO, repo_type="dataset", commit_message="Update knowledge base", ) return True except Exception as e: logger.error(f"KB save: {e}"); return False # ── Tick data ────────────────────────────────────────── def tick_list_symbols() -> list[str]: if not cfg.HF_TICK_REPO: return [] try: files = list(list_repo_files(repo_id=cfg.HF_TICK_REPO, repo_type="dataset", token=cfg.HF_TOKEN)) seen = set(); syms = [] for f in files: parts = f.split("/") if len(parts) >= 2 and parts[0] not in seen: seen.add(parts[0]); syms.append(parts[0]) return sorted(syms) except Exception as e: logger.warning(f"Tick symbols: {e}"); return [] def tick_load(symbol: str, timeframe: str = "1h") -> Optional[pd.DataFrame]: cache = cfg.TMP / "tick_cache" / f"{symbol}_{timeframe}.parquet" if cache.exists(): return pd.read_parquet(cache) if not cfg.HF_TICK_REPO: return None for fname in [f"{timeframe}.parquet", f"{timeframe}.csv", "ticks.parquet", "data.parquet"]: df = _try_dl(symbol, fname) if df is not None: df = _norm_ohlcv(df, timeframe if fname.startswith("tick") or fname=="data.parquet" else None) if df is not None and not df.empty: df.to_parquet(cache); return df return None def _try_dl(sym, fname): try: local = cfg.TMP / "tick_cache" / sym local.mkdir(parents=True, exist_ok=True) path = hf_hub_download(repo_id=cfg.HF_TICK_REPO, filename=f"{sym}/{fname}", repo_type="dataset", token=cfg.HF_TOKEN, local_dir=str(local), force_download=False) return pd.read_parquet(path) if fname.endswith(".parquet") else pd.read_csv(path) except Exception: return None _TF_MAP = {"1m":"1min","5m":"5min","15m":"15min","30m":"30min", "1h":"1h","4h":"4h","1d":"1D","1w":"1W"} def _norm_ohlcv(df: pd.DataFrame, resample_to=None) -> Optional[pd.DataFrame]: import numpy as np df = df.copy() ts = next((c for c in df.columns if "time" in c.lower() or "date" in c.lower()), None) if ts: df.index = pd.to_datetime(df[ts], utc=True); df = df.drop(columns=[ts]) else: try: df.index = pd.to_datetime(df.index, utc=True) except: return None df.index = df.index.tz_convert("UTC") if df.index.tz else df.index.tz_localize("UTC") df = df.sort_index() if resample_to: price_col = next((c for c in df.columns if c.lower() in ("bid","mid","price","close")), None) if price_col is None: return None if "bid" in df.columns and "ask" in df.columns: df["_price"] = (df["bid"] + df["ask"]) / 2 else: df["_price"] = df[price_col] rule = _TF_MAP.get(resample_to, "1h") ohlcv = df["_price"].resample(rule).ohlc() ohlcv.columns = ["open","high","low","close"] vcol = next((c for c in df.columns if "vol" in c.lower()), None) ohlcv["volume"] = df[vcol].resample(rule).sum() if vcol else df["_price"].resample(rule).count() return ohlcv.dropna() renames = {} for c in df.columns: lc = c.lower() if lc in ("o","open"): renames[c]="open" elif lc in ("h","high"): renames[c]="high" elif lc in ("l","low"): renames[c]="low" elif lc in ("c","close"): renames[c]="close" elif lc in ("v","vol","volume","tick_volume"): renames[c]="volume" df = df.rename(columns=renames) for col in ["open","high","low","close"]: if col not in df.columns: return None if "volume" not in df.columns: df["volume"] = 0.0 df = df[["open","high","low","close","volume"]].astype(float).dropna(subset=["open","high","low","close"]) bad = df["high"] < df["low"] if bad.any(): df.loc[bad,["high","low"]] = df.loc[bad,["low","high"]].values return df # ── Batch push ───────────────────────────────────────── def push_batch(files: list[tuple[str, bytes]], msg="Update") -> int: if not cfg.HF_DATASET_REPO or not files: return 0 ops = [CommitOperationAdd(path_in_repo=p, path_or_fileobj=io.BytesIO(c)) for p,c in files] pushed = 0 for i in range(0, len(ops), 100): try: _api().create_commit(repo_id=cfg.HF_DATASET_REPO, repo_type="dataset", operations=ops[i:i+100], commit_message=f"{msg} [{i+1}–{i+len(ops[i:i+100])}]") pushed += len(ops[i:i+100]) except Exception as e: logger.error(f"Batch push: {e}") return pushed def push_result(name, symbol, tf, report, opt_json, mt5_set, julia_cfg) -> bool: from pipeline.exporter import slugify sl = slugify(name); pre = f"{sl}_{symbol}_{tf}" files = [ (f"backtests/{sl}/{pre}_report.md", report.encode()), (f"optimal_sets/{pre}_optimal.json", json.dumps(opt_json,indent=2).encode()), (f"optimal_sets/{pre}.set", mt5_set.encode()), (f"optimal_sets/{pre}_config.jl", julia_cfg.encode()), ] return push_batch(files, f"Backtest: {name} {symbol} {tf}") == 4 def push_index(md: str, data: dict) -> bool: return push_batch([ ("optimal_sets/BACKTEST_INDEX.md", md.encode()), ("optimal_sets/backtest_index.json", json.dumps(data,indent=2).encode()), ], "Update index") == 2 def fetch_index() -> dict: try: path = hf_hub_download(repo_id=cfg.HF_DATASET_REPO, filename="optimal_sets/backtest_index.json", repo_type="dataset", token=cfg.HF_TOKEN, local_dir=str(cfg.TMP), force_download=True) return json.loads(Path(path).read_text()) except: return {} def fetch_file(remote: str) -> Optional[bytes]: try: path = hf_hub_download(repo_id=cfg.HF_DATASET_REPO, filename=remote, repo_type="dataset", token=cfg.HF_TOKEN, local_dir=str(cfg.TMP/"downloads"), force_download=True) return Path(path).read_bytes() except: return None def pdf_upload(pdf_path: Path) -> str: if not cfg.HF_DATASET_REPO: return "" try: return str(_api().upload_file(path_or_fileobj=str(pdf_path), path_in_repo=f"pdfs/{pdf_path.name}", repo_id=cfg.HF_DATASET_REPO, repo_type="dataset", commit_message=f"Add PDF: {pdf_path.name}")) except Exception as e: logger.warning(f"PDF upload: {e}"); return ""