import os import pathlib import duckdb import pandas as pd import leafmap # DuckDB connection & extensions con = duckdb.connect('processed_dashboard.db') con.install_extension("httpfs") con.load_extension("httpfs") con.install_extension("spatial") con.load_extension("spatial") # Taxi-zone geometry TAXI_ZONES_URL = ( "https://data.source.coop/cholmes/nyc-taxi-zones/taxi_zones.parquet" ) con.sql(f"CREATE OR REPLACE VIEW taxi_zones AS SELECT * FROM '{TAXI_ZONES_URL}'") # NYC census blocks DB_PATH = pathlib.Path("nyc_data.db") if not DB_PATH.exists(): leafmap.download_file( "https://opengeos.org/data/duckdb/nyc_data.db.zip", unzip=True, overwrite=True, ) con.execute("ATTACH 'nyc_data.db' AS nyc_data (READ_ONLY)") # Compute demographics con.sql(""" CREATE OR REPLACE TABLE taxi_zones_utm AS SELECT * EXCLUDE (geometry), ST_GeomFromWKB(ST_AsWKB(ST_Transform(geometry, 'EPSG:2263', 'EPSG:26918'))) AS geometry FROM taxi_zones """) con.sql(""" CREATE OR REPLACE TABLE zone_demographics AS SELECT tz.LocationID, tz.zone AS TaxiZone, tz.borough AS Borough, SUM(cb.popn_total) AS TotalPop, SUM(cb.popn_white) AS WhitePop, SUM(cb.popn_black) AS BlackPop, 100.0 * SUM(cb.popn_white) / SUM(cb.popn_total) AS white_pct, 100.0 * SUM(cb.popn_black) / SUM(cb.popn_total) AS black_pct FROM nyc_data.nyc_census_blocks AS cb JOIN taxi_zones_utm AS tz ON ST_Intersects(tz.geometry, cb.geom) GROUP BY tz.LocationID, tz.zone, tz.borough """) baseline_df = con.sql(""" SELECT ROUND(100.0 * SUM(popn_white) / SUM(popn_total), 2) AS baseline_white_pct, ROUND(100.0 * SUM(popn_black) / SUM(popn_total), 2) AS baseline_black_pct FROM nyc_data.nyc_census_blocks """).df() con.sql("CREATE OR REPLACE TABLE city_baselines AS SELECT * FROM baseline_df") baseline_white: float = float(baseline_df["baseline_white_pct"].iloc[0]) / 100.0 baseline_black: float = float(baseline_df["baseline_black_pct"].iloc[0]) / 100.0 # Trip data ingestion _pu_field = {"FHV": "PUlocationID", "Yellow": "PULocationID"} _do_field = {"FHV": "DOlocationID", "Yellow": "DOLocationID"} TRIP_URLS = { "FHV_Jan2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-01.parquet", "FHV_Feb2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-02.parquet", "FHV_Mar2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-03.parquet", "Yellow_Jan2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet", "Yellow_Feb2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet", "Yellow_Mar2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-03.parquet", } con.execute( "CREATE OR REPLACE TABLE trip_counts_pu " "(service VARCHAR, month VARCHAR, LocationID INTEGER, trips_pu BIGINT)" ) con.execute( "CREATE OR REPLACE TABLE trip_counts_do " "(service VARCHAR, month VARCHAR, LocationID INTEGER, trips_do BIGINT)" ) for key, url in TRIP_URLS.items(): service, month = key.split("_") pu, do = _pu_field[service], _do_field[service] con.sql( f"INSERT INTO trip_counts_pu " f"SELECT '{service}', '{month}', CAST({pu} AS INTEGER), COUNT(*) " f"FROM '{url}' " f"WHERE {pu} IS NOT NULL AND CAST({pu} AS INTEGER) NOT IN (0, 264, 265) " f"GROUP BY {pu}" ) con.sql( f"INSERT INTO trip_counts_do " f"SELECT '{service}', '{month}', CAST({do} AS INTEGER), COUNT(*) " f"FROM '{url}' " f"WHERE {do} IS NOT NULL AND CAST({do} AS INTEGER) NOT IN (0, 264, 265) " f"GROUP BY {do}" ) # Representative ratio summary bw = baseline_white * 100 bb = baseline_black * 100 rr_pu_df = con.sql(f""" SELECT tp.service, tp.month, SUM(tp.trips_pu * zd.white_pct) * 1.0 / SUM(tp.trips_pu) / {bw} AS RR_white_PU, SUM(tp.trips_pu * zd.black_pct) * 1.0 / SUM(tp.trips_pu) / {bb} AS RR_black_PU FROM trip_counts_pu AS tp JOIN zone_demographics AS zd ON tp.LocationID = zd.LocationID WHERE zd.TotalPop > 0 GROUP BY tp.service, tp.month """).df() rr_do_df = con.sql(f""" SELECT td.service, td.month, SUM(td.trips_do * zd.white_pct) * 1.0 / SUM(td.trips_do) / {bw} AS RR_white_DO, SUM(td.trips_do * zd.black_pct) * 1.0 / SUM(td.trips_do) / {bb} AS RR_black_DO FROM trip_counts_do AS td JOIN zone_demographics AS zd ON td.LocationID = zd.LocationID WHERE zd.TotalPop > 0 GROUP BY td.service, td.month """).df() rr_combined: pd.DataFrame = pd.merge( rr_pu_df, rr_do_df, on=["service", "month"], how="outer" )