#!/usr/bin/env python3 """ Download GNoME dataset during Docker build. This script downloads the core dataset files needed for the MCP service. It is run during Docker image build to ensure data is available immediately. """ import os import sys import requests import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration DATA_DIR = os.environ.get("GNOME_DATA_DIR", "/app/gnome_data") PUBLIC_LINK = "https://storage.googleapis.com/" BUCKET_NAME = "gdm_materials_discovery" # Files to download FILES_TO_DOWNLOAD = [ # (folder, filename, description) ("gnome_data", "stable_materials_summary.csv", "GNoME stable materials summary (~120MB)"), ("external_data", "external_materials_summary.csv", "External reference data (~15MB)"), ("external_data", "mp_snapshot_summary.csv", "Materials Project snapshot (~10MB)"), ("gnome_data", "stable_materials_r2scan.csv", "r²SCAN validation data (~5MB)"), ] def ensure_directory(path: str): """Ensure directory exists.""" if not os.path.exists(path): os.makedirs(path, exist_ok=True) logger.info(f"Created directory: {path}") def download_file(folder: str, filename: str, description: str) -> bool: """ Download a file from Google Cloud Storage. Returns: True if successful, False otherwise """ url = f"{PUBLIC_LINK}{BUCKET_NAME}/{folder}/{filename}" output_path = os.path.join(DATA_DIR, filename) # Skip if already exists if os.path.exists(output_path): size_mb = os.path.getsize(output_path) / (1024 * 1024) logger.info(f"[SKIP] {filename} already exists ({size_mb:.1f} MB)") return True logger.info(f"[DOWNLOAD] {description}") logger.info(f" URL: {url}") try: response = requests.get(url, stream=True, timeout=300) response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) downloaded = 0 with open(output_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) downloaded += len(chunk) if total_size > 0: progress = (downloaded / total_size) * 100 if downloaded % (1024 * 1024) < 8192: # Log every ~1MB logger.info(f" Progress: {progress:.1f}%") size_mb = os.path.getsize(output_path) / (1024 * 1024) logger.info(f" [OK] Downloaded {filename} ({size_mb:.1f} MB)") return True except requests.exceptions.RequestException as e: logger.error(f" [FAILED] Error downloading {filename}: {e}") return False except Exception as e: logger.error(f" [FAILED] Unexpected error: {e}") return False def main(): """Main download function.""" logger.info("=" * 60) logger.info("GNoME Dataset Download Script") logger.info("=" * 60) logger.info(f"Data directory: {DATA_DIR}") # Ensure data directory exists ensure_directory(DATA_DIR) # Download all files success_count = 0 total_count = len(FILES_TO_DOWNLOAD) for folder, filename, description in FILES_TO_DOWNLOAD: if download_file(folder, filename, description): success_count += 1 # Summary logger.info("=" * 60) logger.info(f"Download complete: {success_count}/{total_count} files") if success_count >= 2: # At least gnome_summary and external_summary logger.info("Core dataset is ready!") # List downloaded files logger.info("\nDownloaded files:") for f in os.listdir(DATA_DIR): filepath = os.path.join(DATA_DIR, f) if os.path.isfile(filepath): size_mb = os.path.getsize(filepath) / (1024 * 1024) logger.info(f" - {f}: {size_mb:.1f} MB") return 0 else: logger.error("Failed to download core dataset files!") return 1 if __name__ == "__main__": sys.exit(main())