| |
| from __future__ import annotations |
| import json, time, uuid, logging |
| from typing import Any, Dict, List, AsyncIterable, Optional |
| from backends_base import ChatBackend, ImagesBackend |
|
|
| from rabbit_repo import RabbitRepo |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| def _now() -> int: return int(time.time()) |
| def _extract_user_id(data: Dict[str, Any]) -> Optional[str]: |
| if not isinstance(data, dict): |
| return None |
| for k in ("userId", "UserID", "user_id"): |
| v = data.get(k) |
| if isinstance(v, str) and v.strip(): |
| return v.strip() |
| ui = data.get("UserInfo") |
| if isinstance(ui, dict): |
| v = ui.get("UserID") or ui.get("userId") or ui.get("user_id") |
| if isinstance(v, str) and v.strip(): |
| return v.strip() |
| return None |
|
|
| def _with_user_id(payload: Dict[str, Any], user_id: Optional[str]) -> Dict[str, Any]: |
| if not user_id: |
| return payload |
| if isinstance(payload, dict): |
| if not payload.get("userId"): |
| payload = {**payload, "userId": user_id} |
| return payload |
|
|
| def _chunk_text(s: str, sz: int = 140) -> List[str]: |
| return [s[i:i+sz] for i in range(0, len(s or ""), sz)] if s else [] |
| def _last_user_text(messages: List[Dict[str, Any]]) -> str: |
| for m in reversed(messages or []): |
| if (m or {}).get("role") == "user": |
| c = m.get("content", "") |
| if isinstance(c, str): |
| return c |
| if isinstance(c, list): |
| texts = [p.get("text","") for p in c if p.get("type") == "text"] |
| return " ".join([t for t in texts if t]) |
| return "" |
|
|
|
|
| |
| class OpenAIServers: |
| """ |
| Handlers you can register in RabbitListenerBase: |
| - 'oaChatCreate' -> handle_chat_create |
| - 'oaImagesGenerate' -> handle_images_generate |
| Uses RabbitRepo.publish(...) to emit CloudEvent-wrapped OpenAI JSON. |
| """ |
|
|
| def __init__(self, publisher: RabbitRepo, |
| *, chat_backend: Optional[ChatBackend] = None, |
| images_backend: Optional[ImagesBackend] = None): |
| self._pub = publisher |
| self._chat = chat_backend |
| self._img = images_backend |
|
|
| |
| async def handle_chat_create(self, data: Dict[str, Any]) -> None: |
| """ |
| data: OpenAI chat request + 'reply_key' (string) |
| Server publishes to exchange 'oa.chat.reply' with routing_key = reply_key. |
| """ |
| if not isinstance(data, dict): |
| logger.warning("oaChatCreate: data is not a dict") |
| return |
|
|
| reply_key = data.get("reply_key") |
| if not reply_key: |
| logger.error("oaChatCreate: missing reply_key") |
| return |
|
|
| user_id = _extract_user_id(data) |
| try: |
| async for chunk in self._chat.stream(data): |
| try: |
| await self._pub.publish( |
| "oa.chat.reply", |
| _with_user_id(chunk, user_id), |
| routing_key=reply_key, |
| ) |
| except Exception: |
| logger.exception("oaChatCreate: publish failed") |
| break |
|
|
| |
| try: |
| await self._pub.publish( |
| "oa.chat.reply", |
| _with_user_id({"object": "stream.end"}, user_id), |
| routing_key=reply_key, |
| ) |
| except Exception: |
| logger.exception("oaChatCreate: publish sentinel failed") |
| except Exception: |
| logger.exception("oaChatCreate: streaming failed") |
|
|
| |
| async def handle_images_generate(self, data: Dict[str, Any]) -> None: |
| """ |
| data: OpenAI images.generate request + 'reply_key' (string) |
| """ |
| if not isinstance(data, dict): |
| logger.warning("oaImagesGenerate: data is not a dict") |
| return |
| reply_key = data.get("reply_key") |
| if not reply_key: |
| logger.error("oaImagesGenerate: missing reply_key") |
| return |
|
|
| user_id = _extract_user_id(data) |
| try: |
| b64 = await self._img.generate_b64(data) |
| resp = {"created": _now(), "data":[{"b64_json": b64}]} |
| try: |
| await self._pub.publish( |
| "oa.images.reply", |
| _with_user_id(resp, user_id), |
| routing_key=reply_key, |
| ) |
| except Exception: |
| logger.exception("oaImagesGenerate: publish failed") |
| except Exception: |
| logger.exception("oaImagesGenerate: generation failed") |
|
|
|
|
| |
| |
| try: |
| ChatBackend |
| except NameError: |
| try: |
| from typing import TYPE_CHECKING |
| |
| ChatBackend = ChatCompletionsBackend |
| except Exception: |
| pass |
|
|
| try: |
| ImagesBackend |
| except NameError: |
| try: |
| ImagesBackend = ImageGenerationsBackend |
| except Exception: |
| pass |
|
|