HuggingFaceNYCTaxi / data_pipeline.py
cmande62's picture
Initial dashboard deployment
d77a6bf
import os
import pathlib
import duckdb
import pandas as pd
import leafmap
import requests
# DuckDB connection & extensions
con = duckdb.connect('processed_dashboard.db')
con.install_extension('httpfs')
con.load_extension('httpfs')
con.install_extension('spatial')
con.load_extension('spatial')
# Taxi-zone geometry
TAXI_ZONES_URL = 'https://data.source.coop/cholmes/nyc-taxi-zones/taxi_zones.parquet'
con.sql(f"CREATE OR REPLACE VIEW taxi_zones AS SELECT * FROM '{TAXI_ZONES_URL}'")
# NYC census blocks
DB_PATH = pathlib.Path('nyc_data.db')
if not DB_PATH.exists():
leafmap.download_file(
'https://opengeos.org/data/duckdb/nyc_data.db.zip',
unzip=True,
overwrite=True,
)
con.execute("ATTACH 'nyc_data.db' AS nyc_data (READ_ONLY)")
# Reproject taxi zones to UTM
con.sql("""
CREATE OR REPLACE TABLE taxi_zones_utm AS
SELECT * EXCLUDE (geometry),
ST_Transform(geometry, 'EPSG:2263', 'EPSG:26918') AS geometry
FROM taxi_zones
""")
# Zone demographics via spatial join
con.sql("""
CREATE OR REPLACE TABLE zone_demographics AS
SELECT
tz.LocationID,
tz.zone AS TaxiZone,
tz.borough AS Borough,
SUM(cb.popn_total) AS TotalPop,
SUM(cb.popn_white) AS WhitePop,
SUM(cb.popn_black) AS BlackPop,
100.0 * SUM(cb.popn_white) / SUM(cb.popn_total) AS white_pct,
100.0 * SUM(cb.popn_black) / SUM(cb.popn_total) AS black_pct
FROM nyc_data.nyc_census_blocks AS cb
JOIN taxi_zones_utm AS tz ON ST_Intersects(tz.geometry, cb.geom)
GROUP BY tz.LocationID, tz.zone, tz.borough
""")
# Citywide baselines
baseline_df = con.sql("""
SELECT
ROUND(100.0 * SUM(popn_white) / SUM(popn_total), 2) AS baseline_white_pct,
ROUND(100.0 * SUM(popn_black) / SUM(popn_total), 2) AS baseline_black_pct
FROM nyc_data.nyc_census_blocks
""").df()
baseline_white = float(baseline_df['baseline_white_pct'].iloc[0]) / 100.0
baseline_black = float(baseline_df['baseline_black_pct'].iloc[0]) / 100.0
# Save baselines into the db for app.py to read later
con.sql(f"""
CREATE OR REPLACE TABLE city_baselines AS
SELECT
{baseline_white * 100} AS baseline_white_pct,
{baseline_black * 100} AS baseline_black_pct
""")
# Trip data
pu_field = {'FHV': 'PUlocationID', 'Yellow': 'PULocationID'}
do_field = {'FHV': 'DOlocationID', 'Yellow': 'DOLocationID'}
trip_urls = {
'FHV_Jan2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-01.parquet',
'FHV_Feb2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-02.parquet',
'FHV_Mar2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-03.parquet',
'Yellow_Jan2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet',
'Yellow_Feb2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet',
'Yellow_Mar2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-03.parquet',
}
con.execute("CREATE OR REPLACE TABLE trip_counts_pu (service VARCHAR, month VARCHAR, LocationID INTEGER, trips_pu BIGINT)")
con.execute("CREATE OR REPLACE TABLE trip_counts_do (service VARCHAR, month VARCHAR, LocationID INTEGER, trips_do BIGINT)")
DATA_DIR = 'trip_data'
os.makedirs(DATA_DIR, exist_ok=True)
for key, url in trip_urls.items():
local_path = os.path.join(DATA_DIR, f'{key}.parquet')
if not os.path.exists(local_path):
print(f'Downloading {key}...')
r = requests.get(url, timeout=120)
r.raise_for_status()
with open(local_path, 'wb') as f:
f.write(r.content)
else:
print(f'Already exists: {local_path}')
service, month = key.split('_')
pu, do = pu_field[service], do_field[service]
con.sql(f"""
INSERT INTO trip_counts_pu
SELECT '{service}', '{month}', CAST({pu} AS INTEGER), COUNT(*)
FROM '{local_path}'
WHERE {pu} IS NOT NULL AND CAST({pu} AS INTEGER) NOT IN (0, 264, 265)
GROUP BY {pu}
""")
con.sql(f"""
INSERT INTO trip_counts_do
SELECT '{service}', '{month}', CAST({do} AS INTEGER), COUNT(*)
FROM '{local_path}'
WHERE {do} IS NOT NULL AND CAST({do} AS INTEGER) NOT IN (0, 264, 265)
GROUP BY {do}
""")
# Representative ratios
rr_pu_df = con.sql(f"""
SELECT
tp.service, tp.month,
SUM(tp.trips_pu * zd.white_pct) * 1.0 / SUM(tp.trips_pu) / {baseline_white*100} AS RR_white_PU,
SUM(tp.trips_pu * zd.black_pct) * 1.0 / SUM(tp.trips_pu) / {baseline_black*100} AS RR_black_PU
FROM trip_counts_pu AS tp
JOIN zone_demographics AS zd ON tp.LocationID = zd.LocationID
WHERE zd.TotalPop > 0
GROUP BY tp.service, tp.month
""").df()
rr_do_df = con.sql(f"""
SELECT
td.service, td.month,
SUM(td.trips_do * zd.white_pct) * 1.0 / SUM(td.trips_do) / {baseline_white*100} AS RR_white_DO,
SUM(td.trips_do * zd.black_pct) * 1.0 / SUM(td.trips_do) / {baseline_black*100} AS RR_black_DO
FROM trip_counts_do AS td
JOIN zone_demographics AS zd ON td.LocationID = zd.LocationID
WHERE zd.TotalPop > 0
GROUP BY td.service, td.month
""").df()
rr_combined = pd.merge(rr_pu_df, rr_do_df, on=['service', 'month'], how='outer')
con.sql("CREATE OR REPLACE TABLE rr_combined AS SELECT * FROM rr_combined")
print('Pipeline complete. processed_dashboard.db is ready.')
con.close()