| |
| """ |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| β data_fetcher.py β MT5 XAUUSDc M3 Data Fetcher β |
| β Fetches 1-year OHLCV + spread from MetaTrader5 (3-min candles) β |
| β Saves CSV + symbol_info.json. Run locally with MT5 terminal open. β |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| """ |
|
|
| import sys, time, json |
| import numpy as np |
| import pandas as pd |
| from datetime import datetime, timedelta, timezone |
| from pathlib import Path |
|
|
| try: |
| import MetaTrader5 as mt5 |
| except ImportError: |
| print("ERROR: MetaTrader5 package not installed. Run: pip install MetaTrader5") |
| sys.exit(1) |
|
|
| |
| |
| |
| SYMBOL = "XAUUSDc" |
| TIMEFRAME_M1 = mt5.TIMEFRAME_M1 |
| TF_LABEL = "M3" |
| RESAMPLE_MINS = 3 |
| LOOKBACK_DAYS = 365 |
| OUTPUT_DIR = Path(__file__).resolve().parent |
| OUTPUT_CSV = OUTPUT_DIR / f"{SYMBOL}_{TF_LABEL}_data.csv" |
| OUTPUT_JSON = OUTPUT_DIR / f"{SYMBOL}_symbol_info.json" |
|
|
|
|
| |
| |
| |
| def init_mt5() -> None: |
| """Initialize MT5 connection with retries.""" |
| for attempt in range(3): |
| if mt5.initialize(): |
| info = mt5.terminal_info() |
| print(f"β MT5 connected β Build {info.build}, Company: {info.company}") |
| return |
| print(f" Attempt {attempt+1}/3 failed, retrying in 2s...") |
| time.sleep(2) |
| print(f"β MT5 initialization failed: {mt5.last_error()}") |
| sys.exit(1) |
|
|
|
|
| def validate_symbol(symbol: str) -> dict: |
| """Validate symbol exists and return its properties.""" |
| info = mt5.symbol_info(symbol) |
| if info is None: |
| symbols = mt5.symbols_get() |
| gold_syms = [s.name for s in symbols if "XAU" in s.name or "GOLD" in s.name.upper()] |
| print(f"β Symbol '{symbol}' not found.") |
| if gold_syms: |
| print(f" Available gold symbols: {gold_syms}") |
| else: |
| print(f" No gold symbols found. Check your broker.") |
| sys.exit(1) |
|
|
| if not info.visible: |
| mt5.symbol_select(symbol, True) |
| time.sleep(0.5) |
|
|
| props = { |
| "name": info.name, |
| "digits": info.digits, |
| "point": info.point, |
| "spread": info.spread, |
| "trade_mode": info.trade_mode, |
| "volume_min": info.volume_min, |
| "volume_max": info.volume_max, |
| "volume_step": info.volume_step, |
| "trade_contract_size": info.trade_contract_size, |
| "trade_tick_value": info.trade_tick_value, |
| "trade_tick_size": info.trade_tick_size, |
| "currency_profit": info.currency_profit, |
| } |
| print(f"β Symbol validated: {info.name}") |
| print(f" Digits: {info.digits} | Point: {info.point} | " |
| f"Spread: {info.spread} | Min Lot: {info.volume_min} | " |
| f"Max Lot: {info.volume_max} | Contract: {info.trade_contract_size}") |
| return props |
|
|
|
|
| |
| |
| |
| def fetch_ohlcv(symbol: str, days: int) -> pd.DataFrame: |
| """Fetch M1 OHLCV from MT5, then resample to 3-minute candles.""" |
| utc_now = datetime.now(timezone.utc) |
| date_from = utc_now - timedelta(days=days) |
|
|
| print(f"\nβ Fetching M1 bars: {date_from.date()} to {utc_now.date()} β¦") |
| print(f" (Will resample M1 β M{RESAMPLE_MINS} after fetching)") |
|
|
| |
| chunk_days = 30 |
| all_frames = [] |
| current_start = date_from |
|
|
| while current_start < utc_now: |
| chunk_end = min(current_start + timedelta(days=chunk_days), utc_now) |
| rates = mt5.copy_rates_range(symbol, TIMEFRAME_M1, current_start, chunk_end) |
|
|
| if rates is not None and len(rates) > 0: |
| chunk_df = pd.DataFrame(rates) |
| chunk_df["time"] = pd.to_datetime(chunk_df["time"], unit="s", utc=True) |
| all_frames.append(chunk_df) |
| print(f" Chunk {current_start.date()} β {chunk_end.date()}: {len(chunk_df):,} M1 bars") |
| else: |
| err = mt5.last_error() |
| print(f" Chunk {current_start.date()} β {chunk_end.date()}: no data ({err})") |
|
|
| current_start = chunk_end |
|
|
| if not all_frames: |
| print(f"β No M1 data returned from any chunk") |
| sys.exit(1) |
|
|
| df = pd.concat(all_frames, ignore_index=True) |
| df = df.drop_duplicates(subset="time").sort_values("time").reset_index(drop=True) |
| df.rename(columns={"real_volume": "volume"}, inplace=True, errors="ignore") |
|
|
| print(f"β Total M1 bars: {len(df):,}") |
| print(f" M1 range: {df['time'].iloc[0]} β {df['time'].iloc[-1]}") |
|
|
| |
| print(f"\nβ Resampling M1 β M{RESAMPLE_MINS} β¦") |
| df.set_index("time", inplace=True) |
|
|
| resampled = df.resample(f"{RESAMPLE_MINS}min", label="right", closed="right").agg({ |
| "open": "first", |
| "high": "max", |
| "low": "min", |
| "close": "last", |
| "tick_volume": "sum", |
| "spread": "last", |
| }).dropna(subset=["open"]) |
|
|
| |
| if "volume" in df.columns: |
| resampled["volume"] = df["volume"].resample(f"{RESAMPLE_MINS}min", label="right", closed="right").sum() |
|
|
| resampled.reset_index(inplace=True) |
|
|
| |
| required = ["time", "open", "high", "low", "close", "tick_volume", "spread"] |
| for col in required: |
| if col not in resampled.columns: |
| if col == "spread": |
| resampled["spread"] = 0 |
| elif col == "tick_volume": |
| resampled["tick_volume"] = 0 |
|
|
| print(f"β Resampled to {len(resampled):,} M{RESAMPLE_MINS} bars") |
| print(f" M3 range: {resampled['time'].iloc[0]} β {resampled['time'].iloc[-1]}") |
| return resampled |
|
|
|
|
|
|
| def fetch_spread_from_ticks(symbol: str, days: int) -> float | None: |
| """Fetch recent tick data to compute median spread. Returns median spread in points.""" |
| print(f"\nβ Computing spread from tick data (sampling last 30 days) β¦") |
|
|
| utc_now = datetime.now(timezone.utc) |
| tick_start = utc_now - timedelta(days=min(days, 30)) |
|
|
| ticks = mt5.copy_ticks_range(symbol, tick_start, utc_now, mt5.COPY_TICKS_INFO) |
|
|
| if ticks is None or len(ticks) == 0: |
| print(f" β No tick data available, using bar spread column") |
| return None |
|
|
| tick_df = pd.DataFrame(ticks) |
| tick_df["time"] = pd.to_datetime(tick_df["time"], unit="s", utc=True) |
| tick_df["spread_pts"] = (tick_df["ask"] - tick_df["bid"]) / mt5.symbol_info(symbol).point |
|
|
| avg_spread = tick_df["spread_pts"].mean() |
| median_spread = tick_df["spread_pts"].median() |
| max_spread = tick_df["spread_pts"].quantile(0.99) |
|
|
| print(f"β Processed {len(tick_df):,} ticks") |
| print(f" Avg spread: {avg_spread:.1f} pts | " |
| f"Median: {median_spread:.1f} pts | " |
| f"99th pctl: {max_spread:.1f} pts") |
| return median_spread |
|
|
|
|
| |
| |
| |
| def validate_data(df: pd.DataFrame) -> pd.DataFrame: |
| """Validate and clean OHLCV data.""" |
| print(f"\nβ Validating data quality β¦") |
| issues = [] |
|
|
| |
| nan_count = df[["open", "high", "low", "close"]].isnull().sum().sum() |
| if nan_count > 0: |
| issues.append(f" β {nan_count} NaN values in OHLCV β forward-filling") |
| df[["open", "high", "low", "close"]] = df[["open", "high", "low", "close"]].ffill() |
|
|
| |
| bad_hl = (df["high"] < df["low"]).sum() |
| if bad_hl > 0: |
| issues.append(f" β {bad_hl} bars where high < low β swapping") |
| mask = df["high"] < df["low"] |
| df.loc[mask, ["high", "low"]] = df.loc[mask, ["low", "high"]].values |
|
|
| bad_range = ((df["open"] > df["high"]) | (df["open"] < df["low"]) | |
| (df["close"] > df["high"]) | (df["close"] < df["low"])).sum() |
| if bad_range > 0: |
| issues.append(f" β {bad_range} bars where open/close outside H-L β clamping") |
| df["open"] = df["open"].clip(lower=df["low"], upper=df["high"]) |
| df["close"] = df["close"].clip(lower=df["low"], upper=df["high"]) |
|
|
| |
| dups = df["time"].duplicated().sum() |
| if dups > 0: |
| issues.append(f" β {dups} duplicate timestamps β keeping last") |
| df = df.drop_duplicates(subset="time", keep="last") |
|
|
| |
| time_diff = df["time"].diff() |
| large_gaps = time_diff[time_diff > pd.Timedelta(days=5)] |
| for idx in large_gaps.index: |
| gap = time_diff.loc[idx] |
| issues.append(f" β Large gap: {df['time'].iloc[idx-1]} β {df['time'].iloc[idx]} ({gap})") |
|
|
| |
| df = df.sort_values("time").reset_index(drop=True) |
|
|
| |
| weekend_mask = df["time"].dt.dayofweek.isin([5, 6]) |
| weekend_count = weekend_mask.sum() |
| if weekend_count > 0: |
| issues.append(f" βΉ Removed {weekend_count} weekend bars") |
| df = df[~weekend_mask].reset_index(drop=True) |
|
|
| if issues: |
| for issue in issues: |
| print(issue) |
| else: |
| print(" β Data quality: PASS (no issues found)") |
|
|
| print(f"\n Final dataset: {len(df):,} bars") |
| print(f" Price range: {df['close'].min():.2f} β {df['close'].max():.2f}") |
| print(f" Avg spread: {df['spread'].mean():.1f} pts") |
| print(f" Date range: {df['time'].iloc[0].date()} β {df['time'].iloc[-1].date()}") |
| return df |
|
|
|
|
| |
| |
| |
| def main(): |
| print("=" * 68) |
| print(f" MT5 Data Fetcher β {SYMBOL} {TF_LABEL} (1 Year)") |
| print("=" * 68) |
|
|
| |
| init_mt5() |
|
|
| try: |
| |
| sym_props = validate_symbol(SYMBOL) |
|
|
| |
| with open(OUTPUT_JSON, "w") as f: |
| json.dump(sym_props, f, indent=2, default=str) |
| print(f"\nβ Symbol info saved: {OUTPUT_JSON}") |
|
|
| |
| df = fetch_ohlcv(SYMBOL, LOOKBACK_DAYS) |
|
|
| |
| median_spread = fetch_spread_from_ticks(SYMBOL, LOOKBACK_DAYS) |
| if median_spread is not None: |
| zero_mask = df["spread"] == 0 |
| if zero_mask.sum() > 0: |
| df.loc[zero_mask, "spread"] = int(median_spread) |
| print(f" Filled {zero_mask.sum()} zero-spread bars with median: {median_spread:.0f}") |
|
|
| |
| df = validate_data(df) |
|
|
| |
| df["hour"] = df["time"].dt.hour |
| df["dayofweek"] = df["time"].dt.dayofweek |
| df["returns"] = np.log(df["close"] / df["close"].shift(1)) |
|
|
| |
| output_cols = [ |
| "time", "open", "high", "low", "close", |
| "tick_volume", "spread", "hour", "dayofweek", "returns", |
| ] |
| if "volume" in df.columns and "volume" not in output_cols: |
| output_cols.insert(5, "volume") |
|
|
| df_out = df[[c for c in output_cols if c in df.columns]] |
| df_out.to_csv(OUTPUT_CSV, index=False) |
|
|
| print(f"\n{'=' * 68}") |
| print(f" β SAVED: {OUTPUT_CSV}") |
| print(f" β Rows: {len(df_out):,} | Columns: {len(df_out.columns)}") |
| print(f" β File size: {OUTPUT_CSV.stat().st_size / 1024:.0f} KB") |
| print(f"{'=' * 68}") |
|
|
| print(f"\nSample (first 3 rows):") |
| print(df_out.head(3).to_string(index=False)) |
| print(f"\nSample (last 3 rows):") |
| print(df_out.tail(3).to_string(index=False)) |
|
|
| finally: |
| mt5.shutdown() |
| print("\nβ MT5 connection closed.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|