yakilee Claude Opus 4.6 commited on
Commit
70f82ca
·
1 Parent(s): 9edb6c6

refactor(ui): simplify pipeline progress display with st.progress bar

Browse files

- Replace st.status polling with st.progress bar and step text
- Simplify formatting in direct_pipeline.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

app/pages/2_profile_review.py CHANGED
@@ -4,7 +4,6 @@ import logging
4
  import os
5
  import queue
6
  import threading
7
- import time
8
 
9
  import streamlit as st
10
 
@@ -119,74 +118,71 @@ else:
119
  st.session_state["eligibility_ledger"] = MOCK_ELIGIBILITY_LEDGERS
120
  status.update(label="Loaded demo data (API unavailable)", state="complete")
121
  else:
122
- with st.status("Starting pipeline...", expanded=True) as status:
123
- progress_placeholder = st.empty()
124
- detail_placeholder = st.empty()
125
 
126
- try:
127
- from app.services.direct_pipeline import (
128
- PipelineProgress,
129
- get_progress_queue,
130
- run_trial_search_and_evaluate,
131
- )
132
-
133
- progress_q = get_progress_queue()
134
-
135
- # Drain any stale progress messages
136
- while not progress_q.empty():
137
- try:
138
- progress_q.get_nowait()
139
- except queue.Empty:
140
- break
141
-
142
- result_container: dict = {"result": None, "error": None}
143
-
144
- def _run_pipeline():
145
- try:
146
- result_container["result"] = run_trial_search_and_evaluate(
147
- profile, progress_q=progress_q
148
- )
149
- except Exception as e:
150
- result_container["error"] = e
151
-
152
- thread = threading.Thread(target=_run_pipeline, daemon=True)
153
- thread.start()
154
-
155
- # Poll progress queue and update UI
156
- while thread.is_alive():
157
- try:
158
- update = progress_q.get(timeout=0.5)
159
- progress_placeholder.markdown(f"**{update.message}**")
160
- if update.detail:
161
- detail_placeholder.caption(update.detail)
162
- status.update(label=update.message, state="running")
163
- except queue.Empty:
164
- pass
165
-
166
- # Drain remaining progress messages
167
- while not progress_q.empty():
168
- try:
169
- update = progress_q.get_nowait()
170
- progress_placeholder.markdown(f"**{update.message}**")
171
- except queue.Empty:
172
- break
173
-
174
- if result_container["error"]:
175
- raise result_container["error"]
176
-
177
- trials, ledgers = result_container["result"]
178
- if not trials:
179
- raise ValueError("No trials returned from pipeline")
180
- st.session_state["trial_candidates"] = trials
181
- st.session_state["eligibility_ledger"] = ledgers
182
- status.update(label="Trial search complete!", state="complete")
183
- except Exception:
184
- logger.exception("Direct pipeline failed, using mock data")
185
- st.warning("Real API unavailable — loading demo data instead.")
186
- st.session_state["trial_candidates"] = MOCK_TRIAL_CANDIDATES
187
- st.session_state["eligibility_ledger"] = MOCK_ELIGIBILITY_LEDGERS
188
- status.update(
189
- label="Loaded demo data (API unavailable)", state="complete"
190
- )
191
  advance_journey("VALIDATE_TRIALS")
192
  st.rerun()
 
4
  import os
5
  import queue
6
  import threading
 
7
 
8
  import streamlit as st
9
 
 
118
  st.session_state["eligibility_ledger"] = MOCK_ELIGIBILITY_LEDGERS
119
  status.update(label="Loaded demo data (API unavailable)", state="complete")
120
  else:
121
+ progress_bar = st.progress(0, text="Starting pipeline...")
122
+ step_text = st.empty()
 
123
 
124
+ try:
125
+ from app.services.direct_pipeline import (
126
+ get_progress_queue,
127
+ run_trial_search_and_evaluate,
128
+ )
129
+
130
+ progress_q = get_progress_queue()
131
+
132
+ # Drain any stale progress messages
133
+ while not progress_q.empty():
134
+ try:
135
+ progress_q.get_nowait()
136
+ except queue.Empty:
137
+ break
138
+
139
+ result_container: dict = {"result": None, "error": None}
140
+
141
+ def _run_pipeline():
142
+ try:
143
+ result_container["result"] = run_trial_search_and_evaluate(
144
+ profile, progress_q=progress_q
145
+ )
146
+ except Exception as e:
147
+ result_container["error"] = e
148
+
149
+ thread = threading.Thread(target=_run_pipeline, daemon=True)
150
+ thread.start()
151
+
152
+ # Poll progress queue and update progress bar
153
+ while thread.is_alive():
154
+ try:
155
+ update = progress_q.get(timeout=0.5)
156
+ pct = max(0, min(int(update.pct * 100), 100))
157
+ progress_bar.progress(pct, text=update.message)
158
+ if update.detail:
159
+ step_text.caption(update.detail)
160
+ except queue.Empty:
161
+ pass
162
+
163
+ # Drain remaining progress messages
164
+ while not progress_q.empty():
165
+ try:
166
+ update = progress_q.get_nowait()
167
+ pct = max(0, min(int(update.pct * 100), 100))
168
+ progress_bar.progress(pct, text=update.message)
169
+ except queue.Empty:
170
+ break
171
+
172
+ if result_container["error"]:
173
+ raise result_container["error"]
174
+
175
+ trials, ledgers = result_container["result"]
176
+ if not trials:
177
+ raise ValueError("No trials returned from pipeline")
178
+ st.session_state["trial_candidates"] = trials
179
+ st.session_state["eligibility_ledger"] = ledgers
180
+ progress_bar.progress(100, text="Trial search complete!")
181
+ except Exception:
182
+ logger.exception("Direct pipeline failed, using mock data")
183
+ st.warning("Real API unavailable — loading demo data instead.")
184
+ st.session_state["trial_candidates"] = MOCK_TRIAL_CANDIDATES
185
+ st.session_state["eligibility_ledger"] = MOCK_ELIGIBILITY_LEDGERS
186
+ progress_bar.progress(100, text="Loaded demo data")
 
 
187
  advance_journey("VALIDATE_TRIALS")
188
  st.rerun()
app/services/direct_pipeline.py CHANGED
@@ -91,9 +91,7 @@ async def _search_and_evaluate(
91
 
92
  def _report(step: str, message: str, detail: str = "", pct: float = 0.0):
93
  if progress_q:
94
- progress_q.put(
95
- PipelineProgress(step=step, message=message, detail=detail, pct=pct)
96
- )
97
 
98
  planner = GeminiPlanner(api_key=GEMINI_API_KEY)
99
  client = ClinicalTrialsMCPClient()
@@ -153,8 +151,7 @@ async def _search_and_evaluate(
153
  )
154
 
155
  eval_tasks = [
156
- _evaluate_one(planner, profile_dict, profile.patient_id, trial)
157
- for trial in to_evaluate
158
  ]
159
  step_start = time.monotonic()
160
  results = await asyncio.gather(*eval_tasks)
 
91
 
92
  def _report(step: str, message: str, detail: str = "", pct: float = 0.0):
93
  if progress_q:
94
+ progress_q.put(PipelineProgress(step=step, message=message, detail=detail, pct=pct))
 
 
95
 
96
  planner = GeminiPlanner(api_key=GEMINI_API_KEY)
97
  client = ClinicalTrialsMCPClient()
 
151
  )
152
 
153
  eval_tasks = [
154
+ _evaluate_one(planner, profile_dict, profile.patient_id, trial) for trial in to_evaluate
 
155
  ]
156
  step_start = time.monotonic()
157
  results = await asyncio.gather(*eval_tasks)