Spaces:
Running
Running
| import os | |
| import gradio as gr | |
| import sys | |
| import json | |
| import time | |
| import boto3 | |
| import httpx | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Any | |
| from anthropic import Anthropic | |
| import openai | |
| from openai import OpenAI | |
| from google import genai | |
| from google.genai import types | |
| from llama_index.core import Settings | |
| from llama_index.llms.openai import OpenAI as LlamaOpenAI | |
| from llama_index.core.llms import ChatMessage | |
| from llama_index.core.storage.docstore import SimpleDocumentStore | |
| from llama_index.retrievers.bm25 import BM25Retriever | |
| from llama_index.embeddings.openai import OpenAIEmbedding | |
| from llama_index.core.retrievers import QueryFusionRetriever | |
| from llama_index.core.workflow import Event, Context, Workflow, StartEvent, StopEvent, step | |
| from llama_index.core.schema import NodeWithScore | |
| from config import ( | |
| AWS_ACCESS_KEY_ID, | |
| AWS_SECRET_ACCESS_KEY, | |
| ANTHROPIC_API_KEY, | |
| OPENAI_API_KEY, | |
| BUCKET_NAME, | |
| PREFIX_RETRIEVER, | |
| LOCAL_DIR, | |
| SETTINGS, | |
| MAX_TOKENS_CONFIG, | |
| MAX_TOKENS_ANALYSIS, | |
| GENERATION_TEMPERATURE, | |
| LEGAL_POSITION_SCHEMA, | |
| REQUIRED_FILES, | |
| DEBUG_PROMPTS, | |
| ModelProvider, | |
| AnalysisModelName, | |
| DEFAULT_ANALYSIS_MODEL, | |
| DEEPSEEK_API_KEY, | |
| validate_environment | |
| ) | |
| from prompts import SYSTEM_PROMPT, LEGAL_POSITION_PROMPT, PRECEDENT_ANALYSIS_TEMPLATE | |
| from utils import ( | |
| clean_text, | |
| extract_court_decision_text, | |
| get_links_html, | |
| get_links_html_lp, | |
| extract_json_from_text | |
| ) | |
| from embeddings import GeminiEmbedding | |
| # ============ Debug Prompt Logging ============ | |
| # DEBUG_PROMPTS is loaded from config (YAML env setting + DEBUG_PROMPTS env var override). | |
| _PROMPT_SEP = "=" * 80 | |
| def _log_prompt(provider: str, model: str, system: str, user: str) -> None: | |
| """Print full system + user prompts when DEBUG_PROMPTS is enabled.""" | |
| if not DEBUG_PROMPTS: | |
| return | |
| print(f"\n{_PROMPT_SEP}") | |
| print(f"[PROMPT DEBUG] Provider: {provider} | Model: {model}") | |
| print(f"{_PROMPT_SEP}") | |
| if system: | |
| print("[PROMPT DEBUG] ── SYSTEM PROMPT ──────────────────────────────────") | |
| print(system) | |
| print("[PROMPT DEBUG] ── USER PROMPT ────────────────────────────────────") | |
| print(user) | |
| print(f"{_PROMPT_SEP}\n") | |
| # ============ End Debug Prompt Logging ============ | |
| # ============ OpenAI Reasoning Helpers ============ | |
| # Valid reasoning_effort values accepted by the OpenAI API | |
| _OPENAI_EFFORT_MAP = { | |
| "none": None, # omit the parameter (model uses its own default) | |
| "low": "low", | |
| "medium": "medium", | |
| "high": "high", | |
| "xhigh": "high", # UI-only level, maps to highest valid API value | |
| } | |
| def _build_openai_reasoning_params( | |
| model_name: str, | |
| thinking_level: str = "medium", | |
| openai_verbosity: str = "medium", | |
| ) -> dict: | |
| """Return extra completion params for OpenAI reasoning-capable models. | |
| - reasoning_effort: mapped from thinking_level with UI→API value normalisation. | |
| - verbosity: only for models that support it (gpt-5.* and o3*). | |
| - store: always False so conversations are not stored in OpenAI history. | |
| """ | |
| params: dict = {} | |
| effort = _OPENAI_EFFORT_MAP.get(thinking_level.lower(), "medium") | |
| if effort is not None: | |
| # gpt-5.3-chat-latest supports only "medium" for reasoning_effort | |
| if "gpt-5.3" in model_name.lower() and effort != "medium": | |
| effort = "medium" | |
| params["reasoning_effort"] = effort | |
| model_lower = model_name.lower() | |
| if "gpt-5" in model_lower or model_lower.startswith("o3"): | |
| params["verbosity"] = openai_verbosity.lower() | |
| params["store"] = False | |
| return params | |
| # ============ End OpenAI Reasoning Helpers ============ | |
| # ============ Prompt Assembly Helpers ============ | |
| _GENERATION_DYNAMIC_PLACEHOLDERS = ("{court_decision_text}", "{comment}") | |
| _ANALYSIS_DYNAMIC_TAGS = ("<new_decision>", "<clarifying_question>", "<legal_positions>") | |
| _DEFAULT_COMMENT_TEXT = "Коментар відсутній" | |
| def _split_prompt_by_placeholders(prompt_text: str, placeholders: Tuple[str, ...]) -> Tuple[str, str]: | |
| """Split prompt into static prefix and dynamic suffix starting from first dynamic placeholder.""" | |
| positions = [prompt_text.find(p) for p in placeholders if p in prompt_text] | |
| if not positions: | |
| return prompt_text.strip(), "" | |
| split_at = min(positions) | |
| static_part = prompt_text[:split_at].strip() | |
| dynamic_part = prompt_text[split_at:].strip() | |
| return static_part, dynamic_part | |
| def _compile_prompt_blocks( | |
| system_prompt: str, | |
| prompt_template: str, | |
| format_values: Dict[str, str], | |
| placeholders: Tuple[str, ...], | |
| ) -> Dict[str, str]: | |
| """Compile provider-ready prompt blocks without changing UI-facing prompt settings.""" | |
| static_part, dynamic_part = _split_prompt_by_placeholders(prompt_template, placeholders) | |
| merged_system_prompt = system_prompt.strip() | |
| if static_part: | |
| merged_system_prompt = f"{merged_system_prompt}\n\n{static_part}" if merged_system_prompt else static_part | |
| full_user_prompt = prompt_template.format(**format_values) | |
| dynamic_payload = dynamic_part.format(**format_values) if dynamic_part else full_user_prompt | |
| return { | |
| "system_prompt": merged_system_prompt, | |
| "user_prompt": dynamic_payload, | |
| "full_user_prompt": full_user_prompt, | |
| "static_prompt": static_part, | |
| "dynamic_template": dynamic_part, | |
| } | |
| def _ensure_generation_placeholders(lp_prompt: str) -> str: | |
| """Ensure dynamic placeholders exist in the legal-position prompt.""" | |
| updated = lp_prompt | |
| if "{court_decision_text}" not in updated: | |
| print("[WARNING] {court_decision_text} placeholder missing in prompt! Appending to the end.") | |
| updated += "\n\n<court_decision>\n{court_decision_text}\n</court_decision>" | |
| if "{comment}" not in updated: | |
| updated += "\n\n<comment>\n{comment}\n</comment>" | |
| return updated | |
| def _compile_generation_prompt_blocks( | |
| system_prompt: str, | |
| lp_prompt: str, | |
| court_decision_text: str, | |
| comment: str, | |
| ) -> Dict[str, str]: | |
| """Compile provider-ready prompt blocks for legal position generation.""" | |
| prepared_lp_prompt = _ensure_generation_placeholders(lp_prompt) | |
| final_comment = comment if comment else _DEFAULT_COMMENT_TEXT | |
| return _compile_prompt_blocks( | |
| system_prompt=system_prompt, | |
| prompt_template=prepared_lp_prompt, | |
| format_values={ | |
| "court_decision_text": court_decision_text, | |
| "comment": final_comment, | |
| }, | |
| placeholders=_GENERATION_DYNAMIC_PLACEHOLDERS, | |
| ) | |
| def _compile_analysis_prompt_blocks( | |
| system_prompt: str, | |
| full_prompt: str, | |
| ) -> Dict[str, str]: | |
| """Compile provider-ready prompt blocks for precedent analysis from the already formatted prompt.""" | |
| static_part, dynamic_part = _split_prompt_by_placeholders(full_prompt, _ANALYSIS_DYNAMIC_TAGS) | |
| merged_system_prompt = system_prompt.strip() | |
| if static_part: | |
| merged_system_prompt = f"{merged_system_prompt}\n\n{static_part}" if merged_system_prompt else static_part | |
| return { | |
| "system_prompt": merged_system_prompt, | |
| "user_prompt": dynamic_part if dynamic_part else full_prompt, | |
| "full_user_prompt": full_prompt, | |
| "static_prompt": static_part, | |
| "dynamic_template": dynamic_part, | |
| } | |
| # ============ End Prompt Assembly Helpers ============ | |
| # Initialize embedding model and settings BEFORE importing components | |
| # Priority: OpenAI > Gemini > None | |
| embed_model = None | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| if OPENAI_API_KEY: | |
| embed_model = OpenAIEmbedding(model_name="text-embedding-3-small") | |
| print("OpenAI embedding model initialized successfully") | |
| elif GEMINI_API_KEY: | |
| embed_model = GeminiEmbedding(api_key=GEMINI_API_KEY, model_name="gemini-embedding-001") | |
| print("Gemini embedding model initialized successfully (alternative to OpenAI)") | |
| else: | |
| print("Warning: No embedding API key found (OpenAI or Gemini). Search functionality will be disabled.") | |
| if embed_model: | |
| Settings.embed_model = embed_model | |
| # Set basic LlamaIndex Settings before setting LLM | |
| Settings.chunk_size = SETTINGS["chunk_size"] | |
| Settings.similarity_top_k = SETTINGS["similarity_top_k"] | |
| # Set a default LLM to prevent QueryFusionRetriever from trying to load OpenAI | |
| # Use a mock LLM with minimal initialization to avoid validation issues | |
| # We use DeepSeek but with a gpt-4o-mini model name to pass validation | |
| if DEEPSEEK_API_KEY: | |
| Settings.llm = LlamaOpenAI( | |
| api_key=DEEPSEEK_API_KEY, | |
| api_base="https://api.deepseek.com", | |
| model="gpt-4o-mini" # Use a known model name for validation | |
| ) | |
| print("DeepSeek LLM set as default for LlamaIndex (using gpt-4o-mini model name for compatibility)") | |
| elif OPENAI_API_KEY: | |
| Settings.llm = LlamaOpenAI(api_key=OPENAI_API_KEY, model="gpt-4o-mini") | |
| print("OpenAI LLM set as default for LlamaIndex") | |
| # Now we can safely set context_window | |
| Settings.context_window = SETTINGS["context_window"] | |
| # Import components AFTER setting all Settings | |
| from components import search_components | |
| # Initialize S3 client (optional, only if AWS credentials are provided) | |
| s3_client = None | |
| if all([AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY]): | |
| try: | |
| s3_client = boto3.client( | |
| "s3", | |
| aws_access_key_id=AWS_ACCESS_KEY_ID, | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
| region_name="eu-north-1" | |
| ) | |
| print("AWS S3 client initialized successfully") | |
| except Exception as e: | |
| print(f"Warning: Failed to initialize AWS S3 client: {str(e)}") | |
| s3_client = None | |
| else: | |
| print("AWS credentials not provided. Will use local files only.") | |
| def download_s3_file(bucket_name: str, s3_key: str, local_path: str) -> None: | |
| """Download a single file from S3.""" | |
| if not s3_client: | |
| raise ValueError("S3 client not initialized. Please provide AWS credentials or use local files.") | |
| try: | |
| s3_client.download_file(bucket_name, s3_key, str(local_path)) | |
| print(f"Downloaded: {s3_key} -> {local_path}") | |
| except Exception as e: | |
| print(f"Error downloading file {s3_key}: {str(e)}", file=sys.stderr) | |
| raise | |
| def download_s3_folder(bucket_name: str, prefix: str, local_dir: Path) -> None: | |
| """Download all files from an S3 folder.""" | |
| if not s3_client: | |
| raise ValueError("S3 client not initialized. Please provide AWS credentials or use local files.") | |
| try: | |
| response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) | |
| if 'Contents' not in response: | |
| raise ValueError(f"No files found in S3 bucket {bucket_name} with prefix {prefix}") | |
| for obj in response['Contents']: | |
| s3_key = obj['Key'] | |
| if s3_key.endswith('/'): | |
| continue | |
| local_file_path = local_dir / Path(s3_key).relative_to(prefix) | |
| local_file_path.parent.mkdir(parents=True, exist_ok=True) | |
| s3_client.download_file(bucket_name, s3_key, str(local_file_path)) | |
| print(f"Downloaded: {s3_key} -> {local_file_path}") | |
| except Exception as e: | |
| print(f"Error downloading folder {prefix}: {str(e)}", file=sys.stderr) | |
| raise | |
| def initialize_components() -> bool: | |
| """Initialize all necessary components for the application.""" | |
| from index_loader import load_indexes_with_fallback | |
| try: | |
| # Create local directory if it doesn't exist | |
| LOCAL_DIR.mkdir(parents=True, exist_ok=True) | |
| # Check if required files are present | |
| missing_files = [f for f in REQUIRED_FILES if not (LOCAL_DIR / f).exists()] | |
| if missing_files: | |
| print(f"Missing index files: {', '.join(missing_files)}") | |
| print(f"Attempting to load indexes via fallback (local → HF Dataset → S3)...") | |
| indexes_ok = load_indexes_with_fallback(str(LOCAL_DIR)) | |
| if not indexes_ok: | |
| # Last resort: try S3 directly if client is available | |
| if s3_client: | |
| print("Fallback failed, trying S3 directly...") | |
| download_s3_folder(BUCKET_NAME, PREFIX_RETRIEVER, LOCAL_DIR) | |
| else: | |
| print(f"Warning: No S3 client and fallback failed for: {', '.join(missing_files)}") | |
| else: | |
| print(f"All required files found locally in {LOCAL_DIR}") | |
| # Final check | |
| missing_files = [f for f in REQUIRED_FILES if not (LOCAL_DIR / f).exists()] | |
| if missing_files: | |
| raise FileNotFoundError(f"Missing required files after all attempts: {', '.join(missing_files)}") | |
| # Initialize search components if any embedding model is available | |
| if embed_model: | |
| success = search_components.initialize_components(LOCAL_DIR) | |
| if not success: | |
| raise RuntimeError("Failed to initialize search components") | |
| print("Search components initialized successfully") | |
| else: | |
| print("Skipping search components initialization (no embedding API key available)") | |
| return True | |
| except Exception as e: | |
| print(f"Error initializing components: {str(e)}", file=sys.stderr) | |
| return False | |
| def deduplicate_nodes(nodes: list[NodeWithScore], key="doc_id"): | |
| """Видаляє дублікати з результатів пошуку на основі метаданих.""" | |
| seen = set() | |
| unique_nodes = [] | |
| for node in nodes: | |
| value = node.node.metadata.get(key) | |
| if value and value not in seen: | |
| seen.add(value) | |
| unique_nodes.append(node) | |
| return unique_nodes | |
| def get_text_length_without_spaces(text: str) -> int: | |
| """Підраховує довжину тексту без пробілів.""" | |
| return len(''.join(text.split())) | |
| def get_available_providers() -> Dict[str, bool]: | |
| """Get status of all AI providers.""" | |
| # Use os.getenv directly to ensure we get the latest environment state | |
| # independent of import time | |
| return { | |
| "openai": bool(os.getenv("OPENAI_API_KEY", "")), | |
| "anthropic": bool(os.getenv("ANTHROPIC_API_KEY", "")), | |
| "gemini": bool(os.getenv("GEMINI_API_KEY", "")), | |
| "deepseek": bool(os.getenv("DEEPSEEK_API_KEY", "")) | |
| } | |
| def check_provider_available(provider: str) -> Tuple[bool, str]: | |
| """ | |
| Check if a provider is available. | |
| Returns: | |
| Tuple of (is_available, error_message) | |
| """ | |
| providers = get_available_providers() | |
| provider_key = provider.lower() | |
| if provider_key not in providers: | |
| return False, f"Unknown provider: {provider}" | |
| if not providers[provider_key]: | |
| available = [k.upper() for k, v in providers.items() if v] | |
| if not available: | |
| return False, "No AI provider API keys configured. Please set at least one API key." | |
| return False, f"{provider.upper()} API key not configured. Available providers: {', '.join(available)}" | |
| return True, "" | |
| def normalize_response_keys(response_dict: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Normalize keys in the response dictionary to match the expected format. | |
| Handles variations like 'text_lp' -> 'text' and 'proceeding_type' -> 'proceeding'. | |
| """ | |
| if not response_dict: | |
| return response_dict | |
| # Map common variations to standard keys | |
| key_mapping = { | |
| "text_lp": "text", | |
| "legal_position_text": "text", | |
| "lp_text": "text", | |
| "proceeding_type": "proceeding", | |
| "type_of_proceeding": "proceeding" | |
| } | |
| normalized = response_dict.copy() | |
| for variant, standard in key_mapping.items(): | |
| if variant in normalized and standard not in normalized: | |
| normalized[standard] = normalized.pop(variant) | |
| return normalized | |
| class RetrieverEvent(Event): | |
| """Event class for retriever operations.""" | |
| nodes: list[NodeWithScore] | |
| class LLMAnalyzer: | |
| """Class for handling different LLM providers.""" | |
| def __init__(self, provider: Any, model_name: Any, temperature: float = GENERATION_TEMPERATURE, | |
| max_tokens: Optional[int] = None, thinking_enabled: bool = False, | |
| thinking_level: str = "medium", openai_verbosity: str = "medium", | |
| thinking_type: str = "Adaptive", thinking_budget: int = 10000): | |
| self.provider = provider | |
| self.model_name = model_name | |
| self.temperature = temperature | |
| self.max_tokens = max_tokens | |
| self.thinking_enabled = thinking_enabled | |
| self.thinking_level = thinking_level | |
| self.openai_verbosity = openai_verbosity | |
| self.thinking_type = thinking_type | |
| self.thinking_budget = thinking_budget | |
| if provider == ModelProvider.OPENAI: | |
| if not OPENAI_API_KEY: | |
| raise ValueError(f"OpenAI API key not configured. Please set OPENAI_API_KEY environment variable to use {provider.value} provider.") | |
| # Disable HTTP/2 to avoid 421 Misdirected Request on HF Spaces | |
| self._http_client = httpx.Client(timeout=httpx.Timeout(120.0, connect=30.0), http2=False) | |
| self.client = openai.OpenAI(api_key=OPENAI_API_KEY, http_client=self._http_client) | |
| elif provider == ModelProvider.DEEPSEEK: | |
| if not DEEPSEEK_API_KEY: | |
| raise ValueError(f"DeepSeek API key not configured. Please set DEEPSEEK_API_KEY environment variable to use {provider.value} provider.") | |
| # Disable HTTP/2 for consistent connections | |
| self._http_client = httpx.Client(timeout=httpx.Timeout(120.0, connect=30.0), http2=False) | |
| self.client = openai.OpenAI(api_key=DEEPSEEK_API_KEY, base_url="https://api.deepseek.com", http_client=self._http_client) | |
| elif provider == ModelProvider.ANTHROPIC: | |
| if not ANTHROPIC_API_KEY: | |
| raise ValueError(f"Anthropic API key not configured. Please set ANTHROPIC_API_KEY environment variable to use {provider.value} provider.") | |
| self.client = Anthropic(api_key=ANTHROPIC_API_KEY) | |
| elif provider == ModelProvider.GEMINI: | |
| if not os.environ.get("GEMINI_API_KEY"): | |
| raise ValueError(f"Gemini API key not configured. Please set GEMINI_API_KEY environment variable to use {provider.value} provider.") | |
| # Initialize Gemini client with new API | |
| self.client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) | |
| else: | |
| raise ValueError(f"Unsupported provider: {provider}") | |
| async def analyze(self, prompt: str, response_schema: dict) -> str: | |
| """Analyze text using selected LLM provider.""" | |
| if self.provider == ModelProvider.OPENAI: | |
| return await self._analyze_with_openai(prompt, response_schema) | |
| elif self.provider == ModelProvider.DEEPSEEK: | |
| return await self._analyze_with_deepseek(prompt) | |
| elif self.provider == ModelProvider.ANTHROPIC: | |
| return await self._analyze_with_anthropic(prompt, response_schema) | |
| else: | |
| return await self._analyze_with_gemini(prompt, response_schema) | |
| async def _analyze_with_openai(self, prompt: str, response_schema: dict) -> str: | |
| """Analyze text using OpenAI.""" | |
| # Determine model name and if it's a reasoning model | |
| model_val = self.model_name.value if hasattr(self.model_name, "value") else str(self.model_name) | |
| is_reasoning_model = any(m in model_val.lower() for m in ["gpt-4.1", "gpt-4.5", "gpt-5", "o1", "o3"]) | |
| analysis_blocks = _compile_analysis_prompt_blocks( | |
| system_prompt=SYSTEM_PROMPT, | |
| full_prompt=prompt, | |
| ) | |
| compiled_system_prompt = analysis_blocks["system_prompt"] | |
| final_user_prompt = analysis_blocks["user_prompt"] | |
| # Use developer role for newer models | |
| role = "developer" if is_reasoning_model else "system" | |
| messages = [ | |
| ChatMessage(role=role, content=compiled_system_prompt), | |
| ChatMessage(role="user", content=final_user_prompt) | |
| ] | |
| response_format = { | |
| "type": "json_schema", | |
| "json_schema": { | |
| "name": "relevant_positions_schema", | |
| "schema": response_schema | |
| } | |
| } | |
| try: | |
| completion_params = { | |
| "model": model_val, | |
| "messages": [{"role": m.role, "content": m.content} for m in messages], | |
| "response_format": response_format, | |
| } | |
| # Reasoning models usually require temperature=1.0 or none | |
| if not is_reasoning_model: | |
| completion_params["temperature"] = self.temperature | |
| # Add reasoning parameters only when thinking is explicitly enabled | |
| if is_reasoning_model and self.thinking_enabled: | |
| completion_params.update( | |
| _build_openai_reasoning_params(model_val, self.thinking_level, self.openai_verbosity) | |
| ) | |
| # Log full prompts in debug mode | |
| _log_prompt("openai-analyzer", model_val, compiled_system_prompt, final_user_prompt) | |
| # Retry logic for OpenAI analysis | |
| max_retries = 3 | |
| last_error = None | |
| response = None | |
| for attempt in range(max_retries): | |
| try: | |
| print(f"[DEBUG] OpenAI Analysis call attempt {attempt + 1}/{max_retries}") | |
| response = self.client.chat.completions.create(**completion_params) | |
| break | |
| except Exception as api_err: | |
| last_error = api_err | |
| print(f"[ERROR] OpenAI Analysis attempt {attempt + 1} failed: {type(api_err).__name__}: {str(api_err)}") | |
| if attempt < max_retries - 1: | |
| time.sleep(2 ** attempt) | |
| else: | |
| raise last_error | |
| response_text = response.choices[0].message.content | |
| # Log cache hit stats (automatic caching, no config needed) | |
| if hasattr(response, 'usage') and hasattr(response.usage, 'prompt_tokens_details'): | |
| cached = getattr(response.usage.prompt_tokens_details, 'cached_tokens', 0) | |
| total = response.usage.prompt_tokens | |
| print(f"[CACHE] OpenAI analysis: {cached}/{total} input tokens from cache") | |
| # Verify it's valid JSON | |
| json_data = extract_json_from_text(response_text) | |
| return json.dumps(json_data, ensure_ascii=False) if json_data else response_text | |
| except Exception as e: | |
| raise RuntimeError(f"Error in OpenAI analysis ({model_val}): {str(e)}") | |
| async def _analyze_with_deepseek(self, prompt: str) -> str: | |
| """Analyze text using DeepSeek.""" | |
| model_val = self.model_name.value if hasattr(self.model_name, "value") else str(self.model_name) | |
| is_reasoning = "reasoner" in model_val.lower() | |
| messages = [] | |
| if is_reasoning: | |
| # DeepSeek R1 does not support system role, combine with user | |
| messages.append(ChatMessage(role="user", content=f"{SYSTEM_PROMPT}\n\n{prompt}")) | |
| else: | |
| messages.append(ChatMessage(role="system", content=SYSTEM_PROMPT)) | |
| messages.append(ChatMessage(role="user", content=prompt)) | |
| try: | |
| completion_params = { | |
| "model": model_val, | |
| "messages": [{"role": m.role, "content": m.content} for m in messages], | |
| } | |
| # Use JSON mode and temperature only for non-reasoning models | |
| if not is_reasoning: | |
| completion_params["response_format"] = {'type': 'json_object'} | |
| completion_params["temperature"] = self.temperature | |
| # Log full prompts in debug mode | |
| if is_reasoning: | |
| _log_prompt("deepseek-analyzer", model_val, "", f"{SYSTEM_PROMPT}\n\n{prompt}") | |
| else: | |
| _log_prompt("deepseek-analyzer", model_val, SYSTEM_PROMPT, prompt) | |
| # Retry logic for DeepSeek analysis | |
| max_retries = 3 | |
| last_error = None | |
| response = None | |
| for attempt in range(max_retries): | |
| try: | |
| print(f"[DEBUG] DeepSeek Analysis call attempt {attempt + 1}/{max_retries}") | |
| response = self.client.chat.completions.create(**completion_params) | |
| break | |
| except Exception as api_err: | |
| last_error = api_err | |
| print(f"[ERROR] DeepSeek Analysis attempt {attempt + 1} failed: {type(api_err).__name__}: {str(api_err)}") | |
| if attempt < max_retries - 1: | |
| time.sleep(2 ** attempt) | |
| else: | |
| raise last_error | |
| response_text = response.choices[0].message.content | |
| # Verify and clean JSON | |
| json_data = extract_json_from_text(response_text) | |
| return json.dumps(json_data, ensure_ascii=False) if json_data else response_text | |
| except Exception as e: | |
| raise RuntimeError(f"Error in DeepSeek analysis ({model_val}): {str(e)}") | |
| async def _analyze_with_anthropic(self, prompt: str, response_schema: dict) -> str: | |
| """Analyze text using Anthropic.""" | |
| try: | |
| analysis_blocks = _compile_analysis_prompt_blocks( | |
| system_prompt=SYSTEM_PROMPT, | |
| full_prompt=prompt, | |
| ) | |
| compiled_system_prompt = analysis_blocks["system_prompt"] | |
| final_user_prompt = analysis_blocks["user_prompt"] | |
| _log_prompt("anthropic-analyzer", str(self.model_name), compiled_system_prompt, final_user_prompt) | |
| message_params = { | |
| "model": self.model_name, | |
| "max_tokens": self.max_tokens or MAX_TOKENS_ANALYSIS, | |
| "temperature": self.temperature, | |
| "system": [{"type": "text", "text": compiled_system_prompt, "cache_control": {"type": "ephemeral"}}], | |
| "messages": [{"role": "user", "content": final_user_prompt}] | |
| } | |
| if self.thinking_enabled and "claude" in str(self.model_name).lower(): | |
| if self.thinking_type.lower() == "adaptive" and "-4-6" in str(self.model_name).lower(): | |
| message_params["thinking"] = {"type": "adaptive"} | |
| message_params["temperature"] = 1.0 | |
| t_lv = self.thinking_level.lower() | |
| if t_lv == "xhigh": | |
| effort = "max" | |
| elif t_lv in ["low", "medium", "high"]: | |
| effort = t_lv | |
| else: | |
| effort = "medium" | |
| message_params["output_config"] = {"effort": effort} | |
| else: | |
| budget = max(1024, int(self.thinking_budget)) | |
| if message_params["max_tokens"] <= budget: | |
| message_params["max_tokens"] = budget + 4000 | |
| message_params["thinking"] = { | |
| "type": "enabled", | |
| "budget_tokens": budget | |
| } | |
| message_params["temperature"] = 1.0 | |
| response = self.client.messages.create(**message_params) | |
| response_text = "" | |
| for block in response.content: | |
| if hasattr(block, 'type') and block.type == 'text': | |
| response_text += getattr(block, 'text', '') | |
| elif hasattr(block, 'text'): | |
| response_text += block.text | |
| # Extract JSON from potential markdown blocks | |
| json_data = extract_json_from_text(response_text) | |
| if json_data: | |
| return json.dumps(json_data, ensure_ascii=False) | |
| return response_text | |
| except Exception as e: | |
| raise RuntimeError(f"Error in Anthropic analysis: {str(e)}") | |
| async def _analyze_with_gemini(self, prompt: str, response_schema: dict) -> str: | |
| """Analyze text using Gemini with new API.""" | |
| try: | |
| # Форматуємо промпт для отримання відповіді у форматі JSON | |
| json_instruction = """ | |
| Твоя відповідь повинна бути в форматі JSON: | |
| { | |
| "relevant_positions": [ | |
| { | |
| "lp_id": "ID позиції", | |
| "source_index": "Порядковий номер позиції у списку", | |
| "description": "Детальне обґрунтування релевантності" | |
| } | |
| ] | |
| } | |
| """ | |
| formatted_prompt = f"{prompt}\n\n{json_instruction}" | |
| # Log full prompts in debug mode | |
| _log_prompt("gemini-analyzer", str(self.model_name), SYSTEM_PROMPT, formatted_prompt) | |
| # Use new google.genai API | |
| contents = [ | |
| types.Content( | |
| role="user", | |
| parts=[ | |
| types.Part.from_text(text=formatted_prompt), | |
| ], | |
| ), | |
| ] | |
| config_params = { | |
| "temperature": self.temperature, | |
| "max_output_tokens": self.max_tokens or MAX_TOKENS_ANALYSIS, | |
| "system_instruction": [ | |
| types.Part.from_text(text=SYSTEM_PROMPT), | |
| ], | |
| } | |
| if self.thinking_enabled and str(self.model_name).startswith("gemini-3"): | |
| config_params["thinking_config"] = types.ThinkingConfig( | |
| thinking_level=self.thinking_level.upper() | |
| ) | |
| generate_content_config = types.GenerateContentConfig(**config_params) | |
| response = self.client.models.generate_content( | |
| model=self.model_name, | |
| contents=contents, | |
| config=generate_content_config, | |
| ) | |
| response_text = response.text | |
| if not response_text: | |
| raise RuntimeError("Empty response from Gemini") | |
| # Витягуємо JSON з відповіді за допомогою універсальної функції | |
| json_data = extract_json_from_text(response_text) | |
| if json_data: | |
| if "relevant_positions" not in json_data: | |
| json_data = { | |
| "relevant_positions": [ | |
| { | |
| "lp_id": "unknown", | |
| "source_index": "1", | |
| "description": json.dumps(json_data, ensure_ascii=False) | |
| } | |
| ] | |
| } | |
| return json.dumps(json_data, ensure_ascii=False) | |
| else: | |
| # Якщо JSON не знайдено, створюємо структурований JSON з тексту | |
| return json.dumps({ | |
| "relevant_positions": [ | |
| { | |
| "lp_id": "unknown", | |
| "source_index": "1", | |
| "description": response_text | |
| } | |
| ] | |
| }, ensure_ascii=False) | |
| except Exception as e: | |
| # Спроба отримати більш детальну інформацію про помилку | |
| error_details = str(e) | |
| if hasattr(e, 'response'): | |
| error_details += f"\nResponse: {e.response}" | |
| raise RuntimeError(f"Error in Gemini analysis: {error_details}") | |
| class PrecedentAnalysisWorkflow(Workflow): | |
| """Workflow for analyzing legal precedents.""" | |
| def __init__(self, provider: Any = ModelProvider.OPENAI, | |
| model_name: Any = None, | |
| temperature: float = GENERATION_TEMPERATURE, | |
| max_tokens: Optional[int] = None, | |
| thinking_enabled: bool = False, | |
| thinking_level: str = "medium", | |
| openai_verbosity: str = "medium", | |
| thinking_type: str = "Adaptive", | |
| thinking_budget: int = 10000): | |
| super().__init__() | |
| # Use default analysis model if not specified | |
| if model_name is None: | |
| model_name = DEFAULT_ANALYSIS_MODEL or AnalysisModelName.GPT5_2 | |
| self.analyzer = LLMAnalyzer(provider, model_name, temperature, max_tokens, | |
| thinking_enabled, thinking_level, openai_verbosity, | |
| thinking_type, thinking_budget) | |
| async def analyze(self, ctx: Context, ev: StartEvent) -> StopEvent: | |
| """Analyze legal precedents.""" | |
| try: | |
| query = ev.get("query", "") | |
| question = ev.get("question", "") | |
| nodes = ev.get("nodes", []) | |
| if not query: | |
| return StopEvent(result="Error: No text provided (query)") | |
| if not nodes: | |
| return StopEvent(result="Error: No legal positions provided for analysis (nodes)") | |
| context_parts = [] | |
| for i, node in enumerate(nodes, 1): | |
| node_text = node.node.text if hasattr(node, 'node') else node.text | |
| metadata = node.node.metadata if hasattr(node, 'node') else node.metadata | |
| lp_id = metadata.get('lp_id', f'unknown_{i}') | |
| context_parts.append(f"Source {i} (ID: {lp_id}):\n{node_text}") | |
| context_str = "\n\n".join(context_parts) | |
| response_schema = { | |
| "type": "object", | |
| "properties": { | |
| "relevant_positions": { | |
| "type": "array", | |
| "items": { | |
| "type": "object", | |
| "properties": { | |
| "lp_id": {"type": "string"}, | |
| "source_index": {"type": "string"}, | |
| "description": {"type": "string"} | |
| }, | |
| "required": ["lp_id", "source_index", "description"] | |
| } | |
| } | |
| } | |
| } | |
| prompt = PRECEDENT_ANALYSIS_TEMPLATE.format( | |
| query=query, | |
| question=question if question else "Загальний аналіз релевантності", | |
| context_str=context_str | |
| ) | |
| response_content = await self.analyzer.analyze(prompt, response_schema) | |
| try: | |
| # Спроба розпарсити JSON | |
| parsed_response = json.loads(response_content) | |
| if "relevant_positions" in parsed_response: | |
| response_lines = [] | |
| for position in parsed_response["relevant_positions"]: | |
| position_text = f"* [{position['source_index']}] {position['description']} " | |
| response_lines.append(position_text) | |
| if not response_lines: | |
| return StopEvent(result="Жодної релевантної правової позиції у базі не знайдено.") | |
| response_text = "\n".join(response_lines) | |
| return StopEvent(result=response_text) | |
| else: | |
| # Якщо немає relevant_positions, повертаємо весь текст | |
| return StopEvent(result=f"* [1] {response_content}") | |
| except json.JSONDecodeError as e: | |
| # Якщо не вдалося розпарсити JSON, повертаємо текст як є | |
| return StopEvent(result=f"* [1] {response_content}") | |
| except Exception as e: | |
| return StopEvent(result=f"Error during analysis: {str(e)}") | |
| def generate_legal_position( | |
| input_text: str, | |
| input_type: str, | |
| comment_input: str, | |
| provider: str, | |
| model_name: str, | |
| thinking_enabled: bool = False, | |
| thinking_type: str = "Adaptive", | |
| thinking_level: str = "MEDIUM", | |
| openai_verbosity: str = "medium", | |
| thinking_budget: int = 10000, | |
| temperature: float = GENERATION_TEMPERATURE, | |
| max_tokens: Optional[int] = None, | |
| custom_system_prompt: Optional[str] = None, | |
| custom_lp_prompt: Optional[str] = None | |
| ) -> Dict: | |
| """Generate legal position from input text using specified provider and model.""" | |
| try: | |
| # Check if provider is available | |
| is_available, error_msg = check_provider_available(provider) | |
| if not is_available: | |
| return { | |
| "title": "Помилка конфігурації", | |
| "text": error_msg, | |
| "proceeding": "N/A", | |
| "category": "Error" | |
| } | |
| # Use custom prompts if provided, otherwise use defaults | |
| system_prompt = custom_system_prompt if custom_system_prompt else SYSTEM_PROMPT | |
| lp_prompt = custom_lp_prompt if custom_lp_prompt else LEGAL_POSITION_PROMPT | |
| print(f"[DEBUG] RAW input_text length: {len(input_text) if input_text else 0}") | |
| print(f"[DEBUG] RAW input_text preview: {input_text[:300] if input_text else 'Empty'}") | |
| print(f"[DEBUG] Using custom prompts: system={custom_system_prompt is not None}, lp={custom_lp_prompt is not None}") | |
| input_text = clean_text(input_text) | |
| print(f"[DEBUG] AFTER CLEAN input_text length: {len(input_text) if input_text else 0}") | |
| print(f"[DEBUG] AFTER CLEAN input_text preview: {input_text[:300] if input_text else 'Empty'}") | |
| comment_input = clean_text(comment_input) | |
| if input_type == "url": | |
| try: | |
| extracted = extract_court_decision_text(input_text) | |
| print(f"[DEBUG] EXTRACTED text length: {len(extracted) if extracted else 0}") | |
| print(f"[DEBUG] EXTRACTED text preview: {extracted[:300] if extracted else 'Empty'}") | |
| court_decision_text = clean_text(extracted) | |
| print(f"[DEBUG] AFTER CLEAN extracted length: {len(court_decision_text) if court_decision_text else 0}") | |
| print(f"[DEBUG] AFTER CLEAN extracted preview: {court_decision_text[:300] if court_decision_text else 'Empty'}") | |
| except Exception as e: | |
| raise Exception(f"Помилка при отриманні тексту за URL: {str(e)}") | |
| else: | |
| court_decision_text = input_text | |
| # Debug: Check what we have before formatting | |
| print(f"[DEBUG] FINAL court_decision_text length: {len(court_decision_text)}") | |
| print(f"[DEBUG] FINAL court_decision_text preview: {court_decision_text[:300]}") | |
| print(f"[DEBUG] comment_input: {comment_input[:100] if comment_input else 'Empty'}") | |
| prompt_blocks = _compile_generation_prompt_blocks( | |
| system_prompt=system_prompt, | |
| lp_prompt=lp_prompt, | |
| court_decision_text=court_decision_text, | |
| comment=comment_input, | |
| ) | |
| compiled_system_prompt = prompt_blocks["system_prompt"] | |
| content = prompt_blocks["full_user_prompt"] | |
| provider_user_content = prompt_blocks["user_prompt"] | |
| # Debug: Check formatted content | |
| print(f"[DEBUG] ===== COMPILED PROMPT BLOCKS =====") | |
| print(f"[DEBUG] Static system length: {len(compiled_system_prompt)}") | |
| print(f"[DEBUG] Dynamic user length: {len(provider_user_content)}") | |
| print(f"[DEBUG] Full content length: {len(content)}") | |
| print(f"[DEBUG] Static system preview (first 500 chars): {compiled_system_prompt[:500]}") | |
| print(f"[DEBUG] Dynamic user preview (first 500 chars): {provider_user_content[:500]}") | |
| print(f"[DEBUG] Provider: {provider}, Model: {model_name}") | |
| print(f"[DEBUG] ==================================") | |
| # Validation check - ensure court_decision_text is not empty | |
| if not court_decision_text or len(court_decision_text.strip()) < 50: | |
| print(f"[WARNING] court_decision_text is too short or empty! Length: {len(court_decision_text) if court_decision_text else 0}") | |
| raise Exception(f"Текст судового рішення занадто короткий або відсутній (довжина: {len(court_decision_text) if court_decision_text else 0} символів). Будь ласка, перевірте вхідні дані.") | |
| if provider == ModelProvider.OPENAI.value: | |
| http_client = None | |
| response_text = None | |
| try: | |
| print(f"[DEBUG] OpenAI pre-flight check...") | |
| print(f"[DEBUG] openai SDK version: {openai.__version__}") | |
| print(f"[DEBUG] httpx version: {httpx.__version__}") | |
| http_client = httpx.Client( | |
| timeout=httpx.Timeout(120.0, connect=30.0), | |
| http2=False, | |
| ) | |
| client = OpenAI( | |
| api_key=OPENAI_API_KEY, | |
| http_client=http_client | |
| ) | |
| print(f"[DEBUG] OpenAI client base_url: {client.base_url}") | |
| # Retry logic for connection errors | |
| max_retries = 3 | |
| last_error = None | |
| response = None | |
| print(f"[DEBUG] OpenAI Generation - Model: {model_name}") | |
| # Check for reasoning models (gpt-4.1, gpt-5.2, o1, etc.) | |
| is_reasoning_model = any(m in model_name.lower() for m in ["gpt-4.1", "gpt-4.5", "gpt-5", "o1", "o3"]) | |
| # Use developer role for newer models, system for others | |
| role = "developer" if is_reasoning_model else "system" | |
| messages = [ | |
| {"role": role, "content": compiled_system_prompt}, | |
| {"role": "user", "content": provider_user_content}, | |
| ] | |
| # Parameters for chat completion | |
| completion_params = { | |
| "model": model_name, | |
| "messages": messages, | |
| } | |
| # Set tokens based on model capabilities | |
| if is_reasoning_model: | |
| completion_params["max_completion_tokens"] = max_tokens or MAX_TOKENS_CONFIG["openai"] | |
| else: | |
| completion_params["max_tokens"] = max_tokens or MAX_TOKENS_CONFIG["openai"] | |
| completion_params["temperature"] = temperature | |
| # Add reasoning parameters only when thinking is explicitly enabled by the user | |
| if is_reasoning_model and thinking_enabled: | |
| completion_params.update( | |
| _build_openai_reasoning_params(model_name, thinking_level, openai_verbosity) | |
| ) | |
| # Log full prompts in debug mode | |
| _log_prompt("openai", model_name, compiled_system_prompt, provider_user_content) | |
| # Execute with retries | |
| for attempt in range(max_retries): | |
| try: | |
| print(f"[DEBUG] OpenAI API call attempt {attempt + 1}/{max_retries}") | |
| response = client.chat.completions.create(**completion_params) | |
| break | |
| except Exception as api_err: | |
| import traceback | |
| last_error = api_err | |
| error_type = type(api_err).__name__ | |
| error_detail = str(api_err) | |
| # Walk the full exception chain | |
| cause = api_err.__cause__ | |
| depth = 0 | |
| while cause and depth < 5: | |
| error_detail += f"\n -> Caused by [{depth}]: {type(cause).__name__}: {cause}" | |
| cause = getattr(cause, '__cause__', None) or getattr(cause, '__context__', None) | |
| depth += 1 | |
| print(f"[ERROR] OpenAI API attempt {attempt + 1} failed: {error_type}: {error_detail}") | |
| print(f"[ERROR] Full traceback:\n{traceback.format_exc()}") | |
| if attempt < max_retries - 1: | |
| wait_time = 2 ** attempt # 1, 2, 4 seconds | |
| print(f"[DEBUG] Retrying in {wait_time}s...") | |
| time.sleep(wait_time) | |
| else: | |
| raise last_error | |
| response_text = response.choices[0].message.content | |
| print(f"[DEBUG] OpenAI response length: {len(response_text) if response_text else 0}") | |
| # Log cache hit stats (automatic caching, no config needed) | |
| if hasattr(response, 'usage') and hasattr(response.usage, 'prompt_tokens_details'): | |
| cached = getattr(response.usage.prompt_tokens_details, 'cached_tokens', 0) | |
| total = response.usage.prompt_tokens | |
| print(f"[CACHE] OpenAI generation: {cached}/{total} input tokens from cache") | |
| json_response = extract_json_from_text(response_text) | |
| if json_response: | |
| json_response = normalize_response_keys(json_response) | |
| if json_response and all(key in json_response for key in ["title", "text", "proceeding", "category"]): | |
| return json_response | |
| else: | |
| print(f"[WARNING] Invalid JSON structure from OpenAI. Text: {response_text[:300] if response_text else 'None'}...") | |
| raise ValueError("Invalid JSON structure") | |
| except Exception as e: | |
| print(f"[ERROR] OpenAI generation/parsing failed: {e}") | |
| return { | |
| "title": "Автоматично сформований заголовок (OpenAI)", | |
| "text": response_text.strip() if response_text else f"Помилка при отриманні відповіді: {str(e)}", | |
| "proceeding": "Не визначено", | |
| "category": "Помилка парсингу" | |
| } | |
| finally: | |
| if http_client: | |
| try: | |
| http_client.close() | |
| except Exception: | |
| pass | |
| if provider == ModelProvider.DEEPSEEK.value: | |
| # Use custom httpx client with HTTP/2 disabled for consistent connections | |
| http_client = None | |
| response_text = None | |
| try: | |
| http_client = httpx.Client( | |
| timeout=httpx.Timeout(120.0, connect=30.0), | |
| http2=False, | |
| ) | |
| client = OpenAI( | |
| api_key=DEEPSEEK_API_KEY, | |
| base_url="https://api.deepseek.com", | |
| http_client=http_client | |
| ) | |
| # Retry logic for DeepSeek | |
| max_retries = 3 | |
| last_error = None | |
| response = None | |
| print(f"[DEBUG] DeepSeek Generation - Model: {model_name}") | |
| # Check for reasoning model (DeepSeek R1) | |
| is_reasoning = "reasoner" in model_name.lower() | |
| messages = [] | |
| if is_reasoning: | |
| # R1 does not support system role, combine with user | |
| combined_content = f"{system_prompt}\n\n{content}" | |
| messages.append({"role": "user", "content": combined_content}) | |
| else: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| completion_params = { | |
| "model": model_name, | |
| "messages": messages, | |
| "max_tokens": max_tokens or MAX_TOKENS_CONFIG["deepseek"], | |
| "frequency_penalty": 0.0, | |
| } | |
| if not is_reasoning: | |
| completion_params["temperature"] = temperature | |
| # Log full prompts in debug mode | |
| if is_reasoning: | |
| _log_prompt("deepseek", model_name, "", combined_content) | |
| else: | |
| _log_prompt("deepseek", model_name, system_prompt, content) | |
| # Execute with retries | |
| for attempt in range(max_retries): | |
| try: | |
| print(f"[DEBUG] DeepSeek API call attempt {attempt + 1}/{max_retries}") | |
| response = client.chat.completions.create(**completion_params) | |
| break | |
| except Exception as api_err: | |
| last_error = api_err | |
| error_type = type(api_err).__name__ | |
| print(f"[ERROR] DeepSeek API attempt {attempt + 1} failed: {error_type}: {str(api_err)}") | |
| if attempt < max_retries - 1: | |
| wait_time = 2 ** attempt | |
| print(f"[DEBUG] Retrying in {wait_time}s...") | |
| time.sleep(wait_time) | |
| else: | |
| raise last_error | |
| response_text = response.choices[0].message.content | |
| print(f"[DEBUG] DeepSeek response length: {len(response_text) if response_text else 0}") | |
| json_response = extract_json_from_text(response_text) | |
| if json_response: | |
| json_response = normalize_response_keys(json_response) | |
| if json_response and all(key in json_response for key in ["title", "text", "proceeding", "category"]): | |
| return json_response | |
| else: | |
| print(f"[WARNING] Invalid JSON structure from DeepSeek. Text: {response_text[:300] if response_text else 'None'}...") | |
| raise ValueError("Invalid JSON structure") | |
| except Exception as e: | |
| print(f"[ERROR] DeepSeek generation/parsing failed: {e}") | |
| return { | |
| "title": "Автоматично сформований заголовок (DeepSeek)", | |
| "text": response_text.strip() if response_text else f"Помилка при отриманні відповіді від DeepSeek: {str(e)}", | |
| "proceeding": "Не визначено", | |
| "category": "Помилка API/Парсингу" | |
| } | |
| finally: | |
| if http_client: | |
| try: | |
| http_client.close() | |
| except Exception: | |
| pass | |
| elif provider == ModelProvider.ANTHROPIC.value: | |
| client = Anthropic(api_key=ANTHROPIC_API_KEY) | |
| # Debug: check what we're sending to Anthropic | |
| print(f"[DEBUG] Sending to Anthropic - system length: {len(compiled_system_prompt)}") | |
| print(f"[DEBUG] Sending to Anthropic - dynamic user length: {len(provider_user_content)}") | |
| print(f"[DEBUG] Anthropic dynamic preview: {provider_user_content[:500]}") | |
| print(f"[DEBUG] Anthropic API key configured: {bool(ANTHROPIC_API_KEY)}") | |
| messages = [{ | |
| "role": "user", | |
| "content": provider_user_content | |
| }] | |
| # Prepare message creation parameters | |
| message_params = { | |
| "model": model_name, | |
| "max_tokens": max_tokens or MAX_TOKENS_CONFIG["anthropic"], | |
| "system": [{"type": "text", "text": compiled_system_prompt, "cache_control": {"type": "ephemeral"}}], | |
| "messages": messages, | |
| "temperature": temperature | |
| } | |
| # Add thinking config if enabled | |
| if thinking_enabled and "claude" in model_name.lower(): | |
| # For Claude 4.6 models, we can use Adaptive | |
| if thinking_type.lower() == "adaptive" and "-4-6" in str(model_name).lower(): | |
| message_params["thinking"] = {"type": "adaptive"} | |
| message_params["temperature"] = 1.0 | |
| # Map thinking_level to valid effort | |
| t_lv = thinking_level.lower() | |
| if t_lv == "xhigh": | |
| effort = "max" | |
| elif t_lv in ["low", "medium", "high"]: | |
| effort = t_lv | |
| else: | |
| effort = "medium" | |
| message_params["output_config"] = {"effort": effort} | |
| else: | |
| # 'Enabled' type works for both 4.5 and 4.6 models | |
| budget = max(1024, int(thinking_budget)) | |
| # Anthropic REQUIRES max_tokens > budget_tokens. | |
| # If the user sets a low max_tokens (e.g. 4000) and high budget (10000), it will fail. | |
| if message_params["max_tokens"] <= budget: | |
| recommended_max = budget + 4000 | |
| print(f"[WARNING] max_tokens ({message_params['max_tokens']}) is <= thinking_budget ({budget}). Increasing max_tokens to {recommended_max}.") | |
| message_params["max_tokens"] = recommended_max | |
| message_params["thinking"] = { | |
| "type": "enabled", | |
| "budget_tokens": budget | |
| } | |
| message_params["temperature"] = 1.0 | |
| # Log full prompts in debug mode | |
| _log_prompt("anthropic", model_name, compiled_system_prompt, provider_user_content) | |
| # Retry logic for connection errors | |
| max_retries = 3 | |
| last_error = None | |
| for attempt in range(max_retries): | |
| try: | |
| print(f"[DEBUG] Anthropic API call attempt {attempt + 1}/{max_retries}") | |
| response = client.messages.create(**message_params) | |
| break | |
| except Exception as api_err: | |
| last_error = api_err | |
| error_type = type(api_err).__name__ | |
| print(f"[ERROR] Anthropic API attempt {attempt + 1} failed: {error_type}: {str(api_err)}") | |
| if attempt < max_retries - 1: | |
| wait_time = 2 ** attempt # 1, 2, 4 seconds | |
| print(f"[DEBUG] Retrying in {wait_time}s...") | |
| time.sleep(wait_time) | |
| else: | |
| raise Exception(f"Помилка з'єднання з Anthropic API після {max_retries} спроб: {error_type}: {str(api_err)}") | |
| try: | |
| # Extract text from response, handling different content block types | |
| response_text = "" | |
| thinking_text = "" | |
| for block in response.content: | |
| if hasattr(block, 'type'): | |
| if block.type == 'thinking': | |
| # Separate thinking blocks (if any) | |
| thinking_text += getattr(block, 'thinking', '') | |
| elif block.type == 'text': | |
| response_text += getattr(block, 'text', '') | |
| elif hasattr(block, 'text'): | |
| # Fallback for simpler response format | |
| response_text += block.text | |
| if thinking_text: | |
| print(f"[DEBUG] Anthropic thinking block length: {len(thinking_text)}") | |
| print(f"[DEBUG] Anthropic response text length: {len(response_text)}") | |
| print(f"[DEBUG] Response preview (first 500 chars): {response_text[:500]}") | |
| # Спробуємо розпарсити JSON за допомогою універсальної функції | |
| json_response = extract_json_from_text(response_text) | |
| if json_response: | |
| json_response = normalize_response_keys(json_response) | |
| # Validate required fields | |
| required = ["title", "text", "proceeding", "category"] | |
| missing = [f for f in required if f not in json_response] | |
| if missing: | |
| print(f"[WARNING] Missing fields in Anthropic JSON: {missing}") | |
| for field in missing: | |
| if field not in json_response: | |
| json_response[field] = "Не вказано" | |
| return json_response | |
| else: | |
| print(f"[ERROR] Could not extract JSON from Anthropic response") | |
| # Fallback: create structured response from raw text | |
| return { | |
| "title": "Автоматично згенерований заголовок", | |
| "text": response_text.strip(), | |
| "proceeding": "Не визначено", | |
| "category": "Помилка парсингу JSON" | |
| } | |
| except Exception as e: | |
| # Скидання помилки для подальшого аналізу | |
| error_details = str(e) | |
| if hasattr(e, 'response'): | |
| error_details += f"\nResponse: {e.response}" | |
| raise RuntimeError(f"Error in Anthropic analysis: {error_details}") | |
| elif provider == ModelProvider.GEMINI.value: | |
| if not os.environ.get("GEMINI_API_KEY"): | |
| raise ValueError("Gemini API key not found in environment variables") | |
| try: | |
| # Debug: Log input parameters | |
| print(f"[DEBUG] Gemini Generation:") | |
| print(f"[DEBUG] Model: {model_name}") | |
| print(f"[DEBUG] Input text length: {len(input_text)}") | |
| print(f"[DEBUG] Court decision text length: {len(court_decision_text)}") | |
| # Use new google.genai API | |
| client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) | |
| contents = [ | |
| types.Content( | |
| role="user", | |
| parts=[ | |
| types.Part.from_text(text=content), | |
| ], | |
| ), | |
| ] | |
| # Build config based on model version | |
| config_params = { | |
| "temperature": temperature, | |
| "max_output_tokens": max_tokens or MAX_TOKENS_CONFIG["gemini"], | |
| "system_instruction": [ | |
| types.Part.from_text(text=system_prompt), | |
| ], | |
| } | |
| # Add thinking config if enabled (only for Gemini 3+ models) | |
| if thinking_enabled and model_name.startswith("gemini-3"): | |
| config_params["thinking_config"] = types.ThinkingConfig( | |
| thinking_level=thinking_level.upper() | |
| ) | |
| # Only add response_mime_type for models that support it | |
| # if not model_name.startswith("gemini-3"): | |
| # config_params["response_mime_type"] = "application/json" | |
| generate_content_config = types.GenerateContentConfig(**config_params) | |
| # Log full prompts in debug mode | |
| _log_prompt("gemini", model_name, system_prompt, content) | |
| response = client.models.generate_content( | |
| model=model_name, | |
| contents=contents, | |
| config=generate_content_config, | |
| ) | |
| response_text = response.text | |
| # Перевіряємо наявність тексту у відповіді | |
| if not response_text: | |
| raise Exception("Пуста відповідь від моделі Gemini") | |
| print(f"[DEBUG] Gemini response length: {len(response_text)}") | |
| print(f"[DEBUG] Gemini response preview: {response_text[:300]}...") | |
| # Спробуємо розпарсити JSON за допомогою універсальної функції | |
| json_response = extract_json_from_text(response_text) | |
| if json_response: | |
| json_response = normalize_response_keys(json_response) | |
| # Перевіряємо наявність всіх необхідних полів | |
| required_fields = ["title", "text", "proceeding", "category"] | |
| if all(field in json_response for field in required_fields): | |
| return json_response | |
| else: | |
| missing_fields = [field for field in required_fields if field not in json_response] | |
| print(f"[WARNING] Gemini response missing fields: {missing_fields}") | |
| # Fallback for missing fields | |
| for field in required_fields: | |
| if field not in json_response: | |
| json_response[field] = "Не визначено" | |
| return json_response | |
| else: | |
| print(f"[ERROR] Could not extract JSON from Gemini response: {response_text[:300]}...") | |
| return { | |
| "title": "Автоматично сформований заголовок", | |
| "text": response_text.strip(), | |
| "proceeding": "Не визначено", | |
| "category": "Помилка парсингу" | |
| } | |
| except Exception as e: | |
| print(f"Error in Gemini generation: {str(e)}") | |
| return { | |
| "title": "Error in Gemini generation", | |
| "text": str(e), | |
| "proceeding": "Error", | |
| "category": "Error" | |
| } | |
| except Exception as e: | |
| print(f"Error in generate_legal_position: {str(e)}") | |
| return { | |
| "title": "Error", | |
| "text": str(e), | |
| "proceeding": "Unknown", | |
| "category": "Error" | |
| } | |
| async def search_with_ai_action(legal_position_json: Dict) -> Tuple[str, Optional[List[NodeWithScore]]]: | |
| """Search for relevant legal positions based on input.""" | |
| try: | |
| if not embed_model: | |
| return "Помилка: пошук недоступний без налаштованого embedding API ключа (OpenAI або Gemini)", None | |
| retriever = search_components.get_retriever() | |
| if not retriever: | |
| return "Помилка: компоненти пошуку не ініціалізовано", None | |
| query_text = ( | |
| f"{legal_position_json['title']}: " | |
| f"{legal_position_json['text']}: " | |
| f"{legal_position_json['proceeding']}: " | |
| f"{legal_position_json['category']}" | |
| ) | |
| nodes = await retriever.aretrieve(query_text) | |
| # Видалення дублікатів | |
| unique_nodes = deduplicate_nodes(nodes) | |
| # Обмеження кількості результатів | |
| top_nodes = unique_nodes[:Settings.similarity_top_k] | |
| sources_output = "\n **Результати пошуку (наявні правові позиції Верховного Суду):** \n\n" | |
| for index, node in enumerate(top_nodes, start=1): | |
| source_title = node.node.metadata.get('title') | |
| doc_ids = node.node.metadata.get('doc_id') | |
| lp_ids = node.node.metadata.get('lp_id') | |
| links = get_links_html(doc_ids) | |
| links_lp = get_links_html_lp(lp_ids) | |
| sources_output += f"\n[{index}] *{source_title}* ⚖️ {links_lp} | {links} 👉 Score: {node.score}\n" | |
| return sources_output, top_nodes | |
| except Exception as e: | |
| return f"Помилка при пошуку: {str(e)}", None | |
| async def search_with_raw_text(input_text: str) -> Tuple[str, Optional[List[NodeWithScore]]]: | |
| """Пошук на основі вхідного тексту з вибором відповідного ретривера.""" | |
| try: | |
| if not input_text: | |
| return "Помилка: Порожній текст для пошуку", None | |
| if not embed_model: | |
| return "Помилка: пошук недоступний без налаштованого embedding API ключа (OpenAI або Gemini)", None | |
| retriever = search_components.get_retriever() | |
| if not retriever: | |
| return "Помилка: компоненти пошуку не ініціалізовано", None | |
| # Вибір ретривера залежно від довжини тексту | |
| text_length = get_text_length_without_spaces(input_text) | |
| try: | |
| if text_length < 1024: | |
| nodes = await retriever.aretrieve(input_text) | |
| else: | |
| # Для довгих текстів використовуємо тільки BM25 | |
| bm25_retriever = search_components.get_component('bm25_retriever') | |
| if not bm25_retriever: | |
| return "Помилка: BM25 ретривер не ініціалізовано", None | |
| nodes = await bm25_retriever.aretrieve(input_text) | |
| if not nodes: | |
| return "Не знайдено відповідних правових позицій", None | |
| # Видалення дублікатів | |
| unique_nodes = deduplicate_nodes(nodes) | |
| # Обмеження кількості результатів | |
| top_nodes = unique_nodes[:Settings.similarity_top_k] | |
| if not top_nodes: | |
| return "Не знайдено унікальних правових позицій після дедуплікації", None | |
| sources_output = "\n **Результати пошуку (наявні правові позиції Верховного Суду):** \n\n" | |
| for index, node in enumerate(top_nodes, start=1): | |
| source_title = node.node.metadata.get('title', 'Невідомий заголовок') | |
| doc_ids = node.node.metadata.get('doc_id', '') | |
| lp_ids = node.node.metadata.get('lp_id', '') | |
| links = get_links_html(doc_ids) | |
| links_lp = get_links_html_lp(lp_ids) | |
| sources_output += f"\n[{index}] *{source_title}* ⚖️ {links_lp} | {links} 👉 Score: {node.score}\n" | |
| return sources_output, top_nodes | |
| except Exception as e: | |
| return f"Помилка під час виконання пошуку: {str(e)}", None | |
| except Exception as e: | |
| return f"Помилка при пошуку: {str(e)}", None | |
| async def analyze_action( | |
| legal_position_json: Dict, | |
| question: str, | |
| nodes: List[NodeWithScore], | |
| provider: str, | |
| model_name: str, | |
| temperature: float = GENERATION_TEMPERATURE, | |
| max_tokens: Optional[int] = None, | |
| thinking_enabled: bool = False, | |
| thinking_type: str = "Adaptive", | |
| thinking_level: str = "medium", | |
| openai_verbosity: str = "medium", | |
| thinking_budget: int = 10000, | |
| progress=gr.Progress() | |
| ) -> str: | |
| """Analyze search results using AI.""" | |
| try: | |
| progress(0, desc="Ініціалізація аналізу...") | |
| workflow = PrecedentAnalysisWorkflow( | |
| provider=ModelProvider(provider), | |
| model_name=AnalysisModelName(model_name), | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| thinking_enabled=thinking_enabled, | |
| thinking_type=thinking_type, | |
| thinking_level=thinking_level, | |
| openai_verbosity=openai_verbosity, | |
| thinking_budget=thinking_budget | |
| ) | |
| progress(0.2, desc="Підготовка даних...") | |
| query = ( | |
| f"{legal_position_json['title']}: " | |
| f"{legal_position_json['text']}: " | |
| f"{legal_position_json['proceeding']}: " | |
| f"{legal_position_json['category']}" | |
| ) | |
| progress(0.4, desc="Запит до AI (це може зайняти час)...") | |
| response_text = await workflow.run( | |
| query=query, | |
| question=question, | |
| nodes=nodes | |
| ) | |
| progress(0.8, desc="Формування висновків...") | |
| output = f"**Аналіз ШІ (модель: {model_name}):**\n{response_text}\n\n" | |
| output += "**Наявні в базі правові позицій Верховного Суду:**\n\n" | |
| analysis_lines = response_text.split('\n') | |
| total_lines = len(analysis_lines) | |
| for i, line in enumerate(analysis_lines): | |
| if i % 2 == 0: | |
| progress(0.8 + (0.2 * i / total_lines), desc=f"Обробка результатів ({i}/{total_lines})...") | |
| if line.startswith('* ['): | |
| index = line[3:line.index(']')] | |
| node = nodes[int(index) - 1] | |
| source_node = node.node | |
| source_title = source_node.metadata.get('title', 'Невідомий заголовок') | |
| source_text_lp = node.text | |
| doc_ids = source_node.metadata.get('doc_id') | |
| lp_id = source_node.metadata.get('lp_id') | |
| links = get_links_html(doc_ids) | |
| links_lp = get_links_html_lp(lp_id) | |
| output += f"[{index}]: *{clean_text(source_title)}* | {clean_text(source_text_lp)} | {links_lp} | {links}\n\n" | |
| return output | |
| except Exception as e: | |
| return f"Помилка при аналізі: {str(e)}" | |
| if __name__ == "__main__": | |
| try: | |
| # Check which providers are available | |
| available_providers = [] | |
| if OPENAI_API_KEY: | |
| available_providers.append("OpenAI") | |
| if ANTHROPIC_API_KEY: | |
| available_providers.append("Anthropic") | |
| if os.getenv("GEMINI_API_KEY"): | |
| available_providers.append("Gemini") | |
| if DEEPSEEK_API_KEY: | |
| available_providers.append("DeepSeek") | |
| if not available_providers: | |
| print("Error: No AI provider API keys configured. Please set at least one of:", | |
| file=sys.stderr) | |
| print(" - OPENAI_API_KEY", file=sys.stderr) | |
| print(" - ANTHROPIC_API_KEY", file=sys.stderr) | |
| print(" - GEMINI_API_KEY", file=sys.stderr) | |
| print(" - DEEPSEEK_API_KEY", file=sys.stderr) | |
| sys.exit(1) | |
| print(f"Available AI providers: {', '.join(available_providers)}") | |
| # Check embedding availability for search | |
| if not embed_model: | |
| print("Warning: No embedding model configured. Search functionality will be disabled.") | |
| print(" To enable search, set either OPENAI_API_KEY or GEMINI_API_KEY") | |
| elif GEMINI_API_KEY and not OPENAI_API_KEY: | |
| print("Info: Using Gemini embeddings for search (OpenAI not configured)") | |
| if not all([AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY]): | |
| print("Warning: AWS credentials not configured. Will use local files only.") | |
| # Initialize components | |
| if initialize_components(): | |
| print("Components initialized successfully!") | |
| # Import create_gradio_interface here to avoid circular import | |
| from interface import create_gradio_interface | |
| # Create and launch the interface | |
| app = create_gradio_interface() | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True | |
| ) | |
| else: | |
| print("Failed to initialize components. Please check the logs for details.", | |
| file=sys.stderr) | |
| sys.exit(1) | |
| except ImportError as e: | |
| print(f"Error importing required modules: {str(e)}", file=sys.stderr) | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"Error starting application: {str(e)}", file=sys.stderr) | |
| sys.exit(1) |