Vector-HaSH-agent-trader_v1 / data_fetcher.py
algorembrant's picture
Upload 31 files
134a55d verified
#!/usr/bin/env python3
"""
╔══════════════════════════════════════════════════════════════════════════╗
β•‘ data_fetcher.py β€” MT5 XAUUSDc M3 Data Fetcher β•‘
β•‘ Fetches 1-year OHLCV + spread from MetaTrader5 (3-min candles) β•‘
β•‘ Saves CSV + symbol_info.json. Run locally with MT5 terminal open. β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
"""
import sys, time, json
import numpy as np
import pandas as pd
from datetime import datetime, timedelta, timezone
from pathlib import Path
try:
import MetaTrader5 as mt5
except ImportError:
print("ERROR: MetaTrader5 package not installed. Run: pip install MetaTrader5")
sys.exit(1)
# ══════════════════════════════════════════════════════════════════════════
# CONFIGURATION
# ══════════════════════════════════════════════════════════════════════════
SYMBOL = "XAUUSDc"
TIMEFRAME_M1 = mt5.TIMEFRAME_M1 # Fetch M1, resample to M3
TF_LABEL = "M3"
RESAMPLE_MINS = 3 # 3-minute candles
LOOKBACK_DAYS = 365 # 1 year
OUTPUT_DIR = Path(__file__).resolve().parent
OUTPUT_CSV = OUTPUT_DIR / f"{SYMBOL}_{TF_LABEL}_data.csv"
OUTPUT_JSON = OUTPUT_DIR / f"{SYMBOL}_symbol_info.json"
# ══════════════════════════════════════════════════════════════════════════
# MT5 CONNECTION
# ══════════════════════════════════════════════════════════════════════════
def init_mt5() -> None:
"""Initialize MT5 connection with retries."""
for attempt in range(3):
if mt5.initialize():
info = mt5.terminal_info()
print(f"βœ“ MT5 connected β€” Build {info.build}, Company: {info.company}")
return
print(f" Attempt {attempt+1}/3 failed, retrying in 2s...")
time.sleep(2)
print(f"βœ— MT5 initialization failed: {mt5.last_error()}")
sys.exit(1)
def validate_symbol(symbol: str) -> dict:
"""Validate symbol exists and return its properties."""
info = mt5.symbol_info(symbol)
if info is None:
symbols = mt5.symbols_get()
gold_syms = [s.name for s in symbols if "XAU" in s.name or "GOLD" in s.name.upper()]
print(f"βœ— Symbol '{symbol}' not found.")
if gold_syms:
print(f" Available gold symbols: {gold_syms}")
else:
print(f" No gold symbols found. Check your broker.")
sys.exit(1)
if not info.visible:
mt5.symbol_select(symbol, True)
time.sleep(0.5)
props = {
"name": info.name,
"digits": info.digits,
"point": info.point,
"spread": info.spread,
"trade_mode": info.trade_mode,
"volume_min": info.volume_min,
"volume_max": info.volume_max,
"volume_step": info.volume_step,
"trade_contract_size": info.trade_contract_size,
"trade_tick_value": info.trade_tick_value,
"trade_tick_size": info.trade_tick_size,
"currency_profit": info.currency_profit,
}
print(f"βœ“ Symbol validated: {info.name}")
print(f" Digits: {info.digits} | Point: {info.point} | "
f"Spread: {info.spread} | Min Lot: {info.volume_min} | "
f"Max Lot: {info.volume_max} | Contract: {info.trade_contract_size}")
return props
# ══════════════════════════════════════════════════════════════════════════
# DATA FETCHING (M1 β†’ resample to M3)
# ══════════════════════════════════════════════════════════════════════════
def fetch_ohlcv(symbol: str, days: int) -> pd.DataFrame:
"""Fetch M1 OHLCV from MT5, then resample to 3-minute candles."""
utc_now = datetime.now(timezone.utc)
date_from = utc_now - timedelta(days=days)
print(f"\nβ†’ Fetching M1 bars: {date_from.date()} to {utc_now.date()} …")
print(f" (Will resample M1 β†’ M{RESAMPLE_MINS} after fetching)")
# Fetch in chunks to avoid MT5 limits (max ~100k bars per request)
chunk_days = 30
all_frames = []
current_start = date_from
while current_start < utc_now:
chunk_end = min(current_start + timedelta(days=chunk_days), utc_now)
rates = mt5.copy_rates_range(symbol, TIMEFRAME_M1, current_start, chunk_end)
if rates is not None and len(rates) > 0:
chunk_df = pd.DataFrame(rates)
chunk_df["time"] = pd.to_datetime(chunk_df["time"], unit="s", utc=True)
all_frames.append(chunk_df)
print(f" Chunk {current_start.date()} β†’ {chunk_end.date()}: {len(chunk_df):,} M1 bars")
else:
err = mt5.last_error()
print(f" Chunk {current_start.date()} β†’ {chunk_end.date()}: no data ({err})")
current_start = chunk_end
if not all_frames:
print(f"βœ— No M1 data returned from any chunk")
sys.exit(1)
df = pd.concat(all_frames, ignore_index=True)
df = df.drop_duplicates(subset="time").sort_values("time").reset_index(drop=True)
df.rename(columns={"real_volume": "volume"}, inplace=True, errors="ignore")
print(f"βœ“ Total M1 bars: {len(df):,}")
print(f" M1 range: {df['time'].iloc[0]} β†’ {df['time'].iloc[-1]}")
# ── Resample M1 β†’ M3 ──
print(f"\nβ†’ Resampling M1 β†’ M{RESAMPLE_MINS} …")
df.set_index("time", inplace=True)
resampled = df.resample(f"{RESAMPLE_MINS}min", label="right", closed="right").agg({
"open": "first",
"high": "max",
"low": "min",
"close": "last",
"tick_volume": "sum",
"spread": "last",
}).dropna(subset=["open"])
# Also resample volume if present
if "volume" in df.columns:
resampled["volume"] = df["volume"].resample(f"{RESAMPLE_MINS}min", label="right", closed="right").sum()
resampled.reset_index(inplace=True)
# Ensure required columns
required = ["time", "open", "high", "low", "close", "tick_volume", "spread"]
for col in required:
if col not in resampled.columns:
if col == "spread":
resampled["spread"] = 0
elif col == "tick_volume":
resampled["tick_volume"] = 0
print(f"βœ“ Resampled to {len(resampled):,} M{RESAMPLE_MINS} bars")
print(f" M3 range: {resampled['time'].iloc[0]} β†’ {resampled['time'].iloc[-1]}")
return resampled
def fetch_spread_from_ticks(symbol: str, days: int) -> float | None:
"""Fetch recent tick data to compute median spread. Returns median spread in points."""
print(f"\nβ†’ Computing spread from tick data (sampling last 30 days) …")
utc_now = datetime.now(timezone.utc)
tick_start = utc_now - timedelta(days=min(days, 30))
ticks = mt5.copy_ticks_range(symbol, tick_start, utc_now, mt5.COPY_TICKS_INFO)
if ticks is None or len(ticks) == 0:
print(f" ⚠ No tick data available, using bar spread column")
return None
tick_df = pd.DataFrame(ticks)
tick_df["time"] = pd.to_datetime(tick_df["time"], unit="s", utc=True)
tick_df["spread_pts"] = (tick_df["ask"] - tick_df["bid"]) / mt5.symbol_info(symbol).point
avg_spread = tick_df["spread_pts"].mean()
median_spread = tick_df["spread_pts"].median()
max_spread = tick_df["spread_pts"].quantile(0.99)
print(f"βœ“ Processed {len(tick_df):,} ticks")
print(f" Avg spread: {avg_spread:.1f} pts | "
f"Median: {median_spread:.1f} pts | "
f"99th pctl: {max_spread:.1f} pts")
return median_spread
# ══════════════════════════════════════════════════════════════════════════
# DATA VALIDATION & CLEANING
# ══════════════════════════════════════════════════════════════════════════
def validate_data(df: pd.DataFrame) -> pd.DataFrame:
"""Validate and clean OHLCV data."""
print(f"\nβ†’ Validating data quality …")
issues = []
# 1. NaN
nan_count = df[["open", "high", "low", "close"]].isnull().sum().sum()
if nan_count > 0:
issues.append(f" ⚠ {nan_count} NaN values in OHLCV β€” forward-filling")
df[["open", "high", "low", "close"]] = df[["open", "high", "low", "close"]].ffill()
# 2. OHLC integrity
bad_hl = (df["high"] < df["low"]).sum()
if bad_hl > 0:
issues.append(f" ⚠ {bad_hl} bars where high < low β€” swapping")
mask = df["high"] < df["low"]
df.loc[mask, ["high", "low"]] = df.loc[mask, ["low", "high"]].values
bad_range = ((df["open"] > df["high"]) | (df["open"] < df["low"]) |
(df["close"] > df["high"]) | (df["close"] < df["low"])).sum()
if bad_range > 0:
issues.append(f" ⚠ {bad_range} bars where open/close outside H-L β€” clamping")
df["open"] = df["open"].clip(lower=df["low"], upper=df["high"])
df["close"] = df["close"].clip(lower=df["low"], upper=df["high"])
# 3. Duplicates
dups = df["time"].duplicated().sum()
if dups > 0:
issues.append(f" ⚠ {dups} duplicate timestamps β€” keeping last")
df = df.drop_duplicates(subset="time", keep="last")
# 4. Large gaps (> 5 days)
time_diff = df["time"].diff()
large_gaps = time_diff[time_diff > pd.Timedelta(days=5)]
for idx in large_gaps.index:
gap = time_diff.loc[idx]
issues.append(f" ⚠ Large gap: {df['time'].iloc[idx-1]} β†’ {df['time'].iloc[idx]} ({gap})")
# 5. Sort
df = df.sort_values("time").reset_index(drop=True)
# 6. Remove weekends
weekend_mask = df["time"].dt.dayofweek.isin([5, 6])
weekend_count = weekend_mask.sum()
if weekend_count > 0:
issues.append(f" β„Ή Removed {weekend_count} weekend bars")
df = df[~weekend_mask].reset_index(drop=True)
if issues:
for issue in issues:
print(issue)
else:
print(" βœ“ Data quality: PASS (no issues found)")
print(f"\n Final dataset: {len(df):,} bars")
print(f" Price range: {df['close'].min():.2f} β€” {df['close'].max():.2f}")
print(f" Avg spread: {df['spread'].mean():.1f} pts")
print(f" Date range: {df['time'].iloc[0].date()} β†’ {df['time'].iloc[-1].date()}")
return df
# ══════════════════════════════════════════════════════════════════════════
# MAIN
# ══════════════════════════════════════════════════════════════════════════
def main():
print("=" * 68)
print(f" MT5 Data Fetcher β€” {SYMBOL} {TF_LABEL} (1 Year)")
print("=" * 68)
# 1. Connect
init_mt5()
try:
# 2. Validate symbol & save info
sym_props = validate_symbol(SYMBOL)
# Save symbol info JSON for Colab / EA consumption
with open(OUTPUT_JSON, "w") as f:
json.dump(sym_props, f, indent=2, default=str)
print(f"\nβœ“ Symbol info saved: {OUTPUT_JSON}")
# 3. Fetch OHLCV
df = fetch_ohlcv(SYMBOL, LOOKBACK_DAYS)
# 4. Enhance spread from ticks
median_spread = fetch_spread_from_ticks(SYMBOL, LOOKBACK_DAYS)
if median_spread is not None:
zero_mask = df["spread"] == 0
if zero_mask.sum() > 0:
df.loc[zero_mask, "spread"] = int(median_spread)
print(f" Filled {zero_mask.sum()} zero-spread bars with median: {median_spread:.0f}")
# 5. Validate
df = validate_data(df)
# 6. Add metadata columns
df["hour"] = df["time"].dt.hour
df["dayofweek"] = df["time"].dt.dayofweek
df["returns"] = np.log(df["close"] / df["close"].shift(1))
# 7. Save CSV
output_cols = [
"time", "open", "high", "low", "close",
"tick_volume", "spread", "hour", "dayofweek", "returns",
]
if "volume" in df.columns and "volume" not in output_cols:
output_cols.insert(5, "volume")
df_out = df[[c for c in output_cols if c in df.columns]]
df_out.to_csv(OUTPUT_CSV, index=False)
print(f"\n{'=' * 68}")
print(f" βœ“ SAVED: {OUTPUT_CSV}")
print(f" βœ“ Rows: {len(df_out):,} | Columns: {len(df_out.columns)}")
print(f" βœ“ File size: {OUTPUT_CSV.stat().st_size / 1024:.0f} KB")
print(f"{'=' * 68}")
print(f"\nSample (first 3 rows):")
print(df_out.head(3).to_string(index=False))
print(f"\nSample (last 3 rows):")
print(df_out.tail(3).to_string(index=False))
finally:
mt5.shutdown()
print("\nβœ“ MT5 connection closed.")
if __name__ == "__main__":
main()