cyberkyne's picture
Upload 22 files
094a5f6 verified
"""utils/hf_io.py β€” All HuggingFace Hub read/write."""
import io, json
from pathlib import Path
from typing import Optional
import pandas as pd
from huggingface_hub import HfApi, hf_hub_download, list_repo_files, CommitOperationAdd
from loguru import logger
import utils.config as cfg
def _api(): return HfApi(token=cfg.HF_TOKEN)
# ── Knowledge base ─────────────────────────────────────
def kb_load() -> dict:
empty = {"strategies": {}, "formulas": {}, "systems": {}}
if not cfg.HF_DATASET_REPO: return empty
try:
path = hf_hub_download(
repo_id=cfg.HF_DATASET_REPO, filename="knowledge_base.jsonl",
repo_type="dataset", token=cfg.HF_TOKEN,
local_dir=str(cfg.TMP), force_download=True,
)
result = {"strategies": {}, "formulas": {}, "systems": {}}
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line: continue
rec = json.loads(line)
kind = rec.get("_type", ""); cid = rec.get("canonical_id", "")
if kind in result and cid: result[kind][cid] = rec
logger.info(f"KB: {len(result['strategies'])} strats, {len(result['formulas'])} formulas")
return result
except Exception as e:
logger.warning(f"KB load (may not exist yet): {e}")
return empty
def kb_save(kb: dict) -> bool:
if not cfg.HF_DATASET_REPO: return False
try:
lines = []
for kind in ("strategies","formulas","systems"):
for rec in kb[kind].values():
lines.append(json.dumps({**rec, "_type": kind}))
_api().upload_file(
path_or_fileobj=io.BytesIO("\n".join(lines).encode()),
path_in_repo="knowledge_base.jsonl",
repo_id=cfg.HF_DATASET_REPO, repo_type="dataset",
commit_message="Update knowledge base",
)
return True
except Exception as e:
logger.error(f"KB save: {e}"); return False
# ── Tick data ──────────────────────────────────────────
def tick_list_symbols() -> list[str]:
if not cfg.HF_TICK_REPO: return []
try:
files = list(list_repo_files(repo_id=cfg.HF_TICK_REPO,
repo_type="dataset", token=cfg.HF_TOKEN))
seen = set(); syms = []
for f in files:
parts = f.split("/")
if len(parts) >= 2 and parts[0] not in seen:
seen.add(parts[0]); syms.append(parts[0])
return sorted(syms)
except Exception as e:
logger.warning(f"Tick symbols: {e}"); return []
def tick_load(symbol: str, timeframe: str = "1h") -> Optional[pd.DataFrame]:
cache = cfg.TMP / "tick_cache" / f"{symbol}_{timeframe}.parquet"
if cache.exists(): return pd.read_parquet(cache)
if not cfg.HF_TICK_REPO: return None
for fname in [f"{timeframe}.parquet", f"{timeframe}.csv",
"ticks.parquet", "data.parquet"]:
df = _try_dl(symbol, fname)
if df is not None:
df = _norm_ohlcv(df, timeframe if fname.startswith("tick") or fname=="data.parquet" else None)
if df is not None and not df.empty:
df.to_parquet(cache); return df
return None
def _try_dl(sym, fname):
try:
local = cfg.TMP / "tick_cache" / sym
local.mkdir(parents=True, exist_ok=True)
path = hf_hub_download(repo_id=cfg.HF_TICK_REPO,
filename=f"{sym}/{fname}", repo_type="dataset",
token=cfg.HF_TOKEN, local_dir=str(local), force_download=False)
return pd.read_parquet(path) if fname.endswith(".parquet") else pd.read_csv(path)
except Exception:
return None
_TF_MAP = {"1m":"1min","5m":"5min","15m":"15min","30m":"30min",
"1h":"1h","4h":"4h","1d":"1D","1w":"1W"}
def _norm_ohlcv(df: pd.DataFrame, resample_to=None) -> Optional[pd.DataFrame]:
import numpy as np
df = df.copy()
ts = next((c for c in df.columns if "time" in c.lower() or "date" in c.lower()), None)
if ts: df.index = pd.to_datetime(df[ts], utc=True); df = df.drop(columns=[ts])
else:
try: df.index = pd.to_datetime(df.index, utc=True)
except: return None
df.index = df.index.tz_convert("UTC") if df.index.tz else df.index.tz_localize("UTC")
df = df.sort_index()
if resample_to:
price_col = next((c for c in df.columns if c.lower() in ("bid","mid","price","close")), None)
if price_col is None: return None
if "bid" in df.columns and "ask" in df.columns:
df["_price"] = (df["bid"] + df["ask"]) / 2
else: df["_price"] = df[price_col]
rule = _TF_MAP.get(resample_to, "1h")
ohlcv = df["_price"].resample(rule).ohlc()
ohlcv.columns = ["open","high","low","close"]
vcol = next((c for c in df.columns if "vol" in c.lower()), None)
ohlcv["volume"] = df[vcol].resample(rule).sum() if vcol else df["_price"].resample(rule).count()
return ohlcv.dropna()
renames = {}
for c in df.columns:
lc = c.lower()
if lc in ("o","open"): renames[c]="open"
elif lc in ("h","high"): renames[c]="high"
elif lc in ("l","low"): renames[c]="low"
elif lc in ("c","close"): renames[c]="close"
elif lc in ("v","vol","volume","tick_volume"): renames[c]="volume"
df = df.rename(columns=renames)
for col in ["open","high","low","close"]:
if col not in df.columns: return None
if "volume" not in df.columns: df["volume"] = 0.0
df = df[["open","high","low","close","volume"]].astype(float).dropna(subset=["open","high","low","close"])
bad = df["high"] < df["low"]
if bad.any(): df.loc[bad,["high","low"]] = df.loc[bad,["low","high"]].values
return df
# ── Batch push ─────────────────────────────────────────
def push_batch(files: list[tuple[str, bytes]], msg="Update") -> int:
if not cfg.HF_DATASET_REPO or not files: return 0
ops = [CommitOperationAdd(path_in_repo=p, path_or_fileobj=io.BytesIO(c)) for p,c in files]
pushed = 0
for i in range(0, len(ops), 100):
try:
_api().create_commit(repo_id=cfg.HF_DATASET_REPO, repo_type="dataset",
operations=ops[i:i+100], commit_message=f"{msg} [{i+1}–{i+len(ops[i:i+100])}]")
pushed += len(ops[i:i+100])
except Exception as e: logger.error(f"Batch push: {e}")
return pushed
def push_result(name, symbol, tf, report, opt_json, mt5_set, julia_cfg) -> bool:
from pipeline.exporter import slugify
sl = slugify(name); pre = f"{sl}_{symbol}_{tf}"
files = [
(f"backtests/{sl}/{pre}_report.md", report.encode()),
(f"optimal_sets/{pre}_optimal.json", json.dumps(opt_json,indent=2).encode()),
(f"optimal_sets/{pre}.set", mt5_set.encode()),
(f"optimal_sets/{pre}_config.jl", julia_cfg.encode()),
]
return push_batch(files, f"Backtest: {name} {symbol} {tf}") == 4
def push_index(md: str, data: dict) -> bool:
return push_batch([
("optimal_sets/BACKTEST_INDEX.md", md.encode()),
("optimal_sets/backtest_index.json", json.dumps(data,indent=2).encode()),
], "Update index") == 2
def fetch_index() -> dict:
try:
path = hf_hub_download(repo_id=cfg.HF_DATASET_REPO,
filename="optimal_sets/backtest_index.json",
repo_type="dataset", token=cfg.HF_TOKEN,
local_dir=str(cfg.TMP), force_download=True)
return json.loads(Path(path).read_text())
except: return {}
def fetch_file(remote: str) -> Optional[bytes]:
try:
path = hf_hub_download(repo_id=cfg.HF_DATASET_REPO,
filename=remote, repo_type="dataset", token=cfg.HF_TOKEN,
local_dir=str(cfg.TMP/"downloads"), force_download=True)
return Path(path).read_bytes()
except: return None
def pdf_upload(pdf_path: Path) -> str:
if not cfg.HF_DATASET_REPO: return ""
try:
return str(_api().upload_file(path_or_fileobj=str(pdf_path),
path_in_repo=f"pdfs/{pdf_path.name}",
repo_id=cfg.HF_DATASET_REPO, repo_type="dataset",
commit_message=f"Add PDF: {pdf_path.name}"))
except Exception as e:
logger.warning(f"PDF upload: {e}"); return ""