File size: 5,680 Bytes
48be8c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
"""Concurrency probe: simulate N users hitting the Cornerstone in parallel.

Compares three patterns under thread contention:
  1. baseline (gpd.sjoin) — current production
  2. raster-shared    — single rasterio.DatasetReader shared across threads
                         (UNSAFE; included as a control to show why it's wrong)
  3. raster-tlocal    — threading.local() DatasetReader per worker thread
                         (RECOMMENDED pattern)

Run: uv run python experiments/22_cornerstone_optim/bench_concurrent.py
"""
from __future__ import annotations

import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

import geopandas as gpd
import rasterio
from shapely.geometry import Point

REPO = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(REPO))

from app.flood_layers import dep_stormwater, sandy_inundation  # noqa: E402

NYC_CRS = "EPSG:2263"
N_CONCURRENT = 8
N_QUERIES_PER_THREAD = 5

ADDRESSES = [
    (40.6790, -74.0050),
    (40.5867, -73.8062),
    (40.5772, -73.9870),
    (40.6033, -73.7626),
]

BAKED = REPO / "experiments" / "22_cornerstone_optim" / "baked"
RASTER_PATHS = {
    "dep_extreme_2080":     BAKED / "dep_extreme_2080.tif",
    "dep_moderate_2050":    BAKED / "dep_moderate_2050.tif",
    "dep_moderate_current": BAKED / "dep_moderate_current.tif",
    "sandy":                BAKED / "sandy.tif",
}


def to_2263_point(lat, lon):
    return gpd.GeoDataFrame(geometry=[Point(lon, lat)], crs="EPSG:4326").to_crs(NYC_CRS)


# --- pattern A: baseline sjoin ----------------------------------------------

def worker_baseline(thread_id):
    times = []
    for i in range(N_QUERIES_PER_THREAD):
        lat, lon = ADDRESSES[(thread_id + i) % len(ADDRESSES)]
        pt = to_2263_point(lat, lon)
        t0 = time.perf_counter()
        for s in ["dep_extreme_2080", "dep_moderate_2050", "dep_moderate_current"]:
            dep_stormwater.join(pt, s)
        sandy_inundation.join(pt)
        times.append(time.perf_counter() - t0)
    return times


# --- pattern B: shared DatasetReader (UNSAFE control) -----------------------

class SharedRaster:
    def __init__(self):
        self.handles = {k: rasterio.open(str(p)) for k, p in RASTER_PATHS.items()}

    def sample(self, pt_geom, key):
        ds = self.handles[key]
        return int(next(ds.sample([(pt_geom.x, pt_geom.y)]))[0])


def worker_shared(args):
    shared, thread_id = args
    times = []
    errors = 0
    for i in range(N_QUERIES_PER_THREAD):
        lat, lon = ADDRESSES[(thread_id + i) % len(ADDRESSES)]
        pt = to_2263_point(lat, lon).iloc[0].geometry
        t0 = time.perf_counter()
        try:
            for k in RASTER_PATHS:
                shared.sample(pt, k)
        except Exception:
            errors += 1
        times.append(time.perf_counter() - t0)
    return times, errors


# --- pattern C: thread-local DatasetReader (RECOMMENDED) --------------------

_TL = threading.local()


def _tl_handles():
    h = getattr(_TL, "handles", None)
    if h is None:
        h = {k: rasterio.open(str(p)) for k, p in RASTER_PATHS.items()}
        _TL.handles = h
    return h


def worker_tlocal(thread_id):
    times = []
    for i in range(N_QUERIES_PER_THREAD):
        lat, lon = ADDRESSES[(thread_id + i) % len(ADDRESSES)]
        pt = to_2263_point(lat, lon).iloc[0].geometry
        h = _tl_handles()
        t0 = time.perf_counter()
        for k in RASTER_PATHS:
            ds = h[k]
            int(next(ds.sample([(pt.x, pt.y)]))[0])
        times.append(time.perf_counter() - t0)
    return times


# --- harness ----------------------------------------------------------------

def run_pattern(name, worker, *extra):
    print(f"\n[{name}] N={N_CONCURRENT} threads × {N_QUERIES_PER_THREAD} queries")
    t_wall = time.perf_counter()
    all_times = []
    errors = 0
    with ThreadPoolExecutor(max_workers=N_CONCURRENT) as ex:
        futs = [ex.submit(worker, *(extra + (i,))) for i in range(N_CONCURRENT)]
        for f in as_completed(futs):
            r = f.result()
            if isinstance(r, tuple):
                ts, err = r
                errors += err
                all_times.extend(ts)
            else:
                all_times.extend(r)
    wall = time.perf_counter() - t_wall
    n = len(all_times)
    avg_ms = sum(all_times) / n * 1000
    p95_ms = sorted(all_times)[int(0.95 * n) - 1] * 1000
    print(f"  wall {wall:5.2f}s   per-query avg {avg_ms:6.1f} ms   "
          f"p95 {p95_ms:6.1f} ms   errors={errors}")
    return wall, avg_ms, p95_ms, errors


def main():
    # warm caches first so we measure steady-state, not cold-load
    print("warming baseline caches (first DEP load is ~30s)...")
    pt = to_2263_point(*ADDRESSES[0][:2])
    for s in RASTER_PATHS:
        if s != "sandy":
            dep_stormwater.join(pt, s)
    sandy_inundation.join(pt)
    print("warm.")

    base = run_pattern("baseline (gpd.sjoin)", worker_baseline)

    if not BAKED.exists() or not all(p.exists() for p in RASTER_PATHS.values()):
        print("\nbaked rasters missing — run bake_rasters.py first")
        return

    shared = SharedRaster()
    rb = run_pattern("raster-shared (UNSAFE)", worker_shared, shared)

    rt = run_pattern("raster-tlocal (recommended)", worker_tlocal)

    print("\n" + "=" * 72)
    print(f"{'pattern':<32} {'wall(s)':>10} {'avg(ms)':>10} {'p95(ms)':>10} {'err':>5}")
    print("=" * 72)
    for name, r in [("baseline", base), ("raster-shared", rb), ("raster-tlocal", rt)]:
        print(f"{name:<32} {r[0]:>10.2f} {r[1]:>10.1f} {r[2]:>10.1f} {r[3]:>5}")


if __name__ == "__main__":
    main()