import io import os import pathlib import duckdb import pandas as pd import geopandas as gpd import requests con = duckdb.connect("sf_dashboard.db") con.install_extension("httpfs") con.load_extension("httpfs") con.install_extension("spatial") con.load_extension("spatial") NHOOD_URL = "https://data.sfgov.org/resource/j2bu-swwd.geojson" print("[1/6] Loading SF Analysis Neighborhoods ...") nhoods = gpd.read_file(NHOOD_URL) nhoods_df = nhoods[["nhood", "geometry"]].copy() nhoods_df = nhoods_df.to_crs("EPSG:4326") nhoods_utm = nhoods_df.to_crs("EPSG:26910") print(f" Loaded {len(nhoods_df)} neighborhoods") # Register as DuckDB table nhoods_df["geometry"] = nhoods_df["geometry"].apply(lambda g: bytes(g.wkb)) con.sql(""" CREATE OR REPLACE TABLE neighborhoods AS SELECT nhood, ST_GeomFromWKB(geometry)::GEOMETRY AS geometry FROM nhoods_df """) # Register UTM version nhoods_utm["geometry"] = nhoods_utm["geometry"].apply(lambda g: bytes(g.wkb)) con.sql(""" CREATE OR REPLACE TABLE neighborhoods_utm AS SELECT nhood, ST_GeomFromWKB(geometry)::GEOMETRY AS geometry FROM nhoods_utm """) SOCRATA_BASE = "https://data.sfgov.org/resource/m8hk-2ipk.csv" MONTHS = [ ("Jan2024", "2024-01-01T00:00:00", "2024-01-31T23:59:59"), ("Feb2024", "2024-02-01T00:00:00", "2024-02-29T23:59:59"), ("Mar2024", "2024-03-01T00:00:00", "2024-03-31T23:59:59"), ("Apr2024", "2024-04-01T00:00:00", "2024-04-30T23:59:59"), ("May2024", "2024-05-01T00:00:00", "2024-05-31T23:59:59"), ("Jun2024", "2024-06-01T00:00:00", "2024-06-30T23:59:59"), ("Jul2024", "2024-07-01T00:00:00", "2024-07-31T23:59:59"), ("Aug2024", "2024-08-01T00:00:00", "2024-08-31T23:59:59"), ("Sep2024", "2024-09-01T00:00:00", "2024-09-30T23:59:59"), ("Oct2024", "2024-10-01T00:00:00", "2024-10-31T23:59:59"), ("Nov2024", "2024-11-01T00:00:00", "2024-11-30T23:59:59"), ("Dec2024", "2024-12-01T00:00:00", "2024-12-31T23:59:59"), ] LIMIT = 1000 print("[2/6] Downloading SF taxi trips ...") if not os.path.exists("raw_trips.csv"): all_trips = [] for month_label, start, end in MONTHS: OFFSET = 0 while True: params = { "$where": f"start_time_local between '{start}' and '{end}'", "$limit": LIMIT, "$offset": OFFSET, "$order": "start_time_local" } response = requests.get(SOCRATA_BASE, params=params, timeout=30) df = pd.read_csv(io.StringIO(response.text)) df["month"] = month_label all_trips.append(df) if len(df) < LIMIT: break OFFSET += LIMIT print(f" {month_label}: {len(df)} rows") trips_df = pd.concat(all_trips, ignore_index=True) trips_df.to_csv("raw_trips.csv", index=False) else: trips_df = pd.read_csv("raw_trips.csv") # Drop rows with missing or zero coordinates trips_df = trips_df.dropna( subset=[ "pickup_location_latitude", "pickup_location_longitude", "dropoff_location_latitude", "dropoff_location_longitude", ] ) trips_df = trips_df[ (trips_df["pickup_location_latitude"] != 0) & (trips_df["pickup_location_longitude"] != 0) & (trips_df["dropoff_location_latitude"] != 0) & (trips_df["dropoff_location_longitude"] != 0) ] # Normalise hail_type to two categories def normalise_hail_type(hail_type): if hail_type in ["street","dispatch"]: return "Street" else: return "App" trips_df["hail_type"] = trips_df["hail_type"].apply(normalise_hail_type) bad_flags = ['DR', 'FTR', 'ST', 'ET'] trips_df = trips_df[ ~trips_df['qa_flags'].fillna('').apply( lambda flags: any(f in flags.split('-') for f in bad_flags) ) ] con.sql("CREATE OR REPLACE TABLE raw_trips AS SELECT * FROM trips_df") print("[3/6] Spatial join: pickup points to neighborhoods ...") con.sql(""" CREATE OR REPLACE TABLE trip_counts_pu AS SELECT t.hail_type, t.month, n.nhood, COUNT(*) AS trips_pu FROM raw_trips AS t JOIN neighborhoods AS n ON ST_Intersects( n.geometry, ST_Point(t.pickup_location_longitude, t.pickup_location_latitude)::GEOMETRY ) GROUP BY t.hail_type, t.month, n.nhood """) print("[3/6] Spatial join: dropoff points to neighborhoods ...") con.sql(""" CREATE OR REPLACE TABLE trip_counts_do AS SELECT t.hail_type, t.month, n.nhood, COUNT(*) AS trips_do FROM raw_trips AS t JOIN neighborhoods AS n ON ST_Intersects( n.geometry, ST_Point(t.dropoff_location_longitude, t.dropoff_location_latitude)::GEOMETRY ) GROUP BY t.hail_type, t.month, n.nhood """) top5 = con.sql(""" SELECT nhood, SUM(trips_pu) AS total FROM trip_counts_pu GROUP BY nhood ORDER BY total DESC LIMIT 5 """).df() print(" Top 5 pickup neighborhoods:") print(top5.to_string(index=False)) print("[4/6] Computing neighborhood demographics ...") # ACS 5-Year 2022, block groups in SF County (state=06, county=075) response = requests.get( "https://api.census.gov/data/2022/acs/acs5", params={ "get": "B02001_001E,B02001_002E,B02001_003E,B02001_005E", "ucgid": "pseudo(0500000US06075$1500000)" }, timeout=30 ) data = response.json() census_df = pd.DataFrame(data[1:], columns=data[0]) # Convert to numeric for col in ["B02001_001E", "B02001_002E", "B02001_003E", "B02001_005E"]: census_df[col] = pd.to_numeric(census_df[col], errors="coerce") census_df = census_df.rename(columns={ "B02001_001E": "total_pop", "B02001_002E": "white_pop", "B02001_003E": "black_pop", "B02001_005E": "asian_pop", }) census_df["GEOID"] = census_df["ucgid"].str[-12:] BG_URL = "https://www2.census.gov/geo/tiger/TIGER2022/BG/tl_2022_06_bg.zip" bg_gdf = gpd.read_file(BG_URL) bg_gdf = bg_gdf[bg_gdf["COUNTYFP"] == "075"] # SF county only bg_gdf = bg_gdf[["GEOID", "geometry"]].copy() bg_gdf = bg_gdf.to_crs("EPSG:4326") # Merge census data with geometries census_gdf = bg_gdf.merge( census_df[["GEOID", "total_pop", "white_pop", "black_pop", "asian_pop"]], on="GEOID", how="inner" ) census_db = pd.DataFrame(census_gdf) census_db["geometry"] = census_gdf["geometry"].apply(lambda g: bytes(g.wkb)) con.register("census_raw", census_db) con.sql(""" CREATE OR REPLACE TABLE census_blocks AS SELECT GEOID, total_pop, white_pop, black_pop, asian_pop, ST_GeomFromWKB(geometry)::GEOMETRY AS geometry FROM census_raw """) con.sql(""" CREATE OR REPLACE TABLE nhood_demographics AS SELECT n.nhood, SUM(cb.total_pop) AS total_pop, SUM(cb.white_pop) AS white_pop, SUM(cb.black_pop) AS black_pop, SUM(cb.asian_pop) AS asian_pop, CASE WHEN SUM(cb.total_pop) > 0 THEN 100.0 * SUM(cb.white_pop) / SUM(cb.total_pop) ELSE 0 END AS white_pct, CASE WHEN SUM(cb.total_pop) > 0 THEN 100.0 * SUM(cb.black_pop) / SUM(cb.total_pop) ELSE 0 END AS black_pct, CASE WHEN SUM(cb.total_pop) > 0 THEN 100.0 * SUM(cb.asian_pop) / SUM(cb.total_pop) ELSE 0 END AS asian_pct FROM census_blocks AS cb JOIN neighborhoods AS n ON ST_Intersects(n.geometry, cb.geometry) GROUP BY n.nhood """) con.sql("SELECT * FROM nhood_demographics ORDER BY total_pop DESC LIMIT 10").df() print("[5/6] Computing city-wide baselines ...") baseline_df = con.sql(""" SELECT ROUND(100.0 * SUM(white_pop) / SUM(total_pop), 2) AS baseline_white_pct, ROUND(100.0 * SUM(black_pop) / SUM(total_pop), 2) AS baseline_black_pct, ROUND(100.0 * SUM(asian_pop) / SUM(total_pop), 2) AS baseline_asian_pct FROM nhood_demographics WHERE total_pop > 0 """).df() con.sql("CREATE OR REPLACE TABLE city_baselines AS SELECT * FROM baseline_df") print(f" Baselines: {baseline_df.to_dict('records')[0]}") bw = float(baseline_df["baseline_white_pct"].iloc[0]) bb = float(baseline_df["baseline_black_pct"].iloc[0]) ba = float(baseline_df["baseline_asian_pct"].iloc[0]) print("[6/6] Computing representative ratios ...") rr_pu_df = con.sql(f""" SELECT tp.hail_type, tp.month, SUM(tp.trips_pu * nd.white_pct) * 1.0 / SUM(tp.trips_pu) / {bw} AS RR_white_PU, SUM(tp.trips_pu * nd.asian_pct) * 1.0 / SUM(tp.trips_pu) / {ba} AS RR_asian_PU FROM trip_counts_pu AS tp JOIN nhood_demographics AS nd ON tp.nhood = nd.nhood WHERE nd.total_pop > 0 GROUP BY tp.hail_type, tp.month """).df() rr_do_df = con.sql(f""" SELECT td.hail_type, td.month, SUM(td.trips_do * nd.white_pct) * 1.0 / SUM(td.trips_do) / {bw} AS RR_white_DO, SUM(td.trips_do * nd.asian_pct) * 1.0 / SUM(td.trips_do) / {ba} AS RR_asian_DO FROM trip_counts_do AS td JOIN nhood_demographics AS nd ON td.nhood = nd.nhood WHERE nd.total_pop > 0 GROUP BY td.hail_type, td.month """).df() rr_combined = pd.merge( rr_pu_df, rr_do_df, on=["hail_type", "month"], how="outer" ) con.sql("CREATE OR REPLACE TABLE representative_ratios AS SELECT * FROM rr_combined") print("\nPipeline complete. Database: sf_dashboard.db") print("Representative ratios:") print(rr_combined.to_string(index=False)) con.close()