File size: 4,768 Bytes
44131b7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import os
import pathlib
import duckdb
import pandas as pd
import leafmap

# DuckDB connection & extensions
con = duckdb.connect('processed_dashboard.db')
con.install_extension("httpfs")
con.load_extension("httpfs")
con.install_extension("spatial")
con.load_extension("spatial")

# Taxi-zone geometry
TAXI_ZONES_URL = (
    "https://data.source.coop/cholmes/nyc-taxi-zones/taxi_zones.parquet"
)
con.sql(f"CREATE OR REPLACE VIEW taxi_zones AS SELECT * FROM '{TAXI_ZONES_URL}'")

# NYC census blocks 
DB_PATH = pathlib.Path("nyc_data.db")
if not DB_PATH.exists():
    leafmap.download_file(
        "https://opengeos.org/data/duckdb/nyc_data.db.zip",
        unzip=True,
        overwrite=True,
    )
con.execute("ATTACH 'nyc_data.db' AS nyc_data (READ_ONLY)")


# Compute demographics
con.sql("""
    CREATE OR REPLACE TABLE taxi_zones_utm AS
    SELECT * EXCLUDE (geometry),
        ST_GeomFromWKB(ST_AsWKB(ST_Transform(geometry, 'EPSG:2263', 'EPSG:26918'))) AS geometry
    FROM taxi_zones
""")

con.sql("""
    CREATE OR REPLACE TABLE zone_demographics AS
    SELECT
        tz.LocationID,
        tz.zone     AS TaxiZone,
        tz.borough  AS Borough,
        SUM(cb.popn_total) AS TotalPop,
        SUM(cb.popn_white) AS WhitePop,
        SUM(cb.popn_black) AS BlackPop,
        100.0 * SUM(cb.popn_white) / SUM(cb.popn_total) AS white_pct,
        100.0 * SUM(cb.popn_black) / SUM(cb.popn_total) AS black_pct
    FROM nyc_data.nyc_census_blocks AS cb
    JOIN taxi_zones_utm AS tz ON ST_Intersects(tz.geometry, cb.geom)
    GROUP BY tz.LocationID, tz.zone, tz.borough
""")

baseline_df = con.sql("""
    SELECT
        ROUND(100.0 * SUM(popn_white) / SUM(popn_total), 2) AS baseline_white_pct,
        ROUND(100.0 * SUM(popn_black) / SUM(popn_total), 2) AS baseline_black_pct
    FROM nyc_data.nyc_census_blocks
""").df()
con.sql("CREATE OR REPLACE TABLE city_baselines AS SELECT * FROM baseline_df")

baseline_white: float = float(baseline_df["baseline_white_pct"].iloc[0]) / 100.0
baseline_black: float = float(baseline_df["baseline_black_pct"].iloc[0]) / 100.0

# Trip data ingestion
_pu_field = {"FHV": "PUlocationID", "Yellow": "PULocationID"}
_do_field = {"FHV": "DOlocationID", "Yellow": "DOLocationID"}

TRIP_URLS = {
    "FHV_Jan2025":    "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-01.parquet",
    "FHV_Feb2025":    "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-02.parquet",
    "FHV_Mar2025":    "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-03.parquet",
    "Yellow_Jan2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet",
    "Yellow_Feb2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet",
    "Yellow_Mar2025": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-03.parquet",
}

con.execute(
    "CREATE OR REPLACE TABLE trip_counts_pu "
    "(service VARCHAR, month VARCHAR, LocationID INTEGER, trips_pu BIGINT)"
)
con.execute(
    "CREATE OR REPLACE TABLE trip_counts_do "
    "(service VARCHAR, month VARCHAR, LocationID INTEGER, trips_do BIGINT)"
)

for key, url in TRIP_URLS.items():
    service, month = key.split("_")
    pu, do = _pu_field[service], _do_field[service]
    con.sql(
        f"INSERT INTO trip_counts_pu "
        f"SELECT '{service}', '{month}', CAST({pu} AS INTEGER), COUNT(*) "
        f"FROM '{url}' "
        f"WHERE {pu} IS NOT NULL AND CAST({pu} AS INTEGER) NOT IN (0, 264, 265) "
        f"GROUP BY {pu}"
    )
    con.sql(
        f"INSERT INTO trip_counts_do "
        f"SELECT '{service}', '{month}', CAST({do} AS INTEGER), COUNT(*) "
        f"FROM '{url}' "
        f"WHERE {do} IS NOT NULL AND CAST({do} AS INTEGER) NOT IN (0, 264, 265) "
        f"GROUP BY {do}"
    )

# Representative ratio summary
bw = baseline_white * 100
bb = baseline_black * 100

rr_pu_df = con.sql(f"""
    SELECT tp.service, tp.month,
        SUM(tp.trips_pu * zd.white_pct) * 1.0 / SUM(tp.trips_pu) / {bw} AS RR_white_PU,
        SUM(tp.trips_pu * zd.black_pct) * 1.0 / SUM(tp.trips_pu) / {bb} AS RR_black_PU
    FROM trip_counts_pu AS tp
    JOIN zone_demographics AS zd ON tp.LocationID = zd.LocationID
    WHERE zd.TotalPop > 0
    GROUP BY tp.service, tp.month
""").df()

rr_do_df = con.sql(f"""
    SELECT td.service, td.month,
        SUM(td.trips_do * zd.white_pct) * 1.0 / SUM(td.trips_do) / {bw} AS RR_white_DO,
        SUM(td.trips_do * zd.black_pct) * 1.0 / SUM(td.trips_do) / {bb} AS RR_black_DO
    FROM trip_counts_do AS td
    JOIN zone_demographics AS zd ON td.LocationID = zd.LocationID
    WHERE zd.TotalPop > 0
    GROUP BY td.service, td.month
""").df()

rr_combined: pd.DataFrame = pd.merge(
    rr_pu_df, rr_do_df, on=["service", "month"], how="outer"
)