Debayan Mandal
Initial Dashboard Upload
acda8b7
import io
import os
import pathlib
import duckdb
import pandas as pd
import geopandas as gpd
import requests
con = duckdb.connect("sf_dashboard.db")
con.install_extension("httpfs")
con.load_extension("httpfs")
con.install_extension("spatial")
con.load_extension("spatial")
NHOOD_URL = "https://data.sfgov.org/resource/j2bu-swwd.geojson"
print("[1/6] Loading SF Analysis Neighborhoods ...")
nhoods = gpd.read_file(NHOOD_URL)
nhoods_df = nhoods[["nhood", "geometry"]].copy()
nhoods_df = nhoods_df.to_crs("EPSG:4326")
nhoods_utm = nhoods_df.to_crs("EPSG:26910")
print(f" Loaded {len(nhoods_df)} neighborhoods")
# Register as DuckDB table
nhoods_df["geometry"] = nhoods_df["geometry"].apply(lambda g: bytes(g.wkb))
con.sql("""
CREATE OR REPLACE TABLE neighborhoods AS
SELECT nhood, ST_GeomFromWKB(geometry)::GEOMETRY AS geometry
FROM nhoods_df
""")
# Register UTM version
nhoods_utm["geometry"] = nhoods_utm["geometry"].apply(lambda g: bytes(g.wkb))
con.sql("""
CREATE OR REPLACE TABLE neighborhoods_utm AS
SELECT nhood, ST_GeomFromWKB(geometry)::GEOMETRY AS geometry
FROM nhoods_utm
""")
SOCRATA_BASE = "https://data.sfgov.org/resource/m8hk-2ipk.csv"
MONTHS = [
("Jan2024", "2024-01-01T00:00:00", "2024-01-31T23:59:59"),
("Feb2024", "2024-02-01T00:00:00", "2024-02-29T23:59:59"),
("Mar2024", "2024-03-01T00:00:00", "2024-03-31T23:59:59"),
("Apr2024", "2024-04-01T00:00:00", "2024-04-30T23:59:59"),
("May2024", "2024-05-01T00:00:00", "2024-05-31T23:59:59"),
("Jun2024", "2024-06-01T00:00:00", "2024-06-30T23:59:59"),
("Jul2024", "2024-07-01T00:00:00", "2024-07-31T23:59:59"),
("Aug2024", "2024-08-01T00:00:00", "2024-08-31T23:59:59"),
("Sep2024", "2024-09-01T00:00:00", "2024-09-30T23:59:59"),
("Oct2024", "2024-10-01T00:00:00", "2024-10-31T23:59:59"),
("Nov2024", "2024-11-01T00:00:00", "2024-11-30T23:59:59"),
("Dec2024", "2024-12-01T00:00:00", "2024-12-31T23:59:59"),
]
LIMIT = 1000
print("[2/6] Downloading SF taxi trips ...")
if not os.path.exists("raw_trips.csv"):
all_trips = []
for month_label, start, end in MONTHS:
OFFSET = 0
while True:
params = {
"$where": f"start_time_local between '{start}' and '{end}'",
"$limit": LIMIT,
"$offset": OFFSET,
"$order": "start_time_local"
}
response = requests.get(SOCRATA_BASE, params=params, timeout=30)
df = pd.read_csv(io.StringIO(response.text))
df["month"] = month_label
all_trips.append(df)
if len(df) < LIMIT:
break
OFFSET += LIMIT
print(f" {month_label}: {len(df)} rows")
trips_df = pd.concat(all_trips, ignore_index=True)
trips_df.to_csv("raw_trips.csv", index=False)
else:
trips_df = pd.read_csv("raw_trips.csv")
# Drop rows with missing or zero coordinates
trips_df = trips_df.dropna(
subset=[
"pickup_location_latitude", "pickup_location_longitude",
"dropoff_location_latitude", "dropoff_location_longitude",
]
)
trips_df = trips_df[
(trips_df["pickup_location_latitude"] != 0)
& (trips_df["pickup_location_longitude"] != 0)
& (trips_df["dropoff_location_latitude"] != 0)
& (trips_df["dropoff_location_longitude"] != 0)
]
# Normalise hail_type to two categories
def normalise_hail_type(hail_type):
if hail_type in ["street","dispatch"]:
return "Street"
else:
return "App"
trips_df["hail_type"] = trips_df["hail_type"].apply(normalise_hail_type)
bad_flags = ['DR', 'FTR', 'ST', 'ET']
trips_df = trips_df[
~trips_df['qa_flags'].fillna('').apply(
lambda flags: any(f in flags.split('-') for f in bad_flags)
)
]
con.sql("CREATE OR REPLACE TABLE raw_trips AS SELECT * FROM trips_df")
print("[3/6] Spatial join: pickup points to neighborhoods ...")
con.sql("""
CREATE OR REPLACE TABLE trip_counts_pu AS
SELECT
t.hail_type,
t.month,
n.nhood,
COUNT(*) AS trips_pu
FROM raw_trips AS t
JOIN neighborhoods AS n
ON ST_Intersects(
n.geometry,
ST_Point(t.pickup_location_longitude, t.pickup_location_latitude)::GEOMETRY
)
GROUP BY t.hail_type, t.month, n.nhood
""")
print("[3/6] Spatial join: dropoff points to neighborhoods ...")
con.sql("""
CREATE OR REPLACE TABLE trip_counts_do AS
SELECT
t.hail_type,
t.month,
n.nhood,
COUNT(*) AS trips_do
FROM raw_trips AS t
JOIN neighborhoods AS n
ON ST_Intersects(
n.geometry,
ST_Point(t.dropoff_location_longitude, t.dropoff_location_latitude)::GEOMETRY
)
GROUP BY t.hail_type, t.month, n.nhood
""")
top5 = con.sql("""
SELECT nhood, SUM(trips_pu) AS total
FROM trip_counts_pu GROUP BY nhood ORDER BY total DESC LIMIT 5
""").df()
print(" Top 5 pickup neighborhoods:")
print(top5.to_string(index=False))
print("[4/6] Computing neighborhood demographics ...")
# ACS 5-Year 2022, block groups in SF County (state=06, county=075)
response = requests.get(
"https://api.census.gov/data/2022/acs/acs5",
params={
"get": "B02001_001E,B02001_002E,B02001_003E,B02001_005E",
"ucgid": "pseudo(0500000US06075$1500000)"
},
timeout=30
)
data = response.json()
census_df = pd.DataFrame(data[1:], columns=data[0])
# Convert to numeric
for col in ["B02001_001E", "B02001_002E", "B02001_003E", "B02001_005E"]:
census_df[col] = pd.to_numeric(census_df[col], errors="coerce")
census_df = census_df.rename(columns={
"B02001_001E": "total_pop",
"B02001_002E": "white_pop",
"B02001_003E": "black_pop",
"B02001_005E": "asian_pop",
})
census_df["GEOID"] = census_df["ucgid"].str[-12:]
BG_URL = "https://www2.census.gov/geo/tiger/TIGER2022/BG/tl_2022_06_bg.zip"
bg_gdf = gpd.read_file(BG_URL)
bg_gdf = bg_gdf[bg_gdf["COUNTYFP"] == "075"] # SF county only
bg_gdf = bg_gdf[["GEOID", "geometry"]].copy()
bg_gdf = bg_gdf.to_crs("EPSG:4326")
# Merge census data with geometries
census_gdf = bg_gdf.merge(
census_df[["GEOID", "total_pop", "white_pop", "black_pop", "asian_pop"]],
on="GEOID",
how="inner"
)
census_db = pd.DataFrame(census_gdf)
census_db["geometry"] = census_gdf["geometry"].apply(lambda g: bytes(g.wkb))
con.register("census_raw", census_db)
con.sql("""
CREATE OR REPLACE TABLE census_blocks AS
SELECT GEOID, total_pop, white_pop, black_pop, asian_pop,
ST_GeomFromWKB(geometry)::GEOMETRY AS geometry
FROM census_raw
""")
con.sql("""
CREATE OR REPLACE TABLE nhood_demographics AS
SELECT
n.nhood,
SUM(cb.total_pop) AS total_pop,
SUM(cb.white_pop) AS white_pop,
SUM(cb.black_pop) AS black_pop,
SUM(cb.asian_pop) AS asian_pop,
CASE WHEN SUM(cb.total_pop) > 0
THEN 100.0 * SUM(cb.white_pop) / SUM(cb.total_pop)
ELSE 0 END AS white_pct,
CASE WHEN SUM(cb.total_pop) > 0
THEN 100.0 * SUM(cb.black_pop) / SUM(cb.total_pop)
ELSE 0 END AS black_pct,
CASE WHEN SUM(cb.total_pop) > 0
THEN 100.0 * SUM(cb.asian_pop) / SUM(cb.total_pop)
ELSE 0 END AS asian_pct
FROM census_blocks AS cb
JOIN neighborhoods AS n
ON ST_Intersects(n.geometry, cb.geometry)
GROUP BY n.nhood
""")
con.sql("SELECT * FROM nhood_demographics ORDER BY total_pop DESC LIMIT 10").df()
print("[5/6] Computing city-wide baselines ...")
baseline_df = con.sql("""
SELECT
ROUND(100.0 * SUM(white_pop) / SUM(total_pop), 2) AS baseline_white_pct,
ROUND(100.0 * SUM(black_pop) / SUM(total_pop), 2) AS baseline_black_pct,
ROUND(100.0 * SUM(asian_pop) / SUM(total_pop), 2) AS baseline_asian_pct
FROM nhood_demographics
WHERE total_pop > 0
""").df()
con.sql("CREATE OR REPLACE TABLE city_baselines AS SELECT * FROM baseline_df")
print(f" Baselines: {baseline_df.to_dict('records')[0]}")
bw = float(baseline_df["baseline_white_pct"].iloc[0])
bb = float(baseline_df["baseline_black_pct"].iloc[0])
ba = float(baseline_df["baseline_asian_pct"].iloc[0])
print("[6/6] Computing representative ratios ...")
rr_pu_df = con.sql(f"""
SELECT tp.hail_type, tp.month,
SUM(tp.trips_pu * nd.white_pct) * 1.0
/ SUM(tp.trips_pu) / {bw} AS RR_white_PU,
SUM(tp.trips_pu * nd.asian_pct) * 1.0
/ SUM(tp.trips_pu) / {ba} AS RR_asian_PU
FROM trip_counts_pu AS tp
JOIN nhood_demographics AS nd ON tp.nhood = nd.nhood
WHERE nd.total_pop > 0
GROUP BY tp.hail_type, tp.month
""").df()
rr_do_df = con.sql(f"""
SELECT td.hail_type, td.month,
SUM(td.trips_do * nd.white_pct) * 1.0
/ SUM(td.trips_do) / {bw} AS RR_white_DO,
SUM(td.trips_do * nd.asian_pct) * 1.0
/ SUM(td.trips_do) / {ba} AS RR_asian_DO
FROM trip_counts_do AS td
JOIN nhood_demographics AS nd ON td.nhood = nd.nhood
WHERE nd.total_pop > 0
GROUP BY td.hail_type, td.month
""").df()
rr_combined = pd.merge(
rr_pu_df, rr_do_df, on=["hail_type", "month"], how="outer"
)
con.sql("CREATE OR REPLACE TABLE representative_ratios AS SELECT * FROM rr_combined")
print("\nPipeline complete. Database: sf_dashboard.db")
print("Representative ratios:")
print(rr_combined.to_string(index=False))
con.close()