| import io |
| import os |
| import pathlib |
| import duckdb |
| import pandas as pd |
| import geopandas as gpd |
| import requests |
|
|
| con = duckdb.connect("sf_dashboard.db") |
| con.install_extension("httpfs") |
| con.load_extension("httpfs") |
| con.install_extension("spatial") |
| con.load_extension("spatial") |
|
|
| NHOOD_URL = "https://data.sfgov.org/resource/j2bu-swwd.geojson" |
| print("[1/6] Loading SF Analysis Neighborhoods ...") |
| nhoods = gpd.read_file(NHOOD_URL) |
| nhoods_df = nhoods[["nhood", "geometry"]].copy() |
| nhoods_df = nhoods_df.to_crs("EPSG:4326") |
| nhoods_utm = nhoods_df.to_crs("EPSG:26910") |
| print(f" Loaded {len(nhoods_df)} neighborhoods") |
|
|
| |
| nhoods_df["geometry"] = nhoods_df["geometry"].apply(lambda g: bytes(g.wkb)) |
| con.sql(""" |
| CREATE OR REPLACE TABLE neighborhoods AS |
| SELECT nhood, ST_GeomFromWKB(geometry)::GEOMETRY AS geometry |
| FROM nhoods_df |
| """) |
|
|
| |
| nhoods_utm["geometry"] = nhoods_utm["geometry"].apply(lambda g: bytes(g.wkb)) |
| con.sql(""" |
| CREATE OR REPLACE TABLE neighborhoods_utm AS |
| SELECT nhood, ST_GeomFromWKB(geometry)::GEOMETRY AS geometry |
| FROM nhoods_utm |
| """) |
|
|
| SOCRATA_BASE = "https://data.sfgov.org/resource/m8hk-2ipk.csv" |
| MONTHS = [ |
| ("Jan2024", "2024-01-01T00:00:00", "2024-01-31T23:59:59"), |
| ("Feb2024", "2024-02-01T00:00:00", "2024-02-29T23:59:59"), |
| ("Mar2024", "2024-03-01T00:00:00", "2024-03-31T23:59:59"), |
| ("Apr2024", "2024-04-01T00:00:00", "2024-04-30T23:59:59"), |
| ("May2024", "2024-05-01T00:00:00", "2024-05-31T23:59:59"), |
| ("Jun2024", "2024-06-01T00:00:00", "2024-06-30T23:59:59"), |
| ("Jul2024", "2024-07-01T00:00:00", "2024-07-31T23:59:59"), |
| ("Aug2024", "2024-08-01T00:00:00", "2024-08-31T23:59:59"), |
| ("Sep2024", "2024-09-01T00:00:00", "2024-09-30T23:59:59"), |
| ("Oct2024", "2024-10-01T00:00:00", "2024-10-31T23:59:59"), |
| ("Nov2024", "2024-11-01T00:00:00", "2024-11-30T23:59:59"), |
| ("Dec2024", "2024-12-01T00:00:00", "2024-12-31T23:59:59"), |
| ] |
| LIMIT = 1000 |
| print("[2/6] Downloading SF taxi trips ...") |
| if not os.path.exists("raw_trips.csv"): |
| all_trips = [] |
| for month_label, start, end in MONTHS: |
| OFFSET = 0 |
| while True: |
| params = { |
| "$where": f"start_time_local between '{start}' and '{end}'", |
| "$limit": LIMIT, |
| "$offset": OFFSET, |
| "$order": "start_time_local" |
| } |
| response = requests.get(SOCRATA_BASE, params=params, timeout=30) |
| df = pd.read_csv(io.StringIO(response.text)) |
| df["month"] = month_label |
| all_trips.append(df) |
| if len(df) < LIMIT: |
| break |
| OFFSET += LIMIT |
| print(f" {month_label}: {len(df)} rows") |
|
|
| trips_df = pd.concat(all_trips, ignore_index=True) |
| trips_df.to_csv("raw_trips.csv", index=False) |
| else: |
| trips_df = pd.read_csv("raw_trips.csv") |
|
|
| |
| trips_df = trips_df.dropna( |
| subset=[ |
| "pickup_location_latitude", "pickup_location_longitude", |
| "dropoff_location_latitude", "dropoff_location_longitude", |
| ] |
| ) |
| trips_df = trips_df[ |
| (trips_df["pickup_location_latitude"] != 0) |
| & (trips_df["pickup_location_longitude"] != 0) |
| & (trips_df["dropoff_location_latitude"] != 0) |
| & (trips_df["dropoff_location_longitude"] != 0) |
| ] |
|
|
| |
| def normalise_hail_type(hail_type): |
| if hail_type in ["street","dispatch"]: |
| return "Street" |
| else: |
| return "App" |
| trips_df["hail_type"] = trips_df["hail_type"].apply(normalise_hail_type) |
| bad_flags = ['DR', 'FTR', 'ST', 'ET'] |
| trips_df = trips_df[ |
| ~trips_df['qa_flags'].fillna('').apply( |
| lambda flags: any(f in flags.split('-') for f in bad_flags) |
| ) |
| ] |
|
|
| con.sql("CREATE OR REPLACE TABLE raw_trips AS SELECT * FROM trips_df") |
|
|
| print("[3/6] Spatial join: pickup points to neighborhoods ...") |
| con.sql(""" |
| CREATE OR REPLACE TABLE trip_counts_pu AS |
| SELECT |
| t.hail_type, |
| t.month, |
| n.nhood, |
| COUNT(*) AS trips_pu |
| FROM raw_trips AS t |
| JOIN neighborhoods AS n |
| ON ST_Intersects( |
| n.geometry, |
| ST_Point(t.pickup_location_longitude, t.pickup_location_latitude)::GEOMETRY |
| ) |
| GROUP BY t.hail_type, t.month, n.nhood |
| """) |
|
|
| print("[3/6] Spatial join: dropoff points to neighborhoods ...") |
| con.sql(""" |
| CREATE OR REPLACE TABLE trip_counts_do AS |
| SELECT |
| t.hail_type, |
| t.month, |
| n.nhood, |
| COUNT(*) AS trips_do |
| FROM raw_trips AS t |
| JOIN neighborhoods AS n |
| ON ST_Intersects( |
| n.geometry, |
| ST_Point(t.dropoff_location_longitude, t.dropoff_location_latitude)::GEOMETRY |
| ) |
| GROUP BY t.hail_type, t.month, n.nhood |
| """) |
|
|
| top5 = con.sql(""" |
| SELECT nhood, SUM(trips_pu) AS total |
| FROM trip_counts_pu GROUP BY nhood ORDER BY total DESC LIMIT 5 |
| """).df() |
| print(" Top 5 pickup neighborhoods:") |
| print(top5.to_string(index=False)) |
|
|
| print("[4/6] Computing neighborhood demographics ...") |
|
|
| |
| response = requests.get( |
| "https://api.census.gov/data/2022/acs/acs5", |
| params={ |
| "get": "B02001_001E,B02001_002E,B02001_003E,B02001_005E", |
| "ucgid": "pseudo(0500000US06075$1500000)" |
| }, |
| timeout=30 |
| ) |
| data = response.json() |
|
|
| census_df = pd.DataFrame(data[1:], columns=data[0]) |
|
|
| |
| for col in ["B02001_001E", "B02001_002E", "B02001_003E", "B02001_005E"]: |
| census_df[col] = pd.to_numeric(census_df[col], errors="coerce") |
|
|
| census_df = census_df.rename(columns={ |
| "B02001_001E": "total_pop", |
| "B02001_002E": "white_pop", |
| "B02001_003E": "black_pop", |
| "B02001_005E": "asian_pop", |
| }) |
|
|
| census_df["GEOID"] = census_df["ucgid"].str[-12:] |
| BG_URL = "https://www2.census.gov/geo/tiger/TIGER2022/BG/tl_2022_06_bg.zip" |
| bg_gdf = gpd.read_file(BG_URL) |
| bg_gdf = bg_gdf[bg_gdf["COUNTYFP"] == "075"] |
| bg_gdf = bg_gdf[["GEOID", "geometry"]].copy() |
| bg_gdf = bg_gdf.to_crs("EPSG:4326") |
|
|
| |
| census_gdf = bg_gdf.merge( |
| census_df[["GEOID", "total_pop", "white_pop", "black_pop", "asian_pop"]], |
| on="GEOID", |
| how="inner" |
| ) |
| census_db = pd.DataFrame(census_gdf) |
| census_db["geometry"] = census_gdf["geometry"].apply(lambda g: bytes(g.wkb)) |
| con.register("census_raw", census_db) |
| con.sql(""" |
| CREATE OR REPLACE TABLE census_blocks AS |
| SELECT GEOID, total_pop, white_pop, black_pop, asian_pop, |
| ST_GeomFromWKB(geometry)::GEOMETRY AS geometry |
| FROM census_raw |
| """) |
| con.sql(""" |
| CREATE OR REPLACE TABLE nhood_demographics AS |
| SELECT |
| n.nhood, |
| SUM(cb.total_pop) AS total_pop, |
| SUM(cb.white_pop) AS white_pop, |
| SUM(cb.black_pop) AS black_pop, |
| SUM(cb.asian_pop) AS asian_pop, |
| CASE WHEN SUM(cb.total_pop) > 0 |
| THEN 100.0 * SUM(cb.white_pop) / SUM(cb.total_pop) |
| ELSE 0 END AS white_pct, |
| CASE WHEN SUM(cb.total_pop) > 0 |
| THEN 100.0 * SUM(cb.black_pop) / SUM(cb.total_pop) |
| ELSE 0 END AS black_pct, |
| CASE WHEN SUM(cb.total_pop) > 0 |
| THEN 100.0 * SUM(cb.asian_pop) / SUM(cb.total_pop) |
| ELSE 0 END AS asian_pct |
| FROM census_blocks AS cb |
| JOIN neighborhoods AS n |
| ON ST_Intersects(n.geometry, cb.geometry) |
| GROUP BY n.nhood |
| """) |
| con.sql("SELECT * FROM nhood_demographics ORDER BY total_pop DESC LIMIT 10").df() |
|
|
| print("[5/6] Computing city-wide baselines ...") |
| baseline_df = con.sql(""" |
| SELECT |
| ROUND(100.0 * SUM(white_pop) / SUM(total_pop), 2) AS baseline_white_pct, |
| ROUND(100.0 * SUM(black_pop) / SUM(total_pop), 2) AS baseline_black_pct, |
| ROUND(100.0 * SUM(asian_pop) / SUM(total_pop), 2) AS baseline_asian_pct |
| FROM nhood_demographics |
| WHERE total_pop > 0 |
| """).df() |
| con.sql("CREATE OR REPLACE TABLE city_baselines AS SELECT * FROM baseline_df") |
| print(f" Baselines: {baseline_df.to_dict('records')[0]}") |
|
|
| bw = float(baseline_df["baseline_white_pct"].iloc[0]) |
| bb = float(baseline_df["baseline_black_pct"].iloc[0]) |
| ba = float(baseline_df["baseline_asian_pct"].iloc[0]) |
|
|
| print("[6/6] Computing representative ratios ...") |
| rr_pu_df = con.sql(f""" |
| SELECT tp.hail_type, tp.month, |
| SUM(tp.trips_pu * nd.white_pct) * 1.0 |
| / SUM(tp.trips_pu) / {bw} AS RR_white_PU, |
| SUM(tp.trips_pu * nd.asian_pct) * 1.0 |
| / SUM(tp.trips_pu) / {ba} AS RR_asian_PU |
| FROM trip_counts_pu AS tp |
| JOIN nhood_demographics AS nd ON tp.nhood = nd.nhood |
| WHERE nd.total_pop > 0 |
| GROUP BY tp.hail_type, tp.month |
| """).df() |
|
|
| rr_do_df = con.sql(f""" |
| SELECT td.hail_type, td.month, |
| SUM(td.trips_do * nd.white_pct) * 1.0 |
| / SUM(td.trips_do) / {bw} AS RR_white_DO, |
| SUM(td.trips_do * nd.asian_pct) * 1.0 |
| / SUM(td.trips_do) / {ba} AS RR_asian_DO |
| FROM trip_counts_do AS td |
| JOIN nhood_demographics AS nd ON td.nhood = nd.nhood |
| WHERE nd.total_pop > 0 |
| GROUP BY td.hail_type, td.month |
| """).df() |
|
|
| rr_combined = pd.merge( |
| rr_pu_df, rr_do_df, on=["hail_type", "month"], how="outer" |
| ) |
| con.sql("CREATE OR REPLACE TABLE representative_ratios AS SELECT * FROM rr_combined") |
|
|
| print("\nPipeline complete. Database: sf_dashboard.db") |
| print("Representative ratios:") |
| print(rr_combined.to_string(index=False)) |
|
|
| con.close() |
|
|