import os import io import zipfile import pathlib import requests import numpy as np import pandas as pd import geopandas as gpd import duckdb # Keys and paths CENSUS_API_KEY = os.getenv("CENSUS_API_KEY", "5f2c79bff648d3c5220c0f519359df561ac2eec6") TES_PATH = pathlib.Path("data/raw/az_tes.shp") DB_PATH = "processed_dashboard.db" # Step 1: Import block group boundaries print("Step 1: Downloading Phoenix block-group boundaries...") tiger_dir = pathlib.Path("data/raw/tiger_maricopa") tiger_dir.mkdir(parents=True, exist_ok=True) tiger_url = "https://www2.census.gov/geo/tiger/TIGER2022/BG/tl_2022_04_bg.zip" print(" Downloading Arizona block groups from Census TIGER...") resp = requests.get(tiger_url, timeout=120) resp.raise_for_status() z = zipfile.ZipFile(io.BytesIO(resp.content)) z.extractall(tiger_dir) shp_files = list(tiger_dir.glob("*.shp")) print(f" Found shapefile: {shp_files[0].name}") gdf = gpd.read_file(shp_files[0]) gdf = gdf[gdf["COUNTYFP"] == "013"].copy() gdf = gdf.reset_index(drop=True) gdf["GEOID"] = gdf["STATEFP"] + gdf["COUNTYFP"] + gdf["TRACTCE"] + gdf["BLKGRPCE"] gdf = gdf.rename(columns={"NAMELSAD": "NAME"}) gdf = gdf[["GEOID", "NAME", "geometry"]].copy() print(f" Got {len(gdf)} Maricopa County block groups") print(f" CRS: {gdf.crs}") print(f" Bounds: {gdf.total_bounds}") print(f" Sample GEOIDs: {gdf['GEOID'].head(3).tolist()}") # Step 2: Import and join ACS demographic data print("Step 2 — Fetching ACS 5-year demographics...") vars_str = "B19013_001E,B03002_001E,B03002_003E,B03002_012E,B03002_004E" acs_url = ( f"https://api.census.gov/data/2022/acs/acs5" f"?get=NAME,{vars_str}" f"&for=block+group:*&in=state:04+county:013" f"&key={CENSUS_API_KEY}" ) resp = requests.get(acs_url, timeout=60) resp.raise_for_status() raw = resp.json() demo = pd.DataFrame(raw[1:], columns=raw[0]) demo["GEOID"] = demo["state"] + demo["county"] + demo["tract"] + demo["block group"] print(f" ACS sample GEOIDs: {demo['GEOID'].head(3).tolist()}") print(f" TIGER sample GEOIDs: {gdf['GEOID'].head(3).tolist()}") for col in ["B19013_001E","B03002_001E","B03002_003E","B03002_012E","B03002_004E"]: demo[col] = pd.to_numeric(demo[col], errors="coerce") demo["B19013_001E"] = demo["B19013_001E"].where(demo["B19013_001E"] > 0) demo = demo.rename(columns={ "B19013_001E": "median_income", "B03002_001E": "total_pop", "B03002_003E": "pop_white", "B03002_012E": "pop_hispanic", "B03002_004E": "pop_black", }) total_pop = np.where(demo["total_pop"] > 0, demo["total_pop"], np.nan) demo["pct_white"] = demo["pop_white"] / total_pop * 100 demo["pct_hispanic"] = demo["pop_hispanic"] / total_pop * 100 demo["pct_black"] = demo["pop_black"] / total_pop * 100 demo["pct_minority"] = 100 - demo["pct_white"] gdf = gdf.merge( demo[["GEOID","median_income","total_pop","pct_white","pct_hispanic","pct_black","pct_minority"]], on="GEOID", how="inner" ) print(f" Merged onto {len(gdf)} block groups — {gdf['median_income'].notna().sum()} matched income values") # Step 3: Import and join NDVI from Google Earth Engine print("Step 3 — Computing NDVI via Google Earth Engine...") try: import ee ee.Initialize(project="gis322final") valid_gdf = gdf[gdf.geometry.notnull() & gdf.is_valid].copy() bounds = valid_gdf.total_bounds print(f" Bounding box: {bounds}") aoi = ee.Geometry.Rectangle([ float(bounds[0]), float(bounds[1]), float(bounds[2]), float(bounds[3]) ]) s2 = ( ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED") .filterBounds(aoi) .filterDate("2023-01-01", "2023-12-31") .filter(ee.Filter.lt("CLOUDY_PIXEL_PERCENTAGE", 1)) .mosaic() .clip(aoi) ) ndvi_composite = s2.normalizedDifference(["B8", "B4"]).rename("NDVI") all_results = [] batch_size = 750 rows = list(valid_gdf.iterrows()) for i in range(0, len(rows), batch_size): batch = rows[i:i + batch_size] print(f" Processing batch {i//batch_size + 1} of {len(rows)//batch_size + 1}...") features = [ ee.Feature(ee.Geometry(row.geometry.__geo_interface__), {"GEOID": row["GEOID"]}) for _, row in batch ] fc = ee.FeatureCollection(features) sampled = ndvi_composite.reduceRegions( collection=fc, reducer=ee.Reducer.mean(), scale=30 ) results = sampled.getInfo()["features"] all_results.extend(results) ndvi_df = pd.DataFrame([ {"GEOID": f["properties"]["GEOID"], "ndvi_mean": f["properties"].get("mean", np.nan)} for f in all_results ]) print(f" NDVI computed for {len(ndvi_df)} block groups") except Exception as e: print(f" ⚠ GEE unavailable ({e})") print(" ℹ Using placeholder NDVI values") ndvi_df = pd.DataFrame({ "GEOID": gdf["GEOID"].values, "ndvi_mean": np.random.uniform(0.05, 0.45, len(gdf)) }) gdf = gdf.merge(ndvi_df, on="GEOID", how="left") # Step 4: Import and join Tree Equity Scores print("Step 4 — Loading Tree Equity Scores...") if TES_PATH.exists(): tes = gpd.read_file(TES_PATH, engine="pyogrio") tes_df = tes[["GEOID","tes"]].copy().rename(columns={"tes": "tree_equity_score"}) tes_df["TRACT_GEOID"] = tes_df["GEOID"].str[:11] gdf["TRACT_GEOID"] = gdf["GEOID"].str[:11] gdf = gdf.merge(tes_df[["TRACT_GEOID","tree_equity_score"]], on="TRACT_GEOID", how="left") gdf = gdf.drop(columns="TRACT_GEOID") gdf = gdf.drop_duplicates(subset="GEOID", keep="first") print(f" Joined TES for {gdf['tree_equity_score'].notna().sum()} block groups") else: print(" ⚠ File not found — using NaN placeholders") gdf["tree_equity_score"] = np.nan # Step 5: Save to DuckDB print("Step 5 — Saving to DuckDB...") gdf_out = gdf.copy() gdf_out["geometry_wkt"] = gdf_out["geometry"].to_wkt() df_out = pd.DataFrame(gdf_out.drop(columns="geometry")) print(f" Columns: {df_out.columns.tolist()}") print(f" Income non-null: {df_out['median_income'].notna().sum()}") print(f" NDVI non-null: {df_out['ndvi_mean'].notna().sum()}") print(f" TES non-null: {df_out['tree_equity_score'].notna().sum()}") con = duckdb.connect(DB_PATH) con.execute("DROP TABLE IF EXISTS block_groups") con.execute("CREATE TABLE block_groups AS SELECT * FROM df_out") con.execute("DROP TABLE IF EXISTS city_baselines") con.execute(""" CREATE TABLE city_baselines AS SELECT ROUND(AVG(pct_minority), 1) AS baseline_minority_pct, ROUND(AVG(ndvi_mean), 4) AS baseline_ndvi, ROUND(AVG(median_income),0) AS baseline_income FROM block_groups WHERE total_pop > 0 """) print(f"\n✅ Done! Saved {len(df_out)} block groups -> {DB_PATH}") print(" Now run: solara run app.py")