Instant-SWOT-Agent / src /nodes /analyzer.py
vn6295337's picture
Initial commit: Instant SWOT Agent
0c591a7
raw
history blame
11.5 kB
from src.tools import get_strategy_context
from src.llm_client import get_llm_client
from langsmith import traceable
import time
import json
def _add_activity_log(workflow_id, progress_store, step, message):
"""Helper to add activity log entry."""
if workflow_id and progress_store:
from src.services.workflow_store import add_activity_log
add_activity_log(workflow_id, step, message)
def _extract_key_metrics(raw_data: str) -> dict:
"""Extract and format key metrics from raw JSON data."""
try:
data = json.loads(raw_data)
except json.JSONDecodeError:
return {"error": "Could not parse raw data"}
metrics = data.get("metrics", {})
extracted = {
"company": data.get("company_name", "Unknown"),
"ticker": data.get("ticker", "N/A"),
"financials": {},
"valuation": {},
"volatility": {},
"macro": {},
"news": {},
"sentiment": {},
"aggregated_swot": data.get("aggregated_swot", {})
}
# Extract financials
fin = metrics.get("financials", {})
if fin and "error" not in fin:
fin_data = fin.get("financials", {})
extracted["financials"] = {
"revenue": fin_data.get("revenue", {}).get("value"),
"revenue_cagr_3yr": fin_data.get("revenue_cagr_3yr"),
"net_margin": fin_data.get("net_margin"),
"eps": fin_data.get("eps", {}).get("value"),
"debt_to_equity": fin.get("debt", {}).get("debt_to_equity"),
"free_cash_flow": fin.get("cash_flow", {}).get("free_cash_flow", {}).get("value"),
}
# Extract valuation
val = metrics.get("valuation", {})
if val and "error" not in val:
val_metrics = val.get("metrics", {})
pe = val_metrics.get("pe_ratio", {})
extracted["valuation"] = {
"pe_trailing": pe.get("trailing") if isinstance(pe, dict) else pe,
"pe_forward": pe.get("forward") if isinstance(pe, dict) else None,
"pb_ratio": val_metrics.get("pb_ratio"),
"ps_ratio": val_metrics.get("ps_ratio"),
"ev_ebitda": val_metrics.get("ev_ebitda"),
"valuation_signal": val.get("overall_signal"),
}
# Extract volatility
vol = metrics.get("volatility", {})
if vol and "error" not in vol:
vol_metrics = vol.get("metrics", {})
extracted["volatility"] = {
"beta": vol_metrics.get("beta", {}).get("value"),
"vix": vol_metrics.get("vix", {}).get("value"),
"historical_volatility": vol_metrics.get("historical_volatility", {}).get("value"),
}
# Extract macro
macro = metrics.get("macro", {})
if macro and "error" not in macro:
macro_metrics = macro.get("metrics", {})
extracted["macro"] = {
"gdp_growth": macro_metrics.get("gdp_growth", {}).get("value"),
"interest_rate": macro_metrics.get("interest_rate", {}).get("value"),
"inflation": macro_metrics.get("cpi_inflation", {}).get("value"),
"unemployment": macro_metrics.get("unemployment", {}).get("value"),
}
# Extract news
news = metrics.get("news", {})
if news and "error" not in news:
articles = news.get("articles", [])
extracted["news"] = {
"article_count": len(articles),
"headlines": [a.get("title", "")[:100] for a in articles[:5]],
}
# Extract sentiment
sent = metrics.get("sentiment", {})
if sent and "error" not in sent:
extracted["sentiment"] = {
"composite_score": sent.get("composite_score"),
"overall_category": sent.get("overall_swot_category"),
}
return extracted
def _format_metrics_for_prompt(extracted: dict) -> str:
"""Format extracted metrics into a clear text for the LLM."""
lines = []
lines.append(f"Company: {extracted['company']} ({extracted['ticker']})")
lines.append("")
# Financials
fin = extracted.get("financials", {})
if fin:
lines.append("=== FINANCIALS (from SEC EDGAR) ===")
if fin.get("revenue"):
lines.append(f"- Revenue: ${fin['revenue']:,.0f}" if isinstance(fin['revenue'], (int, float)) else f"- Revenue: {fin['revenue']}")
if fin.get("revenue_cagr_3yr"):
lines.append(f"- Revenue CAGR (3yr): {fin['revenue_cagr_3yr']:.1f}%")
if fin.get("net_margin"):
lines.append(f"- Net Margin: {fin['net_margin']:.1f}%")
if fin.get("eps"):
lines.append(f"- EPS: ${fin['eps']:.2f}")
if fin.get("debt_to_equity"):
lines.append(f"- Debt/Equity: {fin['debt_to_equity']:.2f}")
if fin.get("free_cash_flow"):
lines.append(f"- Free Cash Flow: ${fin['free_cash_flow']:,.0f}" if isinstance(fin['free_cash_flow'], (int, float)) else f"- Free Cash Flow: {fin['free_cash_flow']}")
lines.append("")
# Valuation
val = extracted.get("valuation", {})
if val:
lines.append("=== VALUATION (from Yahoo Finance) ===")
if val.get("pe_trailing"):
lines.append(f"- P/E Ratio (trailing): {val['pe_trailing']:.1f}")
if val.get("pe_forward"):
lines.append(f"- P/E Ratio (forward): {val['pe_forward']:.1f}")
if val.get("pb_ratio"):
lines.append(f"- P/B Ratio: {val['pb_ratio']:.2f}")
if val.get("ps_ratio"):
lines.append(f"- P/S Ratio: {val['ps_ratio']:.2f}")
if val.get("ev_ebitda"):
lines.append(f"- EV/EBITDA: {val['ev_ebitda']:.1f}")
if val.get("valuation_signal"):
lines.append(f"- Overall Signal: {val['valuation_signal']}")
lines.append("")
# Volatility
vol = extracted.get("volatility", {})
if vol:
lines.append("=== VOLATILITY/RISK ===")
if vol.get("beta"):
lines.append(f"- Beta: {vol['beta']:.2f}")
if vol.get("vix"):
lines.append(f"- VIX (market fear index): {vol['vix']:.1f}")
if vol.get("historical_volatility"):
lines.append(f"- Historical Volatility: {vol['historical_volatility']:.1f}%")
lines.append("")
# Macro
macro = extracted.get("macro", {})
if macro:
lines.append("=== MACROECONOMIC ENVIRONMENT (from FRED) ===")
if macro.get("gdp_growth"):
lines.append(f"- GDP Growth: {macro['gdp_growth']:.1f}%")
if macro.get("interest_rate"):
lines.append(f"- Federal Funds Rate: {macro['interest_rate']:.2f}%")
if macro.get("inflation"):
lines.append(f"- Inflation (CPI): {macro['inflation']:.1f}%")
if macro.get("unemployment"):
lines.append(f"- Unemployment: {macro['unemployment']:.1f}%")
lines.append("")
# News
news = extracted.get("news", {})
if news:
lines.append("=== RECENT NEWS ===")
lines.append(f"- Articles found: {news.get('article_count', 0)}")
for headline in news.get("headlines", []):
lines.append(f" • {headline}")
lines.append("")
# Sentiment
sent = extracted.get("sentiment", {})
if sent:
lines.append("=== MARKET SENTIMENT ===")
if sent.get("composite_score") is not None:
lines.append(f"- Composite Score: {sent['composite_score']:.2f}")
if sent.get("overall_category"):
lines.append(f"- Overall: {sent['overall_category']}")
lines.append("")
# Pre-built SWOT hints from MCP servers
swot = extracted.get("aggregated_swot", {})
if any(swot.get(k) for k in ["strengths", "weaknesses", "opportunities", "threats"]):
lines.append("=== DATA-DRIVEN SWOT SIGNALS (from metrics analysis) ===")
for category in ["strengths", "weaknesses", "opportunities", "threats"]:
items = swot.get(category, [])
if items:
lines.append(f"{category.upper()}:")
for item in items:
lines.append(f" • {item}")
lines.append("")
return "\n".join(lines)
@traceable(name="Analyzer")
def analyzer_node(state, workflow_id=None, progress_store=None):
# Extract workflow_id and progress_store from state (graph invokes with state only)
if workflow_id is None:
workflow_id = state.get("workflow_id")
if progress_store is None:
progress_store = state.get("progress_store")
# Update progress if tracking is enabled
if workflow_id and progress_store:
progress_store[workflow_id].update({
"current_step": "analyzer",
"revision_count": state.get("revision_count", 0),
"score": state.get("score", 0)
})
llm = get_llm_client()
raw = state["raw_data"]
strategy_name = state.get("strategy_focus", "Cost Leadership")
strategy_context = get_strategy_context(strategy_name)
company = state["company_name"]
ticker = state.get("ticker", "")
# Extract and format metrics for better LLM understanding
extracted = _extract_key_metrics(raw)
formatted_data = _format_metrics_for_prompt(extracted)
# Log LLM call start
_add_activity_log(workflow_id, progress_store, "analyzer", f"Calling LLM to generate SWOT analysis...")
prompt = f"""You are a financial analyst creating a SWOT analysis for {company} ({ticker}).
CRITICAL INSTRUCTIONS:
1. ONLY use the data provided below. DO NOT invent or assume any information.
2. Every point MUST cite specific numbers from the data (e.g., "P/E of 21.3", "Beta of 0.88").
3. If data is missing for a category, say "Insufficient data" - do NOT make up information.
4. Focus on what the numbers actually mean for this specific company.
5. This is a {company} - tailor your analysis to their industry (e.g., bank, tech, retail).
Strategic Focus: {strategy_name}
Context: {strategy_context}
=== ACTUAL DATA FROM FINANCIAL SOURCES ===
{formatted_data}
Based ONLY on the data above, provide a SWOT analysis in this format:
Strengths:
- [Cite specific metrics that show strengths]
Weaknesses:
- [Cite specific metrics that show weaknesses]
Opportunities:
- [Cite macro/market conditions that create opportunities]
Threats:
- [Cite risks from volatility, macro conditions, or sentiment]
Remember: Every bullet point must reference actual data provided above. Do not invent any figures or facts."""
start_time = time.time()
response, provider, error, providers_failed = llm.query(prompt, temperature=0)
elapsed = time.time() - start_time
# Log failed providers
for pf in providers_failed:
_add_activity_log(workflow_id, progress_store, "analyzer", f"LLM {pf['name']} failed: {pf['error']}")
# Track failed providers in state for frontend
if "llm_providers_failed" not in state:
state["llm_providers_failed"] = []
state["llm_providers_failed"].extend([pf["name"] for pf in providers_failed])
if error:
state["draft_report"] = f"Error generating analysis: {error}"
state["provider_used"] = None
state["error"] = error # Signal workflow to abort
_add_activity_log(workflow_id, progress_store, "analyzer", f"LLM error: {error}")
_add_activity_log(workflow_id, progress_store, "analyzer", "Workflow aborted - all LLM providers unavailable")
else:
state["draft_report"] = response
state["provider_used"] = provider
_add_activity_log(workflow_id, progress_store, "analyzer", f"SWOT generated via {provider} ({elapsed:.1f}s)")
return state