Spaces:
Build error
Build error
File size: 5,442 Bytes
d77a6bf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | import os
import pathlib
import duckdb
import pandas as pd
import leafmap
import requests
# DuckDB connection & extensions
con = duckdb.connect('processed_dashboard.db')
con.install_extension('httpfs')
con.load_extension('httpfs')
con.install_extension('spatial')
con.load_extension('spatial')
# Taxi-zone geometry
TAXI_ZONES_URL = 'https://data.source.coop/cholmes/nyc-taxi-zones/taxi_zones.parquet'
con.sql(f"CREATE OR REPLACE VIEW taxi_zones AS SELECT * FROM '{TAXI_ZONES_URL}'")
# NYC census blocks
DB_PATH = pathlib.Path('nyc_data.db')
if not DB_PATH.exists():
leafmap.download_file(
'https://opengeos.org/data/duckdb/nyc_data.db.zip',
unzip=True,
overwrite=True,
)
con.execute("ATTACH 'nyc_data.db' AS nyc_data (READ_ONLY)")
# Reproject taxi zones to UTM
con.sql("""
CREATE OR REPLACE TABLE taxi_zones_utm AS
SELECT * EXCLUDE (geometry),
ST_Transform(geometry, 'EPSG:2263', 'EPSG:26918') AS geometry
FROM taxi_zones
""")
# Zone demographics via spatial join
con.sql("""
CREATE OR REPLACE TABLE zone_demographics AS
SELECT
tz.LocationID,
tz.zone AS TaxiZone,
tz.borough AS Borough,
SUM(cb.popn_total) AS TotalPop,
SUM(cb.popn_white) AS WhitePop,
SUM(cb.popn_black) AS BlackPop,
100.0 * SUM(cb.popn_white) / SUM(cb.popn_total) AS white_pct,
100.0 * SUM(cb.popn_black) / SUM(cb.popn_total) AS black_pct
FROM nyc_data.nyc_census_blocks AS cb
JOIN taxi_zones_utm AS tz ON ST_Intersects(tz.geometry, cb.geom)
GROUP BY tz.LocationID, tz.zone, tz.borough
""")
# Citywide baselines
baseline_df = con.sql("""
SELECT
ROUND(100.0 * SUM(popn_white) / SUM(popn_total), 2) AS baseline_white_pct,
ROUND(100.0 * SUM(popn_black) / SUM(popn_total), 2) AS baseline_black_pct
FROM nyc_data.nyc_census_blocks
""").df()
baseline_white = float(baseline_df['baseline_white_pct'].iloc[0]) / 100.0
baseline_black = float(baseline_df['baseline_black_pct'].iloc[0]) / 100.0
# Save baselines into the db for app.py to read later
con.sql(f"""
CREATE OR REPLACE TABLE city_baselines AS
SELECT
{baseline_white * 100} AS baseline_white_pct,
{baseline_black * 100} AS baseline_black_pct
""")
# Trip data
pu_field = {'FHV': 'PUlocationID', 'Yellow': 'PULocationID'}
do_field = {'FHV': 'DOlocationID', 'Yellow': 'DOLocationID'}
trip_urls = {
'FHV_Jan2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-01.parquet',
'FHV_Feb2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-02.parquet',
'FHV_Mar2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-03.parquet',
'Yellow_Jan2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet',
'Yellow_Feb2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet',
'Yellow_Mar2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-03.parquet',
}
con.execute("CREATE OR REPLACE TABLE trip_counts_pu (service VARCHAR, month VARCHAR, LocationID INTEGER, trips_pu BIGINT)")
con.execute("CREATE OR REPLACE TABLE trip_counts_do (service VARCHAR, month VARCHAR, LocationID INTEGER, trips_do BIGINT)")
DATA_DIR = 'trip_data'
os.makedirs(DATA_DIR, exist_ok=True)
for key, url in trip_urls.items():
local_path = os.path.join(DATA_DIR, f'{key}.parquet')
if not os.path.exists(local_path):
print(f'Downloading {key}...')
r = requests.get(url, timeout=120)
r.raise_for_status()
with open(local_path, 'wb') as f:
f.write(r.content)
else:
print(f'Already exists: {local_path}')
service, month = key.split('_')
pu, do = pu_field[service], do_field[service]
con.sql(f"""
INSERT INTO trip_counts_pu
SELECT '{service}', '{month}', CAST({pu} AS INTEGER), COUNT(*)
FROM '{local_path}'
WHERE {pu} IS NOT NULL AND CAST({pu} AS INTEGER) NOT IN (0, 264, 265)
GROUP BY {pu}
""")
con.sql(f"""
INSERT INTO trip_counts_do
SELECT '{service}', '{month}', CAST({do} AS INTEGER), COUNT(*)
FROM '{local_path}'
WHERE {do} IS NOT NULL AND CAST({do} AS INTEGER) NOT IN (0, 264, 265)
GROUP BY {do}
""")
# Representative ratios
rr_pu_df = con.sql(f"""
SELECT
tp.service, tp.month,
SUM(tp.trips_pu * zd.white_pct) * 1.0 / SUM(tp.trips_pu) / {baseline_white*100} AS RR_white_PU,
SUM(tp.trips_pu * zd.black_pct) * 1.0 / SUM(tp.trips_pu) / {baseline_black*100} AS RR_black_PU
FROM trip_counts_pu AS tp
JOIN zone_demographics AS zd ON tp.LocationID = zd.LocationID
WHERE zd.TotalPop > 0
GROUP BY tp.service, tp.month
""").df()
rr_do_df = con.sql(f"""
SELECT
td.service, td.month,
SUM(td.trips_do * zd.white_pct) * 1.0 / SUM(td.trips_do) / {baseline_white*100} AS RR_white_DO,
SUM(td.trips_do * zd.black_pct) * 1.0 / SUM(td.trips_do) / {baseline_black*100} AS RR_black_DO
FROM trip_counts_do AS td
JOIN zone_demographics AS zd ON td.LocationID = zd.LocationID
WHERE zd.TotalPop > 0
GROUP BY td.service, td.month
""").df()
rr_combined = pd.merge(rr_pu_df, rr_do_df, on=['service', 'month'], how='outer')
con.sql("CREATE OR REPLACE TABLE rr_combined AS SELECT * FROM rr_combined")
print('Pipeline complete. processed_dashboard.db is ready.')
con.close() |