aegis / agents /swarm.py
Benny-Tang's picture
Create agents/swarm.py
a1c1a89 verified
python3 << 'PYEOF'
code = '''
import asyncio, json, re, os, requests
from bs4 import BeautifulSoup
from groq import AsyncGroq
MODEL = "llama-3.3-70b-versatile"
def _client():
return AsyncGroq(api_key=os.environ["GROQ_API_KEY"])
async def _llm(system, user, temperature=0.3):
r = await _client().chat.completions.create(
model=MODEL,
messages=[{"role":"system","content":system},{"role":"user","content":user}],
temperature=temperature, max_tokens=600)
return r.choices[0].message.content.strip()
async def _json(system, user):
raw = await _llm(system+"\\n\\nRespond ONLY with valid JSON. No markdown.", user)
clean = re.sub(r"```json|```","",raw).strip()
try:
return json.loads(clean)
except:
m = re.search(r"\\{.*\\}", clean, re.DOTALL)
if m:
try:
return json.loads(m.group())
except:
pass
return {"raw": raw}
def scrape_marine_traffic():
"""
Scrapes MarineTraffic news for real-time shipping disruption signals.
Returns list of recent news headlines and summaries.
Falls back to simulated data if scraping fails.
"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
url = "https://www.marinetraffic.com/en/ais/details/ships/straits"
news_items = []
# Try MarineTraffic blog for shipping news
blog_url = "https://www.marinetraffic.com/blog/"
r = requests.get(blog_url, headers=headers, timeout=8)
if r.status_code == 200:
soup = BeautifulSoup(r.text, "lxml")
articles = soup.find_all("article", limit=5)
for a in articles:
title = a.find("h2") or a.find("h3") or a.find("h1")
if title:
news_items.append({
"source": "MarineTraffic",
"headline": title.get_text(strip=True)[:200],
"type": "shipping_news"
})
# Also scrape Lloyd\'s List for Hormuz news
lloyds_url = "https://lloydslist.maritimeintelligence.informa.com/"
r2 = requests.get(lloyds_url, headers=headers, timeout=8)
if r2.status_code == 200:
soup2 = BeautifulSoup(r2.text, "lxml")
headlines = soup2.find_all(["h2","h3"], limit=5)
for h in headlines:
text = h.get_text(strip=True)
if len(text) > 20:
news_items.append({
"source": "LloydsList",
"headline": text[:200],
"type": "maritime_intelligence"
})
if news_items:
return {
"status": "live",
"source": "MarineTraffic + LloydsListI",
"items": news_items[:6],
"timestamp": __import__("datetime").datetime.utcnow().isoformat()
}
else:
raise Exception("No items scraped")
except Exception as e:
# Fallback: return simulated Hormuz disruption data
return {
"status": "simulated",
"source": "Aegis Internal Feed",
"note": f"Live scrape failed ({str(e)[:50]}), using crisis simulation data",
"items": [
{"source": "MarineTraffic", "headline": "3 tankers rerouted away from Strait of Hormuz amid escalating tensions", "type": "shipping_disruption"},
{"source": "MarineTraffic", "headline": "Iranian Revolutionary Guard patrol boats spotted near major shipping lanes", "type": "security_alert"},
{"source": "LloydsListI", "headline": "War risk insurance premiums surge 40% for Gulf vessels", "type": "financial_impact"},
{"source": "LloydsListI", "headline": "Major shipping lines suspend bookings through Hormuz indefinitely", "type": "operational_disruption"},
],
"timestamp": __import__("datetime").datetime.utcnow().isoformat()
}
async def signal_agent(event):
# Enrich event with live MarineTraffic data
marine_data = scrape_marine_traffic()
event["marine_traffic"] = marine_data
r = await _json("""You are the Signal Agent for Aegis. Classify incoming signals.
You have access to live MarineTraffic shipping news.
Return JSON: severity(LOW|MEDIUM|HIGH|CRITICAL), signal_type, anomalies(list),
confidence(0-100), summary(1 sentence), shipping_alerts(list of strings from marine data).""",
f"Event: {json.dumps(event)}")
r["agent"] = "signal"
r["marine_data"] = marine_data
return r
async def intelligence_agent(signal, event):
r = await _json("""You are the Intelligence Agent for Aegis. Interpret signals for business risk.
Return JSON: root_cause, affected_regions(list), supply_chain_impact(LOW|MEDIUM|HIGH|SEVERE),
escalation_probability(0-100), geopolitical_context(1-2 sentences), time_to_impact_days(int).""",
f"Signal: {json.dumps(signal)}\\nEvent: {json.dumps(event)}")
r["agent"] = "intelligence"
return r
async def forecast_agent(intel, forecast_data):
r = await _json("""You are the Forecast Agent for Aegis. Interpret ML forecasts in business terms.
Return JSON: oil_outlook(string), price_trajectory(RISING|STABLE|FALLING|VOLATILE),
supply_risk_score(0-100), delay_probability(0-100), cost_impact_pct(float),
confidence_level(LOW|MEDIUM|HIGH), key_assumptions(list).""",
f"Intel: {json.dumps(intel)}\\nML summary: {json.dumps(forecast_data.get(\'summary\',{}))}")
r["agent"] = "forecast"
r["ml_summary"] = forecast_data.get("summary",{})
r["ml_forecast"] = forecast_data.get("forecast",[])[:14]
return r
async def simulation_agent(forecast, event):
r = await _json("""You are the Simulation Agent for Aegis. Run 3 supply chain scenarios.
Return JSON: scenarios(list of: name,probability,cost_impact_pct,lead_time_increase_days,description,mitigation_available).""",
f"Forecast: {json.dumps(forecast)}\\nEvent: {json.dumps(event)}")
r["agent"] = "simulation"
if "scenarios" not in r or not isinstance(r["scenarios"], list):
r["scenarios"] = []
return r
async def decision_agent(simulation, forecast, intel):
r = await _json("""You are the Decision Agent for Aegis — the final brain.
Return JSON: threat_level(LOW|MEDIUM|HIGH|CRITICAL),
recommended_actions(list of: priority,action,rationale,estimated_savings_usd,time_to_implement,risk),
executive_summary(2-3 sentences), do_nothing_cost_usd(int).""",
f"Scenarios: {json.dumps(simulation.get(\'scenarios\',[]))}\\nForecast: {json.dumps(forecast.get(\'ml_summary\',{}))}\\nIntel: {json.dumps(intel)}")
r["agent"] = "decision"
return r
async def alert_agent(decision, forecast):
r = await _json("""You are the Alert Agent for Aegis. Generate stakeholder alerts.
Return JSON: slack_message(string,max 200 chars), email_subject, email_body(3-4 sentences),
severity_emoji, notification_channels(list).""",
f"Decision: {json.dumps(decision)}\\nForecast: {json.dumps(forecast.get(\'ml_summary\',{}))}")
r["agent"] = "alert"
return r
async def execution_agent(decision):
r = await _json("""You are the Execution Agent for Aegis. Determine workflows to trigger.
Return JSON: triggered_workflows(list of: system,action,api_endpoint,payload_summary,status,requires_human_approval),
autonomous_actions_count(int), pending_approvals_count(int), execution_summary(1 sentence).""",
f"Actions: {json.dumps(decision.get(\'recommended_actions\',[])[:3])}")
r["agent"] = "execution"
return r
async def run_aegis_pipeline(event, forecast_data):
results = {"event": event, "agents": {}}
sig = await signal_agent(event)
results["agents"]["signal"] = sig
intel = await intelligence_agent(sig, event)
results["agents"]["intelligence"] = intel
fore = await forecast_agent(intel, forecast_data)
results["agents"]["forecast"] = fore
sim = await simulation_agent(fore, event)
results["agents"]["simulation"] = sim
dec = await decision_agent(sim, fore, intel)
results["agents"]["decision"] = dec
alert = await alert_agent(dec, fore)
results["agents"]["alert"] = alert
exe = await execution_agent(dec)
results["agents"]["execution"] = exe
print("Pipeline complete")
return results
'''
with open("/opt/aegis/agents/swarm.py","w") as f:
f.write(code)
print("swarm.py written OK")
PYEOF
✅ Should print: swarm.py written OK