| """
|
| GDC Data Portal Client
|
| Download and parse cancer genomics data from GDC
|
| """
|
|
|
| import os
|
| import json
|
| import requests
|
| from typing import Dict, List, Optional, Any
|
| from pathlib import Path
|
| import yaml
|
| from dataclasses import dataclass
|
| import logging
|
|
|
| logging.basicConfig(level=logging.INFO)
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| @dataclass
|
| class GDCFile:
|
| """Represents a file from GDC Portal"""
|
| file_id: str
|
| file_name: str
|
| file_size: int
|
| data_type: str
|
| data_format: str
|
| experimental_strategy: str
|
| case_id: str
|
| project_id: str
|
|
|
|
|
| class GDCClient:
|
| """Client for interacting with GDC Data Portal API"""
|
|
|
| def __init__(self, config_path: str = "config.yml"):
|
| with open(config_path, 'r') as f:
|
| self.config = yaml.safe_load(f)['gdc']
|
|
|
| self.api_url = self.config['api_url']
|
| self.download_dir = Path(self.config['download_dir'])
|
| self.download_dir.mkdir(parents=True, exist_ok=True)
|
|
|
| self.session = requests.Session()
|
| self.session.headers.update({
|
| 'Content-Type': 'application/json'
|
| })
|
|
|
| def search_files(
|
| self,
|
| filters: Optional[Dict] = None,
|
| size: int = 100,
|
| fields: Optional[List[str]] = None
|
| ) -> List[GDCFile]:
|
| """
|
| Search for files in GDC
|
|
|
| Args:
|
| filters: GDC filter query
|
| size: Number of results to return
|
| fields: Fields to include in response
|
| """
|
| endpoint = f"{self.api_url}/files"
|
|
|
| if fields is None:
|
| fields = [
|
| 'file_id', 'file_name', 'file_size', 'data_type',
|
| 'data_format', 'experimental_strategy', 'cases.case_id',
|
| 'cases.project.project_id'
|
| ]
|
|
|
| params = {
|
| 'size': size,
|
| 'fields': ','.join(fields)
|
| }
|
|
|
| if filters:
|
| params['filters'] = json.dumps(filters)
|
|
|
| try:
|
| response = self.session.get(endpoint, params=params)
|
| response.raise_for_status()
|
| data = response.json()
|
|
|
| files = []
|
| for hit in data.get('data', {}).get('hits', []):
|
| gdc_file = GDCFile(
|
| file_id=hit.get('file_id'),
|
| file_name=hit.get('file_name'),
|
| file_size=hit.get('file_size', 0),
|
| data_type=hit.get('data_type'),
|
| data_format=hit.get('data_format'),
|
| experimental_strategy=hit.get('experimental_strategy'),
|
| case_id=hit.get('cases', [{}])[0].get('case_id') if hit.get('cases') else None,
|
| project_id=hit.get('cases', [{}])[0].get('project', {}).get('project_id') if hit.get('cases') else None
|
| )
|
| files.append(gdc_file)
|
|
|
| logger.info(f"Found {len(files)} files")
|
| return files
|
|
|
| except Exception as e:
|
| logger.error(f"Error searching files: {e}")
|
| return []
|
|
|
| def download_file(
|
| self,
|
| file_id: str,
|
| output_dir: Optional[Path] = None
|
| ) -> Optional[Path]:
|
| """
|
| Download a file from GDC
|
|
|
| Args:
|
| file_id: GDC file UUID
|
| output_dir: Directory to save file (defaults to config download_dir)
|
|
|
| Returns:
|
| Path to downloaded file or None if failed
|
| """
|
| if output_dir is None:
|
| output_dir = self.download_dir
|
|
|
| output_dir = Path(output_dir)
|
| output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
| endpoint = f"{self.api_url}/data/{file_id}"
|
|
|
| try:
|
| logger.info(f"Downloading file {file_id}")
|
| response = self.session.get(endpoint, stream=True)
|
| response.raise_for_status()
|
|
|
|
|
| content_disposition = response.headers.get('content-disposition', '')
|
| if 'filename=' in content_disposition:
|
| filename = content_disposition.split('filename=')[1].strip('"')
|
| else:
|
| filename = file_id
|
|
|
| output_path = output_dir / filename
|
|
|
| with open(output_path, 'wb') as f:
|
| for chunk in response.iter_content(chunk_size=8192):
|
| f.write(chunk)
|
|
|
| logger.info(f"Downloaded to {output_path}")
|
| return output_path
|
|
|
| except Exception as e:
|
| logger.error(f"Error downloading file {file_id}: {e}")
|
| return None
|
|
|
| def get_project_files(
|
| self,
|
| project_id: str,
|
| data_type: Optional[str] = None,
|
| limit: int = 100
|
| ) -> List[GDCFile]:
|
| """
|
| Get files for a specific project
|
|
|
| Args:
|
| project_id: GDC project ID (e.g., TCGA-BRCA)
|
| data_type: Filter by data type
|
| limit: Maximum number of files
|
| """
|
| filters = {
|
| "op": "and",
|
| "content": [
|
| {
|
| "op": "in",
|
| "content": {
|
| "field": "cases.project.project_id",
|
| "value": [project_id]
|
| }
|
| }
|
| ]
|
| }
|
|
|
| if data_type:
|
| filters["content"].append({
|
| "op": "in",
|
| "content": {
|
| "field": "data_type",
|
| "value": [data_type]
|
| }
|
| })
|
|
|
| return self.search_files(filters=filters, size=limit)
|
|
|
| def get_mutation_data(self, project_id: str, limit: int = 100) -> List[GDCFile]:
|
| """Get mutation/variant calling files for a project"""
|
| return self.get_project_files(
|
| project_id=project_id,
|
| data_type="Simple Nucleotide Variation",
|
| limit=limit
|
| )
|
|
|
| def get_gene_expression_data(self, project_id: str, limit: int = 100) -> List[GDCFile]:
|
| """Get gene expression data for a project"""
|
| return self.get_project_files(
|
| project_id=project_id,
|
| data_type="Gene Expression Quantification",
|
| limit=limit
|
| )
|
|
|
| def search_cases(
|
| self,
|
| project_id: str,
|
| filters: Optional[Dict] = None,
|
| size: int = 100
|
| ) -> List[Dict]:
|
| """
|
| Search for cases (patients) in GDC
|
|
|
| Args:
|
| project_id: GDC project ID
|
| filters: Additional filter criteria
|
| size: Number of results
|
| """
|
| endpoint = f"{self.api_url}/cases"
|
|
|
| base_filters = {
|
| "op": "in",
|
| "content": {
|
| "field": "project.project_id",
|
| "value": [project_id]
|
| }
|
| }
|
|
|
| if filters:
|
| filter_query = {
|
| "op": "and",
|
| "content": [base_filters, filters]
|
| }
|
| else:
|
| filter_query = base_filters
|
|
|
| params = {
|
| 'size': size,
|
| 'filters': json.dumps(filter_query),
|
| 'fields': 'case_id,project.project_id,demographic,diagnoses'
|
| }
|
|
|
| try:
|
| response = self.session.get(endpoint, params=params)
|
| response.raise_for_status()
|
| data = response.json()
|
|
|
| cases = data.get('data', {}).get('hits', [])
|
| logger.info(f"Found {len(cases)} cases")
|
| return cases
|
|
|
| except Exception as e:
|
| logger.error(f"Error searching cases: {e}")
|
| return []
|
|
|
|
|
| class GDCDataParser:
|
| """Parse downloaded GDC data files"""
|
|
|
| @staticmethod
|
| def parse_maf(file_path: Path) -> List[Dict]:
|
| """
|
| Parse MAF (Mutation Annotation Format) file
|
|
|
| Returns list of mutation records
|
| """
|
| mutations = []
|
|
|
| try:
|
| with open(file_path, 'r') as f:
|
|
|
| for line in f:
|
| if not line.startswith('#'):
|
| header_line = line.strip()
|
| break
|
|
|
| headers = header_line.split('\t')
|
|
|
| for line in f:
|
| if line.startswith('#'):
|
| continue
|
|
|
| values = line.strip().split('\t')
|
| if len(values) == len(headers):
|
| mutation = dict(zip(headers, values))
|
| mutations.append(mutation)
|
|
|
| logger.info(f"Parsed {len(mutations)} mutations from {file_path}")
|
| return mutations
|
|
|
| except Exception as e:
|
| logger.error(f"Error parsing MAF file: {e}")
|
| return []
|
|
|
| @staticmethod
|
| def parse_vcf(file_path: Path) -> List[Dict]:
|
| """
|
| Parse VCF (Variant Call Format) file
|
|
|
| Returns list of variant records
|
| """
|
| variants = []
|
|
|
| try:
|
| with open(file_path, 'r') as f:
|
| for line in f:
|
| if line.startswith('##'):
|
| continue
|
| if line.startswith('#CHROM'):
|
| headers = line.strip().split('\t')
|
| continue
|
|
|
| values = line.strip().split('\t')
|
| variant = {
|
| 'chrom': values[0],
|
| 'pos': values[1],
|
| 'id': values[2],
|
| 'ref': values[3],
|
| 'alt': values[4],
|
| 'qual': values[5],
|
| 'filter': values[6],
|
| 'info': values[7]
|
| }
|
| variants.append(variant)
|
|
|
| logger.info(f"Parsed {len(variants)} variants from {file_path}")
|
| return variants
|
|
|
| except Exception as e:
|
| logger.error(f"Error parsing VCF file: {e}")
|
| return []
|
|
|
| @staticmethod
|
| def parse_clinical_data(data: Dict) -> Dict:
|
| """Parse clinical data from GDC case"""
|
| clinical = {
|
| 'case_id': data.get('case_id'),
|
| 'project_id': data.get('project', {}).get('project_id'),
|
| 'demographic': {},
|
| 'diagnoses': []
|
| }
|
|
|
|
|
| demo = data.get('demographic', {})
|
| clinical['demographic'] = {
|
| 'age_at_index': demo.get('age_at_index'),
|
| 'gender': demo.get('gender'),
|
| 'race': demo.get('race'),
|
| 'ethnicity': demo.get('ethnicity')
|
| }
|
|
|
|
|
| for diag in data.get('diagnoses', []):
|
| diagnosis = {
|
| 'diagnosis_id': diag.get('diagnosis_id'),
|
| 'primary_diagnosis': diag.get('primary_diagnosis'),
|
| 'tumor_stage': diag.get('tumor_stage'),
|
| 'age_at_diagnosis': diag.get('age_at_diagnosis'),
|
| 'vital_status': diag.get('vital_status')
|
| }
|
| clinical['diagnoses'].append(diagnosis)
|
|
|
| return clinical
|
|
|