#!/usr/bin/env python3 """ ╔══════════════════════════════════════════════════════════════════════════╗ ║ data_fetcher.py — MT5 XAUUSDc M3 Data Fetcher ║ ║ Fetches 1-year OHLCV + spread from MetaTrader5 (3-min candles) ║ ║ Saves CSV + symbol_info.json. Run locally with MT5 terminal open. ║ ╚══════════════════════════════════════════════════════════════════════════╝ """ import sys, time, json import numpy as np import pandas as pd from datetime import datetime, timedelta, timezone from pathlib import Path try: import MetaTrader5 as mt5 except ImportError: print("ERROR: MetaTrader5 package not installed. Run: pip install MetaTrader5") sys.exit(1) # ══════════════════════════════════════════════════════════════════════════ # CONFIGURATION # ══════════════════════════════════════════════════════════════════════════ SYMBOL = "XAUUSDc" TIMEFRAME_M1 = mt5.TIMEFRAME_M1 # Fetch M1, resample to M3 TF_LABEL = "M3" RESAMPLE_MINS = 3 # 3-minute candles LOOKBACK_DAYS = 365 # 1 year OUTPUT_DIR = Path(__file__).resolve().parent OUTPUT_CSV = OUTPUT_DIR / f"{SYMBOL}_{TF_LABEL}_data.csv" OUTPUT_JSON = OUTPUT_DIR / f"{SYMBOL}_symbol_info.json" # ══════════════════════════════════════════════════════════════════════════ # MT5 CONNECTION # ══════════════════════════════════════════════════════════════════════════ def init_mt5() -> None: """Initialize MT5 connection with retries.""" for attempt in range(3): if mt5.initialize(): info = mt5.terminal_info() print(f"✓ MT5 connected — Build {info.build}, Company: {info.company}") return print(f" Attempt {attempt+1}/3 failed, retrying in 2s...") time.sleep(2) print(f"✗ MT5 initialization failed: {mt5.last_error()}") sys.exit(1) def validate_symbol(symbol: str) -> dict: """Validate symbol exists and return its properties.""" info = mt5.symbol_info(symbol) if info is None: symbols = mt5.symbols_get() gold_syms = [s.name for s in symbols if "XAU" in s.name or "GOLD" in s.name.upper()] print(f"✗ Symbol '{symbol}' not found.") if gold_syms: print(f" Available gold symbols: {gold_syms}") else: print(f" No gold symbols found. Check your broker.") sys.exit(1) if not info.visible: mt5.symbol_select(symbol, True) time.sleep(0.5) props = { "name": info.name, "digits": info.digits, "point": info.point, "spread": info.spread, "trade_mode": info.trade_mode, "volume_min": info.volume_min, "volume_max": info.volume_max, "volume_step": info.volume_step, "trade_contract_size": info.trade_contract_size, "trade_tick_value": info.trade_tick_value, "trade_tick_size": info.trade_tick_size, "currency_profit": info.currency_profit, } print(f"✓ Symbol validated: {info.name}") print(f" Digits: {info.digits} | Point: {info.point} | " f"Spread: {info.spread} | Min Lot: {info.volume_min} | " f"Max Lot: {info.volume_max} | Contract: {info.trade_contract_size}") return props # ══════════════════════════════════════════════════════════════════════════ # DATA FETCHING (M1 → resample to M3) # ══════════════════════════════════════════════════════════════════════════ def fetch_ohlcv(symbol: str, days: int) -> pd.DataFrame: """Fetch M1 OHLCV from MT5, then resample to 3-minute candles.""" utc_now = datetime.now(timezone.utc) date_from = utc_now - timedelta(days=days) print(f"\n→ Fetching M1 bars: {date_from.date()} to {utc_now.date()} …") print(f" (Will resample M1 → M{RESAMPLE_MINS} after fetching)") # Fetch in chunks to avoid MT5 limits (max ~100k bars per request) chunk_days = 30 all_frames = [] current_start = date_from while current_start < utc_now: chunk_end = min(current_start + timedelta(days=chunk_days), utc_now) rates = mt5.copy_rates_range(symbol, TIMEFRAME_M1, current_start, chunk_end) if rates is not None and len(rates) > 0: chunk_df = pd.DataFrame(rates) chunk_df["time"] = pd.to_datetime(chunk_df["time"], unit="s", utc=True) all_frames.append(chunk_df) print(f" Chunk {current_start.date()} → {chunk_end.date()}: {len(chunk_df):,} M1 bars") else: err = mt5.last_error() print(f" Chunk {current_start.date()} → {chunk_end.date()}: no data ({err})") current_start = chunk_end if not all_frames: print(f"✗ No M1 data returned from any chunk") sys.exit(1) df = pd.concat(all_frames, ignore_index=True) df = df.drop_duplicates(subset="time").sort_values("time").reset_index(drop=True) df.rename(columns={"real_volume": "volume"}, inplace=True, errors="ignore") print(f"✓ Total M1 bars: {len(df):,}") print(f" M1 range: {df['time'].iloc[0]} → {df['time'].iloc[-1]}") # ── Resample M1 → M3 ── print(f"\n→ Resampling M1 → M{RESAMPLE_MINS} …") df.set_index("time", inplace=True) resampled = df.resample(f"{RESAMPLE_MINS}min", label="right", closed="right").agg({ "open": "first", "high": "max", "low": "min", "close": "last", "tick_volume": "sum", "spread": "last", }).dropna(subset=["open"]) # Also resample volume if present if "volume" in df.columns: resampled["volume"] = df["volume"].resample(f"{RESAMPLE_MINS}min", label="right", closed="right").sum() resampled.reset_index(inplace=True) # Ensure required columns required = ["time", "open", "high", "low", "close", "tick_volume", "spread"] for col in required: if col not in resampled.columns: if col == "spread": resampled["spread"] = 0 elif col == "tick_volume": resampled["tick_volume"] = 0 print(f"✓ Resampled to {len(resampled):,} M{RESAMPLE_MINS} bars") print(f" M3 range: {resampled['time'].iloc[0]} → {resampled['time'].iloc[-1]}") return resampled def fetch_spread_from_ticks(symbol: str, days: int) -> float | None: """Fetch recent tick data to compute median spread. Returns median spread in points.""" print(f"\n→ Computing spread from tick data (sampling last 30 days) …") utc_now = datetime.now(timezone.utc) tick_start = utc_now - timedelta(days=min(days, 30)) ticks = mt5.copy_ticks_range(symbol, tick_start, utc_now, mt5.COPY_TICKS_INFO) if ticks is None or len(ticks) == 0: print(f" ⚠ No tick data available, using bar spread column") return None tick_df = pd.DataFrame(ticks) tick_df["time"] = pd.to_datetime(tick_df["time"], unit="s", utc=True) tick_df["spread_pts"] = (tick_df["ask"] - tick_df["bid"]) / mt5.symbol_info(symbol).point avg_spread = tick_df["spread_pts"].mean() median_spread = tick_df["spread_pts"].median() max_spread = tick_df["spread_pts"].quantile(0.99) print(f"✓ Processed {len(tick_df):,} ticks") print(f" Avg spread: {avg_spread:.1f} pts | " f"Median: {median_spread:.1f} pts | " f"99th pctl: {max_spread:.1f} pts") return median_spread # ══════════════════════════════════════════════════════════════════════════ # DATA VALIDATION & CLEANING # ══════════════════════════════════════════════════════════════════════════ def validate_data(df: pd.DataFrame) -> pd.DataFrame: """Validate and clean OHLCV data.""" print(f"\n→ Validating data quality …") issues = [] # 1. NaN nan_count = df[["open", "high", "low", "close"]].isnull().sum().sum() if nan_count > 0: issues.append(f" ⚠ {nan_count} NaN values in OHLCV — forward-filling") df[["open", "high", "low", "close"]] = df[["open", "high", "low", "close"]].ffill() # 2. OHLC integrity bad_hl = (df["high"] < df["low"]).sum() if bad_hl > 0: issues.append(f" ⚠ {bad_hl} bars where high < low — swapping") mask = df["high"] < df["low"] df.loc[mask, ["high", "low"]] = df.loc[mask, ["low", "high"]].values bad_range = ((df["open"] > df["high"]) | (df["open"] < df["low"]) | (df["close"] > df["high"]) | (df["close"] < df["low"])).sum() if bad_range > 0: issues.append(f" ⚠ {bad_range} bars where open/close outside H-L — clamping") df["open"] = df["open"].clip(lower=df["low"], upper=df["high"]) df["close"] = df["close"].clip(lower=df["low"], upper=df["high"]) # 3. Duplicates dups = df["time"].duplicated().sum() if dups > 0: issues.append(f" ⚠ {dups} duplicate timestamps — keeping last") df = df.drop_duplicates(subset="time", keep="last") # 4. Large gaps (> 5 days) time_diff = df["time"].diff() large_gaps = time_diff[time_diff > pd.Timedelta(days=5)] for idx in large_gaps.index: gap = time_diff.loc[idx] issues.append(f" ⚠ Large gap: {df['time'].iloc[idx-1]} → {df['time'].iloc[idx]} ({gap})") # 5. Sort df = df.sort_values("time").reset_index(drop=True) # 6. Remove weekends weekend_mask = df["time"].dt.dayofweek.isin([5, 6]) weekend_count = weekend_mask.sum() if weekend_count > 0: issues.append(f" ℹ Removed {weekend_count} weekend bars") df = df[~weekend_mask].reset_index(drop=True) if issues: for issue in issues: print(issue) else: print(" ✓ Data quality: PASS (no issues found)") print(f"\n Final dataset: {len(df):,} bars") print(f" Price range: {df['close'].min():.2f} — {df['close'].max():.2f}") print(f" Avg spread: {df['spread'].mean():.1f} pts") print(f" Date range: {df['time'].iloc[0].date()} → {df['time'].iloc[-1].date()}") return df # ══════════════════════════════════════════════════════════════════════════ # MAIN # ══════════════════════════════════════════════════════════════════════════ def main(): print("=" * 68) print(f" MT5 Data Fetcher — {SYMBOL} {TF_LABEL} (1 Year)") print("=" * 68) # 1. Connect init_mt5() try: # 2. Validate symbol & save info sym_props = validate_symbol(SYMBOL) # Save symbol info JSON for Colab / EA consumption with open(OUTPUT_JSON, "w") as f: json.dump(sym_props, f, indent=2, default=str) print(f"\n✓ Symbol info saved: {OUTPUT_JSON}") # 3. Fetch OHLCV df = fetch_ohlcv(SYMBOL, LOOKBACK_DAYS) # 4. Enhance spread from ticks median_spread = fetch_spread_from_ticks(SYMBOL, LOOKBACK_DAYS) if median_spread is not None: zero_mask = df["spread"] == 0 if zero_mask.sum() > 0: df.loc[zero_mask, "spread"] = int(median_spread) print(f" Filled {zero_mask.sum()} zero-spread bars with median: {median_spread:.0f}") # 5. Validate df = validate_data(df) # 6. Add metadata columns df["hour"] = df["time"].dt.hour df["dayofweek"] = df["time"].dt.dayofweek df["returns"] = np.log(df["close"] / df["close"].shift(1)) # 7. Save CSV output_cols = [ "time", "open", "high", "low", "close", "tick_volume", "spread", "hour", "dayofweek", "returns", ] if "volume" in df.columns and "volume" not in output_cols: output_cols.insert(5, "volume") df_out = df[[c for c in output_cols if c in df.columns]] df_out.to_csv(OUTPUT_CSV, index=False) print(f"\n{'=' * 68}") print(f" ✓ SAVED: {OUTPUT_CSV}") print(f" ✓ Rows: {len(df_out):,} | Columns: {len(df_out.columns)}") print(f" ✓ File size: {OUTPUT_CSV.stat().st_size / 1024:.0f} KB") print(f"{'=' * 68}") print(f"\nSample (first 3 rows):") print(df_out.head(3).to_string(index=False)) print(f"\nSample (last 3 rows):") print(df_out.tail(3).to_string(index=False)) finally: mt5.shutdown() print("\n✓ MT5 connection closed.") if __name__ == "__main__": main()