import os import pathlib import duckdb import pandas as pd import leafmap import requests # DuckDB connection & extensions con = duckdb.connect('processed_dashboard.db') con.install_extension('httpfs') con.load_extension('httpfs') con.install_extension('spatial') con.load_extension('spatial') # Taxi-zone geometry TAXI_ZONES_URL = 'https://data.source.coop/cholmes/nyc-taxi-zones/taxi_zones.parquet' con.sql(f"CREATE OR REPLACE VIEW taxi_zones AS SELECT * FROM '{TAXI_ZONES_URL}'") # NYC census blocks DB_PATH = pathlib.Path('nyc_data.db') if not DB_PATH.exists(): leafmap.download_file( 'https://opengeos.org/data/duckdb/nyc_data.db.zip', unzip=True, overwrite=True, ) con.execute("ATTACH 'nyc_data.db' AS nyc_data (READ_ONLY)") # Reproject taxi zones to UTM con.sql(""" CREATE OR REPLACE TABLE taxi_zones_utm AS SELECT * EXCLUDE (geometry), ST_Transform(geometry, 'EPSG:2263', 'EPSG:26918') AS geometry FROM taxi_zones """) # Zone demographics via spatial join con.sql(""" CREATE OR REPLACE TABLE zone_demographics AS SELECT tz.LocationID, tz.zone AS TaxiZone, tz.borough AS Borough, SUM(cb.popn_total) AS TotalPop, SUM(cb.popn_white) AS WhitePop, SUM(cb.popn_black) AS BlackPop, 100.0 * SUM(cb.popn_white) / SUM(cb.popn_total) AS white_pct, 100.0 * SUM(cb.popn_black) / SUM(cb.popn_total) AS black_pct FROM nyc_data.nyc_census_blocks AS cb JOIN taxi_zones_utm AS tz ON ST_Intersects(tz.geometry, cb.geom) GROUP BY tz.LocationID, tz.zone, tz.borough """) # Citywide baselines baseline_df = con.sql(""" SELECT ROUND(100.0 * SUM(popn_white) / SUM(popn_total), 2) AS baseline_white_pct, ROUND(100.0 * SUM(popn_black) / SUM(popn_total), 2) AS baseline_black_pct FROM nyc_data.nyc_census_blocks """).df() baseline_white = float(baseline_df['baseline_white_pct'].iloc[0]) / 100.0 baseline_black = float(baseline_df['baseline_black_pct'].iloc[0]) / 100.0 # Save baselines into the db for app.py to read later con.sql(f""" CREATE OR REPLACE TABLE city_baselines AS SELECT {baseline_white * 100} AS baseline_white_pct, {baseline_black * 100} AS baseline_black_pct """) # Trip data pu_field = {'FHV': 'PUlocationID', 'Yellow': 'PULocationID'} do_field = {'FHV': 'DOlocationID', 'Yellow': 'DOLocationID'} trip_urls = { 'FHV_Jan2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-01.parquet', 'FHV_Feb2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-02.parquet', 'FHV_Mar2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-03.parquet', 'Yellow_Jan2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet', 'Yellow_Feb2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet', 'Yellow_Mar2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-03.parquet', } con.execute("CREATE OR REPLACE TABLE trip_counts_pu (service VARCHAR, month VARCHAR, LocationID INTEGER, trips_pu BIGINT)") con.execute("CREATE OR REPLACE TABLE trip_counts_do (service VARCHAR, month VARCHAR, LocationID INTEGER, trips_do BIGINT)") DATA_DIR = 'trip_data' os.makedirs(DATA_DIR, exist_ok=True) for key, url in trip_urls.items(): local_path = os.path.join(DATA_DIR, f'{key}.parquet') if not os.path.exists(local_path): print(f'Downloading {key}...') r = requests.get(url, timeout=120) r.raise_for_status() with open(local_path, 'wb') as f: f.write(r.content) else: print(f'Already exists: {local_path}') service, month = key.split('_') pu, do = pu_field[service], do_field[service] con.sql(f""" INSERT INTO trip_counts_pu SELECT '{service}', '{month}', CAST({pu} AS INTEGER), COUNT(*) FROM '{local_path}' WHERE {pu} IS NOT NULL AND CAST({pu} AS INTEGER) NOT IN (0, 264, 265) GROUP BY {pu} """) con.sql(f""" INSERT INTO trip_counts_do SELECT '{service}', '{month}', CAST({do} AS INTEGER), COUNT(*) FROM '{local_path}' WHERE {do} IS NOT NULL AND CAST({do} AS INTEGER) NOT IN (0, 264, 265) GROUP BY {do} """) # Representative ratios rr_pu_df = con.sql(f""" SELECT tp.service, tp.month, SUM(tp.trips_pu * zd.white_pct) * 1.0 / SUM(tp.trips_pu) / {baseline_white*100} AS RR_white_PU, SUM(tp.trips_pu * zd.black_pct) * 1.0 / SUM(tp.trips_pu) / {baseline_black*100} AS RR_black_PU FROM trip_counts_pu AS tp JOIN zone_demographics AS zd ON tp.LocationID = zd.LocationID WHERE zd.TotalPop > 0 GROUP BY tp.service, tp.month """).df() rr_do_df = con.sql(f""" SELECT td.service, td.month, SUM(td.trips_do * zd.white_pct) * 1.0 / SUM(td.trips_do) / {baseline_white*100} AS RR_white_DO, SUM(td.trips_do * zd.black_pct) * 1.0 / SUM(td.trips_do) / {baseline_black*100} AS RR_black_DO FROM trip_counts_do AS td JOIN zone_demographics AS zd ON td.LocationID = zd.LocationID WHERE zd.TotalPop > 0 GROUP BY td.service, td.month """).df() rr_combined = pd.merge(rr_pu_df, rr_do_df, on=['service', 'month'], how='outer') con.sql("CREATE OR REPLACE TABLE rr_combined AS SELECT * FROM rr_combined") print('Pipeline complete. processed_dashboard.db is ready.') con.close()