| """ |
| Observability module for Langfuse v3 integration with OpenTelemetry support. |
| |
| This module provides: |
| - Single global CallbackHandler for LangChain integration |
| - Root span management for user requests |
| - Session and user tracking |
| - Background flushing for async operations |
| """ |
|
|
| import os |
| import base64 |
| from typing import Optional, Dict, Any |
| from contextlib import contextmanager |
| from dotenv import load_dotenv |
|
|
| |
| from langfuse import get_client |
| from langfuse.langchain import CallbackHandler |
|
|
| |
| from opentelemetry import trace |
| from opentelemetry.sdk.trace import TracerProvider |
| from opentelemetry.sdk.trace.export import SimpleSpanProcessor |
| from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter |
|
|
| |
| load_dotenv("env.local") |
|
|
| |
| _langfuse_handler: Optional[CallbackHandler] = None |
| _tracer_provider: Optional[TracerProvider] = None |
|
|
| def initialize_observability() -> bool: |
| """ |
| Initialize Langfuse observability with OTEL integration. |
| |
| Returns: |
| bool: True if initialization successful, False otherwise |
| """ |
| global _langfuse_handler, _tracer_provider |
| |
| try: |
| |
| required_vars = ["LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_HOST"] |
| missing_vars = [var for var in required_vars if not os.getenv(var)] |
| |
| if missing_vars: |
| print(f"Warning: Missing required environment variables: {missing_vars}") |
| return False |
| |
| |
| langfuse_auth = base64.b64encode( |
| f"{os.getenv('LANGFUSE_PUBLIC_KEY')}:{os.getenv('LANGFUSE_SECRET_KEY')}".encode() |
| ).decode() |
| |
| |
| os.environ['OTEL_EXPORTER_OTLP_ENDPOINT'] = f"{os.getenv('LANGFUSE_HOST')}/api/public/otel" |
| os.environ['OTEL_EXPORTER_OTLP_HEADERS'] = f"Authorization=Basic {langfuse_auth}" |
| |
| |
| _tracer_provider = TracerProvider() |
| _tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter())) |
| trace.set_tracer_provider(_tracer_provider) |
| |
| |
| _langfuse_handler = CallbackHandler() |
| |
| print("✅ Langfuse observability initialized successfully") |
| return True |
| |
| except Exception as e: |
| print(f"❌ Failed to initialize observability: {e}") |
| return False |
|
|
| def get_callback_handler() -> Optional[CallbackHandler]: |
| """ |
| Get the global Langfuse callback handler. |
| |
| Returns: |
| CallbackHandler or None if not initialized |
| """ |
| global _langfuse_handler |
| |
| if _langfuse_handler is None: |
| if initialize_observability(): |
| return _langfuse_handler |
| return None |
| |
| return _langfuse_handler |
|
|
| @contextmanager |
| def start_root_span( |
| name: str, |
| user_id: str, |
| session_id: str, |
| metadata: Optional[Dict[str, Any]] = None |
| ): |
| """ |
| Context manager for creating root spans with user and session tracking. |
| |
| Args: |
| name: Span name (e.g., "user-request") |
| user_id: User identifier for session tracking |
| session_id: Session identifier for conversation continuity |
| metadata: Optional additional metadata |
| |
| Yields: |
| Langfuse span context or None if creation fails |
| """ |
| span = None |
| try: |
| |
| client = get_client() |
| span = client.start_as_current_span(name=name) |
| span_context = span.__enter__() |
| |
| |
| span_context.update_trace( |
| user_id=user_id, |
| session_id=session_id, |
| tags=[ |
| os.getenv("ENV", "dev"), |
| "multi-agent-system" |
| ] |
| ) |
| |
| |
| if metadata: |
| span_context.update_trace(metadata=metadata) |
| |
| yield span_context |
| |
| except Exception as e: |
| print(f"Warning: Failed to create root span: {e}") |
| |
| yield None |
| finally: |
| |
| if span is not None: |
| try: |
| span.__exit__(None, None, None) |
| except Exception as e: |
| print(f"Warning: Error closing span: {e}") |
|
|
| def flush_traces(background: bool = True) -> None: |
| """ |
| Flush pending traces to Langfuse. |
| |
| Args: |
| background: Whether to flush in background (non-blocking) |
| """ |
| try: |
| client = get_client() |
| client.flush() |
| except Exception as e: |
| print(f"Warning: Failed to flush traces: {e}") |
|
|
| def shutdown_observability() -> None: |
| """ |
| Clean shutdown of observability components. |
| """ |
| global _tracer_provider |
| |
| try: |
| |
| flush_traces(background=False) |
| |
| |
| if _tracer_provider: |
| _tracer_provider.shutdown() |
| |
| except Exception as e: |
| print(f"Warning: Error during observability shutdown: {e}") |
|
|
| |
| @contextmanager |
| def agent_span(agent_name: str, metadata: Optional[Dict[str, Any]] = None): |
| """ |
| Context manager for agent-level spans. |
| |
| Args: |
| agent_name: Name of the agent (e.g., "lead", "research", "code") |
| metadata: Optional metadata for the span |
| """ |
| span_name = f"agent/{agent_name}" |
| span = None |
| |
| try: |
| client = get_client() |
| span = client.start_as_current_span(name=span_name) |
| span_context = span.__enter__() |
| |
| if metadata: |
| span_context.update_trace(metadata=metadata) |
| yield span_context |
| except Exception as e: |
| print(f"Warning: Failed to create agent span for {agent_name}: {e}") |
| yield None |
| finally: |
| if span is not None: |
| try: |
| span.__exit__(None, None, None) |
| except Exception as e: |
| print(f"Warning: Error closing agent span: {e}") |
|
|
| @contextmanager |
| def tool_span(tool_name: str, metadata: Optional[Dict[str, Any]] = None): |
| """ |
| Context manager for tool-level spans. |
| |
| Args: |
| tool_name: Name of the tool (e.g., "tavily_search", "calculator") |
| metadata: Optional metadata for the span |
| """ |
| span_name = f"tool/{tool_name}" |
| span = None |
| |
| try: |
| client = get_client() |
| span = client.start_as_current_span(name=span_name) |
| span_context = span.__enter__() |
| |
| if metadata: |
| span_context.update_trace(metadata=metadata) |
| yield span_context |
| except Exception as e: |
| print(f"Warning: Failed to create tool span for {tool_name}: {e}") |
| yield None |
| finally: |
| if span is not None: |
| try: |
| span.__exit__(None, None, None) |
| except Exception as e: |
| print(f"Warning: Error closing tool span: {e}") |