File size: 1,890 Bytes
1e23d14 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
@dataclass
class TableSchema:
name: str
fields: list[FieldSchema] = field(default_factory=list)
def add_field(self, name: str, field_type: str,
mode: str = "NULLABLE", description: str = "") -> None:
self.fields.append(FieldSchema(name=name, field_type=field_type,
mode=mode, description=description))
def to_bq_schema(self) -> list[dict]:
return [f.to_dict() for f in self.fields]
def field_names(self) -> list[str]:
return [f.name for f in self.fields]
@dataclass
class FieldSchema:
name: str
field_type: str
mode: str = "NULLABLE"
description: str = ""
def to_dict(self) -> dict:
return {
"name": self.name,
"type": self.field_type,
"mode": self.mode,
"description": self.description,
}
@dataclass
class JobResult:
job_id: str
state: str
started_at: Optional[datetime] = None
ended_at: Optional[datetime] = None
rows_processed: int = 0
bytes_processed: int = 0
errors: list[str] = field(default_factory=list)
@property
def duration_seconds(self) -> Optional[float]:
if self.started_at and self.ended_at:
return (self.ended_at - self.started_at).total_seconds()
return None
def succeeded(self) -> bool:
return self.state == "DONE" and not self.errors
def to_dict(self) -> dict[str, Any]:
return {
"job_id": self.job_id,
"state": self.state,
"duration_s": self.duration_seconds,
"rows_processed": self.rows_processed,
"bytes_processed": self.bytes_processed,
"errors": self.errors,
}
|