GIS322FinalProject / data_pipeline.py
cmande62's picture
Initial dashboard deployment
c47a6e2
import os
import io
import zipfile
import pathlib
import requests
import numpy as np
import pandas as pd
import geopandas as gpd
import duckdb
# Keys and paths
CENSUS_API_KEY = os.getenv("CENSUS_API_KEY", "5f2c79bff648d3c5220c0f519359df561ac2eec6")
TES_PATH = pathlib.Path("data/raw/az_tes.shp")
DB_PATH = "processed_dashboard.db"
# Step 1: Import block group boundaries
print("Step 1: Downloading Phoenix block-group boundaries...")
tiger_dir = pathlib.Path("data/raw/tiger_maricopa")
tiger_dir.mkdir(parents=True, exist_ok=True)
tiger_url = "https://www2.census.gov/geo/tiger/TIGER2022/BG/tl_2022_04_bg.zip"
print(" Downloading Arizona block groups from Census TIGER...")
resp = requests.get(tiger_url, timeout=120)
resp.raise_for_status()
z = zipfile.ZipFile(io.BytesIO(resp.content))
z.extractall(tiger_dir)
shp_files = list(tiger_dir.glob("*.shp"))
print(f" Found shapefile: {shp_files[0].name}")
gdf = gpd.read_file(shp_files[0])
gdf = gdf[gdf["COUNTYFP"] == "013"].copy()
gdf = gdf.reset_index(drop=True)
gdf["GEOID"] = gdf["STATEFP"] + gdf["COUNTYFP"] + gdf["TRACTCE"] + gdf["BLKGRPCE"]
gdf = gdf.rename(columns={"NAMELSAD": "NAME"})
gdf = gdf[["GEOID", "NAME", "geometry"]].copy()
print(f" Got {len(gdf)} Maricopa County block groups")
print(f" CRS: {gdf.crs}")
print(f" Bounds: {gdf.total_bounds}")
print(f" Sample GEOIDs: {gdf['GEOID'].head(3).tolist()}")
# Step 2: Import and join ACS demographic data
print("Step 2 β€” Fetching ACS 5-year demographics...")
vars_str = "B19013_001E,B03002_001E,B03002_003E,B03002_012E,B03002_004E"
acs_url = (
f"https://api.census.gov/data/2022/acs/acs5"
f"?get=NAME,{vars_str}"
f"&for=block+group:*&in=state:04+county:013"
f"&key={CENSUS_API_KEY}"
)
resp = requests.get(acs_url, timeout=60)
resp.raise_for_status()
raw = resp.json()
demo = pd.DataFrame(raw[1:], columns=raw[0])
demo["GEOID"] = demo["state"] + demo["county"] + demo["tract"] + demo["block group"]
print(f" ACS sample GEOIDs: {demo['GEOID'].head(3).tolist()}")
print(f" TIGER sample GEOIDs: {gdf['GEOID'].head(3).tolist()}")
for col in ["B19013_001E","B03002_001E","B03002_003E","B03002_012E","B03002_004E"]:
demo[col] = pd.to_numeric(demo[col], errors="coerce")
demo["B19013_001E"] = demo["B19013_001E"].where(demo["B19013_001E"] > 0)
demo = demo.rename(columns={
"B19013_001E": "median_income",
"B03002_001E": "total_pop",
"B03002_003E": "pop_white",
"B03002_012E": "pop_hispanic",
"B03002_004E": "pop_black",
})
total_pop = np.where(demo["total_pop"] > 0, demo["total_pop"], np.nan)
demo["pct_white"] = demo["pop_white"] / total_pop * 100
demo["pct_hispanic"] = demo["pop_hispanic"] / total_pop * 100
demo["pct_black"] = demo["pop_black"] / total_pop * 100
demo["pct_minority"] = 100 - demo["pct_white"]
gdf = gdf.merge(
demo[["GEOID","median_income","total_pop","pct_white","pct_hispanic","pct_black","pct_minority"]],
on="GEOID", how="inner"
)
print(f" Merged onto {len(gdf)} block groups β€” {gdf['median_income'].notna().sum()} matched income values")
# Step 3: Import and join NDVI from Google Earth Engine
print("Step 3 β€” Computing NDVI via Google Earth Engine...")
try:
import ee
ee.Initialize(project="gis322final")
valid_gdf = gdf[gdf.geometry.notnull() & gdf.is_valid].copy()
bounds = valid_gdf.total_bounds
print(f" Bounding box: {bounds}")
aoi = ee.Geometry.Rectangle([
float(bounds[0]), float(bounds[1]),
float(bounds[2]), float(bounds[3])
])
s2 = (
ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED")
.filterBounds(aoi)
.filterDate("2023-01-01", "2023-12-31")
.filter(ee.Filter.lt("CLOUDY_PIXEL_PERCENTAGE", 1))
.mosaic()
.clip(aoi)
)
ndvi_composite = s2.normalizedDifference(["B8", "B4"]).rename("NDVI")
all_results = []
batch_size = 750
rows = list(valid_gdf.iterrows())
for i in range(0, len(rows), batch_size):
batch = rows[i:i + batch_size]
print(f" Processing batch {i//batch_size + 1} of {len(rows)//batch_size + 1}...")
features = [
ee.Feature(ee.Geometry(row.geometry.__geo_interface__), {"GEOID": row["GEOID"]})
for _, row in batch
]
fc = ee.FeatureCollection(features)
sampled = ndvi_composite.reduceRegions(
collection=fc, reducer=ee.Reducer.mean(), scale=30
)
results = sampled.getInfo()["features"]
all_results.extend(results)
ndvi_df = pd.DataFrame([
{"GEOID": f["properties"]["GEOID"],
"ndvi_mean": f["properties"].get("mean", np.nan)}
for f in all_results
])
print(f" NDVI computed for {len(ndvi_df)} block groups")
except Exception as e:
print(f" ⚠ GEE unavailable ({e})")
print(" β„Ή Using placeholder NDVI values")
ndvi_df = pd.DataFrame({
"GEOID": gdf["GEOID"].values,
"ndvi_mean": np.random.uniform(0.05, 0.45, len(gdf))
})
gdf = gdf.merge(ndvi_df, on="GEOID", how="left")
# Step 4: Import and join Tree Equity Scores
print("Step 4 β€” Loading Tree Equity Scores...")
if TES_PATH.exists():
tes = gpd.read_file(TES_PATH, engine="pyogrio")
tes_df = tes[["GEOID","tes"]].copy().rename(columns={"tes": "tree_equity_score"})
tes_df["TRACT_GEOID"] = tes_df["GEOID"].str[:11]
gdf["TRACT_GEOID"] = gdf["GEOID"].str[:11]
gdf = gdf.merge(tes_df[["TRACT_GEOID","tree_equity_score"]], on="TRACT_GEOID", how="left")
gdf = gdf.drop(columns="TRACT_GEOID")
gdf = gdf.drop_duplicates(subset="GEOID", keep="first")
print(f" Joined TES for {gdf['tree_equity_score'].notna().sum()} block groups")
else:
print(" ⚠ File not found β€” using NaN placeholders")
gdf["tree_equity_score"] = np.nan
# Step 5: Save to DuckDB
print("Step 5 β€” Saving to DuckDB...")
gdf_out = gdf.copy()
gdf_out["geometry_wkt"] = gdf_out["geometry"].to_wkt()
df_out = pd.DataFrame(gdf_out.drop(columns="geometry"))
print(f" Columns: {df_out.columns.tolist()}")
print(f" Income non-null: {df_out['median_income'].notna().sum()}")
print(f" NDVI non-null: {df_out['ndvi_mean'].notna().sum()}")
print(f" TES non-null: {df_out['tree_equity_score'].notna().sum()}")
con = duckdb.connect(DB_PATH)
con.execute("DROP TABLE IF EXISTS block_groups")
con.execute("CREATE TABLE block_groups AS SELECT * FROM df_out")
con.execute("DROP TABLE IF EXISTS city_baselines")
con.execute("""
CREATE TABLE city_baselines AS
SELECT
ROUND(AVG(pct_minority), 1) AS baseline_minority_pct,
ROUND(AVG(ndvi_mean), 4) AS baseline_ndvi,
ROUND(AVG(median_income),0) AS baseline_income
FROM block_groups
WHERE total_pop > 0
""")
print(f"\nβœ… Done! Saved {len(df_out)} block groups -> {DB_PATH}")
print(" Now run: solara run app.py")