ApplyCRs / app.py
heymenn's picture
modify UI, independant downloads away from docfinder, warnings, retry and manual upload
f8638ca
#!/usr/bin/env python3
"""
CR Application Tool β€” Streamlit frontend.
Three-step UI:
1. UPLOAD β€” upload Excel contribution list
2. PREVIEW β€” review accepted CRs
3. RUNNING β€” pipeline subprocess with live log
4. DONE/ERROR β€” download ZIP of results
"""
import io
import json
import os
import subprocess
import sys
import threading
import time
import uuid
import zipfile
from datetime import datetime
from pathlib import Path
import streamlit as st
# ── EOL credential verification ───────────────────────────────────────────────
def verify_eol_credentials(username: str, password: str) -> bool:
import json as _json
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import requests as _req
session = _req.Session()
session.get(
"https://portal.etsi.org/LoginRedirection.aspx",
verify=False,
timeout=10,
)
resp = session.post(
"https://portal.etsi.org/ETSIPages/LoginEOL.ashx",
data=_json.dumps({"username": username, "password": password}),
headers={"Content-Type": "application/json; charset=UTF-8"},
verify=False,
allow_redirects=False,
timeout=10,
)
return resp.text.strip() != "Failed"
# ── Scripts dir (same folder as app.py / scripts/) ───────────────────────────
SCRIPTS_DIR = Path(__file__).parent / "scripts"
sys.path.insert(0, str(SCRIPTS_DIR))
# ── Session persistence ───────────────────────────────────────────────────────
def _get_session_base() -> Path:
"""Use /data/cr_sessions if writable (HF persistent storage), else /tmp."""
candidate = Path("/data/cr_sessions")
try:
candidate.mkdir(parents=True, exist_ok=True)
probe = candidate / ".write_test"
probe.write_text("x")
probe.unlink()
return candidate
except OSError:
fallback = Path("/tmp/cr_sessions")
fallback.mkdir(parents=True, exist_ok=True)
return fallback
SESSION_BASE = _get_session_base()
def session_dir(sid: str) -> Path:
d = SESSION_BASE / sid
d.mkdir(parents=True, exist_ok=True)
return d
def _state_path(sid: str) -> Path:
return session_dir(sid) / "state.json"
def load_state(sid: str) -> dict | None:
p = _state_path(sid)
if p.exists():
try:
return json.loads(p.read_text())
except Exception:
return None
return None
def save_state(sid: str, state: dict) -> None:
_state_path(sid).write_text(json.dumps(state, indent=2, default=str))
def new_state(sid: str) -> dict:
return {
"session_id": sid,
"status": "login",
"excel_filename": None,
"person_name": "Ly Thanh PHAN",
"cr_list": [],
"pid": None,
"output_dir": None,
"log_path": None,
"started_at": None,
"completed_at": None,
"return_code": None,
}
# ── Helpers ───────────────────────────────────────────────────────────────────
def _rc_path(sid: str) -> Path:
return session_dir(sid) / "returncode"
def _run_and_save_rc(proc: subprocess.Popen, rc_path: Path) -> None:
"""Background thread: wait for process, write return code to disk."""
proc.wait()
rc_path.write_text(str(proc.returncode))
def read_return_code(sid: str) -> int | None:
p = _rc_path(sid)
if p.exists():
try:
return int(p.read_text().strip())
except ValueError:
return None
return None
def is_process_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError):
return False
def tail_log(log_path: str, n: int = 100) -> str:
p = Path(log_path)
if not p.exists():
return "(log not yet available…)"
lines = p.read_text(errors="replace").splitlines()
return "\n".join(lines[-n:])
def parse_log_results(log_path: str) -> list[dict]:
"""Extract per-TS result lines and warning messages from the Final/Retry Report."""
p = Path(log_path)
if not p.exists():
return []
lines = p.read_text(errors="replace").splitlines()
results, in_report = [], False
current = None
for line in lines:
if "Final Report" in line or "Retry Summary" in line:
in_report = True
continue
if not in_report:
continue
matched = False
for tag in ("OK", "WARN", "FAIL", "SKIP"):
if f"[{tag}]" in line:
if current is not None:
results.append(current)
ts_name = line.split(f"[{tag}]", 1)[-1].strip()
current = {"Status": tag, "TS": ts_name, "warnings": []}
matched = True
break
if not matched and current is not None:
stripped = line.strip()
if stripped.startswith("! "):
current["warnings"].append(stripped[2:])
if current is not None:
results.append(current)
return results
def peek_submitted_by(excel_path: Path, max_names: int = 20) -> list[str]:
"""Return unique non-empty SubmittedBy values from the Excel (best-effort)."""
try:
ext = excel_path.suffix.lower()
names: set[str] = set()
if ext == ".xls":
import xlrd
wb = xlrd.open_workbook(str(excel_path))
try:
ws = wb.sheet_by_name("Contributions")
except xlrd.XLRDError:
ws = wb.sheet_by_index(0)
headers = [str(ws.cell_value(0, c)).strip() for c in range(ws.ncols)]
by_col = next(
(i for i, h in enumerate(headers)
if h.lower() in ("submittedby", "submitted by")),
None,
)
if by_col is not None:
for r in range(1, ws.nrows):
v = str(ws.cell_value(r, by_col)).strip()
if v:
names.add(v)
elif ext == ".xlsx":
import openpyxl
wb = openpyxl.load_workbook(str(excel_path), read_only=True, data_only=True)
ws = wb["Contributions"] if "Contributions" in wb.sheetnames else wb.active
rows = iter(ws.iter_rows(values_only=True))
headers = [str(c).strip() if c is not None else "" for c in next(rows, [])]
by_col = next(
(i for i, h in enumerate(headers)
if h.lower() in ("submittedby", "submitted by")),
None,
)
if by_col is not None:
for row in rows:
v = str(row[by_col]).strip() if row[by_col] is not None else ""
if v and v != "None":
names.add(v)
return sorted(names)[:max_names]
except Exception:
return []
def make_zip(output_dir: Path) -> bytes:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for f in output_dir.rglob("*"):
if f.is_file():
zf.write(f, f.relative_to(output_dir.parent))
buf.seek(0)
return buf.read()
# ── Page config ───────────────────────────────────────────────────────────────
st.set_page_config(
page_title="CR Application Tool",
page_icon="πŸ“„",
layout="centered",
)
st.title("πŸ“„ CR Application Tool")
st.caption("Upload an ETSI/3GPP Excel contribution list β†’ preview accepted CRs β†’ apply all β†’ download ZIP.")
# ── Session init ──────────────────────────────────────────────────────────────
params = st.query_params
if "sid" not in st.session_state:
if "sid" in params:
candidate = params["sid"]
existing = load_state(candidate)
if existing:
st.session_state.sid = candidate
st.session_state.state = existing
else:
sid = str(uuid.uuid4())
st.session_state.sid = sid
st.session_state.state = new_state(sid)
st.query_params["sid"] = sid
else:
sid = str(uuid.uuid4())
st.session_state.sid = sid
st.session_state.state = new_state(sid)
st.query_params["sid"] = sid
sid: str = st.session_state.sid
state: dict = st.session_state.state
# Credential guard: if credentials are not in memory (e.g. page refresh after login),
# force re-login regardless of the persisted status.
if state.get("status") not in ("login",) and "eol_user" not in st.session_state:
state["status"] = "login"
# ── Sidebar ───────────────────────────────────────────────────────────────────
with st.sidebar:
st.header("Session")
st.caption(f"ID: `{sid[:8]}…`")
st.divider()
st.subheader("Resume a session")
resume_sid = st.text_input("Paste a session ID")
if st.button("Resume") and resume_sid.strip():
existing = load_state(resume_sid.strip())
if existing:
st.session_state.sid = resume_sid.strip()
st.session_state.state = existing
st.query_params["sid"] = resume_sid.strip()
st.rerun()
else:
st.error("Session not found.")
# ── State machine ─────────────────────────────────────────────────────────────
status: str = state["status"]
# ════════════════════════════════════════════════════════════════════════════
# LOGIN
# ════════════════════════════════════════════════════════════════════════════
if status == "login":
st.subheader("Connect with your ETSI EOL account")
st.info(
"Your credentials are used only for this session and are never stored on disk.",
icon="πŸ”’",
)
username = st.text_input("EOL Username")
password = st.text_input("EOL Password", type="password")
if st.button("Connect", type="primary"):
if not username or not password:
st.error("Please enter both username and password.")
else:
with st.spinner("Verifying credentials…"):
ok = verify_eol_credentials(username, password)
if ok:
st.session_state.eol_user = username
st.session_state.eol_password = password
state["status"] = "upload"
save_state(sid, state)
st.rerun()
else:
st.error("Login failed β€” check your EOL username and password.")
# ════════════════════════════════════════════════════════════════════════════
# UPLOAD
# ════════════════════════════════════════════════════════════════════════════
elif status == "upload":
st.subheader("Step 1 β€” Upload contribution list")
uploaded = st.file_uploader(
"Excel contribution list (.xlsx or .xls)",
type=["xlsx", "xls"],
)
person_name = st.text_input(
"Contributor name (must match SubmittedBy column)",
value=state.get("person_name", "Ly Thanh PHAN"),
)
if uploaded and st.button("Parse CR list β†’", type="primary"):
excel_path = session_dir(sid) / uploaded.name
excel_path.write_bytes(uploaded.getbuffer())
with st.spinner("Parsing Excel…"):
try:
from fetch_crs import parse_excel
cr_list = parse_excel(str(excel_path), person_name)
state["status"] = "preview"
state["excel_filename"] = uploaded.name
state["person_name"] = person_name
state["cr_list"] = [list(row) for row in cr_list]
save_state(sid, state)
st.rerun()
except Exception as exc:
st.error(f"Failed to parse Excel: {exc}")
# ════════════════════════════════════════════════════════════════════════════
# PREVIEW
# ════════════════════════════════════════════════════════════════════════════
elif status == "preview":
cr_list = state["cr_list"]
st.subheader(f"Step 2 β€” {len(cr_list)} Accepted CR(s) found")
if cr_list:
import pandas as pd
df = pd.DataFrame(cr_list, columns=["UID", "Title"])
st.dataframe(df, use_container_width=True)
else:
st.warning(
f"No Accepted CRs found for **{state['person_name']}** in this file."
)
# Diagnostic: show what names are in the SubmittedBy column
excel_path = session_dir(sid) / state["excel_filename"]
found_names = peek_submitted_by(excel_path)
if found_names:
st.info(
"**Names found in SubmittedBy column** β€” copy the exact one into the field above and re-upload:\n\n"
+ "\n".join(f"- `{n}`" for n in found_names)
)
col1, col2 = st.columns(2)
with col1:
if st.button("← Back"):
state["status"] = "upload"
state["cr_list"] = []
save_state(sid, state)
st.rerun()
with col2:
if cr_list and st.button("β–Ά Start Pipeline", type="primary"):
excel_path = session_dir(sid) / state["excel_filename"]
output_dir = session_dir(sid) / "output"
output_dir.mkdir(parents=True, exist_ok=True)
log_path = session_dir(sid) / "pipeline.log"
rc_path = _rc_path(sid)
cmd = [
sys.executable,
str(SCRIPTS_DIR / "orchestrate_cr.py"),
str(excel_path),
state["person_name"],
"--output-dir", str(output_dir),
]
env = os.environ.copy()
env["EOL_USER"] = st.session_state.eol_user
env["EOL_PASSWORD"] = st.session_state.eol_password
log_file = open(str(log_path), "w")
proc = subprocess.Popen(
cmd,
stdout=log_file,
stderr=subprocess.STDOUT,
env=env,
)
log_file.close()
# Background thread writes returncode file when process finishes
threading.Thread(
target=_run_and_save_rc,
args=(proc, rc_path),
daemon=True,
).start()
st.session_state.proc = proc
state["status"] = "running"
state["pid"] = proc.pid
state["output_dir"] = str(output_dir)
state["log_path"] = str(log_path)
state["started_at"] = datetime.now().isoformat()
save_state(sid, state)
st.rerun()
# ════════════════════════════════════════════════════════════════════════════
# RUNNING
# ════════════════════════════════════════════════════════════════════════════
elif status == "running":
pid = state["pid"]
log_path = state["log_path"]
# Determine whether process is still alive
proc = st.session_state.get("proc")
alive = False
if proc is not None:
alive = proc.poll() is None
else:
# Session reloaded β€” check returncode file, then PID
rc = read_return_code(sid)
if rc is None:
alive = is_process_alive(pid)
if alive:
st.subheader("⏳ Pipeline running…")
st.info(f"PID {pid} β€” started {state.get('started_at', '')[:19]}")
log_text = tail_log(log_path, 100)
st.text_area("Live log (last 100 lines)", value=log_text, height=400)
time.sleep(2)
st.rerun()
else:
# Process finished β€” determine return code
rc = read_return_code(sid)
if rc is None and proc is not None:
rc = proc.returncode
state["return_code"] = rc
state["completed_at"] = datetime.now().isoformat()
state["status"] = "done" if rc == 0 else "error"
save_state(sid, state)
st.rerun()
# ════════════════════════════════════════════════════════════════════════════
# DONE / ERROR
# ════════════════════════════════════════════════════════════════════════════
elif status in ("done", "error"):
log_path = state.get("log_path", "")
output_dir = Path(state.get("output_dir", ""))
rc = state.get("return_code")
if status == "done":
st.success("βœ… Pipeline completed successfully!")
else:
st.error(f"❌ Pipeline finished with errors (return code: {rc})")
# Per-TS results table β€” merge all pipeline logs so retry results don't
# replace original ones; later logs (pipeline_retry.log) supersede earlier
# ones (pipeline.log) for the same TS key.
_merged: dict[str, dict] = {}
for _lf in sorted(session_dir(sid).glob("pipeline*.log")):
for _r in parse_log_results(str(_lf)):
_merged[_r["TS"]] = _r
results = list(_merged.values())
if results:
st.subheader("Results per TS")
import pandas as pd
n_warn = sum(1 for r in results if r["warnings"])
warn_label = f"Warnings ({n_warn})" if n_warn else "Warnings"
tab_summary, tab_warnings = st.tabs(["Summary", warn_label])
def _color_status(val):
return {
"OK": "background-color: #d4edda; color: #155724",
"WARN": "background-color: #fff3cd; color: #856404",
"FAIL": "background-color: #f8d7da; color: #721c24",
"SKIP": "background-color: #e2e3e5; color: #383d41",
}.get(val, "")
with tab_summary:
df = pd.DataFrame([{"Status": r["Status"], "TS": r["TS"]} for r in results])
st.dataframe(
df.style.map(_color_status, subset=["Status"]),
use_container_width=True,
)
with tab_warnings:
warned = [r for r in results if r["warnings"]]
if warned:
for r in warned:
with st.expander(f"⚠️ {r['TS']} β€” {len(r['warnings'])} warning(s)"):
for w in r["warnings"]:
st.text(w)
else:
st.success("No warnings.")
# Download ZIP
if output_dir.exists() and any(output_dir.rglob("*")):
st.subheader("Download results")
zip_bytes = make_zip(output_dir)
st.download_button(
label="⬇ Download results ZIP",
data=zip_bytes,
file_name=f"cr_results_{sid[:8]}.zip",
mime="application/zip",
type="primary",
)
else:
st.warning("Output directory is empty β€” nothing to download.")
# Full log
with st.expander("Full pipeline log"):
if log_path and Path(log_path).exists():
st.text(Path(log_path).read_text(errors="replace"))
else:
st.text("Log not found.")
# ── TS Recovery ───────────────────────────────────────────────────────────
failed_ts_path = output_dir / "failed_ts.json"
if failed_ts_path.exists():
failed_ts_entries = json.loads(failed_ts_path.read_text())
if failed_ts_entries:
st.divider()
st.subheader("⚠️ Recover failed TS downloads")
st.info(
f"{len(failed_ts_entries)} TS(s) could not be downloaded. "
"Retry or upload each one manually, then apply the CRs."
)
for entry in failed_ts_entries:
spec_key = f"{entry['spec_number']} v{entry['version']}"
dest_path = Path(entry["spec_dir"]) / entry["expected_filename"]
ready = dest_path.exists()
label = f"{'βœ…' if ready else '❌'} TS {spec_key} β€” CRs: {', '.join(entry['cr_uids'])}"
with st.expander(label, expanded=not ready):
col1, col2 = st.columns(2)
with col1:
if st.button("πŸ”„ Retry download",
key=f"retry_{entry['spec_compact']}_{entry['version']}"):
from fetch_crs import download_ts as _dl_ts
with st.spinner(f"Downloading TS {spec_key}…"):
fn, note = _dl_ts(
entry["spec_number"], entry["version"],
Path(entry["spec_dir"]),
st.session_state.eol_user,
st.session_state.eol_password,
)
if fn:
st.success(f"Downloaded: {fn}")
st.rerun()
else:
st.error(f"Failed: {note}")
with col2:
uploaded_ts = st.file_uploader(
f"Or upload `{entry['expected_filename']}`",
type=["docx"],
key=f"upload_{entry['spec_compact']}_{entry['version']}",
)
if uploaded_ts is not None:
Path(entry["spec_dir"]).mkdir(parents=True, exist_ok=True)
dest_path.write_bytes(uploaded_ts.read())
st.success("Saved βœ“")
st.rerun()
# Global apply button β€” enabled when β‰₯1 TS is now on disk
ready_entries = [
e for e in failed_ts_entries
if (Path(e["spec_dir"]) / e["expected_filename"]).exists()
]
remaining = len(failed_ts_entries) - len(ready_entries)
if ready_entries:
if remaining:
st.warning(f"{len(ready_entries)} ready, {remaining} will be skipped.")
else:
st.success(f"All {len(ready_entries)} TS(s) ready.")
if st.button("β–Ά Apply CRs to recovered TSs", type="primary"):
retry_log = str(session_dir(sid) / "pipeline_retry.log")
_rc_path(sid).unlink(missing_ok=True) # clear old returncode
cmd = [
sys.executable,
str(SCRIPTS_DIR / "orchestrate_cr.py"),
"--output-dir", state["output_dir"],
"--retry-mode",
]
env = os.environ.copy()
env["EOL_USER"] = st.session_state.eol_user
env["EOL_PASSWORD"] = st.session_state.eol_password
log_file = open(retry_log, "w")
proc = subprocess.Popen(
cmd, stdout=log_file, stderr=subprocess.STDOUT, env=env
)
log_file.close()
threading.Thread(
target=_run_and_save_rc,
args=(proc, _rc_path(sid)),
daemon=True,
).start()
st.session_state.proc = proc
state["status"] = "running"
state["pid"] = proc.pid
state["log_path"] = retry_log
state["started_at"] = datetime.now().isoformat()
save_state(sid, state)
st.rerun()
else:
st.warning("No TSs available yet β€” retry download or upload DOCX files above.")
# Start new session
st.divider()
if st.button("Start new session"):
new_sid = str(uuid.uuid4())
st.session_state.sid = new_sid
st.session_state.state = new_state(new_sid)
if "proc" in st.session_state:
del st.session_state.proc
st.query_params["sid"] = new_sid
save_state(new_sid, st.session_state.state)
st.rerun()