import json
import os
import sys
import time
import html
from pathlib import Path
from typing import Any, Optional
import streamlit as st
try:
from huggingface_hub import HfApi
except Exception:
HfApi = None
SRC = Path(__file__).resolve().parent
REPO_ROOT = SRC.parent
for extra in (SRC, REPO_ROOT / "src"):
extra_str = str(extra)
if extra_str not in sys.path:
sys.path.insert(0, extra_str)
import runner as runner_module
from runner import PipelineConfig
from common.paper_package import load_paper_package
from step_08_annotation.pipeline import TwoPassAnnotationPipeline
from streamlit_config import EXAMPLES, TAB_NAMES
DEFAULT_SOURCE_ROOT = str(REPO_ROOT / "src" / "processed_papers")
DEFAULT_OUTPUT_ROOT = str(REPO_ROOT / "hf_space" / "runs")
CUSTOM_CSS = """
"""
def get_secret(name: str, default: str = "") -> str:
value = os.getenv(name)
if value:
return value
try:
return st.secrets[name]
except Exception:
return default
def run_repo_config() -> tuple[str | None, str, str | None]:
repo_id = get_secret("RUNS_REPO_ID", "")
repo_type = get_secret("RUNS_REPO_TYPE", "dataset")
token = get_secret("HF_WRITE_TOKEN", "") or get_secret("HF_TOKEN", "")
return repo_id or None, repo_type, token or None
def remote_run_prefix(job_id: str) -> str:
return f"runs/{job_id}"
def upload_run_artifact(job_dir: Path) -> str:
repo_id, repo_type, token = run_repo_config()
if not repo_id or not token:
return ""
if HfApi is None:
return "upload_failed: huggingface_hub is not installed"
job_id = job_dir.name
remote_prefix = remote_run_prefix(job_id)
uploaded: list[str] = []
try:
api = HfApi(token=token)
for name in ["input_ids.json", "run_config.json", "summary.txt"]:
path = job_dir / name
if path.exists():
api.upload_file(
path_or_fileobj=str(path),
path_in_repo=f"{remote_prefix}/{name}",
repo_id=repo_id,
repo_type=repo_type,
commit_message=f"Upload {name} for {job_id}",
)
uploaded.append(name)
for folder_name in ["logs", "processed_papers", "two_pass_outputs"]:
folder = job_dir / folder_name
if not folder.exists():
continue
files = [path for path in folder.rglob("*") if path.is_file()]
if not files:
continue
api.upload_folder(
folder_path=str(folder),
path_in_repo=f"{remote_prefix}/{folder_name}",
repo_id=repo_id,
repo_type=repo_type,
commit_message=f"Upload {folder_name} for {job_id}",
ignore_patterns=["__pycache__/*", "*.pyc", "*.zip"],
)
uploaded.append(f"{folder_name}[{len(files)} files]")
return f"{repo_type}:{repo_id}/{remote_prefix}/ (uploaded: {', '.join(uploaded) or 'nothing'})"
except Exception as exc:
return f"upload_failed: {exc}"
def _load_json(path: Path) -> Optional[dict]:
if not path.exists():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
def _status_from_line(line: str, current: str) -> str:
text = (line or "").strip()
text = _display_log_line(text)
if text.startswith("Pipeline stopped:"):
return "Stopped"
if text.startswith("Step "):
return text
if "failed" in text.lower():
return f"Failed: {text}"
if "completed successfully" in text.lower():
return "Completed"
return current
def _display_log_line(line: str) -> str:
text = (line or "").strip()
if text.startswith("Step ") and " failed." in text:
return text.splitlines()[0]
if text == "[annotation] starting cluster-first two-pass annotation":
return "Step 8/8: Annotate target contributions and enabling contributions"
if text.startswith("[annotation] complete:"):
return "Step 8 complete"
if text == "Pipeline completed successfully.":
return text
return text
def _format_step_event(line: str) -> str:
text = _display_log_line(line)
if not text:
return ""
if text.startswith("Step ") and "/" in text and ":" in text:
return f"đ ī¸ {text}"
if text.startswith("Step ") and text.endswith(" complete"):
return f"â
{text}"
if text.lower().startswith("stopped after step"):
return f"âšī¸ {text}"
if text.startswith("Pipeline stopped:"):
return f"âšī¸ {text}"
if "failed" in text.lower():
return f"â {text}"
if "completed successfully" in text.lower():
return f"â
{text}"
return f"âĸ {text}"
def _ensure_state():
defaults = {
"paper_input": "",
"run_status": "Idle",
"run_logs": [],
"run_events": [],
"artifact_path": None,
"run_dir_path": None,
"paper_dir_path": None,
"annotation_payload_path": None,
"run_summary": None,
"annotation_skipped_reason": None,
"pipeline_failed_reason": None,
"remote_artifact_ref": "",
}
for key, value in defaults.items():
st.session_state.setdefault(key, value)
def _metric_card(label: str, value: Any):
st.markdown(
f"
",
unsafe_allow_html=True,
)
def _esc(value: Any) -> str:
return html.escape("" if value is None else str(value))
def _safe_int(value: Any, default: int = 0) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
def _grounding_html(grounding: Optional[dict], label: str, kind: str) -> str:
if not grounding:
return ""
title = (
grounding.get("ref_title")
or grounding.get("title")
or grounding.get("paper_id")
or grounding.get("ref_id")
or "__NONE__"
)
meta = []
if grounding.get("paper_id"):
meta.append(f"paper_id: {grounding.get('paper_id')}")
elif grounding.get("ref_id"):
meta.append(f"ref_id: {grounding.get('ref_id')}")
if grounding.get("ref_year"):
meta.append(str(grounding.get("ref_year")))
authors = grounding.get("ref_authors")
if isinstance(authors, list) and authors:
meta.append(", ".join(str(author) for author in authors[:3]))
meta_html = f"{_esc(' ¡ '.join(meta))}
" if meta else ""
extra_class = " additional" if kind == "additional" else ""
return (
f""
)
def _study_key(item: dict) -> str:
for key in ["paper_id", "ref_id", "ref_title", "title"]:
value = item.get(key)
if value:
return str(value).lower()
return ""
def _collect_grounded_studies(discoveries: list[dict], ingredients: list[dict]) -> list[dict]:
studies: list[dict] = []
seen: set[str] = set()
for item in discoveries:
if not isinstance(item, dict):
continue
copied = dict(item)
copied["_grounding_kind"] = "primary"
copied["_grounding_label"] = "Primary study"
key = _study_key(copied)
if key:
seen.add(key)
studies.append(copied)
for idx, ingredient in enumerate(ingredients, start=1):
if not isinstance(ingredient, dict):
continue
canonical = ingredient.get("canonical_grounding") or {}
canonical_key = _study_key(canonical) if isinstance(canonical, dict) else ""
annotation = ingredient.get("canonical_annotation") or {}
for ref in ingredient.get("additional_groundings") or []:
if not isinstance(ref, dict):
continue
key = _study_key(ref)
if key and (key == canonical_key or key in seen):
continue
copied = dict(ref)
copied["_grounding_kind"] = "additional"
copied["_grounding_label"] = f"Additional study for enabling contribution {idx}"
copied.setdefault("role", annotation.get("role") or ", ".join(annotation.get("roles") or []))
copied.setdefault("contribution", annotation.get("contribution"))
copied.setdefault("rationale", annotation.get("rationale"))
if key:
seen.add(key)
studies.append(copied)
return studies
def _render_reference_list(discoveries: list[dict], ingredients: Optional[list[dict]] = None):
studies = _collect_grounded_studies(discoveries, ingredients or [])
if not studies:
st.markdown("No grounded studies listed for this target contribution.
", unsafe_allow_html=True)
return
for item in studies:
title = item.get("ref_title") or item.get("title") or item.get("ref_id") or item.get("paper_id") or "Untitled reference"
is_additional = item.get("_grounding_kind") == "additional"
meta = []
if item.get("_grounding_label"):
meta.append(str(item.get("_grounding_label")))
if item.get("role"):
meta.append(str(item.get("role")))
if item.get("ref_year"):
meta.append(str(item.get("ref_year")))
class_name = "cluster-card additional-study" if is_additional else "cluster-card"
body = [f"{_esc(title)}
"]
if meta:
body.append(f"
{_esc(' ¡ '.join(meta))}
")
if item.get("contribution"):
body.append(f"
Contribution. {_esc(item.get('contribution'))}
")
if item.get("rationale"):
body.append(f"
Rationale. {_esc(item.get('rationale'))}
")
body.append("
")
st.markdown("".join(body), unsafe_allow_html=True)
def _render_claims_tab(payload: Optional[dict]):
if not payload:
st.markdown("No annotation payload is available yet.
", unsafe_allow_html=True)
return
claims = payload.get("claims") or []
if not claims:
st.markdown("The run completed, but no target contributions were produced.
", unsafe_allow_html=True)
return
for idx, claim in enumerate(claims, start=1):
claim_id = claim.get("claim_id") or f"C{idx}"
claim_text = claim.get("rewritten_claim") or claim.get("text") or "(missing target contribution text)"
ingredients = claim.get("ingredients") or []
discoveries = claim.get("enabling_discoveries") or []
grounded_studies = _collect_grounded_studies(discoveries, ingredients)
meta_pills = []
if claim.get("decision"):
meta_pills.append(str(claim.get("decision")))
if claim.get("cluster_id"):
meta_pills.append(f"cluster {claim.get('cluster_id')}")
meta_pills.append(f"{len(ingredients)} enabling contribution{'s' if len(ingredients) != 1 else ''}")
meta_pills.append(f"{len(grounded_studies)} grounded stud{'ies' if len(grounded_studies) != 1 else 'y'}")
pills_html = "".join(f"{_esc(p)}" for p in meta_pills)
st.markdown(
f"""
Target contribution {idx} ¡ {_esc(claim_id)}
{_esc(claim_text)}
{pills_html}
""",
unsafe_allow_html=True,
)
left, right = st.columns([1.7, 1.0], gap="large")
with left:
st.markdown("Decomposition
", unsafe_allow_html=True)
if not ingredients:
st.markdown("No enabling contributions for this target contribution.
", unsafe_allow_html=True)
for ingredient_idx, ingredient in enumerate(ingredients, start=1):
annotation = ingredient.get("canonical_annotation") or {}
role = annotation.get("role") or ", ".join(annotation.get("roles") or []) or "UNSPECIFIED"
canonical_grounding = ingredient.get("canonical_grounding") or {}
extras = ingredient.get("additional_groundings") or []
grounding_parts = []
if canonical_grounding:
grounding_parts.append(
_grounding_html(canonical_grounding, "Primary grounding", "primary")
)
for ref in extras:
if not isinstance(ref, dict):
continue
if canonical_grounding and (
ref.get("paper_id") == canonical_grounding.get("paper_id")
or ref.get("ref_id") == canonical_grounding.get("ref_id")
):
continue
grounding_parts.append(
_grounding_html(ref, "Additional grounding", "additional")
)
if not grounding_parts:
canonical_ref_id = ingredient.get("canonical_ref_id") or "__NONE__"
grounding_parts.append(
""
"
Grounding
"
f"
{_esc(canonical_ref_id)}
"
"
"
)
grounding_block = (
""
f"
Groundings for enabling contribution {ingredient_idx}
"
+ "".join(grounding_parts)
+ "
"
)
st.markdown(
f"""
{ingredient_idx}. {_esc(ingredient.get('ingredient') or '(missing enabling contribution)')}
{_esc(role)}
Contribution. {_esc(annotation.get('contribution') or '')}
Rationale. {_esc(annotation.get('rationale') or '')}
Evidence. {_esc(annotation.get('evidence_span') or '')}
{grounding_block}
""",
unsafe_allow_html=True,
)
with right:
st.markdown("Grounded and additional studies
", unsafe_allow_html=True)
_render_reference_list(discoveries, ingredients)
def _render_clusters_tab(discovery: Optional[dict], contributions: list[dict]):
if not discovery:
st.markdown("No refined cluster file is available yet.
", unsafe_allow_html=True)
return
clusters = discovery.get("clusters") or []
dropped = discovery.get("dropped_clusters") or []
if not clusters:
st.markdown("No valid downstream usage clusters survived refinement and filtering.
", unsafe_allow_html=True)
if dropped:
with st.expander(f"Dropped clusters ({len(dropped)})", expanded=False):
st.json(dropped)
return
for cluster in clusters:
cluster_id = cluster.get("cluster_id", "")
rep = cluster.get("representative_claim") or cluster.get("cluster_title") or "(missing representative claim)"
count = _safe_int(cluster.get("count"), len(cluster.get("claim_indices") or []))
source_ids = cluster.get("source_cluster_ids") or []
merge_rationale = cluster.get("merge_rationale") or ""
st.markdown(
f"""
{_esc(rep)}
Cluster {_esc(cluster_id)} ¡ {count} contribution instance{'s' if count != 1 else ''}
""",
unsafe_allow_html=True,
)
meta_cols = st.columns([1.3, 1.3, 1.4])
with meta_cols[0]:
st.caption("Cluster ID")
st.code(str(cluster_id), language="text")
with meta_cols[1]:
st.caption("Source clusters")
st.code(", ".join(str(x) for x in source_ids) if source_ids else "singleton", language="text")
with meta_cols[2]:
st.caption("Merge rationale")
st.write(merge_rationale or "â")
claim_indices = cluster.get("claim_indices") or []
if claim_indices:
with st.expander(f"Linked contribution instances ({len(claim_indices)})", expanded=False):
for idx in claim_indices:
try:
j = int(idx)
except Exception:
continue
if 0 <= j < len(contributions):
item = contributions[j] or {}
title = item.get("citing_title") or item.get("citing_paper_id") or "Unknown citing paper"
claim = item.get("paper_claim") or item.get("claim") or "(missing claim)"
rationale = item.get("rationale") or ""
evidence = item.get("evidence_span") or ""
st.markdown(f"**{title}**")
st.write(claim)
if rationale:
st.caption(f"Rationale: {rationale}")
if evidence:
st.caption(f"Evidence: {evidence}")
st.divider()
if dropped:
with st.expander(f"Dropped clusters ({len(dropped)})", expanded=False):
st.json(dropped)
def run_two_pass_annotation(
paper_dir: Path,
annotation_output_root: Path,
llm_provider: str,
llm_model: str,
formatter_model: str,
judge_model: str,
candidate_count: int,
):
paper = load_paper_package(paper_dir)
pipeline = TwoPassAnnotationPipeline(
provider=llm_provider,
model=llm_model,
formatter_model=formatter_model or None,
judge_model=judge_model or None,
output_root=annotation_output_root,
annotator_id="streamlit_hf_space",
candidate_count=max(1, int(candidate_count)),
formatter_max_attempts=3,
include_reference_examples=True,
prompt_profile="full",
)
result = pipeline.run(paper)
return result.result, result.run_dir
def run_pipeline_stream(
paper_input: str,
source_root: str,
output_root: str,
llm_provider: str,
llm_model: str,
llm_model_step4: str,
formatter_model: str,
judge_model: str,
candidate_count: int,
):
gemini_key = get_secret("GEMINI_API_KEY")
if gemini_key:
os.environ["GEMINI_API_KEY"] = gemini_key
cfg = PipelineConfig(
repo_root=REPO_ROOT,
source_root=Path(source_root).expanduser().resolve(),
paper_input=paper_input.strip(),
llm_provider=llm_provider.strip() or "gemini",
llm_model=llm_model.strip() or "gemini-3.1-pro-preview",
llm_model_step4=llm_model_step4.strip() or "gemini-3-flash-preview",
model_path="Deep-Citation/Workspace/acl_scicite_wksp_trl/best_model.pt",
model_data_dir="Deep-Citation/Data",
model_class_def="Deep-Citation/Data/class_def.json",
model_lm="scibert",
device="cpu",
embedding_model="sentence-transformers/all-mpnet-base-v2",
)
status_placeholder = st.empty()
activity_placeholder = st.empty()
status = "Starting"
logs: list[str] = []
events: list[str] = []
seen_events: set[str] = set()
artifact_path = None
annotation_payload_path = None
annotation_skipped_reason = None
run_summary = None
pipeline_stopped_reason = None
pipeline_failed_reason = None
def render_activity(items: list[str]):
if not items:
activity_placeholder.info("Waiting for first step...")
return
activity_placeholder.markdown("### Activity\n" + "\n".join(f"- {item}" for item in items[-20:]))
def append_display_line(line: str):
display_line = _display_log_line(line)
if not display_line:
return
logs.append(display_line)
event = _format_step_event(display_line)
if event and event not in seen_events:
seen_events.add(event)
events.append(event)
render_activity(events)
for line, maybe_artifact in runner_module.run_pipeline(cfg, Path(output_root).expanduser().resolve()):
if line:
if line.strip() == "Pipeline completed successfully.":
if maybe_artifact:
artifact_path = maybe_artifact
continue
display_line = _display_log_line(line)
if display_line:
logs.append(display_line)
status = _status_from_line(display_line, status)
if display_line.startswith("Pipeline stopped:"):
pipeline_stopped_reason = display_line
if "failed" in display_line.lower():
pipeline_failed_reason = display_line
event = _format_step_event(display_line)
if event and event not in seen_events:
seen_events.add(event)
events.append(event)
if maybe_artifact:
artifact_path = maybe_artifact
status_placeholder.info(f"Current status: {status}")
render_activity(events)
run_dir_path = None
paper_dir_path = None
remote_artifact_ref = ""
if artifact_path:
job_dir = Path(str(artifact_path)).with_suffix("")
run_dir_path = str(job_dir)
paper_id = runner_module.parse_arxiv_id(paper_input.strip())
paper_dir = job_dir / "processed_papers" / paper_id
paper_dir_path = str(paper_dir)
if pipeline_failed_reason:
annotation_skipped_reason = f"{pipeline_failed_reason} Annotation was not run."
elif pipeline_stopped_reason:
annotation_skipped_reason = f"{pipeline_stopped_reason} Annotation was not run."
else:
discovery = _load_json(paper_dir / "usage_discovery_from_contributions.json") or {}
refined_clusters = discovery.get("clusters") or []
if not refined_clusters:
annotation_skipped_reason = "No valid downstream usage clusters remained after refinement and filtering. Annotation was skipped."
logs.append("[annotation] skipped: no refined downstream usage clusters")
else:
append_display_line("[annotation] starting cluster-first two-pass annotation")
status_placeholder.info("Current status: Running annotation")
try:
run_output, annotation_run_dir = run_two_pass_annotation(
paper_dir=paper_dir,
annotation_output_root=job_dir / "two_pass_outputs",
llm_provider=llm_provider,
llm_model=llm_model,
formatter_model=formatter_model,
judge_model=judge_model,
candidate_count=candidate_count,
)
payload_path = run_output.get("ui_payload_path") if isinstance(run_output, dict) else None
if payload_path and Path(payload_path).exists():
annotation_payload_path = str(Path(payload_path))
append_display_line(f"[annotation] complete: {annotation_run_dir}")
except Exception as exc:
pipeline_failed_reason = f"Annotation failed: {exc}"
annotation_skipped_reason = pipeline_failed_reason
logs.append(f"[annotation] failed: {exc}")
logs.append("[upload] uploading run artifact to Hugging Face dataset")
status_placeholder.info("Current status: Finalizing run")
remote_artifact_ref = upload_run_artifact(job_dir)
if remote_artifact_ref:
logs.append(f"[upload] {remote_artifact_ref}")
else:
logs.append("[upload] skipped: RUNS_REPO_ID/HF_WRITE_TOKEN not configured")
if not pipeline_stopped_reason and not pipeline_failed_reason:
append_display_line("Pipeline completed successfully.")
if pipeline_failed_reason:
status = "Failed"
elif artifact_path and pipeline_stopped_reason:
status = "Stopped"
else:
status = "Completed" if artifact_path else "Failed"
if status == "Completed":
status_placeholder.success(f"Final status: {status}")
elif status == "Stopped":
status_placeholder.warning(f"Final status: {status}")
else:
status_placeholder.error("Final status: Failed")
st.session_state["run_status"] = status
st.session_state["run_logs"] = logs
st.session_state["run_events"] = events
st.session_state["artifact_path"] = artifact_path
st.session_state["run_dir_path"] = run_dir_path
st.session_state["paper_dir_path"] = paper_dir_path
st.session_state["annotation_payload_path"] = annotation_payload_path
st.session_state["annotation_skipped_reason"] = annotation_skipped_reason
st.session_state["pipeline_stopped_reason"] = pipeline_stopped_reason
st.session_state["pipeline_failed_reason"] = pipeline_failed_reason
st.session_state["run_summary"] = run_summary
st.session_state["remote_artifact_ref"] = remote_artifact_ref
def _load_result_bundle():
paper_dir_path = st.session_state.get("paper_dir_path")
annotation_payload_path = st.session_state.get("annotation_payload_path")
paper_dir = Path(paper_dir_path) if paper_dir_path else None
payload = _load_json(Path(annotation_payload_path)) if annotation_payload_path else None
discovery = _load_json(paper_dir / "usage_discovery_from_contributions.json") if paper_dir and paper_dir.exists() else None
contributions_data = _load_json(paper_dir / "usage_contributions.json") if paper_dir and paper_dir.exists() else None
contributions = (contributions_data or {}).get("contributions") or []
return paper_dir, discovery, contributions, payload
def _render_overview(payload: Optional[dict], discovery: Optional[dict]):
claims = (payload or {}).get("claims") or []
ingredients = sum(len(claim.get("ingredients") or []) for claim in claims)
studies = sum(
len(_collect_grounded_studies(claim.get("enabling_discoveries") or [], claim.get("ingredients") or []))
for claim in claims
)
clusters = len((discovery or {}).get("clusters") or [])
c1, c2, c3, c4 = st.columns(4)
with c1:
_metric_card("Refined clusters", clusters)
with c2:
_metric_card("Target contributions", len(claims))
with c3:
_metric_card("Enabling contributions", ingredients)
with c4:
_metric_card("Grounded studies", studies)
def _build_public_export(discovery: Optional[dict], payload: Optional[dict]) -> dict:
claims = []
for claim in (payload or {}).get("claims") or []:
if not isinstance(claim, dict):
continue
ingredients = []
for ingredient in claim.get("ingredients") or []:
if not isinstance(ingredient, dict):
continue
ingredients.append({
"ingredient_id": ingredient.get("ingredient_id"),
"enabling_contribution": ingredient.get("ingredient"),
"canonical_annotation": ingredient.get("canonical_annotation") or {},
"primary_grounding": ingredient.get("canonical_grounding") or {},
"additional_groundings": ingredient.get("additional_groundings") or [],
})
claims.append({
"claim_id": claim.get("claim_id"),
"target_contribution": claim.get("rewritten_claim") or claim.get("text"),
"cluster_id": claim.get("cluster_id"),
"decision": claim.get("decision"),
"enabling_contributions": ingredients,
"grounded_studies": _collect_grounded_studies(claim.get("enabling_discoveries") or [], claim.get("ingredients") or []),
})
return {
"citation_clusters": (discovery or {}).get("clusters") or [],
"target_contribution_decompositions": claims,
}
def main():
llm_provider = os.getenv("LLM_PROVIDER", "gemini")
llm_model = os.getenv("LLM_MODEL", "gemini-3.1-pro-preview")
llm_model_step4 = os.getenv("LLM_MODEL_STEP4", "gemini-3-flash-preview")
formatter_model = os.getenv("ANNOTATION_FORMATTER_MODEL", "gemini/gemini-3.1-pro-preview")
judge_model = os.getenv("ANNOTATION_JUDGE_MODEL", "gemini/gemini-3.1-pro-preview")
candidate_count = int(os.getenv("ANNOTATION_CANDIDATE_COUNT", "3"))
source_root = DEFAULT_SOURCE_ROOT
output_root = DEFAULT_OUTPUT_ROOT
st.set_page_config(page_title="Forecasting Scientific Contribution Pathways", page_icon="đ", layout="wide")
st.markdown(CUSTOM_CSS, unsafe_allow_html=True)
_ensure_state()
with st.sidebar:
st.markdown("## SciPaths")
st.caption("Enter an arXiv paper and run the target-contribution pathway annotation pipeline.")
st.divider()
st.markdown("### Citation")
st.caption("If you find this useful, please cite our paper as:")
st.code(
"@misc{chamoun2026scipathsforecastingpathwaysscientific,\n"
" title={SciPaths: Forecasting Pathways to Scientific Discovery}, \n"
" author={Eric Chamoun and Yizhou Chi and Yulong Chen and Rui Cao and Zifeng Ding and Michalis Korakakis and Andreas Vlachos},\n"
" year={2026},\n"
" eprint={2605.14600},\n"
" archivePrefix={arXiv},\n"
" primaryClass={cs.CL},\n"
" url={https://arxiv.org/abs/2605.14600}, \n"
"}",
language="bibtex",
)
st.caption("Paper URL: https://arxiv.org/abs/2605.14600")
st.caption("Questions or feedback: ec806@cam.ac.uk")
st.divider()
if st.button("Clear chat / restart", use_container_width=True):
for key in [
"paper_input", "run_status", "run_logs", "run_events", "artifact_path",
"run_dir_path", "paper_dir_path", "annotation_payload_path",
"run_summary", "annotation_skipped_reason", "pipeline_stopped_reason",
"pipeline_failed_reason", "remote_artifact_ref",
]:
if key in st.session_state:
del st.session_state[key]
st.rerun()
if not get_secret("GEMINI_API_KEY"):
st.warning("No GEMINI_API_KEY found in environment or secrets.", icon="đ")
st.markdown("Forecasting Scientific Contribution Pathways
", unsafe_allow_html=True)
st.markdown(
"Run the SciPaths pipeline through refined downstream citation clusters, then derive target contributions from those clusters and decompose each target contribution into enabling contributions and grounded studies.
",
unsafe_allow_html=True,
)
tabs = st.tabs(TAB_NAMES)
with tabs[0]:
with st.expander("Try an example", expanded=True):
cols = st.columns(len(EXAMPLES))
for i, (label, value) in enumerate(EXAMPLES.items()):
with cols[i]:
if st.button(label, key=f"example::{label}", use_container_width=True):
st.session_state["paper_input"] = value
st.rerun()
paper_input = st.text_input(
"Paper input (arXiv URL or ID)",
key="paper_input",
placeholder="https://arxiv.org/abs/2311.14919",
)
if st.button("Run pipeline + annotation", type="primary", use_container_width=True):
if not paper_input.strip():
st.error("Paper input is required.")
else:
run_pipeline_stream(
paper_input=paper_input,
source_root=source_root,
output_root=output_root,
llm_provider=llm_provider,
llm_model=llm_model,
llm_model_step4=llm_model_step4,
formatter_model=formatter_model,
judge_model=judge_model,
candidate_count=candidate_count,
)
st.markdown("### Latest run")
st.info(f"Status: {st.session_state.get('run_status', 'Idle')}")
if st.session_state.get("pipeline_failed_reason"):
st.error(st.session_state["pipeline_failed_reason"])
if st.session_state.get("annotation_skipped_reason"):
st.warning(st.session_state["annotation_skipped_reason"])
paper_dir, discovery, contributions, payload = _load_result_bundle()
public_export = _build_public_export(discovery, payload)
if public_export["citation_clusters"] or public_export["target_contribution_decompositions"]:
st.download_button(
"Download citation clusters and contribution groundings",
data=json.dumps(public_export, indent=2, ensure_ascii=False),
file_name="scipaths_run_results.json",
mime="application/json",
use_container_width=False,
)
_render_overview(payload, discovery)
with tabs[1]:
paper_dir, discovery, contributions, payload = _load_result_bundle()
_render_clusters_tab(discovery, contributions)
with tabs[2]:
paper_dir, discovery, contributions, payload = _load_result_bundle()
_render_claims_tab(payload)
if __name__ == "__main__":
main()