Buckets:
| """Synthetic Data Factory — Entrypoint. | |
| Generates synthetic relational datasets (users, products, transactions), | |
| validates them, runs quality checks, and exports in multiple formats | |
| with a visual HTML report. | |
| """ | |
| import time | |
| import pandas as pd | |
| from pydantic import ValidationError | |
| from synthetic_factory.config import OUTPUT_DIR | |
| from synthetic_factory.exporters.formats import export_all | |
| from synthetic_factory.generators.products import generate_products | |
| from synthetic_factory.generators.transactions import generate_transactions | |
| from synthetic_factory.generators.users import generate_users | |
| from synthetic_factory.reporting.report import generate_report | |
| from synthetic_factory.validation.checks import ( | |
| check_null_rates, | |
| check_referential_integrity, | |
| check_uniqueness, | |
| check_value_distribution, | |
| ) | |
| from synthetic_factory.validation.schemas import Product, Transaction, User | |
| SEPARATOR = "=" * 60 | |
| def validate_records(records: list[dict], model_cls: type, entity_name: str) -> dict: | |
| """Validate a list of dicts against a Pydantic model. Returns summary.""" | |
| errors = 0 | |
| for record in records: | |
| try: | |
| model_cls.model_validate(record) | |
| except ValidationError: | |
| errors += 1 | |
| status = "PASS" if errors == 0 else f"FAIL ({errors} errors)" | |
| print(f" [{entity_name}] {len(records):,} records — {status}") | |
| return {"total": len(records), "errors": errors} | |
| def main() -> None: | |
| t0 = time.time() | |
| print(SEPARATOR) | |
| print(" SYNTHETIC DATA FACTORY") | |
| print(SEPARATOR) | |
| print(f"Output directory: {OUTPUT_DIR}\n") | |
| # --- Step 1: Generate --- | |
| print("[1/5] Generating synthetic data...") | |
| users = generate_users() | |
| print(f" Users: {len(users):,}") | |
| products = generate_products() | |
| print(f" Products: {len(products):,}") | |
| user_ids = [u["id"] for u in users] | |
| product_prices = {p["id"]: p["price"] for p in products} | |
| transactions = generate_transactions(user_ids, product_prices) | |
| print(f" Transactions: {len(transactions):,}") | |
| print(f" Done in {time.time() - t0:.1f}s\n") | |
| # --- Step 2: Validate --- | |
| t1 = time.time() | |
| print("[2/5] Validating with Pydantic schemas...") | |
| validation_results = { | |
| "Users": validate_records(users, User, "Users"), | |
| "Products": validate_records(products, Product, "Products"), | |
| "Transactions": validate_records(transactions, Transaction, "Transactions"), | |
| } | |
| print(f" Done in {time.time() - t1:.1f}s\n") | |
| # --- Step 3: Quality checks --- | |
| t2 = time.time() | |
| print("[3/5] Running quality checks...") | |
| users_df = pd.DataFrame(users) | |
| products_df = pd.DataFrame(products) | |
| transactions_df = pd.DataFrame(transactions) | |
| check_null_rates(users_df, "Users") | |
| check_null_rates(products_df, "Products") | |
| check_null_rates(transactions_df, "Transactions") | |
| check_uniqueness(users_df, "Users", "id") | |
| check_uniqueness(products_df, "Products", "id") | |
| check_uniqueness(transactions_df, "Transactions", "id") | |
| check_referential_integrity(transactions_df, "user_id", users_df, "id", "Txn->Users") | |
| check_referential_integrity(transactions_df, "product_id", products_df, "id", "Txn->Products") | |
| check_value_distribution(users_df, "Users", "age") | |
| check_value_distribution(products_df, "Products", "price") | |
| check_value_distribution(transactions_df, "Transactions", "total_amount") | |
| print(f" Done in {time.time() - t2:.1f}s\n") | |
| # --- Step 4: Export --- | |
| t3 = time.time() | |
| print("[4/5] Exporting to Parquet, JSONL, and CSV...") | |
| export_all(users_df, "users", OUTPUT_DIR) | |
| export_all(products_df, "products", OUTPUT_DIR) | |
| export_all(transactions_df, "transactions", OUTPUT_DIR) | |
| print(f" Done in {time.time() - t3:.1f}s\n") | |
| # --- Step 5: Report --- | |
| t4 = time.time() | |
| print("[5/5] Generating HTML report...") | |
| generate_report(users_df, products_df, transactions_df, validation_results, OUTPUT_DIR) | |
| print(f" Done in {time.time() - t4:.1f}s\n") | |
| # --- Summary --- | |
| print(SEPARATOR) | |
| total_time = time.time() - t0 | |
| print(f" Pipeline complete in {total_time:.1f}s") | |
| print(f" Output: {OUTPUT_DIR}/") | |
| print(SEPARATOR) | |
| if __name__ == "__main__": | |
| main() | |
Xet Storage Details
- Size:
- 4.27 kB
- Xet hash:
- 3b00f9254aa7325aa3b83e7acb13656217e7029bf015fb197213f35d59d901b6
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.