| import json |
| import os |
| import re |
| import shutil |
| import subprocess |
| import sys |
| import time |
| import uuid |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Generator, List, Optional, Tuple |
|
|
|
|
| @dataclass |
| class PipelineConfig: |
| repo_root: Path |
| source_root: Path |
| paper_input: str |
| llm_provider: str |
| llm_model: str |
| llm_model_step4: str |
| model_path: str |
| model_data_dir: str |
| model_class_def: str |
| model_lm: str |
| device: str |
| embedding_model: str |
|
|
|
|
| @dataclass |
| class PipelineResult: |
| job_id: str |
| job_dir: Path |
| paper_dir: Path |
| zip_path: Path |
|
|
|
|
| STEP_LABELS = { |
| 1: "Fetch metadata + LaTeX for input paper", |
| 2: "Add citation markers", |
| 3: "Build usage contexts", |
| 4: "Label citation functions", |
| 5: "Verify USES/EXTENDS", |
| 6: "Extract arXiv paragraphs", |
| 7: "Extract target contributions and refine clusters", |
| } |
|
|
| FULL_STEPS = [1, 2, 3, 4, 5, 6, 7] |
| STOP_PREFIX = "Pipeline stopped:" |
|
|
|
|
| def parse_arxiv_id(paper_input: str) -> str: |
| s = (paper_input or "").strip() |
| if not s: |
| raise ValueError("paper_input is required") |
| if "arxiv.org" in s: |
| m = re.search(r"arxiv\.org/(abs|pdf)/([^/?#]+)", s) |
| if not m: |
| raise ValueError(f"Could not parse arXiv ID from URL: {s}") |
| s = m.group(2) |
| s = s.replace(".pdf", "") |
| s = re.sub(r"v\d+$", "", s) |
| if not re.match(r"^[0-9]{4}\.[0-9]{4,5}$", s): |
| raise ValueError(f"Invalid arXiv ID format: {s}") |
| return s |
|
|
|
|
| def _build_commands( |
| cfg: PipelineConfig, |
| step: int, |
| job_processed_root: Path, |
| paper_id: str, |
| ids_path: Optional[Path], |
| ) -> List[List[str]]: |
| py = sys.executable |
| if step == 1: |
| assert ids_path is not None |
| return [[ |
| py, |
| "src/step_01_fetch/fetch_metadata.py", |
| "--ids", |
| str(ids_path), |
| "--outdir", |
| str(job_processed_root), |
| ]] |
| if step == 2: |
| return [[py, "src/step_02_mark_citations/replace_citation_markers.py", "--root", str(job_processed_root)]] |
| if step == 3: |
| return [[py, "src/step_03_usage_contexts/build_usage_contexts.py", "--root", str(job_processed_root), "--out-name", "usage_contexts.json"]] |
| if step == 4: |
| return [[ |
| py, |
| "src/step_04_label_citations/label_citation_functions.py", |
| "--root", |
| str(job_processed_root), |
| "--model-path", |
| cfg.model_path, |
| "--model-data-dir", |
| cfg.model_data_dir, |
| "--model-class-def", |
| cfg.model_class_def, |
| "--model-lm", |
| cfg.model_lm, |
| "--device", |
| cfg.device, |
| ]] |
| if step == 5: |
| return [[ |
| py, |
| "src/step_05_verify_uses_extends/verify_uses_extends.py", |
| "--root", |
| str(job_processed_root), |
| "--k", |
| "0", |
| "--batch-size", |
| "25", |
| ]] |
| if step == 6: |
| return [[py, "src/step_06_extract_paragraphs/extract_arxiv_paragraphs.py", "--root", str(job_processed_root)]] |
| if step == 7: |
| return [ |
| [py, "src/step_07_extract_and_refine/extract_contributions_from_citations.py", "--root", str(job_processed_root)], |
| [py, "src/step_07_extract_and_refine/refine_and_filter_clusters_llm.py", "--root", str(job_processed_root), "--inplace", "--overwrite"], |
| ] |
| raise ValueError(f"Unknown step: {step}") |
|
|
|
|
| def _write_single_id_file(job_dir: Path, arxiv_id: str) -> Path: |
| ids_path = job_dir / "input_ids.json" |
| payload = [{"id": arxiv_id, "title": "", "id_type": "ArXiv"}] |
| ids_path.write_text(json.dumps(payload, indent=2), encoding="utf-8") |
| return ids_path |
|
|
|
|
| def _write_run_metadata(cfg: PipelineConfig, job_dir: Path, paper_id: str, arxiv_id: str) -> None: |
| payload = { |
| "paper_input": cfg.paper_input, |
| "paper_id": paper_id, |
| "arxiv_id": arxiv_id, |
| "source_root": str(cfg.source_root), |
| "steps": FULL_STEPS + ["annotation"], |
| "llm_provider": cfg.llm_provider, |
| "llm_model": cfg.llm_model, |
| "llm_model_step4": cfg.llm_model_step4, |
| "device": cfg.device, |
| "embedding_model": cfg.embedding_model, |
| "timestamp": int(time.time()), |
| } |
| (job_dir / "run_config.json").write_text(json.dumps(payload, indent=2), encoding="utf-8") |
|
|
|
|
| def _zip_job_dir(job_dir: Path) -> Path: |
| zip_base = job_dir.parent / job_dir.name |
| archive = shutil.make_archive(str(zip_base), "zip", root_dir=str(job_dir)) |
| return Path(archive) |
|
|
|
|
| def _tail_log(path: Path, max_lines: int = 60) -> str: |
| try: |
| lines = path.read_text(encoding="utf-8", errors="ignore").splitlines() |
| except Exception: |
| return "" |
| if not lines: |
| return "" |
| return "\n".join(lines[-max_lines:]) |
|
|
|
|
| def _load_json(path: Path, default=None): |
| try: |
| return json.loads(path.read_text(encoding="utf-8")) |
| except Exception: |
| return default |
|
|
|
|
| def _write_summary_and_zip(job_dir: Path, summary_lines: List[str]) -> Path: |
| (job_dir / "summary.txt").write_text("\n".join(summary_lines), encoding="utf-8") |
| return _zip_job_dir(job_dir) |
|
|
|
|
| def _count_verified_uses_extends(payload: dict) -> int: |
| records = payload.get("confirmed") or payload.get("verified_contexts") or payload.get("contexts") or payload.get("items") or [] |
| if not isinstance(records, list): |
| return 0 |
| accepted = {"USES", "EXTENDS", "Uses", "Extends"} |
| return sum(1 for item in records if isinstance(item, dict) and item.get("label") in accepted) |
|
|
|
|
| def _stop_reason_after_step(step: int, paper_dir: Path) -> str | None: |
| if step == 1: |
| if not paper_dir.exists(): |
| return "metadata could not be fetched for this paper" |
| if not (paper_dir / "processed_main.tex").exists(): |
| return "arXiv source could not be retrieved or converted for this paper" |
| citations = _load_json(paper_dir / "citations_metadata.json", []) |
| if not isinstance(citations, list) or not citations: |
| return "Semantic Scholar returned no citing papers for this target paper" |
|
|
| if step == 3: |
| usage = _load_json(paper_dir / "usage_contexts.json", {}) |
| if not isinstance(usage, dict): |
| return "citation usage contexts could not be built" |
| if int(usage.get("num_contexts") or 0) == 0: |
| return "no citation usage contexts were found" |
|
|
| if step == 4: |
| labels = _load_json(paper_dir / "usage_context_labels.json", {}) |
| contexts = labels.get("labels") if isinstance(labels, dict) else None |
| if not isinstance(contexts, list) or not contexts: |
| return "citation-function labeling produced no labeled contexts" |
|
|
| if step == 5: |
| verified = _load_json(paper_dir / "usage_uses_extends_verified.json", {}) |
| if not isinstance(verified, dict): |
| return "USES/EXTENDS verification did not produce an output file" |
| if _count_verified_uses_extends(verified) == 0: |
| return "no downstream citations were verified as USES or EXTENDS" |
|
|
| if step == 6: |
| paragraphs = _load_json(paper_dir / "usage_citing_paragraphs.json", {}) |
| citing = paragraphs.get("citing_papers") if isinstance(paragraphs, dict) else None |
| if not isinstance(citing, list) or not citing: |
| return "no citing-paper paragraphs could be extracted from arXiv" |
| usable = [ |
| item for item in citing |
| if isinstance(item, dict) |
| and not item.get("error") |
| and (item.get("matched_paragraphs") or item.get("target_citing_paragraphs")) |
| ] |
| if not usable: |
| return "arXiv paragraph extraction returned no usable citing-paper text" |
|
|
| if step == 7: |
| contributions = _load_json(paper_dir / "usage_contributions.json", {}) |
| items = contributions.get("contributions") if isinstance(contributions, dict) else None |
| if not isinstance(items, list) or not items: |
| return "no downstream target-contribution evidence could be extracted" |
| refined = _load_json(paper_dir / "usage_discovery_from_contributions.json", {}) |
| clusters = refined.get("clusters") if isinstance(refined, dict) else None |
| if not isinstance(clusters, list) or not clusters: |
| return "no valid downstream usage clusters survived refinement" |
|
|
| return None |
|
|
|
|
| def run_pipeline(cfg: PipelineConfig, output_root: Path) -> Generator[Tuple[str, Optional[str]], None, PipelineResult]: |
| output_root.mkdir(parents=True, exist_ok=True) |
| job_id = f"job_{int(time.time())}_{uuid.uuid4().hex[:8]}" |
| job_dir = output_root / job_id |
| job_processed_root = job_dir / "processed_papers" |
| job_logs = job_dir / "logs" |
|
|
| job_processed_root.mkdir(parents=True, exist_ok=True) |
| job_logs.mkdir(parents=True, exist_ok=True) |
|
|
| arxiv_id = parse_arxiv_id(cfg.paper_input) |
| paper_id = arxiv_id |
| ids_path = _write_single_id_file(job_dir, arxiv_id) |
| _write_run_metadata(cfg, job_dir, paper_id, arxiv_id) |
|
|
| base_env = os.environ.copy() |
| base_env["LLM_PROVIDER"] = cfg.llm_provider |
| base_env["LLM_MODEL"] = cfg.llm_model |
|
|
| summary_lines: List[str] = [] |
| paper_dir = job_processed_root / paper_id |
|
|
| max_step = 8 |
| for step in FULL_STEPS: |
| label = STEP_LABELS[step] |
| log_file = job_logs / f"step_{step:02d}.log" |
| summary_lines.append(f"[{step}] {label}") |
| yield (f"Step {step}/{max_step}: {label}", None) |
|
|
| env = base_env.copy() |
| if step == 5 and cfg.llm_model_step4: |
| env["LLM_MODEL"] = cfg.llm_model_step4 |
|
|
| with log_file.open("w", encoding="utf-8") as lf: |
| return_code = 0 |
| failed_cmd: List[str] | None = None |
| for cmd in _build_commands(cfg, step, job_processed_root, paper_id, ids_path): |
| lf.write(f"$ {' '.join(cmd)}\n\n") |
| proc = subprocess.Popen( |
| cmd, |
| cwd=str(cfg.repo_root), |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| text=True, |
| encoding="utf-8", |
| errors="ignore", |
| env=env, |
| ) |
| assert proc.stdout is not None |
| for line in proc.stdout: |
| lf.write(line) |
| return_code = proc.wait() |
| if return_code != 0: |
| failed_cmd = cmd |
| break |
|
|
| if return_code != 0: |
| summary_lines.append(f"FAILED at step {step}") |
| zip_path = _write_summary_and_zip(job_dir, summary_lines) |
| tail = _tail_log(log_file) |
| if tail: |
| yield ( |
| f"Step {step} failed.\n\nCommand: {' '.join(failed_cmd or [])}\n\nLast log lines:\n{tail}", |
| str(zip_path), |
| ) |
| else: |
| yield (f"Step {step} failed. Command: {' '.join(failed_cmd or [])}", str(zip_path)) |
| return PipelineResult(job_id=job_id, job_dir=job_dir, paper_dir=paper_dir, zip_path=zip_path) |
| else: |
| yield (f"Step {step} complete", None) |
|
|
| if step == 1 and not paper_dir.exists(): |
| summary_lines.append("FAILED: fetch_metadata did not create paper directory") |
| zip_path = _write_summary_and_zip(job_dir, summary_lines) |
| yield (f"Step 1 finished but paper dir missing: {paper_dir}", str(zip_path)) |
| return PipelineResult(job_id=job_id, job_dir=job_dir, paper_dir=paper_dir, zip_path=zip_path) |
|
|
| stop_reason = _stop_reason_after_step(step, paper_dir) |
| if stop_reason: |
| message = f"{STOP_PREFIX} {stop_reason}." |
| summary_lines.append(message) |
| zip_path = _write_summary_and_zip(job_dir, summary_lines) |
| yield (message, str(zip_path)) |
| return PipelineResult(job_id=job_id, job_dir=job_dir, paper_dir=paper_dir, zip_path=zip_path) |
|
|
| summary_lines.append("SUCCESS") |
| zip_path = _write_summary_and_zip(job_dir, summary_lines) |
| yield ("Pipeline completed successfully.", str(zip_path)) |
| return PipelineResult(job_id=job_id, job_dir=job_dir, paper_dir=paper_dir, zip_path=zip_path) |
|
|