| import sys |
| sys.path.insert(0, "/app") |
|
|
| from flask import Flask, render_template, jsonify, Response, request |
| import threading, json, os, time, requests, sqlite3, datetime |
| from queue import Queue, Empty |
|
|
| from shared.config import Config |
| from shared.kafka_utils import create_consumer, create_producer |
|
|
| app = Flask(__name__, template_folder="templates") |
| app.config["TEMPLATES_AUTO_RELOAD"] = True |
|
|
| |
| orders, bbos, trades_cache = [], {}, [] |
| ai_insights_cache = [] |
| lock = threading.Lock() |
|
|
| |
| sse_clients = [] |
| sse_clients_lock = threading.Lock() |
|
|
| |
| session_state = {"active": False, "start_time": None, "suspended": False, "mode": "automatic"} |
|
|
| SCHEDULE_FILE = os.getenv("SCHEDULE_FILE", "/app/shared/data/market_schedule.txt") |
| FRONTEND_URL = os.getenv("FRONTEND_URL", "") |
|
|
| |
| HF_TOKEN = os.getenv("HF_TOKEN", "") |
| HF_MODEL = os.getenv("HF_MODEL", "RayMelius/stockex-analyst") |
| HF_URL = "https://router.huggingface.co/v1/chat/completions" |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY", "") |
| GROQ_MODEL = os.getenv("GROQ_MODEL", "llama-3.1-8b-instant") |
| GROQ_URL = "https://api.groq.com/openai/v1/chat/completions" |
| OLLAMA_HOST = os.getenv("OLLAMA_HOST", "") |
| OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3.1:8b") |
|
|
| |
| GROQ_MODELS = [ |
| "llama-3.1-8b-instant", |
| "llama-3.3-70b-versatile", |
| "llama-3.1-70b-versatile", |
| "mixtral-8x7b-32768", |
| "gemma2-9b-it", |
| ] |
| HF_MODELS = [ |
| "RayMelius/stockex-analyst", |
| "Qwen/Qwen2.5-7B-Instruct", |
| "meta-llama/Llama-3.1-8B-Instruct", |
| "mistralai/Mistral-7B-Instruct-v0.3", |
| ] |
|
|
| |
| _active_provider = "auto" |
| _active_model = None |
|
|
|
|
| def _build_market_prompt(): |
| with lock: |
| snaps = dict(bbos) |
| recent = list(trades_cache[:30]) |
| now = datetime.datetime.now().strftime("%H:%M:%S") |
| sess = ("ACTIVE" if session_state["active"] and not session_state["suspended"] |
| else "SUSPENDED" if session_state["suspended"] else "IDLE") |
|
|
| if recent: |
| by_sym = {} |
| for t in recent: |
| by_sym.setdefault(t.get("symbol", "?"), []).append(t) |
| trade_lines = [] |
| for sym, ts in sorted(by_sym.items()): |
| prices = [float(t.get("price", 0)) for t in ts] |
| vol = sum(int(t.get("quantity") or t.get("qty") or 0) for t in ts) |
| trade_lines.append(f" {sym}: {len(ts)} trade(s), range {min(prices):.2f}β{max(prices):.2f}, vol {vol}, last {prices[-1]:.2f}") |
| trades_block = "\n".join(trade_lines) |
| else: |
| trades_block = " No recent trades" |
|
|
| if snaps: |
| book_lines = [] |
| for sym, s in sorted(snaps.items()): |
| bid, ask = s.get("best_bid"), s.get("best_ask") |
| spread = f"{float(ask)-float(bid):.2f}" if bid and ask else "?" |
| book_lines.append(f" {sym}: Bid {bid or '-'} / Ask {ask or '-'} (spread {spread})") |
| book_block = "\n".join(book_lines) |
| else: |
| book_block = " No order book data" |
|
|
| return (f"You are a concise financial market analyst for a simulated stock exchange.\n" |
| f"Time: {now} | Session: {sess}\n\n" |
| f"Recent trades:\n{trades_block}\n\n" |
| f"Order book:\n{book_block}\n\n" |
| f"In 3-4 sentences: activity level, notable moves, market sentiment. " |
| f"Plain prose, no headers, no bullet points.") |
|
|
|
|
| def _call_llm(prompt, force_provider=None, force_model=None): |
| """Call LLM. Returns (text, source) or (None, error_msg). |
| force_provider: "auto"|"ollama"|"groq"|"hf"|None β selects which provider to use. |
| force_model: override the default model name for the chosen provider. |
| When force_provider is "auto" or None, falls back through Ollama -> Groq -> HF. |
| """ |
| provider = force_provider or "auto" |
|
|
| def _try_ollama(model): |
| if not OLLAMA_HOST: |
| return None, "Ollama not configured (OLLAMA_HOST not set)" |
| m = model or OLLAMA_MODEL |
| try: |
| r = requests.post(f"{OLLAMA_HOST}/api/chat", |
| json={"model": m, "messages": [{"role": "user", "content": prompt}], |
| "stream": False}, |
| timeout=90) |
| if r.status_code == 200: |
| text = r.json().get("message", {}).get("content", "").strip() |
| if text: |
| return text, f"Ollama/{m}" |
| return None, f"Ollama HTTP {r.status_code}: {r.text[:200]}" |
| except Exception as e: |
| return None, f"Ollama error: {e}" |
|
|
| def _try_groq(model): |
| if not GROQ_API_KEY: |
| return None, "Groq not configured (GROQ_API_KEY not set)" |
| m = model or GROQ_MODEL |
| try: |
| r = requests.post(GROQ_URL, |
| headers={"Authorization": f"Bearer {GROQ_API_KEY}", |
| "Content-Type": "application/json"}, |
| json={"model": m, "messages": [{"role": "user", "content": prompt}], |
| "max_tokens": 300, "temperature": 0.7}, |
| timeout=30) |
| print(f"[Dashboard/LLM] Groq status {r.status_code}") |
| if r.status_code == 200: |
| text = r.json()["choices"][0]["message"]["content"].strip() |
| if text: |
| return text, f"Groq/{m}" |
| return None, f"Groq HTTP {r.status_code}: {r.text[:200]}" |
| except Exception as e: |
| return None, f"Groq error: {e}" |
|
|
| def _try_hf(model): |
| if not HF_TOKEN: |
| return None, "HuggingFace not configured (HF_TOKEN not set)" |
| m = model or HF_MODEL |
| url = HF_URL |
| print(f"[Dashboard/LLM] Calling HF ({m})...") |
| for attempt in range(3): |
| try: |
| r = requests.post(url, |
| headers={"Authorization": f"Bearer {HF_TOKEN}", |
| "Content-Type": "application/json"}, |
| json={"model": m, |
| "messages": [{"role": "user", "content": prompt}], |
| "max_tokens": 300, "temperature": 0.7}, |
| timeout=90) |
| print(f"[Dashboard/LLM] HF status {r.status_code} (attempt {attempt+1})") |
| if r.status_code == 200: |
| text = r.json()["choices"][0]["message"]["content"].strip() |
| if text: |
| return text, m |
| elif r.status_code == 503: |
| body = {} |
| try: body = r.json() |
| except: pass |
| wait = min(float(body.get("estimated_time", 20)), 30) |
| print(f"[Dashboard/LLM] Model loading, waiting {wait:.0f}s...") |
| time.sleep(wait) |
| else: |
| print(f"[Dashboard/LLM] HF error: {r.text[:400]}") |
| if r.status_code == 402 or "credit" in r.text.lower() or "depleted" in r.text.lower(): |
| return None, "HF credit depleted. Add GROQ_API_KEY (free at console.groq.com)." |
| return None, f"HF HTTP {r.status_code}: {r.text[:120]}" |
| except requests.exceptions.Timeout: |
| return None, "HF request timed out after 90s" |
| except Exception as e: |
| return None, f"HF error: {e}" |
| return None, "HF: max retries exceeded" |
|
|
| |
| if provider == "ollama": |
| return _try_ollama(force_model) |
| if provider == "groq": |
| return _try_groq(force_model) |
| if provider == "hf": |
| return _try_hf(force_model) |
|
|
| |
| if OLLAMA_HOST: |
| text, src = _try_ollama(force_model) |
| if text: |
| return text, src |
| if GROQ_API_KEY: |
| text, src = _try_groq(force_model) |
| if text: |
| return text, src |
| return _try_hf(force_model) |
|
|
|
|
| def _generate_and_broadcast(): |
| """Background thread: call LLM, publish result via SSE + Kafka.""" |
| if not HF_TOKEN and not OLLAMA_HOST and not GROQ_API_KEY: |
| err = {"text": "β οΈ No LLM configured. Add GROQ_API_KEY secret (free at console.groq.com).", "source": "config", "timestamp": time.time()} |
| broadcast_event("ai_insight", err) |
| return |
|
|
| prompt = _build_market_prompt() |
| text, source = _call_llm(prompt, force_provider=_active_provider, force_model=_active_model) |
| if text: |
| insight = {"text": text, "source": source, "timestamp": time.time()} |
| try: |
| get_producer().send(Config.AI_INSIGHTS_TOPIC, insight) |
| except Exception: |
| pass |
| print(f"[Dashboard/LLM] Insight published ({len(text)} chars, src={source})") |
| else: |
| err_insight = {"text": f"β οΈ LLM error: {source}", "source": "error", "timestamp": time.time()} |
| broadcast_event("ai_insight", err_insight) |
| print(f"[Dashboard/LLM] LLM failed: {source}") |
|
|
| |
| HISTORY_DB = os.getenv("HISTORY_DB", "/app/shared/data/dashboard_history.db") |
| BUCKET_SIZE = 60 |
|
|
| PERIOD_SECONDS = { |
| "1h": 3600, |
| "8h": 28800, |
| "1d": 86400, |
| "1w": 604800, |
| "1m": 2592000, |
| } |
|
|
|
|
| def init_history_db(): |
| os.makedirs(os.path.dirname(HISTORY_DB), exist_ok=True) |
| conn = sqlite3.connect(HISTORY_DB) |
| conn.execute(""" |
| CREATE TABLE IF NOT EXISTS ohlcv ( |
| symbol TEXT, |
| bucket INTEGER, |
| open REAL, |
| high REAL, |
| low REAL, |
| close REAL, |
| volume INTEGER, |
| PRIMARY KEY (symbol, bucket) |
| ) |
| """) |
| conn.commit() |
|
|
| |
| row_count = conn.execute("SELECT COUNT(*) FROM ohlcv").fetchone()[0] |
| if row_count == 0: |
| _seed_history_from_ohlcv(conn) |
|
|
| conn.close() |
|
|
|
|
| def _seed_history_from_ohlcv(conn): |
| """Seed dashboard_history.db from shared/data/ohlcv/*.json on first run.""" |
| import glob as globmod |
| ohlcv_dir = os.path.join(os.path.dirname(HISTORY_DB), "ohlcv") |
| if not os.path.isdir(ohlcv_dir): |
| return |
| count = 0 |
| for path in sorted(globmod.glob(os.path.join(ohlcv_dir, "*.json"))): |
| sym = os.path.splitext(os.path.basename(path))[0] |
| try: |
| with open(path) as f: |
| bars = json.load(f) |
| for bar in bars: |
| from datetime import datetime as _dt |
| dt = _dt.strptime(bar["date"], "%Y-%m-%d").replace(hour=10) |
| bucket = int(dt.timestamp() // 60) * 60 |
| conn.execute( |
| "INSERT OR IGNORE INTO ohlcv VALUES (?,?,?,?,?,?,?)", |
| (sym, bucket, bar["open"], bar["high"], bar["low"], bar["close"], bar["volume"]), |
| ) |
| count += 1 |
| except Exception as e: |
| print(f"[History] Error seeding {sym}: {e}") |
| conn.commit() |
| print(f"[History] Auto-seeded {count} candles from OHLCV files") |
|
|
|
|
| def record_trade(symbol, price, qty, ts): |
| if not symbol or price <= 0: |
| return |
| bucket = int(ts // BUCKET_SIZE) * BUCKET_SIZE |
| try: |
| conn = sqlite3.connect(HISTORY_DB) |
| existing = conn.execute( |
| "SELECT open FROM ohlcv WHERE symbol=? AND bucket=?", (symbol, bucket) |
| ).fetchone() |
| if existing: |
| conn.execute( |
| "UPDATE ohlcv SET high=MAX(high,?), low=MIN(low,?), close=?, volume=volume+? " |
| "WHERE symbol=? AND bucket=?", |
| (price, price, price, qty, symbol, bucket), |
| ) |
| else: |
| conn.execute( |
| "INSERT INTO ohlcv VALUES (?,?,?,?,?,?,?)", |
| (symbol, bucket, price, price, price, price, qty), |
| ) |
| conn.commit() |
| conn.close() |
| except Exception as e: |
| print(f"[History] Error recording trade: {e}") |
|
|
|
|
| def load_securities_file(): |
| securities = {} |
| try: |
| with open(Config.SECURITIES_FILE) as f: |
| for line in f: |
| line = line.strip() |
| if not line or line.startswith("#"): |
| continue |
| parts = line.split() |
| if len(parts) >= 3: |
| securities[parts[0]] = { |
| "start": float(parts[1]), |
| "current": float(parts[2]), |
| } |
| except Exception as e: |
| print(f"[Dashboard] Cannot read securities: {e}") |
| return securities |
|
|
|
|
| def save_securities_file(securities): |
| with open(Config.SECURITIES_FILE, "w") as f: |
| f.write("#SYMBOL\t<start_price>\t<current_price>\n") |
| for sym, vals in securities.items(): |
| f.write(f"{sym}\t{vals['start']:.2f}\t{vals['current']:.2f}\n") |
|
|
|
|
| |
|
|
| def broadcast_event(event_type, data): |
| message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n" |
| with sse_clients_lock: |
| dead = [] |
| for q in sse_clients: |
| try: |
| q.put_nowait(message) |
| except Exception: |
| dead.append(q) |
| for q in dead: |
| sse_clients.remove(q) |
|
|
|
|
| |
|
|
| def consume_kafka(): |
| consumer = create_consumer( |
| topics=[Config.ORDERS_TOPIC, Config.SNAPSHOTS_TOPIC, Config.TRADES_TOPIC, Config.AI_INSIGHTS_TOPIC], |
| group_id="dashboard", |
| component_name="Dashboard", |
| ) |
| for msg in consumer: |
| with lock: |
| if msg.topic == "orders": |
| order = msg.value |
| orders.insert(0, order) |
| orders[:] = orders[:50] |
| broadcast_event("order", order) |
|
|
| elif msg.topic == "snapshots": |
| snap = msg.value |
| symbol = snap.get("symbol") |
| if not symbol: |
| continue |
| bbo_data = { |
| "best_bid": snap.get("best_bid"), |
| "best_ask": snap.get("best_ask"), |
| "bid_size": snap.get("bid_size"), |
| "ask_size": snap.get("ask_size"), |
| "timestamp": snap.get("timestamp", time.time()), |
| "source": snap.get("source", "unknown"), |
| } |
| bbos[symbol] = bbo_data |
| broadcast_event("snapshot", {"symbol": symbol, **bbo_data}) |
|
|
| elif msg.topic == "trades": |
| trade = msg.value |
| trades_cache.insert(0, trade) |
| trades_cache[:] = trades_cache[:200] |
| broadcast_event("trade", trade) |
| |
| sym = trade.get("symbol", "") |
| price = float(trade.get("price") or 0) |
| qty = int(trade.get("quantity") or trade.get("qty") or 0) |
| ts = float(trade.get("timestamp") or time.time()) |
| record_trade(sym, price, qty, ts) |
|
|
| elif msg.topic == Config.AI_INSIGHTS_TOPIC: |
| insight = msg.value |
| ai_insights_cache.insert(0, insight) |
| ai_insights_cache[:] = ai_insights_cache[:10] |
| broadcast_event("ai_insight", insight) |
|
|
|
|
| |
| init_history_db() |
| threading.Thread(target=consume_kafka, daemon=True).start() |
|
|
|
|
| |
|
|
| @app.route("/") |
| def index(): |
| return render_template("index.html", frontend_url=FRONTEND_URL) |
|
|
|
|
| @app.route("/health") |
| def health(): |
| status = { |
| "status": "healthy", |
| "service": "dashboard", |
| "timestamp": time.time(), |
| "stats": { |
| "orders_cached": len(orders), |
| "trades_cached": len(trades_cache), |
| "symbols_tracked": len(bbos), |
| "sse_clients": len(sse_clients), |
| }, |
| } |
| try: |
| r = requests.get(f"{Config.MATCHER_URL}/health", timeout=2) |
| status["matcher"] = "connected" if r.status_code == 200 else f"error: {r.status_code}" |
| except Exception as e: |
| status["matcher"] = f"error: {e}" |
| status["status"] = "degraded" |
| return jsonify(status) |
|
|
|
|
| @app.route("/data") |
| def data(): |
| try: |
| r = requests.get(f"{Config.MATCHER_URL}/trades", timeout=2) |
| resp = r.json() |
| trades = resp.get("trades", []) if isinstance(resp, dict) else resp |
| except Exception as e: |
| print("Cannot fetch trades:", e) |
| with lock: |
| trades = list(trades_cache) |
|
|
| try: |
| r = requests.get(f"{Config.MATCHER_URL}/orderbook/ALPHA", timeout=2) |
| book = r.json() |
| except Exception: |
| book = {"bids": [], "asks": []} |
|
|
| with lock: |
| return jsonify({"orders": list(orders), "bbos": dict(bbos), "trades": trades, "book": book}) |
|
|
|
|
| @app.route("/orderbook/<symbol>") |
| def orderbook(symbol): |
| try: |
| r = requests.get(f"{Config.MATCHER_URL}/orderbook/{symbol}", timeout=2) |
| return (r.text, r.status_code, {"Content-Type": "application/json"}) |
| except Exception as e: |
| return jsonify({"error": str(e), "bids": [], "asks": []}), 500 |
|
|
|
|
| |
| _producer = None |
|
|
|
|
| def get_producer(): |
| global _producer |
| if _producer is None: |
| _producer = create_producer(component_name="Dashboard") |
| return _producer |
|
|
|
|
| @app.route("/order/cancel", methods=["POST"]) |
| def cancel_order(): |
| try: |
| data = request.get_json() |
| orig_cl_ord_id = data.get("orig_cl_ord_id") |
| symbol = data.get("symbol") |
| if not orig_cl_ord_id: |
| return jsonify({"status": "error", "error": "Missing orig_cl_ord_id"}), 400 |
| cancel_msg = { |
| "type": "cancel", |
| "orig_cl_ord_id": orig_cl_ord_id, |
| "symbol": symbol, |
| "timestamp": time.time(), |
| } |
| p = get_producer() |
| p.send(Config.ORDERS_TOPIC, cancel_msg) |
| p.flush() |
| return jsonify({"status": "ok", "message": f"Cancel request sent for {orig_cl_ord_id}"}) |
| except Exception as e: |
| return jsonify({"status": "error", "error": str(e)}), 500 |
|
|
|
|
| @app.route("/order/amend", methods=["POST"]) |
| def amend_order(): |
| try: |
| data = request.get_json() |
| orig_cl_ord_id = data.get("orig_cl_ord_id") |
| if not orig_cl_ord_id: |
| return jsonify({"status": "error", "error": "Missing orig_cl_ord_id"}), 400 |
| amend_msg = { |
| "type": "amend", |
| "orig_cl_ord_id": orig_cl_ord_id, |
| "cl_ord_id": f"amend-{int(time.time()*1000)}", |
| "symbol": data.get("symbol"), |
| "quantity": data.get("quantity"), |
| "price": data.get("price"), |
| "timestamp": time.time(), |
| } |
| p = get_producer() |
| p.send(Config.ORDERS_TOPIC, amend_msg) |
| p.flush() |
| return jsonify({"status": "ok", "message": f"Amend request sent for {orig_cl_ord_id}"}) |
| except Exception as e: |
| return jsonify({"status": "error", "error": str(e)}), 500 |
|
|
|
|
| |
|
|
| def load_market_schedule(): |
| schedule = {} |
| try: |
| with open(SCHEDULE_FILE) as f: |
| for line in f: |
| line = line.strip() |
| if not line or line.startswith("#"): |
| continue |
| parts = line.split() |
| if len(parts) >= 2: |
| schedule[parts[0].lower()] = parts[1] |
| except Exception as e: |
| print(f"[Scheduler] Cannot read schedule file {SCHEDULE_FILE}: {e}") |
| return schedule |
|
|
|
|
| def _local_now(tz_name): |
| """Return current datetime in the given IANA timezone (e.g. 'Europe/Athens').""" |
| try: |
| from zoneinfo import ZoneInfo |
| return datetime.datetime.now(ZoneInfo(tz_name)).replace(tzinfo=None) |
| except Exception: |
| pass |
| try: |
| import pytz |
| tz = pytz.timezone(tz_name) |
| return datetime.datetime.now(tz).replace(tzinfo=None) |
| except Exception: |
| pass |
| return datetime.datetime.utcnow() |
|
|
|
|
| def _do_session_start(): |
| securities = load_securities_file() |
| if securities: |
| for sym in securities: |
| securities[sym]["current"] = securities[sym]["start"] |
| save_securities_file(securities) |
| p = get_producer() |
| p.send(Config.CONTROL_TOPIC, {"action": "start"}) |
| p.flush() |
| session_state["active"] = True |
| session_state["suspended"] = False |
| session_state["start_time"] = time.time() |
| broadcast_event("session", {"status": "started", "time": session_state["start_time"]}) |
|
|
|
|
| def _do_session_end(): |
| securities = load_securities_file() |
| for sym in list(securities.keys()): |
| try: |
| r = requests.get(f"{Config.MATCHER_URL}/orderbook/{sym}", timeout=2) |
| book = r.json() |
| bids = book.get("bids", []) |
| asks = book.get("asks", []) |
| if bids and asks: |
| best_bid = max(b["price"] for b in bids) |
| best_ask = min(a["price"] for a in asks) |
| securities[sym]["current"] = round((best_bid + best_ask) / 2, 2) |
| except Exception: |
| pass |
| if securities: |
| save_securities_file(securities) |
| p = get_producer() |
| p.send(Config.CONTROL_TOPIC, {"action": "stop"}) |
| p.flush() |
| session_state["active"] = False |
| session_state["suspended"] = False |
| broadcast_event("session", {"status": "ended", "time": time.time()}) |
|
|
| |
| try: |
| ch_url = os.getenv("CH_SERVICE_URL", "http://localhost:5004") |
| requests.post(f"{ch_url}/ch/eod", timeout=10) |
| print("[Dashboard] CH EOD settlement triggered") |
| except Exception as e: |
| print(f"[Dashboard] CH EOD hook failed (non-critical): {e}") |
|
|
|
|
| def schedule_runner(): |
| """Background thread: auto start/end session based on market_schedule.txt.""" |
| while True: |
| try: |
| if session_state.get("mode") == "automatic": |
| sched = load_market_schedule() |
| start_str = sched.get("start") |
| end_str = sched.get("end") |
| tz_name = sched.get("timezone", "UTC") |
| if start_str and end_str: |
| now = _local_now(tz_name) |
| sh, sm = int(start_str.split(":")[0]), int(start_str.split(":")[1]) |
| eh, em = int(end_str.split(":")[0]), int(end_str.split(":")[1]) |
| start_t = now.replace(hour=sh, minute=sm, second=0, microsecond=0) |
| end_t = now.replace(hour=eh, minute=em, second=0, microsecond=0) |
| print(f"[Scheduler] Local time ({tz_name}): {now.strftime('%H:%M')} window {start_str}-{end_str}") |
| if now >= end_t and session_state["active"]: |
| print("[Scheduler] Auto end of day") |
| _do_session_end() |
| elif start_t <= now < end_t and not session_state["active"]: |
| print("[Scheduler] Auto start of day") |
| _do_session_start() |
| except Exception as e: |
| print(f"[Scheduler] Error: {e}") |
| time.sleep(30) |
|
|
|
|
| |
|
|
| @app.route("/session/start", methods=["POST"]) |
| def session_start(): |
| try: |
| _do_session_start() |
| return jsonify({"status": "ok", "message": "Day started"}) |
| except Exception as e: |
| return jsonify({"status": "error", "error": str(e)}), 500 |
|
|
|
|
| @app.route("/session/end", methods=["POST"]) |
| def session_end(): |
| try: |
| _do_session_end() |
| return jsonify({"status": "ok", "message": "Day ended, closing prices saved"}) |
| except Exception as e: |
| return jsonify({"status": "error", "error": str(e)}), 500 |
|
|
|
|
| @app.route("/session/suspend", methods=["POST"]) |
| def session_suspend(): |
| try: |
| if not session_state["active"]: |
| return jsonify({"status": "error", "error": "No active session"}), 400 |
| p = get_producer() |
| p.send(Config.CONTROL_TOPIC, {"action": "suspend"}) |
| p.flush() |
| session_state["suspended"] = True |
| broadcast_event("session", {"status": "suspended"}) |
| return jsonify({"status": "ok", "message": "Session suspended"}) |
| except Exception as e: |
| return jsonify({"status": "error", "error": str(e)}), 500 |
|
|
|
|
| @app.route("/session/resume", methods=["POST"]) |
| def session_resume(): |
| try: |
| if not session_state["active"]: |
| return jsonify({"status": "error", "error": "No active session"}), 400 |
| p = get_producer() |
| p.send(Config.CONTROL_TOPIC, {"action": "resume"}) |
| p.flush() |
| session_state["suspended"] = False |
| broadcast_event("session", {"status": "active"}) |
| return jsonify({"status": "ok", "message": "Session resumed"}) |
| except Exception as e: |
| return jsonify({"status": "error", "error": str(e)}), 500 |
|
|
|
|
| @app.route("/session/ai_insight", methods=["POST"]) |
| def trigger_ai_insight(): |
| threading.Thread(target=_generate_and_broadcast, daemon=True).start() |
| return jsonify({"status": "ok", "message": "Insight generation started"}) |
|
|
|
|
| @app.route("/ai/config") |
| def ai_config(): |
| """Return available providers/models and the current active selection.""" |
| return jsonify({ |
| "active_provider": _active_provider, |
| "active_model": _active_model, |
| "providers": { |
| "auto": {"label": "Auto (fallback chain)", "models": []}, |
| "groq": {"label": "Groq", "models": GROQ_MODELS, |
| "available": bool(GROQ_API_KEY)}, |
| "hf": {"label": "HuggingFace", "models": HF_MODELS, |
| "available": bool(HF_TOKEN)}, |
| "ollama": {"label": "Ollama (local)", "models": [OLLAMA_MODEL] if OLLAMA_HOST else [], |
| "available": bool(OLLAMA_HOST)}, |
| }, |
| }) |
|
|
|
|
| @app.route("/ai/select", methods=["POST"]) |
| def ai_select(): |
| """Dynamically switch the LLM provider/model used for AI insights.""" |
| global _active_provider, _active_model |
| data = request.get_json(force=True, silent=True) or {} |
| provider = data.get("provider", "auto") |
| model = data.get("model") or None |
|
|
| allowed = {"auto", "groq", "hf", "ollama"} |
| if provider not in allowed: |
| return jsonify({"status": "error", "error": f"Unknown provider '{provider}'"}), 400 |
|
|
| _active_provider = provider |
| _active_model = model |
| label = f"{provider}/{model}" if model else provider |
| print(f"[Dashboard/LLM] Provider switched to: {label}") |
| broadcast_event("llm_config", {"provider": _active_provider, "model": _active_model}) |
| |
| try: |
| p = get_producer() |
| p.send(Config.CONTROL_TOPIC, {"action": "set_llm", "provider": provider, "model": model}) |
| p.flush() |
| except Exception as e: |
| print(f"[Dashboard/LLM] Could not publish set_llm to Kafka: {e}") |
| return jsonify({"status": "ok", "provider": _active_provider, "model": _active_model}) |
|
|
|
|
| @app.route("/ai/debug") |
| def ai_debug(): |
| """Synchronous LLM test β returns raw API result for debugging.""" |
| result = { |
| "groq_key_set": bool(GROQ_API_KEY), |
| "groq_model": GROQ_MODEL, |
| "hf_token_set": bool(HF_TOKEN), |
| "hf_token_prefix": HF_TOKEN[:8] + "β¦" if HF_TOKEN else None, |
| "hf_model": HF_MODEL, |
| "ollama_host": OLLAMA_HOST, |
| } |
| |
| if GROQ_API_KEY: |
| try: |
| r = requests.post(GROQ_URL, |
| headers={"Authorization": f"Bearer {GROQ_API_KEY}", |
| "Content-Type": "application/json"}, |
| json={"model": GROQ_MODEL, |
| "messages": [{"role": "user", "content": "Reply with exactly: OK"}], |
| "max_tokens": 10}, |
| timeout=15) |
| result["groq_status"] = r.status_code |
| result["groq_response"] = r.text[:200] |
| except Exception as e: |
| result["groq_exception"] = str(e) |
| return jsonify(result) |
| |
| if not HF_TOKEN: |
| result["error"] = "No LLM configured. Add GROQ_API_KEY secret (free at console.groq.com)." |
| return jsonify(result) |
| try: |
| r = requests.post(HF_URL, |
| headers={"Authorization": f"Bearer {HF_TOKEN}", |
| "Content-Type": "application/json"}, |
| json={"model": HF_MODEL, |
| "messages": [{"role": "user", "content": "Reply with exactly: OK"}], |
| "max_tokens": 10}, |
| timeout=30) |
| result["hf_status"] = r.status_code |
| result["hf_response"] = r.text[:400] |
| try: |
| rj = r.json() |
| result["hf_response_json"] = rj |
| if r.status_code == 402 or "credit" in r.text.lower(): |
| result["fix"] = "HF credit depleted. Add GROQ_API_KEY secret (free at console.groq.com)." |
| except Exception: |
| pass |
| except Exception as e: |
| result["hf_exception"] = str(e) |
| return jsonify(result) |
|
|
|
|
| @app.route("/session/mode", methods=["POST"]) |
| def session_mode(): |
| try: |
| current = session_state.get("mode", "manual") |
| new_mode = "automatic" if current == "manual" else "manual" |
| session_state["mode"] = new_mode |
| broadcast_event("mode", {"mode": new_mode}) |
| return jsonify({"status": "ok", "mode": new_mode}) |
| except Exception as e: |
| return jsonify({"status": "error", "error": str(e)}), 500 |
|
|
|
|
| @app.route("/session/status") |
| def session_status(): |
| return jsonify(session_state) |
|
|
|
|
| |
|
|
| @app.route("/history/symbols") |
| def history_symbols(): |
| """Return list of symbols that have historical OHLCV data.""" |
| try: |
| conn = sqlite3.connect(HISTORY_DB) |
| rows = conn.execute("SELECT DISTINCT symbol FROM ohlcv ORDER BY symbol").fetchall() |
| conn.close() |
| return jsonify({"symbols": [r[0] for r in rows]}) |
| except Exception: |
| return jsonify({"symbols": []}) |
|
|
|
|
| @app.route("/history/<symbol>") |
| def history(symbol): |
| period = request.args.get("period", "1d") |
| seconds = PERIOD_SECONDS.get(period, 86400) |
| since = int(time.time()) - seconds |
| try: |
| conn = sqlite3.connect(HISTORY_DB) |
| rows = conn.execute( |
| "SELECT bucket, open, high, low, close, volume FROM ohlcv " |
| "WHERE symbol=? AND bucket>=? ORDER BY bucket ASC", |
| (symbol, since), |
| ).fetchall() |
| conn.close() |
| except Exception as e: |
| return jsonify({"error": str(e), "candles": []}), 500 |
|
|
| candles = [{"t": r[0], "o": r[1], "h": r[2], "l": r[3], "c": r[4], "v": r[5]} for r in rows] |
|
|
| |
| if len(candles) > 150: |
| step = len(candles) // 150 + 1 |
| agg = [] |
| for i in range(0, len(candles), step): |
| chunk = candles[i : i + step] |
| agg.append({ |
| "t": chunk[0]["t"], |
| "o": chunk[0]["o"], |
| "h": max(c["h"] for c in chunk), |
| "l": min(c["l"] for c in chunk), |
| "c": chunk[-1]["c"], |
| "v": sum(c["v"] for c in chunk), |
| }) |
| candles = agg |
|
|
| return jsonify({"symbol": symbol, "period": period, "candles": candles}) |
|
|
|
|
| |
|
|
| @app.route("/stream") |
| def stream(): |
| def event_stream(): |
| q = Queue(maxsize=100) |
| with sse_clients_lock: |
| sse_clients.append(q) |
| try: |
| yield f"event: connected\ndata: {json.dumps({'status': 'connected'})}\n\n" |
| with lock: |
| yield ( |
| f"event: init\ndata: " |
| f"{json.dumps({'orders': list(orders), 'bbos': dict(bbos), 'trades': list(trades_cache)})}\n\n" |
| ) |
| init_insights = list(ai_insights_cache) |
| if not init_insights and not HF_TOKEN and not OLLAMA_HOST: |
| init_insights = [{"text": "β οΈ No LLM configured. Go to Space Settings β Secrets and add HF_TOKEN.", "source": "config", "timestamp": time.time()}] |
| yield f"event: ai_insights_init\ndata: {json.dumps(init_insights)}\n\n" |
| |
| if not session_state["active"]: |
| _sess_status = "ended" |
| elif session_state["suspended"]: |
| _sess_status = "suspended" |
| else: |
| _sess_status = "started" |
| yield f"event: session\ndata: {json.dumps({'status': _sess_status})}\n\n" |
| yield f"event: mode\ndata: {json.dumps({'mode': session_state.get('mode', 'manual')})}\n\n" |
| while True: |
| try: |
| message = q.get(timeout=30) |
| yield message |
| except Empty: |
| yield ": keepalive\n\n" |
| finally: |
| with sse_clients_lock: |
| if q in sse_clients: |
| sse_clients.remove(q) |
|
|
| return Response( |
| event_stream(), |
| mimetype="text/event-stream", |
| headers={ |
| "Cache-Control": "no-cache", |
| "X-Accel-Buffering": "no", |
| "Connection": "keep-alive", |
| }, |
| ) |
|
|
|
|
| _scheduler = threading.Thread(target=schedule_runner, daemon=True) |
| _scheduler.start() |
|
|
| if __name__ == "__main__": |
| port = int(os.getenv("PORT", "5000")) |
| app.run(host="0.0.0.0", port=port, debug=True, use_reloader=False) |
|
|