| |
| from __future__ import annotations |
|
|
| import argparse |
| import re |
| import shutil |
| import subprocess |
| import sys |
| import tempfile |
| from collections.abc import Iterable, Sequence |
| from pathlib import Path |
| from urllib.parse import quote |
|
|
| IMAGE_EXTS = { |
| ".png", |
| ".jpg", |
| ".jpeg", |
| ".gif", |
| ".webp", |
| ".svg", |
| ".avif", |
| ".bmp", |
| ".ico", |
| ".tif", |
| ".tiff", |
| } |
|
|
| MD_IMAGE_RE = re.compile(r"!\[[^\]]*\]\(([^)]+)\)") |
| HTML_IMG_RE = re.compile( |
| r"<img\b[^>]*\bsrc\s*=\s*([\"'])([^\"']+)\1[^>]*>", re.IGNORECASE |
| ) |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser( |
| description="Upload all locally referenced images from Markdown docs to Cloudflare R2 using rclone." |
| ) |
| parser.add_argument("--remote", required=True, help="rclone remote name, e.g. r2") |
| parser.add_argument("--bucket", default="", help="bucket name in remote path") |
| parser.add_argument( |
| "--prefix", |
| default="docs-images", |
| help="destination prefix inside bucket/remote (default: docs-images)", |
| ) |
| parser.add_argument( |
| "--docs-root", |
| default=".", |
| help="docs root to scan for .md files (default: current directory)", |
| ) |
| parser.add_argument( |
| "--dry-run", action="store_true", help="preview uploads without sending files" |
| ) |
| parser.add_argument( |
| "--list-only", action="store_true", help="only print matched image files" |
| ) |
| parser.add_argument( |
| "--rewrite-markdown", |
| action="store_true", |
| help="rewrite local image links in markdown/html to public URL after upload", |
| ) |
| parser.add_argument( |
| "--public-base-url", |
| default="", |
| help="public URL base used for replacement, e.g. https://cdn.example.com/docs", |
| ) |
| parser.add_argument( |
| "--backup-ext", |
| default=".bak", |
| help="backup extension used when rewriting markdown (default: .bak)", |
| ) |
| return parser.parse_args() |
|
|
|
|
| def is_local_ref(ref: str) -> bool: |
| lower = ref.lower() |
| return not ( |
| lower.startswith("http://") |
| or lower.startswith("https://") |
| or lower.startswith("//") |
| or lower.startswith("data:") |
| or lower.startswith("mailto:") |
| ) |
|
|
|
|
| def parse_md_ref(raw: str) -> str: |
| ref = raw.strip() |
| if ref.startswith("<") and ">" in ref: |
| ref = ref[1 : ref.find(">")] |
| else: |
| ref = re.split(r"\s+", ref, maxsplit=1)[0] |
| ref = ref.split("#", 1)[0].split("?", 1)[0] |
| return ref.strip() |
|
|
|
|
| def clean_ref(raw: str) -> str: |
| ref = raw.strip().strip("<>") |
| ref = ref.split("#", 1)[0].split("?", 1)[0] |
| return ref.strip() |
|
|
|
|
| def resolve_local_ref(md_file: Path, ref: str, root: Path) -> Path | None: |
| if not ref: |
| return None |
| if ref.startswith("/"): |
| candidate = root / ref.lstrip("/") |
| else: |
| candidate = (md_file.parent / ref).resolve() |
|
|
| try: |
| resolved = candidate.resolve() |
| except FileNotFoundError: |
| return None |
|
|
| if not resolved.is_file(): |
| return None |
|
|
| try: |
| resolved.relative_to(root) |
| except ValueError: |
| return None |
|
|
| if resolved.suffix.lower() not in IMAGE_EXTS: |
| return None |
|
|
| return resolved |
|
|
|
|
| def find_markdown_files(root: Path) -> list[Path]: |
| files: list[Path] = [] |
| for path in root.rglob("*.md"): |
| if "node_modules" in path.parts: |
| continue |
| files.append(path) |
| return sorted(files) |
|
|
|
|
| def collect_images( |
| root: Path, md_files: Sequence[Path] |
| ) -> tuple[set[Path], list[tuple[Path, str]]]: |
| images: set[Path] = set() |
| missing: list[tuple[Path, str]] = [] |
|
|
| for md_file in md_files: |
| text = md_file.read_text(encoding="utf-8") |
|
|
| for m in MD_IMAGE_RE.finditer(text): |
| ref = parse_md_ref(m.group(1)) |
| if not ref or not is_local_ref(ref): |
| continue |
| resolved = resolve_local_ref(md_file, ref, root) |
| if resolved: |
| images.add(resolved) |
| else: |
| missing.append((md_file, ref)) |
|
|
| for m in HTML_IMG_RE.finditer(text): |
| ref = clean_ref(m.group(2)) |
| if not ref or not is_local_ref(ref): |
| continue |
| resolved = resolve_local_ref(md_file, ref, root) |
| if resolved: |
| images.add(resolved) |
| else: |
| missing.append((md_file, ref)) |
|
|
| return images, missing |
|
|
|
|
| def build_target(remote: str, bucket: str, prefix: str) -> str: |
| target = f"{remote}:" |
| if bucket: |
| target = f"{remote}:{bucket}" |
|
|
| p = prefix.strip("/") |
| if p: |
| target = f"{target}/{p}" |
|
|
| return target |
|
|
|
|
| def rel_object_path(root: Path, image_path: Path, prefix: str) -> str: |
| rel = image_path.relative_to(root).as_posix() |
| p = prefix.strip("/") |
| return f"{p}/{rel}" if p else rel |
|
|
|
|
| def build_public_url(base: str, object_path: str) -> str: |
| base = base.rstrip("/") |
| encoded_path = quote(object_path, safe="/-._~") |
| return f"{base}/{encoded_path}" |
|
|
|
|
| def run_rclone_upload( |
| root: Path, target: str, rel_files: Iterable[str], dry_run: bool |
| ) -> None: |
| if shutil.which("rclone") is None: |
| raise RuntimeError("rclone not found in PATH") |
|
|
| with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8", delete=False) as tmp: |
| tmp_path = Path(tmp.name) |
| for rel in rel_files: |
| tmp.write(f"{rel}\n") |
|
|
| try: |
| cmd = [ |
| "rclone", |
| "copy", |
| str(root), |
| target, |
| "--files-from", |
| str(tmp_path), |
| "--create-empty-src-dirs", |
| ] |
| if dry_run: |
| cmd.append("--dry-run") |
|
|
| print() |
| if dry_run: |
| print("Dry-run:", " ".join(cmd)) |
| else: |
| print(f"Uploading to: {target}") |
|
|
| subprocess.run(cmd, check=True) |
| finally: |
| tmp_path.unlink(missing_ok=True) |
|
|
|
|
| def rewrite_markdown_files( |
| root: Path, |
| md_files: Sequence[Path], |
| image_set: set[Path], |
| prefix: str, |
| public_base_url: str, |
| backup_ext: str, |
| ) -> int: |
| changed_count = 0 |
|
|
| def to_url(md_file: Path, raw_ref: str, is_markdown: bool) -> str | None: |
| ref = parse_md_ref(raw_ref) if is_markdown else clean_ref(raw_ref) |
| if not ref or not is_local_ref(ref): |
| return None |
| resolved = resolve_local_ref(md_file, ref, root) |
| if not resolved or resolved not in image_set: |
| return None |
| obj = rel_object_path(root, resolved, prefix) |
| return build_public_url(public_base_url, obj) |
|
|
| for md_file in md_files: |
| text = md_file.read_text(encoding="utf-8") |
|
|
| def md_repl(match: re.Match[str]) -> str: |
| raw = match.group(1) |
| url = to_url(md_file, raw, is_markdown=True) |
| if not url: |
| return match.group(0) |
| return match.group(0).replace(raw, url, 1) |
|
|
| def html_repl(match: re.Match[str]) -> str: |
| quote_ch = match.group(1) |
| raw = match.group(2) |
| url = to_url(md_file, raw, is_markdown=False) |
| if not url: |
| return match.group(0) |
| return match.group(0).replace( |
| f"src={quote_ch}{raw}{quote_ch}", f"src={quote_ch}{url}{quote_ch}", 1 |
| ) |
|
|
| updated = MD_IMAGE_RE.sub(md_repl, text) |
| updated = HTML_IMG_RE.sub(html_repl, updated) |
|
|
| if updated != text: |
| if backup_ext: |
| backup_path = md_file.with_suffix(md_file.suffix + backup_ext) |
| backup_path.write_text(text, encoding="utf-8") |
| md_file.write_text(updated, encoding="utf-8") |
| changed_count += 1 |
|
|
| return changed_count |
|
|
|
|
| def main() -> int: |
| args = parse_args() |
|
|
| if args.rewrite_markdown and not args.public_base_url: |
| print( |
| "Error: --public-base-url is required when using --rewrite-markdown", |
| file=sys.stderr, |
| ) |
| return 1 |
|
|
| root = Path(args.docs_root).resolve() |
| if not root.is_dir(): |
| print(f"Error: docs root not found: {args.docs_root}", file=sys.stderr) |
| return 1 |
|
|
| if shutil.which("rg") is None: |
| print("Error: rg (ripgrep) not found in PATH", file=sys.stderr) |
| return 1 |
|
|
| md_files = find_markdown_files(root) |
| images, missing = collect_images(root, md_files) |
|
|
| if not images: |
| print("No local image references found in Markdown docs.") |
| return 0 |
|
|
| rel_files = sorted(p.relative_to(root).as_posix() for p in images) |
|
|
| print(f"Found {len(rel_files)} image files:") |
| for rel in rel_files: |
| print(rel) |
|
|
| if missing: |
| print(file=sys.stderr) |
| print( |
| f"Warning: {len(missing)} referenced files were not found (showing up to 20):", |
| file=sys.stderr, |
| ) |
| for md, ref in missing[:20]: |
| print(f"{md}\t{ref}", file=sys.stderr) |
|
|
| if args.list_only: |
| return 0 |
|
|
| target = build_target(args.remote, args.bucket, args.prefix) |
| run_rclone_upload(root, target, rel_files, dry_run=args.dry_run) |
|
|
| if args.rewrite_markdown and not args.dry_run: |
| changed = rewrite_markdown_files( |
| root=root, |
| md_files=md_files, |
| image_set=images, |
| prefix=args.prefix, |
| public_base_url=args.public_base_url, |
| backup_ext=args.backup_ext, |
| ) |
| print(f"Rewrote {changed} markdown files.") |
|
|
| print("Done.") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|