nycmobility / data_pipeline.py
arjavrawal
uploading files
44131b7
import os
import pathlib
import duckdb
import pandas as pd
import leafmap
# DuckDB connection & extensions
con = duckdb.connect('processed_dashboard.db')
con.install_extension("httpfs")
con.load_extension("httpfs")
con.install_extension("spatial")
con.load_extension("spatial")
# Taxi-zone geometry
TAXI_ZONES_URL = (
"https://data.source.coop/cholmes/nyc-taxi-zones/taxi_zones.parquet"
)
con.sql(f"CREATE OR REPLACE VIEW taxi_zones AS SELECT * FROM '{TAXI_ZONES_URL}'")
# NYC census blocks
DB_PATH = pathlib.Path("nyc_data.db")
if not DB_PATH.exists():
leafmap.download_file(
"https://opengeos.org/data/duckdb/nyc_data.db.zip",
unzip=True,
overwrite=True,
)
con.execute("ATTACH 'nyc_data.db' AS nyc_data (READ_ONLY)")
# Compute demographics
con.sql("""
CREATE OR REPLACE TABLE taxi_zones_utm AS
SELECT * EXCLUDE (geometry),
ST_GeomFromWKB(ST_AsWKB(ST_Transform(geometry, 'EPSG:2263', 'EPSG:26918'))) AS geometry
FROM taxi_zones
""")
con.sql("""
CREATE OR REPLACE TABLE zone_demographics AS
SELECT
tz.LocationID,
tz.zone AS TaxiZone,
tz.borough AS Borough,
SUM(cb.popn_total) AS TotalPop,
SUM(cb.popn_white) AS WhitePop,
SUM(cb.popn_black) AS BlackPop,
100.0 * SUM(cb.popn_white) / SUM(cb.popn_total) AS white_pct,
100.0 * SUM(cb.popn_black) / SUM(cb.popn_total) AS black_pct
FROM nyc_data.nyc_census_blocks AS cb
JOIN taxi_zones_utm AS tz ON ST_Intersects(tz.geometry, cb.geom)
GROUP BY tz.LocationID, tz.zone, tz.borough
""")
baseline_df = con.sql("""
SELECT
ROUND(100.0 * SUM(popn_white) / SUM(popn_total), 2) AS baseline_white_pct,
ROUND(100.0 * SUM(popn_black) / SUM(popn_total), 2) AS baseline_black_pct
FROM nyc_data.nyc_census_blocks
""").df()
con.sql("CREATE OR REPLACE TABLE city_baselines AS SELECT * FROM baseline_df")
baseline_white: float = float(baseline_df["baseline_white_pct"].iloc[0]) / 100.0
baseline_black: float = float(baseline_df["baseline_black_pct"].iloc[0]) / 100.0
# Trip data ingestion
_pu_field = {"FHV": "PUlocationID", "Yellow": "PULocationID"}
_do_field = {"FHV": "DOlocationID", "Yellow": "DOLocationID"}
TRIP_URLS = {
"FHV_Jan2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-01.parquet",
"FHV_Feb2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-02.parquet",
"FHV_Mar2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-03.parquet",
"Yellow_Jan2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet",
"Yellow_Feb2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet",
"Yellow_Mar2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-03.parquet",
}
con.execute(
"CREATE OR REPLACE TABLE trip_counts_pu "
"(service VARCHAR, month VARCHAR, LocationID INTEGER, trips_pu BIGINT)"
)
con.execute(
"CREATE OR REPLACE TABLE trip_counts_do "
"(service VARCHAR, month VARCHAR, LocationID INTEGER, trips_do BIGINT)"
)
for key, url in TRIP_URLS.items():
service, month = key.split("_")
pu, do = _pu_field[service], _do_field[service]
con.sql(
f"INSERT INTO trip_counts_pu "
f"SELECT '{service}', '{month}', CAST({pu} AS INTEGER), COUNT(*) "
f"FROM '{url}' "
f"WHERE {pu} IS NOT NULL AND CAST({pu} AS INTEGER) NOT IN (0, 264, 265) "
f"GROUP BY {pu}"
)
con.sql(
f"INSERT INTO trip_counts_do "
f"SELECT '{service}', '{month}', CAST({do} AS INTEGER), COUNT(*) "
f"FROM '{url}' "
f"WHERE {do} IS NOT NULL AND CAST({do} AS INTEGER) NOT IN (0, 264, 265) "
f"GROUP BY {do}"
)
# Representative ratio summary
bw = baseline_white * 100
bb = baseline_black * 100
rr_pu_df = con.sql(f"""
SELECT tp.service, tp.month,
SUM(tp.trips_pu * zd.white_pct) * 1.0 / SUM(tp.trips_pu) / {bw} AS RR_white_PU,
SUM(tp.trips_pu * zd.black_pct) * 1.0 / SUM(tp.trips_pu) / {bb} AS RR_black_PU
FROM trip_counts_pu AS tp
JOIN zone_demographics AS zd ON tp.LocationID = zd.LocationID
WHERE zd.TotalPop > 0
GROUP BY tp.service, tp.month
""").df()
rr_do_df = con.sql(f"""
SELECT td.service, td.month,
SUM(td.trips_do * zd.white_pct) * 1.0 / SUM(td.trips_do) / {bw} AS RR_white_DO,
SUM(td.trips_do * zd.black_pct) * 1.0 / SUM(td.trips_do) / {bb} AS RR_black_DO
FROM trip_counts_do AS td
JOIN zone_demographics AS zd ON td.LocationID = zd.LocationID
WHERE zd.TotalPop > 0
GROUP BY td.service, td.month
""").df()
rr_combined: pd.DataFrame = pd.merge(
rr_pu_df, rr_do_df, on=["service", "month"], how="outer"
)