| import os |
| import pathlib |
| import duckdb |
| import pandas as pd |
| import leafmap |
|
|
| |
| con = duckdb.connect('processed_dashboard.db') |
| con.install_extension("httpfs") |
| con.load_extension("httpfs") |
| con.install_extension("spatial") |
| con.load_extension("spatial") |
|
|
| |
| TAXI_ZONES_URL = ( |
| "https://data.source.coop/cholmes/nyc-taxi-zones/taxi_zones.parquet" |
| ) |
| con.sql(f"CREATE OR REPLACE VIEW taxi_zones AS SELECT * FROM '{TAXI_ZONES_URL}'") |
|
|
| |
| DB_PATH = pathlib.Path("nyc_data.db") |
| if not DB_PATH.exists(): |
| leafmap.download_file( |
| "https://opengeos.org/data/duckdb/nyc_data.db.zip", |
| unzip=True, |
| overwrite=True, |
| ) |
| con.execute("ATTACH 'nyc_data.db' AS nyc_data (READ_ONLY)") |
|
|
|
|
| |
| con.sql(""" |
| CREATE OR REPLACE TABLE taxi_zones_utm AS |
| SELECT * EXCLUDE (geometry), |
| ST_GeomFromWKB(ST_AsWKB(ST_Transform(geometry, 'EPSG:2263', 'EPSG:26918'))) AS geometry |
| FROM taxi_zones |
| """) |
|
|
| con.sql(""" |
| CREATE OR REPLACE TABLE zone_demographics AS |
| SELECT |
| tz.LocationID, |
| tz.zone AS TaxiZone, |
| tz.borough AS Borough, |
| SUM(cb.popn_total) AS TotalPop, |
| SUM(cb.popn_white) AS WhitePop, |
| SUM(cb.popn_black) AS BlackPop, |
| 100.0 * SUM(cb.popn_white) / SUM(cb.popn_total) AS white_pct, |
| 100.0 * SUM(cb.popn_black) / SUM(cb.popn_total) AS black_pct |
| FROM nyc_data.nyc_census_blocks AS cb |
| JOIN taxi_zones_utm AS tz ON ST_Intersects(tz.geometry, cb.geom) |
| GROUP BY tz.LocationID, tz.zone, tz.borough |
| """) |
|
|
| baseline_df = con.sql(""" |
| SELECT |
| ROUND(100.0 * SUM(popn_white) / SUM(popn_total), 2) AS baseline_white_pct, |
| ROUND(100.0 * SUM(popn_black) / SUM(popn_total), 2) AS baseline_black_pct |
| FROM nyc_data.nyc_census_blocks |
| """).df() |
| con.sql("CREATE OR REPLACE TABLE city_baselines AS SELECT * FROM baseline_df") |
|
|
| baseline_white: float = float(baseline_df["baseline_white_pct"].iloc[0]) / 100.0 |
| baseline_black: float = float(baseline_df["baseline_black_pct"].iloc[0]) / 100.0 |
|
|
| |
| _pu_field = {"FHV": "PUlocationID", "Yellow": "PULocationID"} |
| _do_field = {"FHV": "DOlocationID", "Yellow": "DOLocationID"} |
|
|
| TRIP_URLS = { |
| "FHV_Jan2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-01.parquet", |
| "FHV_Feb2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-02.parquet", |
| "FHV_Mar2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-03.parquet", |
| "Yellow_Jan2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet", |
| "Yellow_Feb2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet", |
| "Yellow_Mar2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-03.parquet", |
| } |
|
|
| con.execute( |
| "CREATE OR REPLACE TABLE trip_counts_pu " |
| "(service VARCHAR, month VARCHAR, LocationID INTEGER, trips_pu BIGINT)" |
| ) |
| con.execute( |
| "CREATE OR REPLACE TABLE trip_counts_do " |
| "(service VARCHAR, month VARCHAR, LocationID INTEGER, trips_do BIGINT)" |
| ) |
|
|
| for key, url in TRIP_URLS.items(): |
| service, month = key.split("_") |
| pu, do = _pu_field[service], _do_field[service] |
| con.sql( |
| f"INSERT INTO trip_counts_pu " |
| f"SELECT '{service}', '{month}', CAST({pu} AS INTEGER), COUNT(*) " |
| f"FROM '{url}' " |
| f"WHERE {pu} IS NOT NULL AND CAST({pu} AS INTEGER) NOT IN (0, 264, 265) " |
| f"GROUP BY {pu}" |
| ) |
| con.sql( |
| f"INSERT INTO trip_counts_do " |
| f"SELECT '{service}', '{month}', CAST({do} AS INTEGER), COUNT(*) " |
| f"FROM '{url}' " |
| f"WHERE {do} IS NOT NULL AND CAST({do} AS INTEGER) NOT IN (0, 264, 265) " |
| f"GROUP BY {do}" |
| ) |
|
|
| |
| bw = baseline_white * 100 |
| bb = baseline_black * 100 |
|
|
| rr_pu_df = con.sql(f""" |
| SELECT tp.service, tp.month, |
| SUM(tp.trips_pu * zd.white_pct) * 1.0 / SUM(tp.trips_pu) / {bw} AS RR_white_PU, |
| SUM(tp.trips_pu * zd.black_pct) * 1.0 / SUM(tp.trips_pu) / {bb} AS RR_black_PU |
| FROM trip_counts_pu AS tp |
| JOIN zone_demographics AS zd ON tp.LocationID = zd.LocationID |
| WHERE zd.TotalPop > 0 |
| GROUP BY tp.service, tp.month |
| """).df() |
|
|
| rr_do_df = con.sql(f""" |
| SELECT td.service, td.month, |
| SUM(td.trips_do * zd.white_pct) * 1.0 / SUM(td.trips_do) / {bw} AS RR_white_DO, |
| SUM(td.trips_do * zd.black_pct) * 1.0 / SUM(td.trips_do) / {bb} AS RR_black_DO |
| FROM trip_counts_do AS td |
| JOIN zone_demographics AS zd ON td.LocationID = zd.LocationID |
| WHERE zd.TotalPop > 0 |
| GROUP BY td.service, td.month |
| """).df() |
|
|
| rr_combined: pd.DataFrame = pd.merge( |
| rr_pu_df, rr_do_df, on=["service", "month"], how="outer" |
| ) |