File size: 5,442 Bytes
d77a6bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import pathlib
import duckdb
import pandas as pd
import leafmap
import requests

# DuckDB connection & extensions
con = duckdb.connect('processed_dashboard.db')
con.install_extension('httpfs')
con.load_extension('httpfs')
con.install_extension('spatial')
con.load_extension('spatial')

# Taxi-zone geometry
TAXI_ZONES_URL = 'https://data.source.coop/cholmes/nyc-taxi-zones/taxi_zones.parquet'
con.sql(f"CREATE OR REPLACE VIEW taxi_zones AS SELECT * FROM '{TAXI_ZONES_URL}'")

# NYC census blocks
DB_PATH = pathlib.Path('nyc_data.db')
if not DB_PATH.exists():
    leafmap.download_file(
        'https://opengeos.org/data/duckdb/nyc_data.db.zip',
        unzip=True,
        overwrite=True,
    )

con.execute("ATTACH 'nyc_data.db' AS nyc_data (READ_ONLY)")

# Reproject taxi zones to UTM
con.sql("""
    CREATE OR REPLACE TABLE taxi_zones_utm AS
    SELECT * EXCLUDE (geometry),
        ST_Transform(geometry, 'EPSG:2263', 'EPSG:26918') AS geometry
    FROM taxi_zones
""")

# Zone demographics via spatial join
con.sql("""
    CREATE OR REPLACE TABLE zone_demographics AS
    SELECT
        tz.LocationID,
        tz.zone AS TaxiZone,
        tz.borough AS Borough,
        SUM(cb.popn_total) AS TotalPop,
        SUM(cb.popn_white) AS WhitePop,
        SUM(cb.popn_black) AS BlackPop,
        100.0 * SUM(cb.popn_white) / SUM(cb.popn_total) AS white_pct,
        100.0 * SUM(cb.popn_black) / SUM(cb.popn_total) AS black_pct
    FROM nyc_data.nyc_census_blocks AS cb
    JOIN taxi_zones_utm AS tz ON ST_Intersects(tz.geometry, cb.geom)
    GROUP BY tz.LocationID, tz.zone, tz.borough
""")

# Citywide baselines
baseline_df = con.sql("""
    SELECT
        ROUND(100.0 * SUM(popn_white) / SUM(popn_total), 2) AS baseline_white_pct,
        ROUND(100.0 * SUM(popn_black) / SUM(popn_total), 2) AS baseline_black_pct
    FROM nyc_data.nyc_census_blocks
""").df()

baseline_white = float(baseline_df['baseline_white_pct'].iloc[0]) / 100.0
baseline_black = float(baseline_df['baseline_black_pct'].iloc[0]) / 100.0

# Save baselines into the db for app.py to read later
con.sql(f"""
    CREATE OR REPLACE TABLE city_baselines AS
    SELECT
        {baseline_white * 100} AS baseline_white_pct,
        {baseline_black * 100} AS baseline_black_pct
""")

# Trip data
pu_field = {'FHV': 'PUlocationID', 'Yellow': 'PULocationID'}
do_field = {'FHV': 'DOlocationID', 'Yellow': 'DOLocationID'}
trip_urls = {
    'FHV_Jan2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-01.parquet',
    'FHV_Feb2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-02.parquet',
    'FHV_Mar2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-03.parquet',
    'Yellow_Jan2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet',
    'Yellow_Feb2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet',
    'Yellow_Mar2025': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-03.parquet',
}

con.execute("CREATE OR REPLACE TABLE trip_counts_pu (service VARCHAR, month VARCHAR, LocationID INTEGER, trips_pu BIGINT)")
con.execute("CREATE OR REPLACE TABLE trip_counts_do (service VARCHAR, month VARCHAR, LocationID INTEGER, trips_do BIGINT)")

DATA_DIR = 'trip_data'
os.makedirs(DATA_DIR, exist_ok=True)

for key, url in trip_urls.items():
    local_path = os.path.join(DATA_DIR, f'{key}.parquet')
    if not os.path.exists(local_path):
        print(f'Downloading {key}...')
        r = requests.get(url, timeout=120)
        r.raise_for_status()
        with open(local_path, 'wb') as f:
            f.write(r.content)
    else:
        print(f'Already exists: {local_path}')
    service, month = key.split('_')
    pu, do = pu_field[service], do_field[service]
    con.sql(f"""
        INSERT INTO trip_counts_pu
        SELECT '{service}', '{month}', CAST({pu} AS INTEGER), COUNT(*)
        FROM '{local_path}'
        WHERE {pu} IS NOT NULL AND CAST({pu} AS INTEGER) NOT IN (0, 264, 265)
        GROUP BY {pu}
    """)
    con.sql(f"""
        INSERT INTO trip_counts_do
        SELECT '{service}', '{month}', CAST({do} AS INTEGER), COUNT(*)
        FROM '{local_path}'
        WHERE {do} IS NOT NULL AND CAST({do} AS INTEGER) NOT IN (0, 264, 265)
        GROUP BY {do}
    """)

# Representative ratios
rr_pu_df = con.sql(f"""
    SELECT
        tp.service, tp.month,
        SUM(tp.trips_pu * zd.white_pct) * 1.0 / SUM(tp.trips_pu) / {baseline_white*100} AS RR_white_PU,
        SUM(tp.trips_pu * zd.black_pct) * 1.0 / SUM(tp.trips_pu) / {baseline_black*100} AS RR_black_PU
    FROM trip_counts_pu AS tp
    JOIN zone_demographics AS zd ON tp.LocationID = zd.LocationID
    WHERE zd.TotalPop > 0
    GROUP BY tp.service, tp.month
""").df()

rr_do_df = con.sql(f"""
    SELECT
        td.service, td.month,
        SUM(td.trips_do * zd.white_pct) * 1.0 / SUM(td.trips_do) / {baseline_white*100} AS RR_white_DO,
        SUM(td.trips_do * zd.black_pct) * 1.0 / SUM(td.trips_do) / {baseline_black*100} AS RR_black_DO
    FROM trip_counts_do AS td
    JOIN zone_demographics AS zd ON td.LocationID = zd.LocationID
    WHERE zd.TotalPop > 0
    GROUP BY td.service, td.month
""").df()

rr_combined = pd.merge(rr_pu_df, rr_do_df, on=['service', 'month'], how='outer')
con.sql("CREATE OR REPLACE TABLE rr_combined AS SELECT * FROM rr_combined")

print('Pipeline complete. processed_dashboard.db is ready.')
con.close()