vn6295337 Claude Opus 4.5 commited on
Commit
176e3c1
·
1 Parent(s): 67b4836

Fix temporal data flow with structured payload pattern

Browse files

- research_gateway.py: Pass dict payload instead of positional args
- researcher.py: Add backwards-compatible callback supporting both formats

Ensures fiscal_year, end_date, form fields are preserved through the
A2A → progress_callback → add_metric pipeline.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

src/nodes/research_gateway.py CHANGED
@@ -146,13 +146,18 @@ async def wait_for_completion(
146
  value = metric.get("value")
147
  if not source or not metric_name:
148
  continue
149
- # Extract temporal fields
150
- end_date = metric.get("end_date")
151
- fiscal_year = metric.get("fiscal_year")
152
- form = metric.get("form")
153
  metric_key = f"{source}:{metric_name}:{value}"
154
  if metric_key not in emitted_metrics:
155
- progress_callback(source, metric_name, value, end_date, fiscal_year, form)
 
 
 
 
 
 
 
 
 
156
  emitted_metrics.add(metric_key)
157
 
158
  if status == "completed":
 
146
  value = metric.get("value")
147
  if not source or not metric_name:
148
  continue
 
 
 
 
149
  metric_key = f"{source}:{metric_name}:{value}"
150
  if metric_key not in emitted_metrics:
151
+ # Pass structured payload dict (matches Researcher-Agent emit_metric)
152
+ payload = {
153
+ "source": source,
154
+ "metric": metric_name,
155
+ "value": value,
156
+ "end_date": metric.get("end_date"),
157
+ "fiscal_year": metric.get("fiscal_year"),
158
+ "form": metric.get("form"),
159
+ }
160
+ progress_callback(payload)
161
  emitted_metrics.add(metric_key)
162
 
163
  if status == "completed":
src/nodes/researcher.py CHANGED
@@ -76,12 +76,29 @@ def researcher_node(state, workflow_id=None, progress_store=None):
76
  add_activity_log(workflow_id, step, message)
77
 
78
  # Create progress callback for granular metric events
79
- def progress_callback(source: str, metric: str, value,
80
- end_date: str = None, fiscal_year: int = None, form: str = None):
81
- if workflow_id and progress_store:
82
- # Import here to avoid circular imports
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
  from src.services.workflow_store import add_metric
84
- add_metric(workflow_id, source, metric, value,
85
  end_date=end_date, fiscal_year=fiscal_year, form=form)
86
 
87
  try:
 
76
  add_activity_log(workflow_id, step, message)
77
 
78
  # Create progress callback for granular metric events
79
+ # Supports both dict payload (new) and positional args (legacy)
80
+ def progress_callback(*args, **kwargs):
81
+ if args and isinstance(args[0], dict):
82
+ # New structured payload format
83
+ p = args[0]
84
+ src = p.get("source")
85
+ metric = p.get("metric")
86
+ value = p.get("value")
87
+ end_date = p.get("end_date")
88
+ fiscal_year = p.get("fiscal_year")
89
+ form = p.get("form")
90
+ else:
91
+ # Legacy positional args format
92
+ src = args[0] if len(args) > 0 else kwargs.get("source")
93
+ metric = args[1] if len(args) > 1 else kwargs.get("metric")
94
+ value = args[2] if len(args) > 2 else kwargs.get("value")
95
+ end_date = args[3] if len(args) > 3 else kwargs.get("end_date")
96
+ fiscal_year = args[4] if len(args) > 4 else kwargs.get("fiscal_year")
97
+ form = args[5] if len(args) > 5 else kwargs.get("form")
98
+
99
+ if workflow_id and progress_store and src and metric:
100
  from src.services.workflow_store import add_metric
101
+ add_metric(workflow_id, src, metric, value,
102
  end_date=end_date, fiscal_year=fiscal_year, form=form)
103
 
104
  try: