Spaces:
Sleeping
Sleeping
| import os | |
| import pathlib | |
| import duckdb | |
| import pandas as pd | |
| import leafmap | |
| # DuckDB connection & extensions | |
| con = duckdb.connect('processed_dashboard.db') | |
| con.install_extension("httpfs") | |
| con.load_extension("httpfs") | |
| con.install_extension("spatial") | |
| con.load_extension("spatial") | |
| # Taxi-zone geometry | |
| TAXI_ZONES_URL = ( | |
| "https://data.source.coop/cholmes/nyc-taxi-zones/taxi_zones.parquet" | |
| ) | |
| con.sql(f"CREATE OR REPLACE VIEW taxi_zones AS SELECT * FROM '{TAXI_ZONES_URL}'") | |
| # NYC census blocks | |
| DB_PATH = pathlib.Path("nyc_data.db") | |
| if not DB_PATH.exists(): | |
| leafmap.download_file( | |
| "https://opengeos.org/data/duckdb/nyc_data.db.zip", | |
| unzip=True, | |
| overwrite=True, | |
| ) | |
| con.execute("ATTACH 'nyc_data.db' AS nyc_data (READ_ONLY)") | |
| # Compute demographics | |
| con.sql(""" | |
| CREATE OR REPLACE TABLE taxi_zones_utm AS | |
| SELECT * EXCLUDE (geometry), | |
| ST_GeomFromWKB(ST_AsWKB(ST_Transform(geometry, 'EPSG:2263', 'EPSG:26918'))) AS geometry | |
| FROM taxi_zones | |
| """) | |
| con.sql(""" | |
| CREATE OR REPLACE TABLE zone_demographics AS | |
| SELECT | |
| tz.LocationID, | |
| tz.zone AS TaxiZone, | |
| tz.borough AS Borough, | |
| SUM(cb.popn_total) AS TotalPop, | |
| SUM(cb.popn_white) AS WhitePop, | |
| SUM(cb.popn_black) AS BlackPop, | |
| 100.0 * SUM(cb.popn_white) / SUM(cb.popn_total) AS white_pct, | |
| 100.0 * SUM(cb.popn_black) / SUM(cb.popn_total) AS black_pct | |
| FROM nyc_data.nyc_census_blocks AS cb | |
| JOIN taxi_zones_utm AS tz ON ST_Intersects(tz.geometry, cb.geom) | |
| GROUP BY tz.LocationID, tz.zone, tz.borough | |
| """) | |
| baseline_df = con.sql(""" | |
| SELECT | |
| ROUND(100.0 * SUM(popn_white) / SUM(popn_total), 2) AS baseline_white_pct, | |
| ROUND(100.0 * SUM(popn_black) / SUM(popn_total), 2) AS baseline_black_pct | |
| FROM nyc_data.nyc_census_blocks | |
| """).df() | |
| con.sql("CREATE OR REPLACE TABLE city_baselines AS SELECT * FROM baseline_df") | |
| baseline_white: float = float(baseline_df["baseline_white_pct"].iloc[0]) / 100.0 | |
| baseline_black: float = float(baseline_df["baseline_black_pct"].iloc[0]) / 100.0 | |
| # Trip data ingestion | |
| _pu_field = {"FHV": "PUlocationID", "Yellow": "PULocationID"} | |
| _do_field = {"FHV": "DOlocationID", "Yellow": "DOLocationID"} | |
| TRIP_URLS = { | |
| "FHV_Jan2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-01.parquet", | |
| "FHV_Feb2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-02.parquet", | |
| "FHV_Mar2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-03.parquet", | |
| "Yellow_Jan2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet", | |
| "Yellow_Feb2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet", | |
| "Yellow_Mar2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-03.parquet", | |
| } | |
| con.execute( | |
| "CREATE OR REPLACE TABLE trip_counts_pu " | |
| "(service VARCHAR, month VARCHAR, LocationID INTEGER, trips_pu BIGINT)" | |
| ) | |
| con.execute( | |
| "CREATE OR REPLACE TABLE trip_counts_do " | |
| "(service VARCHAR, month VARCHAR, LocationID INTEGER, trips_do BIGINT)" | |
| ) | |
| for key, url in TRIP_URLS.items(): | |
| service, month = key.split("_") | |
| pu, do = _pu_field[service], _do_field[service] | |
| con.sql( | |
| f"INSERT INTO trip_counts_pu " | |
| f"SELECT '{service}', '{month}', CAST({pu} AS INTEGER), COUNT(*) " | |
| f"FROM '{url}' " | |
| f"WHERE {pu} IS NOT NULL AND CAST({pu} AS INTEGER) NOT IN (0, 264, 265) " | |
| f"GROUP BY {pu}" | |
| ) | |
| con.sql( | |
| f"INSERT INTO trip_counts_do " | |
| f"SELECT '{service}', '{month}', CAST({do} AS INTEGER), COUNT(*) " | |
| f"FROM '{url}' " | |
| f"WHERE {do} IS NOT NULL AND CAST({do} AS INTEGER) NOT IN (0, 264, 265) " | |
| f"GROUP BY {do}" | |
| ) | |
| # Representative ratio summary | |
| bw = baseline_white * 100 | |
| bb = baseline_black * 100 | |
| rr_pu_df = con.sql(f""" | |
| SELECT tp.service, tp.month, | |
| SUM(tp.trips_pu * zd.white_pct) * 1.0 / SUM(tp.trips_pu) / {bw} AS RR_white_PU, | |
| SUM(tp.trips_pu * zd.black_pct) * 1.0 / SUM(tp.trips_pu) / {bb} AS RR_black_PU | |
| FROM trip_counts_pu AS tp | |
| JOIN zone_demographics AS zd ON tp.LocationID = zd.LocationID | |
| WHERE zd.TotalPop > 0 | |
| GROUP BY tp.service, tp.month | |
| """).df() | |
| rr_do_df = con.sql(f""" | |
| SELECT td.service, td.month, | |
| SUM(td.trips_do * zd.white_pct) * 1.0 / SUM(td.trips_do) / {bw} AS RR_white_DO, | |
| SUM(td.trips_do * zd.black_pct) * 1.0 / SUM(td.trips_do) / {bb} AS RR_black_DO | |
| FROM trip_counts_do AS td | |
| JOIN zone_demographics AS zd ON td.LocationID = zd.LocationID | |
| WHERE zd.TotalPop > 0 | |
| GROUP BY td.service, td.month | |
| """).df() | |
| rr_combined: pd.DataFrame = pd.merge( | |
| rr_pu_df, rr_do_df, on=["service", "month"], how="outer" | |
| ) |