"""Copy-paste augmentation for Prithvi NYC pluvial dataset. Takes the existing Phase 14 v1 dataset (166 Ida positive chips at polygon centroids + 22 clear-sky negatives from Major-TOM) and generates ~600 synthetic positive chips by pasting real Ida flood polygons onto clear-sky NYC backgrounds at random positions. Per Ghiasi et al. (CVPR 2021) "Simple Copy-Paste". Validated to be the highest-ROI augmentation for sparse-positive segmentation across many benchmarks. We use feathered alpha blending on the polygon mask edge to avoid sharp spectral seams. Usage: python3 copy_paste_aug.py \ --src /root/terramind_nyc/prithvi_nyc/data \ --out /root/terramind_nyc/prithvi_nyc_v2/data \ --multiplier 5 \ --paste-min 1 --paste-max 4 """ from __future__ import annotations import argparse import random import sys from pathlib import Path import numpy as np import rasterio from rasterio.transform import Affine from scipy.ndimage import binary_dilation, gaussian_filter def feather_mask(mask: np.ndarray, sigma: float = 2.0) -> np.ndarray: """Soft 0..1 alpha matte from a binary mask, feathered at edges.""" return gaussian_filter(mask.astype(np.float32), sigma=sigma) def find_polygon_bbox(mask: np.ndarray, pad: int = 4) -> tuple | None: """Tight bounding box around the positive pixels, with padding.""" ys, xs = np.where(mask > 0) if len(ys) == 0: return None H, W = mask.shape y0, y1 = max(0, ys.min() - pad), min(H, ys.max() + pad + 1) x0, x1 = max(0, xs.min() - pad), min(W, xs.max() + pad + 1) return y0, y1, x0, x1 def paste(bg_chip: np.ndarray, bg_mask: np.ndarray, fg_chip: np.ndarray, fg_mask: np.ndarray, rng: random.Random) -> tuple[np.ndarray, np.ndarray]: """Paste a polygon crop from fg onto bg at a random position. bg_chip: [C, H, W] background imagery bg_mask: [H, W] background mask (will be OR-merged) fg_chip: [C, H, W] source imagery containing flood polygon fg_mask: [H, W] source mask Returns (out_chip, out_mask) of same shape as bg_*. """ bbox = find_polygon_bbox(fg_mask, pad=4) if bbox is None: return bg_chip.copy(), bg_mask.copy() y0, y1, x0, x1 = bbox crop_chip = fg_chip[:, y0:y1, x0:x1] crop_mask = fg_mask[y0:y1, x0:x1] # Random rotation by k × 90° + flips for spatial diversity k = rng.randint(0, 3) crop_chip = np.rot90(crop_chip, k=k, axes=(1, 2)).copy() crop_mask = np.rot90(crop_mask, k=k).copy() if rng.random() < 0.5: crop_chip = np.flip(crop_chip, axis=2).copy() crop_mask = np.flip(crop_mask, axis=1).copy() if rng.random() < 0.5: crop_chip = np.flip(crop_chip, axis=1).copy() crop_mask = np.flip(crop_mask, axis=0).copy() ch, H, W = bg_chip.shape fh, fw = crop_mask.shape if fh >= H or fw >= W: # Polygon larger than chip — center-crop the polygon to fit sh = max(0, (fh - H + 1) // 2) sw = max(0, (fw - W + 1) // 2) crop_chip = crop_chip[:, sh:sh + min(fh, H), sw:sw + min(fw, W)] crop_mask = crop_mask[sh:sh + min(fh, H), sw:sw + min(fw, W)] fh, fw = crop_mask.shape # Random paste position py = rng.randint(0, H - fh) px = rng.randint(0, W - fw) out_chip = bg_chip.copy().astype(np.float32) out_mask = bg_mask.copy().astype(np.uint8) alpha = feather_mask(crop_mask, sigma=2.0)[None, :, :] # [1, fh, fw] region = out_chip[:, py:py + fh, px:px + fw] region = region * (1 - alpha) + crop_chip.astype(np.float32) * alpha out_chip[:, py:py + fh, px:px + fw] = region out_mask[py:py + fh, px:px + fw] = np.maximum( out_mask[py:py + fh, px:px + fw], crop_mask.astype(np.uint8)) return out_chip, out_mask def read_chip(path: Path) -> tuple[np.ndarray, Affine, str]: with rasterio.open(path) as src: return src.read().astype(np.float32), src.transform, src.crs def read_mask(path: Path) -> tuple[np.ndarray, Affine, str]: with rasterio.open(path) as src: return src.read(1).astype(np.uint8), src.transform, src.crs def write_chip(path: Path, bands: np.ndarray, transform, crs): path.parent.mkdir(parents=True, exist_ok=True) with rasterio.open(path, "w", driver="GTiff", height=bands.shape[1], width=bands.shape[2], count=bands.shape[0], dtype="float32", transform=transform, crs=crs) as dst: dst.write(bands.astype(np.float32)) def write_mask(path: Path, mask: np.ndarray, transform, crs): path.parent.mkdir(parents=True, exist_ok=True) with rasterio.open(path, "w", driver="GTiff", height=mask.shape[0], width=mask.shape[1], count=1, dtype="uint8", transform=transform, crs=crs) as dst: dst.write(mask.astype(np.uint8), 1) def expand_negatives_from_majortom(parent_root: Path, n_per_parent: int, chip_px: int, bands: list[str], out_chip_dir: Path, out_mask_dir: Path, rng: random.Random) -> list[str]: """Slice each Major-TOM parent into n_per_parent random chip windows. Each parent is ~1000x1000 px; we extract `n_per_parent` random non-overlapping 224x224 windows per parent. Yields more clear-sky NYC backgrounds without needing fresh STAC fetches. """ from rasterio.windows import Window new_neg_ids = [] n_neg = 0 cells = [] for cell_dir in sorted(parent_root.iterdir()): if not cell_dir.is_dir(): continue for sub_dir in sorted(cell_dir.iterdir()): if not sub_dir.is_dir(): continue products = sorted(sub_dir.iterdir()) if products: cells.append(products[0]) print(f"[neg-expand] {len(cells)} parents available", flush=True) for parent_dir in cells: for _ in range(n_per_parent): try: stack = [] transform, crs = None, None # Determine random offset based on first band with rasterio.open(parent_dir / f"{bands[0]}.tif") as src: H, W = src.shape if H < chip_px or W < chip_px: break oy = rng.randint(0, H - chip_px) ox = rng.randint(0, W - chip_px) for band in bands: with rasterio.open(parent_dir / f"{band}.tif") as src: win = Window(ox, oy, chip_px, chip_px) data = src.read(1, window=win, boundless=True, fill_value=0, out_shape=(chip_px, chip_px)) if transform is None: transform = src.window_transform(win) crs = src.crs stack.append(data.astype(np.float32)) chip = np.stack(stack) cid = f"nyc_negx_{n_neg:04d}" write_chip(out_chip_dir / f"{cid}.tif", chip, transform, crs) mask = np.zeros((chip_px, chip_px), dtype=np.uint8) write_mask(out_mask_dir / f"{cid}_annotation_flood.tif", mask, transform, crs) new_neg_ids.append(cid) n_neg += 1 except Exception as e: print(f" ! neg expand {parent_dir.name}: {e}", flush=True) print(f"[neg-expand] {n_neg} new negatives", flush=True) return new_neg_ids def main(): ap = argparse.ArgumentParser() ap.add_argument("--src", type=Path, required=True, help="Phase 14 dataset root with S2L2A_tif/ and MASK/") ap.add_argument("--out", type=Path, required=True) ap.add_argument("--multiplier", type=int, default=2, help="how many synthetic positives per original positive") ap.add_argument("--paste-min", type=int, default=1) ap.add_argument("--paste-max", type=int, default=4) ap.add_argument("--major-tom-root", type=Path, default=None, help="Major-TOM Core-S2L2A root for additional negatives") ap.add_argument("--neg-per-parent", type=int, default=12, help="random sub-chips per Major-TOM parent (for negs)") ap.add_argument("--seed", type=int, default=42) args = ap.parse_args() rng = random.Random(args.seed) chip_dir = args.src / "S2L2A_tif" mask_dir = args.src / "MASK" # Discover positives and negatives. pos_chips, pos_masks = [], [] neg_chips = [] for chip_path in sorted(chip_dir.glob("*.tif")): cid = chip_path.stem mp = mask_dir / f"{cid}_annotation_flood.tif" if not mp.exists(): continue if cid.startswith("ida_pos_"): pos_chips.append(chip_path) pos_masks.append(mp) elif cid.startswith("nyc_neg_"): neg_chips.append(chip_path) print(f"[phase19] {len(pos_chips)} positives, {len(neg_chips)} negatives", flush=True) out_chip_dir = args.out / "S2L2A_tif" out_mask_dir = args.out / "MASK" out_chip_dir.mkdir(parents=True, exist_ok=True) out_mask_dir.mkdir(parents=True, exist_ok=True) new_ids = [] # 1. Carry over original positives + negatives. for src_chip, src_mask in zip(pos_chips, pos_masks): cid = src_chip.stem c, t, crs = read_chip(src_chip) m, _, _ = read_mask(src_mask) write_chip(out_chip_dir / f"{cid}.tif", c, t, crs) write_mask(out_mask_dir / f"{cid}_annotation_flood.tif", m, t, crs) new_ids.append(cid) for src_chip in neg_chips: cid = src_chip.stem c, t, crs = read_chip(src_chip) m, _, _ = read_mask(mask_dir / f"{cid}_annotation_flood.tif") write_chip(out_chip_dir / f"{cid}.tif", c, t, crs) write_mask(out_mask_dir / f"{cid}_annotation_flood.tif", m, t, crs) new_ids.append(cid) # 1b. Optionally expand negatives by slicing Major-TOM parents. if args.major_tom_root and args.major_tom_root.exists(): prithvi_bands = ["B02", "B03", "B04", "B8A", "B11", "B12"] new_negs = expand_negatives_from_majortom( args.major_tom_root, args.neg_per_parent, 224, prithvi_bands, out_chip_dir, out_mask_dir, rng) new_ids.extend(new_negs) # 2. Synthesize copy-paste positives. n_synth = 0 target = args.multiplier * len(pos_chips) pos_pool = list(zip(pos_chips, pos_masks)) while n_synth < target: bg_path = rng.choice(neg_chips) bg_c, bg_t, bg_crs = read_chip(bg_path) bg_m, _, _ = read_mask( mask_dir / f"{bg_path.stem}_annotation_flood.tif") n_paste = rng.randint(args.paste_min, args.paste_max) for _ in range(n_paste): fg_chip_p, fg_mask_p = rng.choice(pos_pool) fg_c, _, _ = read_chip(fg_chip_p) fg_m, _, _ = read_mask(fg_mask_p) bg_c, bg_m = paste(bg_c, bg_m, fg_c, fg_m, rng) cid = f"synth_pos_{n_synth:04d}" write_chip(out_chip_dir / f"{cid}.tif", bg_c, bg_t, bg_crs) write_mask(out_mask_dir / f"{cid}_annotation_flood.tif", bg_m, bg_t, bg_crs) new_ids.append(cid) n_synth += 1 if n_synth % 100 == 0: print(f" synthesized {n_synth}/{target}", flush=True) # 3. Stratified split (positive chips include synth_pos and ida_pos). out_split = args.out / "split" out_split.mkdir(parents=True, exist_ok=True) pos_ids = [c for c in new_ids if c.startswith("ida_pos_") or c.startswith("synth_pos_")] neg_ids = [c for c in new_ids if c.startswith("nyc_neg_") or c.startswith("nyc_negx_")] rng.shuffle(pos_ids); rng.shuffle(neg_ids) def split(lst, tr=0.7, va=0.15): n = len(lst) return lst[:int(tr*n)], lst[int(tr*n):int((tr+va)*n)], lst[int((tr+va)*n):] pt, pv, pe = split(pos_ids); nt, nv, ne = split(neg_ids) splits = {"train": pt + nt, "val": pv + nv, "test": pe + ne} for name, ids in splits.items(): rng.shuffle(ids) (out_split / f"impactmesh_flood_{name}.txt").write_text( "\n".join(ids) + "\n") n_pos = sum(1 for x in ids if not x.startswith("nyc_neg_")) print(f"[phase19] split {name}: {len(ids)} chips ({n_pos} pos)", flush=True) print(f"[phase19] total: {len(new_ids)} chips " f"({len(pos_ids)} pos, {len(neg_ids)} neg)") if __name__ == "__main__": sys.exit(main())