| from __future__ import annotations |
| from dataclasses import dataclass, field |
| from datetime import datetime |
| from typing import Any, Optional |
|
|
|
|
| @dataclass |
| class TableSchema: |
| name: str |
| fields: list[FieldSchema] = field(default_factory=list) |
|
|
| def add_field(self, name: str, field_type: str, |
| mode: str = "NULLABLE", description: str = "") -> None: |
| self.fields.append(FieldSchema(name=name, field_type=field_type, |
| mode=mode, description=description)) |
|
|
| def to_bq_schema(self) -> list[dict]: |
| return [f.to_dict() for f in self.fields] |
|
|
| def field_names(self) -> list[str]: |
| return [f.name for f in self.fields] |
|
|
|
|
| @dataclass |
| class FieldSchema: |
| name: str |
| field_type: str |
| mode: str = "NULLABLE" |
| description: str = "" |
|
|
| def to_dict(self) -> dict: |
| return { |
| "name": self.name, |
| "type": self.field_type, |
| "mode": self.mode, |
| "description": self.description, |
| } |
|
|
|
|
| @dataclass |
| class JobResult: |
| job_id: str |
| state: str |
| started_at: Optional[datetime] = None |
| ended_at: Optional[datetime] = None |
| rows_processed: int = 0 |
| bytes_processed: int = 0 |
| errors: list[str] = field(default_factory=list) |
|
|
| @property |
| def duration_seconds(self) -> Optional[float]: |
| if self.started_at and self.ended_at: |
| return (self.ended_at - self.started_at).total_seconds() |
| return None |
|
|
| def succeeded(self) -> bool: |
| return self.state == "DONE" and not self.errors |
|
|
| def to_dict(self) -> dict[str, Any]: |
| return { |
| "job_id": self.job_id, |
| "state": self.state, |
| "duration_s": self.duration_seconds, |
| "rows_processed": self.rows_processed, |
| "bytes_processed": self.bytes_processed, |
| "errors": self.errors, |
| } |
|
|