astrbbbb / docs /scripts /upload_doc_images_to_r2.py
qa1145's picture
Upload 1245 files
8ede856 verified
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import re
import shutil
import subprocess
import sys
import tempfile
from collections.abc import Iterable, Sequence
from pathlib import Path
from urllib.parse import quote
IMAGE_EXTS = {
".png",
".jpg",
".jpeg",
".gif",
".webp",
".svg",
".avif",
".bmp",
".ico",
".tif",
".tiff",
}
MD_IMAGE_RE = re.compile(r"!\[[^\]]*\]\(([^)]+)\)")
HTML_IMG_RE = re.compile(
r"<img\b[^>]*\bsrc\s*=\s*([\"'])([^\"']+)\1[^>]*>", re.IGNORECASE
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Upload all locally referenced images from Markdown docs to Cloudflare R2 using rclone."
)
parser.add_argument("--remote", required=True, help="rclone remote name, e.g. r2")
parser.add_argument("--bucket", default="", help="bucket name in remote path")
parser.add_argument(
"--prefix",
default="docs-images",
help="destination prefix inside bucket/remote (default: docs-images)",
)
parser.add_argument(
"--docs-root",
default=".",
help="docs root to scan for .md files (default: current directory)",
)
parser.add_argument(
"--dry-run", action="store_true", help="preview uploads without sending files"
)
parser.add_argument(
"--list-only", action="store_true", help="only print matched image files"
)
parser.add_argument(
"--rewrite-markdown",
action="store_true",
help="rewrite local image links in markdown/html to public URL after upload",
)
parser.add_argument(
"--public-base-url",
default="",
help="public URL base used for replacement, e.g. https://cdn.example.com/docs",
)
parser.add_argument(
"--backup-ext",
default=".bak",
help="backup extension used when rewriting markdown (default: .bak)",
)
return parser.parse_args()
def is_local_ref(ref: str) -> bool:
lower = ref.lower()
return not (
lower.startswith("http://")
or lower.startswith("https://")
or lower.startswith("//")
or lower.startswith("data:")
or lower.startswith("mailto:")
)
def parse_md_ref(raw: str) -> str:
ref = raw.strip()
if ref.startswith("<") and ">" in ref:
ref = ref[1 : ref.find(">")]
else:
ref = re.split(r"\s+", ref, maxsplit=1)[0]
ref = ref.split("#", 1)[0].split("?", 1)[0]
return ref.strip()
def clean_ref(raw: str) -> str:
ref = raw.strip().strip("<>")
ref = ref.split("#", 1)[0].split("?", 1)[0]
return ref.strip()
def resolve_local_ref(md_file: Path, ref: str, root: Path) -> Path | None:
if not ref:
return None
if ref.startswith("/"):
candidate = root / ref.lstrip("/")
else:
candidate = (md_file.parent / ref).resolve()
try:
resolved = candidate.resolve()
except FileNotFoundError:
return None
if not resolved.is_file():
return None
try:
resolved.relative_to(root)
except ValueError:
return None
if resolved.suffix.lower() not in IMAGE_EXTS:
return None
return resolved
def find_markdown_files(root: Path) -> list[Path]:
files: list[Path] = []
for path in root.rglob("*.md"):
if "node_modules" in path.parts:
continue
files.append(path)
return sorted(files)
def collect_images(
root: Path, md_files: Sequence[Path]
) -> tuple[set[Path], list[tuple[Path, str]]]:
images: set[Path] = set()
missing: list[tuple[Path, str]] = []
for md_file in md_files:
text = md_file.read_text(encoding="utf-8")
for m in MD_IMAGE_RE.finditer(text):
ref = parse_md_ref(m.group(1))
if not ref or not is_local_ref(ref):
continue
resolved = resolve_local_ref(md_file, ref, root)
if resolved:
images.add(resolved)
else:
missing.append((md_file, ref))
for m in HTML_IMG_RE.finditer(text):
ref = clean_ref(m.group(2))
if not ref or not is_local_ref(ref):
continue
resolved = resolve_local_ref(md_file, ref, root)
if resolved:
images.add(resolved)
else:
missing.append((md_file, ref))
return images, missing
def build_target(remote: str, bucket: str, prefix: str) -> str:
target = f"{remote}:"
if bucket:
target = f"{remote}:{bucket}"
p = prefix.strip("/")
if p:
target = f"{target}/{p}"
return target
def rel_object_path(root: Path, image_path: Path, prefix: str) -> str:
rel = image_path.relative_to(root).as_posix()
p = prefix.strip("/")
return f"{p}/{rel}" if p else rel
def build_public_url(base: str, object_path: str) -> str:
base = base.rstrip("/")
encoded_path = quote(object_path, safe="/-._~")
return f"{base}/{encoded_path}"
def run_rclone_upload(
root: Path, target: str, rel_files: Iterable[str], dry_run: bool
) -> None:
if shutil.which("rclone") is None:
raise RuntimeError("rclone not found in PATH")
with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8", delete=False) as tmp:
tmp_path = Path(tmp.name)
for rel in rel_files:
tmp.write(f"{rel}\n")
try:
cmd = [
"rclone",
"copy",
str(root),
target,
"--files-from",
str(tmp_path),
"--create-empty-src-dirs",
]
if dry_run:
cmd.append("--dry-run")
print()
if dry_run:
print("Dry-run:", " ".join(cmd))
else:
print(f"Uploading to: {target}")
subprocess.run(cmd, check=True)
finally:
tmp_path.unlink(missing_ok=True)
def rewrite_markdown_files(
root: Path,
md_files: Sequence[Path],
image_set: set[Path],
prefix: str,
public_base_url: str,
backup_ext: str,
) -> int:
changed_count = 0
def to_url(md_file: Path, raw_ref: str, is_markdown: bool) -> str | None:
ref = parse_md_ref(raw_ref) if is_markdown else clean_ref(raw_ref)
if not ref or not is_local_ref(ref):
return None
resolved = resolve_local_ref(md_file, ref, root)
if not resolved or resolved not in image_set:
return None
obj = rel_object_path(root, resolved, prefix)
return build_public_url(public_base_url, obj)
for md_file in md_files:
text = md_file.read_text(encoding="utf-8")
def md_repl(match: re.Match[str]) -> str:
raw = match.group(1)
url = to_url(md_file, raw, is_markdown=True)
if not url:
return match.group(0)
return match.group(0).replace(raw, url, 1)
def html_repl(match: re.Match[str]) -> str:
quote_ch = match.group(1)
raw = match.group(2)
url = to_url(md_file, raw, is_markdown=False)
if not url:
return match.group(0)
return match.group(0).replace(
f"src={quote_ch}{raw}{quote_ch}", f"src={quote_ch}{url}{quote_ch}", 1
)
updated = MD_IMAGE_RE.sub(md_repl, text)
updated = HTML_IMG_RE.sub(html_repl, updated)
if updated != text:
if backup_ext:
backup_path = md_file.with_suffix(md_file.suffix + backup_ext)
backup_path.write_text(text, encoding="utf-8")
md_file.write_text(updated, encoding="utf-8")
changed_count += 1
return changed_count
def main() -> int:
args = parse_args()
if args.rewrite_markdown and not args.public_base_url:
print(
"Error: --public-base-url is required when using --rewrite-markdown",
file=sys.stderr,
)
return 1
root = Path(args.docs_root).resolve()
if not root.is_dir():
print(f"Error: docs root not found: {args.docs_root}", file=sys.stderr)
return 1
if shutil.which("rg") is None:
print("Error: rg (ripgrep) not found in PATH", file=sys.stderr)
return 1
md_files = find_markdown_files(root)
images, missing = collect_images(root, md_files)
if not images:
print("No local image references found in Markdown docs.")
return 0
rel_files = sorted(p.relative_to(root).as_posix() for p in images)
print(f"Found {len(rel_files)} image files:")
for rel in rel_files:
print(rel)
if missing:
print(file=sys.stderr)
print(
f"Warning: {len(missing)} referenced files were not found (showing up to 20):",
file=sys.stderr,
)
for md, ref in missing[:20]:
print(f"{md}\t{ref}", file=sys.stderr)
if args.list_only:
return 0
target = build_target(args.remote, args.bucket, args.prefix)
run_rclone_upload(root, target, rel_files, dry_run=args.dry_run)
if args.rewrite_markdown and not args.dry_run:
changed = rewrite_markdown_files(
root=root,
md_files=md_files,
image_set=images,
prefix=args.prefix,
public_base_url=args.public_base_url,
backup_ext=args.backup_ext,
)
print(f"Rewrote {changed} markdown files.")
print("Done.")
return 0
if __name__ == "__main__":
raise SystemExit(main())