Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| CR Application Tool β Streamlit frontend. | |
| Three-step UI: | |
| 1. UPLOAD β upload Excel contribution list | |
| 2. PREVIEW β review accepted CRs | |
| 3. RUNNING β pipeline subprocess with live log | |
| 4. DONE/ERROR β download ZIP of results | |
| """ | |
| import io | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| import uuid | |
| import zipfile | |
| from datetime import datetime | |
| from pathlib import Path | |
| import streamlit as st | |
| # ββ EOL credential verification βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def verify_eol_credentials(username: str, password: str) -> bool: | |
| import json as _json | |
| import urllib3 | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| import requests as _req | |
| session = _req.Session() | |
| session.get( | |
| "https://portal.etsi.org/LoginRedirection.aspx", | |
| verify=False, | |
| timeout=10, | |
| ) | |
| resp = session.post( | |
| "https://portal.etsi.org/ETSIPages/LoginEOL.ashx", | |
| data=_json.dumps({"username": username, "password": password}), | |
| headers={"Content-Type": "application/json; charset=UTF-8"}, | |
| verify=False, | |
| allow_redirects=False, | |
| timeout=10, | |
| ) | |
| return resp.text.strip() != "Failed" | |
| # ββ Scripts dir (same folder as app.py / scripts/) βββββββββββββββββββββββββββ | |
| SCRIPTS_DIR = Path(__file__).parent / "scripts" | |
| sys.path.insert(0, str(SCRIPTS_DIR)) | |
| # ββ Session persistence βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_session_base() -> Path: | |
| """Use /data/cr_sessions if writable (HF persistent storage), else /tmp.""" | |
| candidate = Path("/data/cr_sessions") | |
| try: | |
| candidate.mkdir(parents=True, exist_ok=True) | |
| probe = candidate / ".write_test" | |
| probe.write_text("x") | |
| probe.unlink() | |
| return candidate | |
| except OSError: | |
| fallback = Path("/tmp/cr_sessions") | |
| fallback.mkdir(parents=True, exist_ok=True) | |
| return fallback | |
| SESSION_BASE = _get_session_base() | |
| def session_dir(sid: str) -> Path: | |
| d = SESSION_BASE / sid | |
| d.mkdir(parents=True, exist_ok=True) | |
| return d | |
| def _state_path(sid: str) -> Path: | |
| return session_dir(sid) / "state.json" | |
| def load_state(sid: str) -> dict | None: | |
| p = _state_path(sid) | |
| if p.exists(): | |
| try: | |
| return json.loads(p.read_text()) | |
| except Exception: | |
| return None | |
| return None | |
| def save_state(sid: str, state: dict) -> None: | |
| _state_path(sid).write_text(json.dumps(state, indent=2, default=str)) | |
| def new_state(sid: str) -> dict: | |
| return { | |
| "session_id": sid, | |
| "status": "login", | |
| "excel_filename": None, | |
| "person_name": "Ly Thanh PHAN", | |
| "cr_list": [], | |
| "pid": None, | |
| "output_dir": None, | |
| "log_path": None, | |
| "started_at": None, | |
| "completed_at": None, | |
| "return_code": None, | |
| } | |
| # ββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _rc_path(sid: str) -> Path: | |
| return session_dir(sid) / "returncode" | |
| def _run_and_save_rc(proc: subprocess.Popen, rc_path: Path) -> None: | |
| """Background thread: wait for process, write return code to disk.""" | |
| proc.wait() | |
| rc_path.write_text(str(proc.returncode)) | |
| def read_return_code(sid: str) -> int | None: | |
| p = _rc_path(sid) | |
| if p.exists(): | |
| try: | |
| return int(p.read_text().strip()) | |
| except ValueError: | |
| return None | |
| return None | |
| def is_process_alive(pid: int) -> bool: | |
| try: | |
| os.kill(pid, 0) | |
| return True | |
| except (ProcessLookupError, PermissionError): | |
| return False | |
| def tail_log(log_path: str, n: int = 100) -> str: | |
| p = Path(log_path) | |
| if not p.exists(): | |
| return "(log not yet availableβ¦)" | |
| lines = p.read_text(errors="replace").splitlines() | |
| return "\n".join(lines[-n:]) | |
| def parse_log_results(log_path: str) -> list[dict]: | |
| """Extract per-TS result lines and warning messages from the Final/Retry Report.""" | |
| p = Path(log_path) | |
| if not p.exists(): | |
| return [] | |
| lines = p.read_text(errors="replace").splitlines() | |
| results, in_report = [], False | |
| current = None | |
| for line in lines: | |
| if "Final Report" in line or "Retry Summary" in line: | |
| in_report = True | |
| continue | |
| if not in_report: | |
| continue | |
| matched = False | |
| for tag in ("OK", "WARN", "FAIL", "SKIP"): | |
| if f"[{tag}]" in line: | |
| if current is not None: | |
| results.append(current) | |
| ts_name = line.split(f"[{tag}]", 1)[-1].strip() | |
| current = {"Status": tag, "TS": ts_name, "warnings": []} | |
| matched = True | |
| break | |
| if not matched and current is not None: | |
| stripped = line.strip() | |
| if stripped.startswith("! "): | |
| current["warnings"].append(stripped[2:]) | |
| if current is not None: | |
| results.append(current) | |
| return results | |
| def peek_submitted_by(excel_path: Path, max_names: int = 20) -> list[str]: | |
| """Return unique non-empty SubmittedBy values from the Excel (best-effort).""" | |
| try: | |
| ext = excel_path.suffix.lower() | |
| names: set[str] = set() | |
| if ext == ".xls": | |
| import xlrd | |
| wb = xlrd.open_workbook(str(excel_path)) | |
| try: | |
| ws = wb.sheet_by_name("Contributions") | |
| except xlrd.XLRDError: | |
| ws = wb.sheet_by_index(0) | |
| headers = [str(ws.cell_value(0, c)).strip() for c in range(ws.ncols)] | |
| by_col = next( | |
| (i for i, h in enumerate(headers) | |
| if h.lower() in ("submittedby", "submitted by")), | |
| None, | |
| ) | |
| if by_col is not None: | |
| for r in range(1, ws.nrows): | |
| v = str(ws.cell_value(r, by_col)).strip() | |
| if v: | |
| names.add(v) | |
| elif ext == ".xlsx": | |
| import openpyxl | |
| wb = openpyxl.load_workbook(str(excel_path), read_only=True, data_only=True) | |
| ws = wb["Contributions"] if "Contributions" in wb.sheetnames else wb.active | |
| rows = iter(ws.iter_rows(values_only=True)) | |
| headers = [str(c).strip() if c is not None else "" for c in next(rows, [])] | |
| by_col = next( | |
| (i for i, h in enumerate(headers) | |
| if h.lower() in ("submittedby", "submitted by")), | |
| None, | |
| ) | |
| if by_col is not None: | |
| for row in rows: | |
| v = str(row[by_col]).strip() if row[by_col] is not None else "" | |
| if v and v != "None": | |
| names.add(v) | |
| return sorted(names)[:max_names] | |
| except Exception: | |
| return [] | |
| def make_zip(output_dir: Path) -> bytes: | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: | |
| for f in output_dir.rglob("*"): | |
| if f.is_file(): | |
| zf.write(f, f.relative_to(output_dir.parent)) | |
| buf.seek(0) | |
| return buf.read() | |
| # ββ Page config βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| st.set_page_config( | |
| page_title="CR Application Tool", | |
| page_icon="π", | |
| layout="centered", | |
| ) | |
| st.title("π CR Application Tool") | |
| st.caption("Upload an ETSI/3GPP Excel contribution list β preview accepted CRs β apply all β download ZIP.") | |
| # ββ Session init ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| params = st.query_params | |
| if "sid" not in st.session_state: | |
| if "sid" in params: | |
| candidate = params["sid"] | |
| existing = load_state(candidate) | |
| if existing: | |
| st.session_state.sid = candidate | |
| st.session_state.state = existing | |
| else: | |
| sid = str(uuid.uuid4()) | |
| st.session_state.sid = sid | |
| st.session_state.state = new_state(sid) | |
| st.query_params["sid"] = sid | |
| else: | |
| sid = str(uuid.uuid4()) | |
| st.session_state.sid = sid | |
| st.session_state.state = new_state(sid) | |
| st.query_params["sid"] = sid | |
| sid: str = st.session_state.sid | |
| state: dict = st.session_state.state | |
| # Credential guard: if credentials are not in memory (e.g. page refresh after login), | |
| # force re-login regardless of the persisted status. | |
| if state.get("status") not in ("login",) and "eol_user" not in st.session_state: | |
| state["status"] = "login" | |
| # ββ Sidebar βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with st.sidebar: | |
| st.header("Session") | |
| st.caption(f"ID: `{sid[:8]}β¦`") | |
| st.divider() | |
| st.subheader("Resume a session") | |
| resume_sid = st.text_input("Paste a session ID") | |
| if st.button("Resume") and resume_sid.strip(): | |
| existing = load_state(resume_sid.strip()) | |
| if existing: | |
| st.session_state.sid = resume_sid.strip() | |
| st.session_state.state = existing | |
| st.query_params["sid"] = resume_sid.strip() | |
| st.rerun() | |
| else: | |
| st.error("Session not found.") | |
| # ββ State machine βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| status: str = state["status"] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LOGIN | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if status == "login": | |
| st.subheader("Connect with your ETSI EOL account") | |
| st.info( | |
| "Your credentials are used only for this session and are never stored on disk.", | |
| icon="π", | |
| ) | |
| username = st.text_input("EOL Username") | |
| password = st.text_input("EOL Password", type="password") | |
| if st.button("Connect", type="primary"): | |
| if not username or not password: | |
| st.error("Please enter both username and password.") | |
| else: | |
| with st.spinner("Verifying credentialsβ¦"): | |
| ok = verify_eol_credentials(username, password) | |
| if ok: | |
| st.session_state.eol_user = username | |
| st.session_state.eol_password = password | |
| state["status"] = "upload" | |
| save_state(sid, state) | |
| st.rerun() | |
| else: | |
| st.error("Login failed β check your EOL username and password.") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # UPLOAD | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif status == "upload": | |
| st.subheader("Step 1 β Upload contribution list") | |
| uploaded = st.file_uploader( | |
| "Excel contribution list (.xlsx or .xls)", | |
| type=["xlsx", "xls"], | |
| ) | |
| person_name = st.text_input( | |
| "Contributor name (must match SubmittedBy column)", | |
| value=state.get("person_name", "Ly Thanh PHAN"), | |
| ) | |
| if uploaded and st.button("Parse CR list β", type="primary"): | |
| excel_path = session_dir(sid) / uploaded.name | |
| excel_path.write_bytes(uploaded.getbuffer()) | |
| with st.spinner("Parsing Excelβ¦"): | |
| try: | |
| from fetch_crs import parse_excel | |
| cr_list = parse_excel(str(excel_path), person_name) | |
| state["status"] = "preview" | |
| state["excel_filename"] = uploaded.name | |
| state["person_name"] = person_name | |
| state["cr_list"] = [list(row) for row in cr_list] | |
| save_state(sid, state) | |
| st.rerun() | |
| except Exception as exc: | |
| st.error(f"Failed to parse Excel: {exc}") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PREVIEW | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif status == "preview": | |
| cr_list = state["cr_list"] | |
| st.subheader(f"Step 2 β {len(cr_list)} Accepted CR(s) found") | |
| if cr_list: | |
| import pandas as pd | |
| df = pd.DataFrame(cr_list, columns=["UID", "Title"]) | |
| st.dataframe(df, use_container_width=True) | |
| else: | |
| st.warning( | |
| f"No Accepted CRs found for **{state['person_name']}** in this file." | |
| ) | |
| # Diagnostic: show what names are in the SubmittedBy column | |
| excel_path = session_dir(sid) / state["excel_filename"] | |
| found_names = peek_submitted_by(excel_path) | |
| if found_names: | |
| st.info( | |
| "**Names found in SubmittedBy column** β copy the exact one into the field above and re-upload:\n\n" | |
| + "\n".join(f"- `{n}`" for n in found_names) | |
| ) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("β Back"): | |
| state["status"] = "upload" | |
| state["cr_list"] = [] | |
| save_state(sid, state) | |
| st.rerun() | |
| with col2: | |
| if cr_list and st.button("βΆ Start Pipeline", type="primary"): | |
| excel_path = session_dir(sid) / state["excel_filename"] | |
| output_dir = session_dir(sid) / "output" | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| log_path = session_dir(sid) / "pipeline.log" | |
| rc_path = _rc_path(sid) | |
| cmd = [ | |
| sys.executable, | |
| str(SCRIPTS_DIR / "orchestrate_cr.py"), | |
| str(excel_path), | |
| state["person_name"], | |
| "--output-dir", str(output_dir), | |
| ] | |
| env = os.environ.copy() | |
| env["EOL_USER"] = st.session_state.eol_user | |
| env["EOL_PASSWORD"] = st.session_state.eol_password | |
| log_file = open(str(log_path), "w") | |
| proc = subprocess.Popen( | |
| cmd, | |
| stdout=log_file, | |
| stderr=subprocess.STDOUT, | |
| env=env, | |
| ) | |
| log_file.close() | |
| # Background thread writes returncode file when process finishes | |
| threading.Thread( | |
| target=_run_and_save_rc, | |
| args=(proc, rc_path), | |
| daemon=True, | |
| ).start() | |
| st.session_state.proc = proc | |
| state["status"] = "running" | |
| state["pid"] = proc.pid | |
| state["output_dir"] = str(output_dir) | |
| state["log_path"] = str(log_path) | |
| state["started_at"] = datetime.now().isoformat() | |
| save_state(sid, state) | |
| st.rerun() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # RUNNING | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif status == "running": | |
| pid = state["pid"] | |
| log_path = state["log_path"] | |
| # Determine whether process is still alive | |
| proc = st.session_state.get("proc") | |
| alive = False | |
| if proc is not None: | |
| alive = proc.poll() is None | |
| else: | |
| # Session reloaded β check returncode file, then PID | |
| rc = read_return_code(sid) | |
| if rc is None: | |
| alive = is_process_alive(pid) | |
| if alive: | |
| st.subheader("β³ Pipeline runningβ¦") | |
| st.info(f"PID {pid} β started {state.get('started_at', '')[:19]}") | |
| log_text = tail_log(log_path, 100) | |
| st.text_area("Live log (last 100 lines)", value=log_text, height=400) | |
| time.sleep(2) | |
| st.rerun() | |
| else: | |
| # Process finished β determine return code | |
| rc = read_return_code(sid) | |
| if rc is None and proc is not None: | |
| rc = proc.returncode | |
| state["return_code"] = rc | |
| state["completed_at"] = datetime.now().isoformat() | |
| state["status"] = "done" if rc == 0 else "error" | |
| save_state(sid, state) | |
| st.rerun() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DONE / ERROR | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif status in ("done", "error"): | |
| log_path = state.get("log_path", "") | |
| output_dir = Path(state.get("output_dir", "")) | |
| rc = state.get("return_code") | |
| if status == "done": | |
| st.success("β Pipeline completed successfully!") | |
| else: | |
| st.error(f"β Pipeline finished with errors (return code: {rc})") | |
| # Per-TS results table β merge all pipeline logs so retry results don't | |
| # replace original ones; later logs (pipeline_retry.log) supersede earlier | |
| # ones (pipeline.log) for the same TS key. | |
| _merged: dict[str, dict] = {} | |
| for _lf in sorted(session_dir(sid).glob("pipeline*.log")): | |
| for _r in parse_log_results(str(_lf)): | |
| _merged[_r["TS"]] = _r | |
| results = list(_merged.values()) | |
| if results: | |
| st.subheader("Results per TS") | |
| import pandas as pd | |
| n_warn = sum(1 for r in results if r["warnings"]) | |
| warn_label = f"Warnings ({n_warn})" if n_warn else "Warnings" | |
| tab_summary, tab_warnings = st.tabs(["Summary", warn_label]) | |
| def _color_status(val): | |
| return { | |
| "OK": "background-color: #d4edda; color: #155724", | |
| "WARN": "background-color: #fff3cd; color: #856404", | |
| "FAIL": "background-color: #f8d7da; color: #721c24", | |
| "SKIP": "background-color: #e2e3e5; color: #383d41", | |
| }.get(val, "") | |
| with tab_summary: | |
| df = pd.DataFrame([{"Status": r["Status"], "TS": r["TS"]} for r in results]) | |
| st.dataframe( | |
| df.style.map(_color_status, subset=["Status"]), | |
| use_container_width=True, | |
| ) | |
| with tab_warnings: | |
| warned = [r for r in results if r["warnings"]] | |
| if warned: | |
| for r in warned: | |
| with st.expander(f"β οΈ {r['TS']} β {len(r['warnings'])} warning(s)"): | |
| for w in r["warnings"]: | |
| st.text(w) | |
| else: | |
| st.success("No warnings.") | |
| # Download ZIP | |
| if output_dir.exists() and any(output_dir.rglob("*")): | |
| st.subheader("Download results") | |
| zip_bytes = make_zip(output_dir) | |
| st.download_button( | |
| label="β¬ Download results ZIP", | |
| data=zip_bytes, | |
| file_name=f"cr_results_{sid[:8]}.zip", | |
| mime="application/zip", | |
| type="primary", | |
| ) | |
| else: | |
| st.warning("Output directory is empty β nothing to download.") | |
| # Full log | |
| with st.expander("Full pipeline log"): | |
| if log_path and Path(log_path).exists(): | |
| st.text(Path(log_path).read_text(errors="replace")) | |
| else: | |
| st.text("Log not found.") | |
| # ββ TS Recovery βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| failed_ts_path = output_dir / "failed_ts.json" | |
| if failed_ts_path.exists(): | |
| failed_ts_entries = json.loads(failed_ts_path.read_text()) | |
| if failed_ts_entries: | |
| st.divider() | |
| st.subheader("β οΈ Recover failed TS downloads") | |
| st.info( | |
| f"{len(failed_ts_entries)} TS(s) could not be downloaded. " | |
| "Retry or upload each one manually, then apply the CRs." | |
| ) | |
| for entry in failed_ts_entries: | |
| spec_key = f"{entry['spec_number']} v{entry['version']}" | |
| dest_path = Path(entry["spec_dir"]) / entry["expected_filename"] | |
| ready = dest_path.exists() | |
| label = f"{'β ' if ready else 'β'} TS {spec_key} β CRs: {', '.join(entry['cr_uids'])}" | |
| with st.expander(label, expanded=not ready): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("π Retry download", | |
| key=f"retry_{entry['spec_compact']}_{entry['version']}"): | |
| from fetch_crs import download_ts as _dl_ts | |
| with st.spinner(f"Downloading TS {spec_key}β¦"): | |
| fn, note = _dl_ts( | |
| entry["spec_number"], entry["version"], | |
| Path(entry["spec_dir"]), | |
| st.session_state.eol_user, | |
| st.session_state.eol_password, | |
| ) | |
| if fn: | |
| st.success(f"Downloaded: {fn}") | |
| st.rerun() | |
| else: | |
| st.error(f"Failed: {note}") | |
| with col2: | |
| uploaded_ts = st.file_uploader( | |
| f"Or upload `{entry['expected_filename']}`", | |
| type=["docx"], | |
| key=f"upload_{entry['spec_compact']}_{entry['version']}", | |
| ) | |
| if uploaded_ts is not None: | |
| Path(entry["spec_dir"]).mkdir(parents=True, exist_ok=True) | |
| dest_path.write_bytes(uploaded_ts.read()) | |
| st.success("Saved β") | |
| st.rerun() | |
| # Global apply button β enabled when β₯1 TS is now on disk | |
| ready_entries = [ | |
| e for e in failed_ts_entries | |
| if (Path(e["spec_dir"]) / e["expected_filename"]).exists() | |
| ] | |
| remaining = len(failed_ts_entries) - len(ready_entries) | |
| if ready_entries: | |
| if remaining: | |
| st.warning(f"{len(ready_entries)} ready, {remaining} will be skipped.") | |
| else: | |
| st.success(f"All {len(ready_entries)} TS(s) ready.") | |
| if st.button("βΆ Apply CRs to recovered TSs", type="primary"): | |
| retry_log = str(session_dir(sid) / "pipeline_retry.log") | |
| _rc_path(sid).unlink(missing_ok=True) # clear old returncode | |
| cmd = [ | |
| sys.executable, | |
| str(SCRIPTS_DIR / "orchestrate_cr.py"), | |
| "--output-dir", state["output_dir"], | |
| "--retry-mode", | |
| ] | |
| env = os.environ.copy() | |
| env["EOL_USER"] = st.session_state.eol_user | |
| env["EOL_PASSWORD"] = st.session_state.eol_password | |
| log_file = open(retry_log, "w") | |
| proc = subprocess.Popen( | |
| cmd, stdout=log_file, stderr=subprocess.STDOUT, env=env | |
| ) | |
| log_file.close() | |
| threading.Thread( | |
| target=_run_and_save_rc, | |
| args=(proc, _rc_path(sid)), | |
| daemon=True, | |
| ).start() | |
| st.session_state.proc = proc | |
| state["status"] = "running" | |
| state["pid"] = proc.pid | |
| state["log_path"] = retry_log | |
| state["started_at"] = datetime.now().isoformat() | |
| save_state(sid, state) | |
| st.rerun() | |
| else: | |
| st.warning("No TSs available yet β retry download or upload DOCX files above.") | |
| # Start new session | |
| st.divider() | |
| if st.button("Start new session"): | |
| new_sid = str(uuid.uuid4()) | |
| st.session_state.sid = new_sid | |
| st.session_state.state = new_state(new_sid) | |
| if "proc" in st.session_state: | |
| del st.session_state.proc | |
| st.query_params["sid"] = new_sid | |
| save_state(new_sid, st.session_state.state) | |
| st.rerun() | |