Spaces:
Build error
Build error
| import os | |
| import pathlib | |
| import duckdb | |
| import pandas as pd | |
| import leafmap | |
| import requests | |
| # DuckDB connection & extensions | |
| con = duckdb.connect('processed_dashboard.db') | |
| con.install_extension('httpfs') | |
| con.load_extension('httpfs') | |
| con.install_extension('spatial') | |
| con.load_extension('spatial') | |
| # Taxi-zone geometry | |
| TAXI_ZONES_URL = 'https://data.source.coop/cholmes/nyc-taxi-zones/taxi_zones.parquet' | |
| con.sql(f"CREATE OR REPLACE VIEW taxi_zones AS SELECT * FROM '{TAXI_ZONES_URL}'") | |
| # NYC census blocks | |
| DB_PATH = pathlib.Path('nyc_data.db') | |
| if not DB_PATH.exists(): | |
| leafmap.download_file( | |
| 'https://opengeos.org/data/duckdb/nyc_data.db.zip', | |
| unzip=True, | |
| overwrite=True, | |
| ) | |
| con.execute("ATTACH 'nyc_data.db' AS nyc_data (READ_ONLY)") | |
| # Reproject taxi zones to UTM | |
| con.sql(""" | |
| CREATE OR REPLACE TABLE taxi_zones_utm AS | |
| SELECT * EXCLUDE (geometry), | |
| ST_Transform(geometry, 'EPSG:2263', 'EPSG:26918') AS geometry | |
| FROM taxi_zones | |
| """) | |
| # Zone demographics via spatial join | |
| con.sql(""" | |
| CREATE OR REPLACE TABLE zone_demographics AS | |
| SELECT | |
| tz.LocationID, | |
| tz.zone AS TaxiZone, | |
| tz.borough AS Borough, | |
| SUM(cb.popn_total) AS TotalPop, | |
| SUM(cb.popn_white) AS WhitePop, | |
| SUM(cb.popn_black) AS BlackPop, | |
| 100.0 * SUM(cb.popn_white) / SUM(cb.popn_total) AS white_pct, | |
| 100.0 * SUM(cb.popn_black) / SUM(cb.popn_total) AS black_pct | |
| FROM nyc_data.nyc_census_blocks AS cb | |
| JOIN taxi_zones_utm AS tz ON ST_Intersects(tz.geometry, cb.geom) | |
| GROUP BY tz.LocationID, tz.zone, tz.borough | |
| """) | |
| # Citywide baselines | |
| baseline_df = con.sql(""" | |
| SELECT | |
| ROUND(100.0 * SUM(popn_white) / SUM(popn_total), 2) AS baseline_white_pct, | |
| ROUND(100.0 * SUM(popn_black) / SUM(popn_total), 2) AS baseline_black_pct | |
| FROM nyc_data.nyc_census_blocks | |
| """).df() | |
| baseline_white = float(baseline_df['baseline_white_pct'].iloc[0]) / 100.0 | |
| baseline_black = float(baseline_df['baseline_black_pct'].iloc[0]) / 100.0 | |
| # Save baselines into the db for app.py to read later | |
| con.sql(f""" | |
| CREATE OR REPLACE TABLE city_baselines AS | |
| SELECT | |
| {baseline_white * 100} AS baseline_white_pct, | |
| {baseline_black * 100} AS baseline_black_pct | |
| """) | |
| # Trip data | |
| pu_field = {'FHV': 'PUlocationID', 'Yellow': 'PULocationID'} | |
| do_field = {'FHV': 'DOlocationID', 'Yellow': 'DOLocationID'} | |
| trip_urls = { | |
| 'FHV_Jan2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-01.parquet', | |
| 'FHV_Feb2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-02.parquet', | |
| 'FHV_Mar2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-03.parquet', | |
| 'Yellow_Jan2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet', | |
| 'Yellow_Feb2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet', | |
| 'Yellow_Mar2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-03.parquet', | |
| } | |
| con.execute("CREATE OR REPLACE TABLE trip_counts_pu (service VARCHAR, month VARCHAR, LocationID INTEGER, trips_pu BIGINT)") | |
| con.execute("CREATE OR REPLACE TABLE trip_counts_do (service VARCHAR, month VARCHAR, LocationID INTEGER, trips_do BIGINT)") | |
| DATA_DIR = 'trip_data' | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| for key, url in trip_urls.items(): | |
| local_path = os.path.join(DATA_DIR, f'{key}.parquet') | |
| if not os.path.exists(local_path): | |
| print(f'Downloading {key}...') | |
| r = requests.get(url, timeout=120) | |
| r.raise_for_status() | |
| with open(local_path, 'wb') as f: | |
| f.write(r.content) | |
| else: | |
| print(f'Already exists: {local_path}') | |
| service, month = key.split('_') | |
| pu, do = pu_field[service], do_field[service] | |
| con.sql(f""" | |
| INSERT INTO trip_counts_pu | |
| SELECT '{service}', '{month}', CAST({pu} AS INTEGER), COUNT(*) | |
| FROM '{local_path}' | |
| WHERE {pu} IS NOT NULL AND CAST({pu} AS INTEGER) NOT IN (0, 264, 265) | |
| GROUP BY {pu} | |
| """) | |
| con.sql(f""" | |
| INSERT INTO trip_counts_do | |
| SELECT '{service}', '{month}', CAST({do} AS INTEGER), COUNT(*) | |
| FROM '{local_path}' | |
| WHERE {do} IS NOT NULL AND CAST({do} AS INTEGER) NOT IN (0, 264, 265) | |
| GROUP BY {do} | |
| """) | |
| # Representative ratios | |
| rr_pu_df = con.sql(f""" | |
| SELECT | |
| tp.service, tp.month, | |
| SUM(tp.trips_pu * zd.white_pct) * 1.0 / SUM(tp.trips_pu) / {baseline_white*100} AS RR_white_PU, | |
| SUM(tp.trips_pu * zd.black_pct) * 1.0 / SUM(tp.trips_pu) / {baseline_black*100} AS RR_black_PU | |
| FROM trip_counts_pu AS tp | |
| JOIN zone_demographics AS zd ON tp.LocationID = zd.LocationID | |
| WHERE zd.TotalPop > 0 | |
| GROUP BY tp.service, tp.month | |
| """).df() | |
| rr_do_df = con.sql(f""" | |
| SELECT | |
| td.service, td.month, | |
| SUM(td.trips_do * zd.white_pct) * 1.0 / SUM(td.trips_do) / {baseline_white*100} AS RR_white_DO, | |
| SUM(td.trips_do * zd.black_pct) * 1.0 / SUM(td.trips_do) / {baseline_black*100} AS RR_black_DO | |
| FROM trip_counts_do AS td | |
| JOIN zone_demographics AS zd ON td.LocationID = zd.LocationID | |
| WHERE zd.TotalPop > 0 | |
| GROUP BY td.service, td.month | |
| """).df() | |
| rr_combined = pd.merge(rr_pu_df, rr_do_df, on=['service', 'month'], how='outer') | |
| con.sql("CREATE OR REPLACE TABLE rr_combined AS SELECT * FROM rr_combined") | |
| print('Pipeline complete. processed_dashboard.db is ready.') | |
| con.close() |