Spaces:
Sleeping
Sleeping
| """utils/hf_io.py β All HuggingFace Hub read/write.""" | |
| import io, json | |
| from pathlib import Path | |
| from typing import Optional | |
| import pandas as pd | |
| from huggingface_hub import HfApi, hf_hub_download, list_repo_files, CommitOperationAdd | |
| from loguru import logger | |
| import utils.config as cfg | |
| def _api(): return HfApi(token=cfg.HF_TOKEN) | |
| # ββ Knowledge base βββββββββββββββββββββββββββββββββββββ | |
| def kb_load() -> dict: | |
| empty = {"strategies": {}, "formulas": {}, "systems": {}} | |
| if not cfg.HF_DATASET_REPO: return empty | |
| try: | |
| path = hf_hub_download( | |
| repo_id=cfg.HF_DATASET_REPO, filename="knowledge_base.jsonl", | |
| repo_type="dataset", token=cfg.HF_TOKEN, | |
| local_dir=str(cfg.TMP), force_download=True, | |
| ) | |
| result = {"strategies": {}, "formulas": {}, "systems": {}} | |
| with open(path, encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: continue | |
| rec = json.loads(line) | |
| kind = rec.get("_type", ""); cid = rec.get("canonical_id", "") | |
| if kind in result and cid: result[kind][cid] = rec | |
| logger.info(f"KB: {len(result['strategies'])} strats, {len(result['formulas'])} formulas") | |
| return result | |
| except Exception as e: | |
| logger.warning(f"KB load (may not exist yet): {e}") | |
| return empty | |
| def kb_save(kb: dict) -> bool: | |
| if not cfg.HF_DATASET_REPO: return False | |
| try: | |
| lines = [] | |
| for kind in ("strategies","formulas","systems"): | |
| for rec in kb[kind].values(): | |
| lines.append(json.dumps({**rec, "_type": kind})) | |
| _api().upload_file( | |
| path_or_fileobj=io.BytesIO("\n".join(lines).encode()), | |
| path_in_repo="knowledge_base.jsonl", | |
| repo_id=cfg.HF_DATASET_REPO, repo_type="dataset", | |
| commit_message="Update knowledge base", | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error(f"KB save: {e}"); return False | |
| # ββ Tick data ββββββββββββββββββββββββββββββββββββββββββ | |
| def tick_list_symbols() -> list[str]: | |
| if not cfg.HF_TICK_REPO: return [] | |
| try: | |
| files = list(list_repo_files(repo_id=cfg.HF_TICK_REPO, | |
| repo_type="dataset", token=cfg.HF_TOKEN)) | |
| seen = set(); syms = [] | |
| for f in files: | |
| parts = f.split("/") | |
| if len(parts) >= 2 and parts[0] not in seen: | |
| seen.add(parts[0]); syms.append(parts[0]) | |
| return sorted(syms) | |
| except Exception as e: | |
| logger.warning(f"Tick symbols: {e}"); return [] | |
| def tick_load(symbol: str, timeframe: str = "1h") -> Optional[pd.DataFrame]: | |
| cache = cfg.TMP / "tick_cache" / f"{symbol}_{timeframe}.parquet" | |
| if cache.exists(): return pd.read_parquet(cache) | |
| if not cfg.HF_TICK_REPO: return None | |
| for fname in [f"{timeframe}.parquet", f"{timeframe}.csv", | |
| "ticks.parquet", "data.parquet"]: | |
| df = _try_dl(symbol, fname) | |
| if df is not None: | |
| df = _norm_ohlcv(df, timeframe if fname.startswith("tick") or fname=="data.parquet" else None) | |
| if df is not None and not df.empty: | |
| df.to_parquet(cache); return df | |
| return None | |
| def _try_dl(sym, fname): | |
| try: | |
| local = cfg.TMP / "tick_cache" / sym | |
| local.mkdir(parents=True, exist_ok=True) | |
| path = hf_hub_download(repo_id=cfg.HF_TICK_REPO, | |
| filename=f"{sym}/{fname}", repo_type="dataset", | |
| token=cfg.HF_TOKEN, local_dir=str(local), force_download=False) | |
| return pd.read_parquet(path) if fname.endswith(".parquet") else pd.read_csv(path) | |
| except Exception: | |
| return None | |
| _TF_MAP = {"1m":"1min","5m":"5min","15m":"15min","30m":"30min", | |
| "1h":"1h","4h":"4h","1d":"1D","1w":"1W"} | |
| def _norm_ohlcv(df: pd.DataFrame, resample_to=None) -> Optional[pd.DataFrame]: | |
| import numpy as np | |
| df = df.copy() | |
| ts = next((c for c in df.columns if "time" in c.lower() or "date" in c.lower()), None) | |
| if ts: df.index = pd.to_datetime(df[ts], utc=True); df = df.drop(columns=[ts]) | |
| else: | |
| try: df.index = pd.to_datetime(df.index, utc=True) | |
| except: return None | |
| df.index = df.index.tz_convert("UTC") if df.index.tz else df.index.tz_localize("UTC") | |
| df = df.sort_index() | |
| if resample_to: | |
| price_col = next((c for c in df.columns if c.lower() in ("bid","mid","price","close")), None) | |
| if price_col is None: return None | |
| if "bid" in df.columns and "ask" in df.columns: | |
| df["_price"] = (df["bid"] + df["ask"]) / 2 | |
| else: df["_price"] = df[price_col] | |
| rule = _TF_MAP.get(resample_to, "1h") | |
| ohlcv = df["_price"].resample(rule).ohlc() | |
| ohlcv.columns = ["open","high","low","close"] | |
| vcol = next((c for c in df.columns if "vol" in c.lower()), None) | |
| ohlcv["volume"] = df[vcol].resample(rule).sum() if vcol else df["_price"].resample(rule).count() | |
| return ohlcv.dropna() | |
| renames = {} | |
| for c in df.columns: | |
| lc = c.lower() | |
| if lc in ("o","open"): renames[c]="open" | |
| elif lc in ("h","high"): renames[c]="high" | |
| elif lc in ("l","low"): renames[c]="low" | |
| elif lc in ("c","close"): renames[c]="close" | |
| elif lc in ("v","vol","volume","tick_volume"): renames[c]="volume" | |
| df = df.rename(columns=renames) | |
| for col in ["open","high","low","close"]: | |
| if col not in df.columns: return None | |
| if "volume" not in df.columns: df["volume"] = 0.0 | |
| df = df[["open","high","low","close","volume"]].astype(float).dropna(subset=["open","high","low","close"]) | |
| bad = df["high"] < df["low"] | |
| if bad.any(): df.loc[bad,["high","low"]] = df.loc[bad,["low","high"]].values | |
| return df | |
| # ββ Batch push βββββββββββββββββββββββββββββββββββββββββ | |
| def push_batch(files: list[tuple[str, bytes]], msg="Update") -> int: | |
| if not cfg.HF_DATASET_REPO or not files: return 0 | |
| ops = [CommitOperationAdd(path_in_repo=p, path_or_fileobj=io.BytesIO(c)) for p,c in files] | |
| pushed = 0 | |
| for i in range(0, len(ops), 100): | |
| try: | |
| _api().create_commit(repo_id=cfg.HF_DATASET_REPO, repo_type="dataset", | |
| operations=ops[i:i+100], commit_message=f"{msg} [{i+1}β{i+len(ops[i:i+100])}]") | |
| pushed += len(ops[i:i+100]) | |
| except Exception as e: logger.error(f"Batch push: {e}") | |
| return pushed | |
| def push_result(name, symbol, tf, report, opt_json, mt5_set, julia_cfg) -> bool: | |
| from pipeline.exporter import slugify | |
| sl = slugify(name); pre = f"{sl}_{symbol}_{tf}" | |
| files = [ | |
| (f"backtests/{sl}/{pre}_report.md", report.encode()), | |
| (f"optimal_sets/{pre}_optimal.json", json.dumps(opt_json,indent=2).encode()), | |
| (f"optimal_sets/{pre}.set", mt5_set.encode()), | |
| (f"optimal_sets/{pre}_config.jl", julia_cfg.encode()), | |
| ] | |
| return push_batch(files, f"Backtest: {name} {symbol} {tf}") == 4 | |
| def push_index(md: str, data: dict) -> bool: | |
| return push_batch([ | |
| ("optimal_sets/BACKTEST_INDEX.md", md.encode()), | |
| ("optimal_sets/backtest_index.json", json.dumps(data,indent=2).encode()), | |
| ], "Update index") == 2 | |
| def fetch_index() -> dict: | |
| try: | |
| path = hf_hub_download(repo_id=cfg.HF_DATASET_REPO, | |
| filename="optimal_sets/backtest_index.json", | |
| repo_type="dataset", token=cfg.HF_TOKEN, | |
| local_dir=str(cfg.TMP), force_download=True) | |
| return json.loads(Path(path).read_text()) | |
| except: return {} | |
| def fetch_file(remote: str) -> Optional[bytes]: | |
| try: | |
| path = hf_hub_download(repo_id=cfg.HF_DATASET_REPO, | |
| filename=remote, repo_type="dataset", token=cfg.HF_TOKEN, | |
| local_dir=str(cfg.TMP/"downloads"), force_download=True) | |
| return Path(path).read_bytes() | |
| except: return None | |
| def pdf_upload(pdf_path: Path) -> str: | |
| if not cfg.HF_DATASET_REPO: return "" | |
| try: | |
| return str(_api().upload_file(path_or_fileobj=str(pdf_path), | |
| path_in_repo=f"pdfs/{pdf_path.name}", | |
| repo_id=cfg.HF_DATASET_REPO, repo_type="dataset", | |
| commit_message=f"Add PDF: {pdf_path.name}")) | |
| except Exception as e: | |
| logger.warning(f"PDF upload: {e}"); return "" | |