from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime from typing import Any, Optional @dataclass class TableSchema: name: str fields: list[FieldSchema] = field(default_factory=list) def add_field(self, name: str, field_type: str, mode: str = "NULLABLE", description: str = "") -> None: self.fields.append(FieldSchema(name=name, field_type=field_type, mode=mode, description=description)) def to_bq_schema(self) -> list[dict]: return [f.to_dict() for f in self.fields] def field_names(self) -> list[str]: return [f.name for f in self.fields] @dataclass class FieldSchema: name: str field_type: str mode: str = "NULLABLE" description: str = "" def to_dict(self) -> dict: return { "name": self.name, "type": self.field_type, "mode": self.mode, "description": self.description, } @dataclass class JobResult: job_id: str state: str started_at: Optional[datetime] = None ended_at: Optional[datetime] = None rows_processed: int = 0 bytes_processed: int = 0 errors: list[str] = field(default_factory=list) @property def duration_seconds(self) -> Optional[float]: if self.started_at and self.ended_at: return (self.ended_at - self.started_at).total_seconds() return None def succeeded(self) -> bool: return self.state == "DONE" and not self.errors def to_dict(self) -> dict[str, Any]: return { "job_id": self.job_id, "state": self.state, "duration_s": self.duration_seconds, "rows_processed": self.rows_processed, "bytes_processed": self.bytes_processed, "errors": self.errors, }