import sys sys.path.insert(0, "/app") from flask import Flask, render_template, jsonify, Response, request import threading, json, os, time, requests, sqlite3, datetime from queue import Queue, Empty from shared.config import Config from shared.kafka_utils import create_consumer, create_producer app = Flask(__name__, template_folder="templates") app.config["TEMPLATES_AUTO_RELOAD"] = True # Shared state orders, bbos, trades_cache = [], {}, [] ai_insights_cache = [] lock = threading.Lock() # SSE: list of queues for connected clients sse_clients = [] sse_clients_lock = threading.Lock() # Session state session_state = {"active": False, "start_time": None, "suspended": False, "mode": "automatic"} SCHEDULE_FILE = os.getenv("SCHEDULE_FILE", "/app/shared/data/market_schedule.txt") FRONTEND_URL = os.getenv("FRONTEND_URL", "") # ── AI Analyst (inline LLM for on-demand generation) ─────────────────────────── HF_TOKEN = os.getenv("HF_TOKEN", "") HF_MODEL = os.getenv("HF_MODEL", "RayMelius/stockex-analyst") HF_URL = "https://router.huggingface.co/v1/chat/completions" GROQ_API_KEY = os.getenv("GROQ_API_KEY", "") GROQ_MODEL = os.getenv("GROQ_MODEL", "llama-3.1-8b-instant") GROQ_URL = "https://api.groq.com/openai/v1/chat/completions" OLLAMA_HOST = os.getenv("OLLAMA_HOST", "") OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3.1:8b") # Known model lists for the dynamic selector UI GROQ_MODELS = [ "llama-3.1-8b-instant", "llama-3.3-70b-versatile", "llama-3.1-70b-versatile", "mixtral-8x7b-32768", "gemma2-9b-it", ] HF_MODELS = [ "RayMelius/stockex-analyst", "Qwen/Qwen2.5-7B-Instruct", "meta-llama/Llama-3.1-8B-Instruct", "mistralai/Mistral-7B-Instruct-v0.3", ] # Runtime LLM selection (overrides env var defaults when set via /ai/select) _active_provider = "auto" # "auto" | "ollama" | "groq" | "hf" _active_model = None # str override or None = use env var default def _build_market_prompt(): with lock: snaps = dict(bbos) recent = list(trades_cache[:30]) now = datetime.datetime.now().strftime("%H:%M:%S") sess = ("ACTIVE" if session_state["active"] and not session_state["suspended"] else "SUSPENDED" if session_state["suspended"] else "IDLE") if recent: by_sym = {} for t in recent: by_sym.setdefault(t.get("symbol", "?"), []).append(t) trade_lines = [] for sym, ts in sorted(by_sym.items()): prices = [float(t.get("price", 0)) for t in ts] vol = sum(int(t.get("quantity") or t.get("qty") or 0) for t in ts) trade_lines.append(f" {sym}: {len(ts)} trade(s), range {min(prices):.2f}–{max(prices):.2f}, vol {vol}, last {prices[-1]:.2f}") trades_block = "\n".join(trade_lines) else: trades_block = " No recent trades" if snaps: book_lines = [] for sym, s in sorted(snaps.items()): bid, ask = s.get("best_bid"), s.get("best_ask") spread = f"{float(ask)-float(bid):.2f}" if bid and ask else "?" book_lines.append(f" {sym}: Bid {bid or '-'} / Ask {ask or '-'} (spread {spread})") book_block = "\n".join(book_lines) else: book_block = " No order book data" return (f"You are a concise financial market analyst for a simulated stock exchange.\n" f"Time: {now} | Session: {sess}\n\n" f"Recent trades:\n{trades_block}\n\n" f"Order book:\n{book_block}\n\n" f"In 3-4 sentences: activity level, notable moves, market sentiment. " f"Plain prose, no headers, no bullet points.") def _call_llm(prompt, force_provider=None, force_model=None): """Call LLM. Returns (text, source) or (None, error_msg). force_provider: "auto"|"ollama"|"groq"|"hf"|None — selects which provider to use. force_model: override the default model name for the chosen provider. When force_provider is "auto" or None, falls back through Ollama -> Groq -> HF. """ provider = force_provider or "auto" def _try_ollama(model): if not OLLAMA_HOST: return None, "Ollama not configured (OLLAMA_HOST not set)" m = model or OLLAMA_MODEL try: r = requests.post(f"{OLLAMA_HOST}/api/chat", json={"model": m, "messages": [{"role": "user", "content": prompt}], "stream": False}, timeout=90) if r.status_code == 200: text = r.json().get("message", {}).get("content", "").strip() if text: return text, f"Ollama/{m}" return None, f"Ollama HTTP {r.status_code}: {r.text[:200]}" except Exception as e: return None, f"Ollama error: {e}" def _try_groq(model): if not GROQ_API_KEY: return None, "Groq not configured (GROQ_API_KEY not set)" m = model or GROQ_MODEL try: r = requests.post(GROQ_URL, headers={"Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json"}, json={"model": m, "messages": [{"role": "user", "content": prompt}], "max_tokens": 300, "temperature": 0.7}, timeout=30) print(f"[Dashboard/LLM] Groq status {r.status_code}") if r.status_code == 200: text = r.json()["choices"][0]["message"]["content"].strip() if text: return text, f"Groq/{m}" return None, f"Groq HTTP {r.status_code}: {r.text[:200]}" except Exception as e: return None, f"Groq error: {e}" def _try_hf(model): if not HF_TOKEN: return None, "HuggingFace not configured (HF_TOKEN not set)" m = model or HF_MODEL url = HF_URL print(f"[Dashboard/LLM] Calling HF ({m})...") for attempt in range(3): try: r = requests.post(url, headers={"Authorization": f"Bearer {HF_TOKEN}", "Content-Type": "application/json"}, json={"model": m, "messages": [{"role": "user", "content": prompt}], "max_tokens": 300, "temperature": 0.7}, timeout=90) print(f"[Dashboard/LLM] HF status {r.status_code} (attempt {attempt+1})") if r.status_code == 200: text = r.json()["choices"][0]["message"]["content"].strip() if text: return text, m elif r.status_code == 503: body = {} try: body = r.json() except: pass wait = min(float(body.get("estimated_time", 20)), 30) print(f"[Dashboard/LLM] Model loading, waiting {wait:.0f}s...") time.sleep(wait) else: print(f"[Dashboard/LLM] HF error: {r.text[:400]}") if r.status_code == 402 or "credit" in r.text.lower() or "depleted" in r.text.lower(): return None, "HF credit depleted. Add GROQ_API_KEY (free at console.groq.com)." return None, f"HF HTTP {r.status_code}: {r.text[:120]}" except requests.exceptions.Timeout: return None, "HF request timed out after 90s" except Exception as e: return None, f"HF error: {e}" return None, "HF: max retries exceeded" # Route to chosen provider or auto-fallback chain if provider == "ollama": return _try_ollama(force_model) if provider == "groq": return _try_groq(force_model) if provider == "hf": return _try_hf(force_model) # Auto: Ollama -> Groq -> HF if OLLAMA_HOST: text, src = _try_ollama(force_model) if text: return text, src if GROQ_API_KEY: text, src = _try_groq(force_model) if text: return text, src return _try_hf(force_model) def _generate_and_broadcast(): """Background thread: call LLM, publish result via SSE + Kafka.""" if not HF_TOKEN and not OLLAMA_HOST and not GROQ_API_KEY: err = {"text": "⚠️ No LLM configured. Add GROQ_API_KEY secret (free at console.groq.com).", "source": "config", "timestamp": time.time()} broadcast_event("ai_insight", err) return prompt = _build_market_prompt() text, source = _call_llm(prompt, force_provider=_active_provider, force_model=_active_model) if text: insight = {"text": text, "source": source, "timestamp": time.time()} try: get_producer().send(Config.AI_INSIGHTS_TOPIC, insight) except Exception: pass print(f"[Dashboard/LLM] Insight published ({len(text)} chars, src={source})") else: err_insight = {"text": f"⚠️ LLM error: {source}", "source": "error", "timestamp": time.time()} broadcast_event("ai_insight", err_insight) print(f"[Dashboard/LLM] LLM failed: {source}") # ── OHLCV History ────────────────────────────────────────────────────────────── HISTORY_DB = os.getenv("HISTORY_DB", "/app/shared/data/dashboard_history.db") BUCKET_SIZE = 60 # 1-minute candles PERIOD_SECONDS = { "1h": 3600, "8h": 28800, "1d": 86400, "1w": 604800, "1m": 2592000, } def init_history_db(): os.makedirs(os.path.dirname(HISTORY_DB), exist_ok=True) conn = sqlite3.connect(HISTORY_DB) conn.execute(""" CREATE TABLE IF NOT EXISTS ohlcv ( symbol TEXT, bucket INTEGER, open REAL, high REAL, low REAL, close REAL, volume INTEGER, PRIMARY KEY (symbol, bucket) ) """) conn.commit() # Auto-seed from OHLCV JSON files if DB is empty row_count = conn.execute("SELECT COUNT(*) FROM ohlcv").fetchone()[0] if row_count == 0: _seed_history_from_ohlcv(conn) conn.close() def _seed_history_from_ohlcv(conn): """Seed dashboard_history.db from shared/data/ohlcv/*.json on first run.""" import glob as globmod ohlcv_dir = os.path.join(os.path.dirname(HISTORY_DB), "ohlcv") if not os.path.isdir(ohlcv_dir): return count = 0 for path in sorted(globmod.glob(os.path.join(ohlcv_dir, "*.json"))): sym = os.path.splitext(os.path.basename(path))[0] try: with open(path) as f: bars = json.load(f) for bar in bars: from datetime import datetime as _dt dt = _dt.strptime(bar["date"], "%Y-%m-%d").replace(hour=10) bucket = int(dt.timestamp() // 60) * 60 conn.execute( "INSERT OR IGNORE INTO ohlcv VALUES (?,?,?,?,?,?,?)", (sym, bucket, bar["open"], bar["high"], bar["low"], bar["close"], bar["volume"]), ) count += 1 except Exception as e: print(f"[History] Error seeding {sym}: {e}") conn.commit() print(f"[History] Auto-seeded {count} candles from OHLCV files") def record_trade(symbol, price, qty, ts): if not symbol or price <= 0: return bucket = int(ts // BUCKET_SIZE) * BUCKET_SIZE try: conn = sqlite3.connect(HISTORY_DB) existing = conn.execute( "SELECT open FROM ohlcv WHERE symbol=? AND bucket=?", (symbol, bucket) ).fetchone() if existing: conn.execute( "UPDATE ohlcv SET high=MAX(high,?), low=MIN(low,?), close=?, volume=volume+? " "WHERE symbol=? AND bucket=?", (price, price, price, qty, symbol, bucket), ) else: conn.execute( "INSERT INTO ohlcv VALUES (?,?,?,?,?,?,?)", (symbol, bucket, price, price, price, price, qty), ) conn.commit() conn.close() except Exception as e: print(f"[History] Error recording trade: {e}") def load_securities_file(): securities = {} try: with open(Config.SECURITIES_FILE) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue parts = line.split() if len(parts) >= 3: securities[parts[0]] = { "start": float(parts[1]), "current": float(parts[2]), } except Exception as e: print(f"[Dashboard] Cannot read securities: {e}") return securities def save_securities_file(securities): with open(Config.SECURITIES_FILE, "w") as f: f.write("#SYMBOL\t\t\n") for sym, vals in securities.items(): f.write(f"{sym}\t{vals['start']:.2f}\t{vals['current']:.2f}\n") # ── SSE broadcast ────────────────────────────────────────────────────────────── def broadcast_event(event_type, data): message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n" with sse_clients_lock: dead = [] for q in sse_clients: try: q.put_nowait(message) except Exception: dead.append(q) for q in dead: sse_clients.remove(q) # ── Kafka consumer thread ────────────────────────────────────────────────────── def consume_kafka(): consumer = create_consumer( topics=[Config.ORDERS_TOPIC, Config.SNAPSHOTS_TOPIC, Config.TRADES_TOPIC, Config.AI_INSIGHTS_TOPIC], group_id="dashboard", component_name="Dashboard", ) for msg in consumer: with lock: if msg.topic == "orders": order = msg.value orders.insert(0, order) orders[:] = orders[:50] broadcast_event("order", order) elif msg.topic == "snapshots": snap = msg.value symbol = snap.get("symbol") if not symbol: continue bbo_data = { "best_bid": snap.get("best_bid"), "best_ask": snap.get("best_ask"), "bid_size": snap.get("bid_size"), "ask_size": snap.get("ask_size"), "timestamp": snap.get("timestamp", time.time()), "source": snap.get("source", "unknown"), } bbos[symbol] = bbo_data broadcast_event("snapshot", {"symbol": symbol, **bbo_data}) elif msg.topic == "trades": trade = msg.value trades_cache.insert(0, trade) trades_cache[:] = trades_cache[:200] broadcast_event("trade", trade) # Record for OHLCV history sym = trade.get("symbol", "") price = float(trade.get("price") or 0) qty = int(trade.get("quantity") or trade.get("qty") or 0) ts = float(trade.get("timestamp") or time.time()) record_trade(sym, price, qty, ts) elif msg.topic == Config.AI_INSIGHTS_TOPIC: insight = msg.value ai_insights_cache.insert(0, insight) ai_insights_cache[:] = ai_insights_cache[:10] broadcast_event("ai_insight", insight) # Initialise DB then start consumer thread init_history_db() threading.Thread(target=consume_kafka, daemon=True).start() # ── Flask routes ─────────────────────────────────────────────────────────────── @app.route("/") def index(): return render_template("index.html", frontend_url=FRONTEND_URL) @app.route("/health") def health(): status = { "status": "healthy", "service": "dashboard", "timestamp": time.time(), "stats": { "orders_cached": len(orders), "trades_cached": len(trades_cache), "symbols_tracked": len(bbos), "sse_clients": len(sse_clients), }, } try: r = requests.get(f"{Config.MATCHER_URL}/health", timeout=2) status["matcher"] = "connected" if r.status_code == 200 else f"error: {r.status_code}" except Exception as e: status["matcher"] = f"error: {e}" status["status"] = "degraded" return jsonify(status) @app.route("/data") def data(): try: r = requests.get(f"{Config.MATCHER_URL}/trades", timeout=2) resp = r.json() trades = resp.get("trades", []) if isinstance(resp, dict) else resp except Exception as e: print("Cannot fetch trades:", e) with lock: trades = list(trades_cache) try: r = requests.get(f"{Config.MATCHER_URL}/orderbook/ALPHA", timeout=2) book = r.json() except Exception: book = {"bids": [], "asks": []} with lock: return jsonify({"orders": list(orders), "bbos": dict(bbos), "trades": trades, "book": book}) @app.route("/orderbook/") def orderbook(symbol): try: r = requests.get(f"{Config.MATCHER_URL}/orderbook/{symbol}", timeout=2) return (r.text, r.status_code, {"Content-Type": "application/json"}) except Exception as e: return jsonify({"error": str(e), "bids": [], "asks": []}), 500 # Kafka producer (lazy init) _producer = None def get_producer(): global _producer if _producer is None: _producer = create_producer(component_name="Dashboard") return _producer @app.route("/order/cancel", methods=["POST"]) def cancel_order(): try: data = request.get_json() orig_cl_ord_id = data.get("orig_cl_ord_id") symbol = data.get("symbol") if not orig_cl_ord_id: return jsonify({"status": "error", "error": "Missing orig_cl_ord_id"}), 400 cancel_msg = { "type": "cancel", "orig_cl_ord_id": orig_cl_ord_id, "symbol": symbol, "timestamp": time.time(), } p = get_producer() p.send(Config.ORDERS_TOPIC, cancel_msg) p.flush() return jsonify({"status": "ok", "message": f"Cancel request sent for {orig_cl_ord_id}"}) except Exception as e: return jsonify({"status": "error", "error": str(e)}), 500 @app.route("/order/amend", methods=["POST"]) def amend_order(): try: data = request.get_json() orig_cl_ord_id = data.get("orig_cl_ord_id") if not orig_cl_ord_id: return jsonify({"status": "error", "error": "Missing orig_cl_ord_id"}), 400 amend_msg = { "type": "amend", "orig_cl_ord_id": orig_cl_ord_id, "cl_ord_id": f"amend-{int(time.time()*1000)}", "symbol": data.get("symbol"), "quantity": data.get("quantity"), "price": data.get("price"), "timestamp": time.time(), } p = get_producer() p.send(Config.ORDERS_TOPIC, amend_msg) p.flush() return jsonify({"status": "ok", "message": f"Amend request sent for {orig_cl_ord_id}"}) except Exception as e: return jsonify({"status": "error", "error": str(e)}), 500 # ── Market schedule ───────────────────────────────────────────────────────────── def load_market_schedule(): schedule = {} try: with open(SCHEDULE_FILE) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue parts = line.split() if len(parts) >= 2: schedule[parts[0].lower()] = parts[1] except Exception as e: print(f"[Scheduler] Cannot read schedule file {SCHEDULE_FILE}: {e}") return schedule def _local_now(tz_name): """Return current datetime in the given IANA timezone (e.g. 'Europe/Athens').""" try: from zoneinfo import ZoneInfo return datetime.datetime.now(ZoneInfo(tz_name)).replace(tzinfo=None) except Exception: pass try: import pytz tz = pytz.timezone(tz_name) return datetime.datetime.now(tz).replace(tzinfo=None) except Exception: pass return datetime.datetime.utcnow() def _do_session_start(): securities = load_securities_file() if securities: for sym in securities: securities[sym]["current"] = securities[sym]["start"] save_securities_file(securities) p = get_producer() p.send(Config.CONTROL_TOPIC, {"action": "start"}) p.flush() session_state["active"] = True session_state["suspended"] = False session_state["start_time"] = time.time() broadcast_event("session", {"status": "started", "time": session_state["start_time"]}) def _do_session_end(): securities = load_securities_file() for sym in list(securities.keys()): try: r = requests.get(f"{Config.MATCHER_URL}/orderbook/{sym}", timeout=2) book = r.json() bids = book.get("bids", []) asks = book.get("asks", []) if bids and asks: best_bid = max(b["price"] for b in bids) best_ask = min(a["price"] for a in asks) securities[sym]["current"] = round((best_bid + best_ask) / 2, 2) except Exception: pass if securities: save_securities_file(securities) p = get_producer() p.send(Config.CONTROL_TOPIC, {"action": "stop"}) p.flush() session_state["active"] = False session_state["suspended"] = False broadcast_event("session", {"status": "ended", "time": time.time()}) # Notify Clearing House for EOD settlement try: ch_url = os.getenv("CH_SERVICE_URL", "http://localhost:5004") requests.post(f"{ch_url}/ch/eod", timeout=10) print("[Dashboard] CH EOD settlement triggered") except Exception as e: print(f"[Dashboard] CH EOD hook failed (non-critical): {e}") def schedule_runner(): """Background thread: auto start/end session based on market_schedule.txt.""" while True: try: if session_state.get("mode") == "automatic": sched = load_market_schedule() start_str = sched.get("start") end_str = sched.get("end") tz_name = sched.get("timezone", "UTC") if start_str and end_str: now = _local_now(tz_name) sh, sm = int(start_str.split(":")[0]), int(start_str.split(":")[1]) eh, em = int(end_str.split(":")[0]), int(end_str.split(":")[1]) start_t = now.replace(hour=sh, minute=sm, second=0, microsecond=0) end_t = now.replace(hour=eh, minute=em, second=0, microsecond=0) print(f"[Scheduler] Local time ({tz_name}): {now.strftime('%H:%M')} window {start_str}-{end_str}") if now >= end_t and session_state["active"]: print("[Scheduler] Auto end of day") _do_session_end() elif start_t <= now < end_t and not session_state["active"]: print("[Scheduler] Auto start of day") _do_session_start() except Exception as e: print(f"[Scheduler] Error: {e}") time.sleep(30) # ── Session endpoints ────────────────────────────────────────────────────────── @app.route("/session/start", methods=["POST"]) def session_start(): try: _do_session_start() return jsonify({"status": "ok", "message": "Day started"}) except Exception as e: return jsonify({"status": "error", "error": str(e)}), 500 @app.route("/session/end", methods=["POST"]) def session_end(): try: _do_session_end() return jsonify({"status": "ok", "message": "Day ended, closing prices saved"}) except Exception as e: return jsonify({"status": "error", "error": str(e)}), 500 @app.route("/session/suspend", methods=["POST"]) def session_suspend(): try: if not session_state["active"]: return jsonify({"status": "error", "error": "No active session"}), 400 p = get_producer() p.send(Config.CONTROL_TOPIC, {"action": "suspend"}) p.flush() session_state["suspended"] = True broadcast_event("session", {"status": "suspended"}) return jsonify({"status": "ok", "message": "Session suspended"}) except Exception as e: return jsonify({"status": "error", "error": str(e)}), 500 @app.route("/session/resume", methods=["POST"]) def session_resume(): try: if not session_state["active"]: return jsonify({"status": "error", "error": "No active session"}), 400 p = get_producer() p.send(Config.CONTROL_TOPIC, {"action": "resume"}) p.flush() session_state["suspended"] = False broadcast_event("session", {"status": "active"}) return jsonify({"status": "ok", "message": "Session resumed"}) except Exception as e: return jsonify({"status": "error", "error": str(e)}), 500 @app.route("/session/ai_insight", methods=["POST"]) def trigger_ai_insight(): threading.Thread(target=_generate_and_broadcast, daemon=True).start() return jsonify({"status": "ok", "message": "Insight generation started"}) @app.route("/ai/config") def ai_config(): """Return available providers/models and the current active selection.""" return jsonify({ "active_provider": _active_provider, "active_model": _active_model, "providers": { "auto": {"label": "Auto (fallback chain)", "models": []}, "groq": {"label": "Groq", "models": GROQ_MODELS, "available": bool(GROQ_API_KEY)}, "hf": {"label": "HuggingFace", "models": HF_MODELS, "available": bool(HF_TOKEN)}, "ollama": {"label": "Ollama (local)", "models": [OLLAMA_MODEL] if OLLAMA_HOST else [], "available": bool(OLLAMA_HOST)}, }, }) @app.route("/ai/select", methods=["POST"]) def ai_select(): """Dynamically switch the LLM provider/model used for AI insights.""" global _active_provider, _active_model data = request.get_json(force=True, silent=True) or {} provider = data.get("provider", "auto") model = data.get("model") or None allowed = {"auto", "groq", "hf", "ollama"} if provider not in allowed: return jsonify({"status": "error", "error": f"Unknown provider '{provider}'"}), 400 _active_provider = provider _active_model = model label = f"{provider}/{model}" if model else provider print(f"[Dashboard/LLM] Provider switched to: {label}") broadcast_event("llm_config", {"provider": _active_provider, "model": _active_model}) # Propagate selection to ai_analyst service via Kafka control topic try: p = get_producer() p.send(Config.CONTROL_TOPIC, {"action": "set_llm", "provider": provider, "model": model}) p.flush() except Exception as e: print(f"[Dashboard/LLM] Could not publish set_llm to Kafka: {e}") return jsonify({"status": "ok", "provider": _active_provider, "model": _active_model}) @app.route("/ai/debug") def ai_debug(): """Synchronous LLM test — returns raw API result for debugging.""" result = { "groq_key_set": bool(GROQ_API_KEY), "groq_model": GROQ_MODEL, "hf_token_set": bool(HF_TOKEN), "hf_token_prefix": HF_TOKEN[:8] + "…" if HF_TOKEN else None, "hf_model": HF_MODEL, "ollama_host": OLLAMA_HOST, } # Test Groq if configured if GROQ_API_KEY: try: r = requests.post(GROQ_URL, headers={"Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json"}, json={"model": GROQ_MODEL, "messages": [{"role": "user", "content": "Reply with exactly: OK"}], "max_tokens": 10}, timeout=15) result["groq_status"] = r.status_code result["groq_response"] = r.text[:200] except Exception as e: result["groq_exception"] = str(e) return jsonify(result) # Fall back to testing HF if not HF_TOKEN: result["error"] = "No LLM configured. Add GROQ_API_KEY secret (free at console.groq.com)." return jsonify(result) try: r = requests.post(HF_URL, headers={"Authorization": f"Bearer {HF_TOKEN}", "Content-Type": "application/json"}, json={"model": HF_MODEL, "messages": [{"role": "user", "content": "Reply with exactly: OK"}], "max_tokens": 10}, timeout=30) result["hf_status"] = r.status_code result["hf_response"] = r.text[:400] try: rj = r.json() result["hf_response_json"] = rj if r.status_code == 402 or "credit" in r.text.lower(): result["fix"] = "HF credit depleted. Add GROQ_API_KEY secret (free at console.groq.com)." except Exception: pass except Exception as e: result["hf_exception"] = str(e) return jsonify(result) @app.route("/session/mode", methods=["POST"]) def session_mode(): try: current = session_state.get("mode", "manual") new_mode = "automatic" if current == "manual" else "manual" session_state["mode"] = new_mode broadcast_event("mode", {"mode": new_mode}) return jsonify({"status": "ok", "mode": new_mode}) except Exception as e: return jsonify({"status": "error", "error": str(e)}), 500 @app.route("/session/status") def session_status(): return jsonify(session_state) # ── History endpoint ─────────────────────────────────────────────────────────── @app.route("/history/symbols") def history_symbols(): """Return list of symbols that have historical OHLCV data.""" try: conn = sqlite3.connect(HISTORY_DB) rows = conn.execute("SELECT DISTINCT symbol FROM ohlcv ORDER BY symbol").fetchall() conn.close() return jsonify({"symbols": [r[0] for r in rows]}) except Exception: return jsonify({"symbols": []}) @app.route("/history/") def history(symbol): period = request.args.get("period", "1d") seconds = PERIOD_SECONDS.get(period, 86400) since = int(time.time()) - seconds try: conn = sqlite3.connect(HISTORY_DB) rows = conn.execute( "SELECT bucket, open, high, low, close, volume FROM ohlcv " "WHERE symbol=? AND bucket>=? ORDER BY bucket ASC", (symbol, since), ).fetchall() conn.close() except Exception as e: return jsonify({"error": str(e), "candles": []}), 500 candles = [{"t": r[0], "o": r[1], "h": r[2], "l": r[3], "c": r[4], "v": r[5]} for r in rows] # Aggregate to at most 150 bars for display if len(candles) > 150: step = len(candles) // 150 + 1 agg = [] for i in range(0, len(candles), step): chunk = candles[i : i + step] agg.append({ "t": chunk[0]["t"], "o": chunk[0]["o"], "h": max(c["h"] for c in chunk), "l": min(c["l"] for c in chunk), "c": chunk[-1]["c"], "v": sum(c["v"] for c in chunk), }) candles = agg return jsonify({"symbol": symbol, "period": period, "candles": candles}) # ── SSE stream ───────────────────────────────────────────────────────────────── @app.route("/stream") def stream(): def event_stream(): q = Queue(maxsize=100) with sse_clients_lock: sse_clients.append(q) try: yield f"event: connected\ndata: {json.dumps({'status': 'connected'})}\n\n" with lock: yield ( f"event: init\ndata: " f"{json.dumps({'orders': list(orders), 'bbos': dict(bbos), 'trades': list(trades_cache)})}\n\n" ) init_insights = list(ai_insights_cache) if not init_insights and not HF_TOKEN and not OLLAMA_HOST: init_insights = [{"text": "⚠️ No LLM configured. Go to Space Settings → Secrets and add HF_TOKEN.", "source": "config", "timestamp": time.time()}] yield f"event: ai_insights_init\ndata: {json.dumps(init_insights)}\n\n" # Also send current session state if not session_state["active"]: _sess_status = "ended" elif session_state["suspended"]: _sess_status = "suspended" else: _sess_status = "started" yield f"event: session\ndata: {json.dumps({'status': _sess_status})}\n\n" yield f"event: mode\ndata: {json.dumps({'mode': session_state.get('mode', 'manual')})}\n\n" while True: try: message = q.get(timeout=30) yield message except Empty: yield ": keepalive\n\n" finally: with sse_clients_lock: if q in sse_clients: sse_clients.remove(q) return Response( event_stream(), mimetype="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", "Connection": "keep-alive", }, ) _scheduler = threading.Thread(target=schedule_runner, daemon=True) _scheduler.start() if __name__ == "__main__": port = int(os.getenv("PORT", "5000")) app.run(host="0.0.0.0", port=port, debug=True, use_reloader=False)