ROCmPort-AI / rocmport /ingest.py
Nawangdorjay's picture
Deploy ROCmPort AI — CUDA-to-ROCm migration scanner
786f63c verified
from __future__ import annotations
import shutil
import tempfile
import urllib.parse
import urllib.request
import os
import uuid
import zipfile
from dataclasses import dataclass
from pathlib import Path
IGNORED_DIRS = {
".git",
".hg",
".svn",
"__pycache__",
".pytest_cache",
".mypy_cache",
".venv",
"venv",
"node_modules",
"dist",
"build",
".cache",
}
TEXT_EXTENSIONS = {
"",
".cfg",
".conf",
".dockerfile",
".env",
".ini",
".json",
".md",
".py",
".sh",
".toml",
".txt",
".yaml",
".yml",
}
MAX_FILE_BYTES = 512_000
MAX_ZIP_FILES = 700
MAX_ZIP_BYTES = 60 * 1024 * 1024
@dataclass(frozen=True)
class PreparedRepo:
path: Path
name: str
temp_dir: Path | None = None
def sample_repo_path(project_root: Path) -> Path:
return project_root / "samples" / "cuda_first_repo"
def prepare_uploaded_zip(zip_path: str | Path) -> PreparedRepo:
temp_dir = make_work_dir("rocmport-upload-")
extract_zip(Path(zip_path), temp_dir)
repo_path = _single_child_or_self(temp_dir)
return PreparedRepo(path=repo_path, name=repo_path.name, temp_dir=temp_dir)
def prepare_github_repo(github_url: str, branch: str = "main") -> PreparedRepo:
owner, repo = parse_github_url(github_url)
temp_dir = make_work_dir("rocmport-github-")
zip_path = temp_dir / f"{repo}-{branch}.zip"
url = f"https://codeload.github.com/{owner}/{repo}/zip/refs/heads/{branch}"
urllib.request.urlretrieve(url, zip_path)
extract_zip(zip_path, temp_dir / "src")
repo_path = _single_child_or_self(temp_dir / "src")
return PreparedRepo(path=repo_path, name=repo, temp_dir=temp_dir)
def parse_github_url(github_url: str) -> tuple[str, str]:
parsed = urllib.parse.urlparse(github_url.strip())
if parsed.netloc.lower() not in {"github.com", "www.github.com"}:
raise ValueError("Use a public GitHub repository URL from github.com.")
parts = [part for part in parsed.path.split("/") if part]
if len(parts) < 2:
raise ValueError("GitHub URL must include owner and repository name.")
owner, repo = parts[0], parts[1].removesuffix(".git")
if not owner or not repo:
raise ValueError("GitHub URL must include owner and repository name.")
return owner, repo
def extract_zip(zip_path: Path, destination: Path) -> None:
destination.mkdir(parents=True, exist_ok=True)
total_size = 0
with zipfile.ZipFile(zip_path) as archive:
infos = archive.infolist()
if len(infos) > MAX_ZIP_FILES:
raise ValueError(f"ZIP has too many files ({len(infos)} > {MAX_ZIP_FILES}).")
for info in infos:
total_size += info.file_size
if total_size > MAX_ZIP_BYTES:
raise ValueError("ZIP is too large for the demo scanner.")
target = destination / info.filename
resolved = target.resolve()
if not _is_within(resolved, destination.resolve()):
raise ValueError("ZIP contains an unsafe path.")
if info.is_dir():
resolved.mkdir(parents=True, exist_ok=True)
continue
resolved.parent.mkdir(parents=True, exist_ok=True)
with archive.open(info) as src, resolved.open("wb") as dst:
shutil.copyfileobj(src, dst)
def iter_text_files(root: Path) -> list[tuple[str, str]]:
files: list[tuple[str, str]] = []
for path in sorted(root.rglob("*")):
if not path.is_file():
continue
if any(part in IGNORED_DIRS for part in path.relative_to(root).parts):
continue
if path.stat().st_size > MAX_FILE_BYTES:
continue
if not _is_probable_text_file(path):
continue
relative_path = path.relative_to(root).as_posix()
try:
text = path.read_text(encoding="utf-8")
except UnicodeDecodeError:
try:
text = path.read_text(encoding="latin-1")
except UnicodeDecodeError:
continue
files.append((relative_path, text))
return files
def make_work_dir(prefix: str) -> Path:
configured = os.getenv("ROCMPORT_TMP_DIR", "").strip()
base = Path(configured) if configured else Path(tempfile.gettempdir())
base.mkdir(parents=True, exist_ok=True)
for _ in range(100):
candidate = base / f"{prefix}{uuid.uuid4().hex}"
try:
candidate.mkdir(parents=True, exist_ok=False)
return candidate
except FileExistsError:
continue
raise RuntimeError("Could not create a ROCmPort work directory.")
def _is_probable_text_file(path: Path) -> bool:
if path.name in {"Dockerfile", "Makefile", "requirements.txt"}:
return True
if path.suffix.lower() in TEXT_EXTENSIONS:
return True
return path.name.lower().startswith("dockerfile")
def _is_within(path: Path, directory: Path) -> bool:
try:
path.relative_to(directory)
return True
except ValueError:
return False
def _single_child_or_self(path: Path) -> Path:
children = [child for child in path.iterdir() if child.name != "__MACOSX"]
if len(children) == 1 and children[0].is_dir():
return children[0]
return path