| """ |
| LightRAG FastAPI Server |
| """ |
|
|
| from fastapi import FastAPI, Depends, HTTPException, Request |
| from fastapi.exceptions import RequestValidationError |
| from fastapi.responses import JSONResponse |
| import os |
| import logging |
| import logging.config |
| import signal |
| import sys |
| import uvicorn |
| import pipmaster as pm |
| import inspect |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.responses import RedirectResponse |
| from pathlib import Path |
| import configparser |
| from ascii_colors import ASCIIColors |
| from fastapi.middleware.cors import CORSMiddleware |
| from contextlib import asynccontextmanager |
| from dotenv import load_dotenv |
| from lightrag.api.utils_api import ( |
| get_combined_auth_dependency, |
| display_splash_screen, |
| check_env_file, |
| ) |
| from .config import ( |
| global_args, |
| update_uvicorn_mode_config, |
| get_default_host, |
| ) |
| from lightrag.utils import get_env_value |
| from lightrag import LightRAG, __version__ as core_version |
| from lightrag.api import __api_version__ |
| from lightrag.types import GPTKeywordExtractionFormat |
| from lightrag.utils import EmbeddingFunc |
| from lightrag.constants import ( |
| DEFAULT_LOG_MAX_BYTES, |
| DEFAULT_LOG_BACKUP_COUNT, |
| DEFAULT_LOG_FILENAME, |
| DEFAULT_LLM_TIMEOUT, |
| DEFAULT_EMBEDDING_TIMEOUT, |
| ) |
| from lightrag.api.routers.document_routes import ( |
| DocumentManager, |
| create_document_routes, |
| ) |
| from lightrag.api.routers.query_routes import create_query_routes |
| from lightrag.api.routers.graph_routes import create_graph_routes |
| from lightrag.api.routers.ollama_api import OllamaAPI |
|
|
| from lightrag.utils import logger, set_verbose_debug |
| from lightrag.kg.shared_storage import ( |
| get_namespace_data, |
| initialize_pipeline_status, |
| cleanup_keyed_lock, |
| finalize_share_data, |
| ) |
| from fastapi.security import OAuth2PasswordRequestForm |
| from lightrag.api.auth import auth_handler |
|
|
| |
| |
| |
| load_dotenv(dotenv_path=".env", override=False) |
|
|
|
|
| webui_title = os.getenv("WEBUI_TITLE") |
| webui_description = os.getenv("WEBUI_DESCRIPTION") |
|
|
| |
| config = configparser.ConfigParser() |
| config.read("config.ini") |
|
|
| |
| auth_configured = bool(auth_handler.accounts) |
|
|
|
|
| def setup_signal_handlers(): |
| """Setup signal handlers for graceful shutdown""" |
|
|
| def signal_handler(sig, frame): |
| print(f"\n\nReceived signal {sig}, shutting down gracefully...") |
| print(f"Process ID: {os.getpid()}") |
|
|
| |
| finalize_share_data() |
|
|
| |
| sys.exit(0) |
|
|
| |
| signal.signal(signal.SIGINT, signal_handler) |
| signal.signal(signal.SIGTERM, signal_handler) |
|
|
|
|
| class LLMConfigCache: |
| """Smart LLM and Embedding configuration cache class""" |
|
|
| def __init__(self, args): |
| self.args = args |
|
|
| |
| self.openai_llm_options = None |
| self.ollama_llm_options = None |
| self.ollama_embedding_options = None |
|
|
| |
| if args.llm_binding in ["openai", "azure_openai"]: |
| from lightrag.llm.binding_options import OpenAILLMOptions |
|
|
| self.openai_llm_options = OpenAILLMOptions.options_dict(args) |
| logger.info(f"OpenAI LLM Options: {self.openai_llm_options}") |
|
|
| |
| if args.llm_binding == "ollama": |
| try: |
| from lightrag.llm.binding_options import OllamaLLMOptions |
|
|
| self.ollama_llm_options = OllamaLLMOptions.options_dict(args) |
| logger.info(f"Ollama LLM Options: {self.ollama_llm_options}") |
| except ImportError: |
| logger.warning( |
| "OllamaLLMOptions not available, using default configuration" |
| ) |
| self.ollama_llm_options = {} |
|
|
| |
| if args.embedding_binding == "ollama": |
| try: |
| from lightrag.llm.binding_options import OllamaEmbeddingOptions |
|
|
| self.ollama_embedding_options = OllamaEmbeddingOptions.options_dict( |
| args |
| ) |
| logger.info( |
| f"Ollama Embedding Options: {self.ollama_embedding_options}" |
| ) |
| except ImportError: |
| logger.warning( |
| "OllamaEmbeddingOptions not available, using default configuration" |
| ) |
| self.ollama_embedding_options = {} |
|
|
|
|
| def create_app(args): |
| |
| logger.setLevel(args.log_level) |
| set_verbose_debug(args.verbose) |
|
|
| |
| config_cache = LLMConfigCache(args) |
|
|
| |
| if args.llm_binding not in [ |
| "lollms", |
| "ollama", |
| "openai", |
| "azure_openai", |
| "aws_bedrock", |
| ]: |
| raise Exception("llm binding not supported") |
|
|
| if args.embedding_binding not in [ |
| "lollms", |
| "ollama", |
| "openai", |
| "azure_openai", |
| "aws_bedrock", |
| "jina", |
| ]: |
| raise Exception("embedding binding not supported") |
|
|
| |
| if args.llm_binding_host is None: |
| args.llm_binding_host = get_default_host(args.llm_binding) |
|
|
| if args.embedding_binding_host is None: |
| args.embedding_binding_host = get_default_host(args.embedding_binding) |
|
|
| |
| if args.ssl: |
| if not args.ssl_certfile or not args.ssl_keyfile: |
| raise Exception( |
| "SSL certificate and key files must be provided when SSL is enabled" |
| ) |
| if not os.path.exists(args.ssl_certfile): |
| raise Exception(f"SSL certificate file not found: {args.ssl_certfile}") |
| if not os.path.exists(args.ssl_keyfile): |
| raise Exception(f"SSL key file not found: {args.ssl_keyfile}") |
|
|
| |
| api_key = os.getenv("LIGHTRAG_API_KEY") or args.key |
|
|
| |
| doc_manager = DocumentManager(args.input_dir, workspace=args.workspace) |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| """Lifespan context manager for startup and shutdown events""" |
| |
| app.state.background_tasks = set() |
|
|
| try: |
| |
| await rag.initialize_storages() |
| await initialize_pipeline_status() |
|
|
| |
| await rag.check_and_migrate_data() |
|
|
| ASCIIColors.green("\nServer is ready to accept connections! 🚀\n") |
|
|
| yield |
|
|
| finally: |
| |
| await rag.finalize_storages() |
|
|
| |
| finalize_share_data() |
|
|
| |
| app_kwargs = { |
| "title": "LightRAG Server API", |
| "description": ( |
| "Providing API for LightRAG core, Web UI and Ollama Model Emulation" |
| + "(With authentication)" |
| if api_key |
| else "" |
| ), |
| "version": __api_version__, |
| "openapi_url": "/openapi.json", |
| "docs_url": "/docs", |
| "redoc_url": "/redoc", |
| "lifespan": lifespan, |
| } |
|
|
| |
| |
| app_kwargs["swagger_ui_parameters"] = { |
| "persistAuthorization": True, |
| "tryItOutEnabled": True, |
| } |
|
|
| app = FastAPI(**app_kwargs) |
|
|
| |
| @app.exception_handler(RequestValidationError) |
| async def validation_exception_handler( |
| request: Request, exc: RequestValidationError |
| ): |
| |
| if request.url.path.endswith("/query/data"): |
| |
| error_details = [] |
| for error in exc.errors(): |
| field_path = " -> ".join(str(loc) for loc in error["loc"]) |
| error_details.append(f"{field_path}: {error['msg']}") |
|
|
| error_message = "; ".join(error_details) |
|
|
| |
| return JSONResponse( |
| status_code=400, |
| content={ |
| "status": "failure", |
| "message": f"Validation error: {error_message}", |
| "data": {}, |
| "metadata": {}, |
| }, |
| ) |
| else: |
| |
| return JSONResponse(status_code=422, content={"detail": exc.errors()}) |
|
|
| def get_cors_origins(): |
| """Get allowed origins from global_args |
| Returns a list of allowed origins, defaults to ["*"] if not set |
| """ |
| origins_str = global_args.cors_origins |
| if origins_str == "*": |
| return ["*"] |
| return [origin.strip() for origin in origins_str.split(",")] |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=get_cors_origins(), |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| combined_auth = get_combined_auth_dependency(api_key) |
|
|
| |
| Path(args.working_dir).mkdir(parents=True, exist_ok=True) |
|
|
| def create_optimized_openai_llm_func( |
| config_cache: LLMConfigCache, args, llm_timeout: int |
| ): |
| """Create optimized OpenAI LLM function with pre-processed configuration""" |
|
|
| async def optimized_openai_alike_model_complete( |
| prompt, |
| system_prompt=None, |
| history_messages=None, |
| keyword_extraction=False, |
| **kwargs, |
| ) -> str: |
| from lightrag.llm.openai import openai_complete_if_cache |
|
|
| keyword_extraction = kwargs.pop("keyword_extraction", None) |
| if keyword_extraction: |
| kwargs["response_format"] = GPTKeywordExtractionFormat |
| if history_messages is None: |
| history_messages = [] |
|
|
| |
| kwargs["timeout"] = llm_timeout |
| if config_cache.openai_llm_options: |
| kwargs.update(config_cache.openai_llm_options) |
|
|
| return await openai_complete_if_cache( |
| args.llm_model, |
| prompt, |
| system_prompt=system_prompt, |
| history_messages=history_messages, |
| base_url=args.llm_binding_host, |
| api_key=args.llm_binding_api_key, |
| **kwargs, |
| ) |
|
|
| return optimized_openai_alike_model_complete |
|
|
| def create_optimized_azure_openai_llm_func( |
| config_cache: LLMConfigCache, args, llm_timeout: int |
| ): |
| """Create optimized Azure OpenAI LLM function with pre-processed configuration""" |
|
|
| async def optimized_azure_openai_model_complete( |
| prompt, |
| system_prompt=None, |
| history_messages=None, |
| keyword_extraction=False, |
| **kwargs, |
| ) -> str: |
| from lightrag.llm.azure_openai import azure_openai_complete_if_cache |
|
|
| keyword_extraction = kwargs.pop("keyword_extraction", None) |
| if keyword_extraction: |
| kwargs["response_format"] = GPTKeywordExtractionFormat |
| if history_messages is None: |
| history_messages = [] |
|
|
| |
| kwargs["timeout"] = llm_timeout |
| if config_cache.openai_llm_options: |
| kwargs.update(config_cache.openai_llm_options) |
|
|
| return await azure_openai_complete_if_cache( |
| args.llm_model, |
| prompt, |
| system_prompt=system_prompt, |
| history_messages=history_messages, |
| base_url=args.llm_binding_host, |
| api_key=os.getenv("AZURE_OPENAI_API_KEY", args.llm_binding_api_key), |
| api_version=os.getenv("AZURE_OPENAI_API_VERSION", "2024-08-01-preview"), |
| **kwargs, |
| ) |
|
|
| return optimized_azure_openai_model_complete |
|
|
| def create_llm_model_func(binding: str): |
| """ |
| Create LLM model function based on binding type. |
| Uses optimized functions for OpenAI bindings and lazy import for others. |
| """ |
| try: |
| if binding == "lollms": |
| from lightrag.llm.lollms import lollms_model_complete |
|
|
| return lollms_model_complete |
| elif binding == "ollama": |
| from lightrag.llm.ollama import ollama_model_complete |
|
|
| return ollama_model_complete |
| elif binding == "aws_bedrock": |
| return bedrock_model_complete |
| elif binding == "azure_openai": |
| |
| return create_optimized_azure_openai_llm_func( |
| config_cache, args, llm_timeout |
| ) |
| else: |
| |
| return create_optimized_openai_llm_func(config_cache, args, llm_timeout) |
| except ImportError as e: |
| raise Exception(f"Failed to import {binding} LLM binding: {e}") |
|
|
| def create_llm_model_kwargs(binding: str, args, llm_timeout: int) -> dict: |
| """ |
| Create LLM model kwargs based on binding type. |
| Uses lazy import for binding-specific options. |
| """ |
| if binding in ["lollms", "ollama"]: |
| try: |
| from lightrag.llm.binding_options import OllamaLLMOptions |
|
|
| return { |
| "host": args.llm_binding_host, |
| "timeout": llm_timeout, |
| "options": OllamaLLMOptions.options_dict(args), |
| "api_key": args.llm_binding_api_key, |
| } |
| except ImportError as e: |
| raise Exception(f"Failed to import {binding} options: {e}") |
| return {} |
|
|
| def create_optimized_embedding_function( |
| config_cache: LLMConfigCache, binding, model, host, api_key, dimensions, args |
| ): |
| """ |
| Create optimized embedding function with pre-processed configuration for applicable bindings. |
| Uses lazy imports for all bindings and avoids repeated configuration parsing. |
| """ |
|
|
| async def optimized_embedding_function(texts): |
| try: |
| if binding == "lollms": |
| from lightrag.llm.lollms import lollms_embed |
|
|
| return await lollms_embed( |
| texts, embed_model=model, host=host, api_key=api_key |
| ) |
| elif binding == "ollama": |
| from lightrag.llm.ollama import ollama_embed |
|
|
| |
| if config_cache.ollama_embedding_options is not None: |
| ollama_options = config_cache.ollama_embedding_options |
| else: |
| |
| from lightrag.llm.binding_options import OllamaEmbeddingOptions |
|
|
| ollama_options = OllamaEmbeddingOptions.options_dict(args) |
|
|
| return await ollama_embed( |
| texts, |
| embed_model=model, |
| host=host, |
| api_key=api_key, |
| options=ollama_options, |
| ) |
| elif binding == "azure_openai": |
| from lightrag.llm.azure_openai import azure_openai_embed |
|
|
| return await azure_openai_embed(texts, model=model, api_key=api_key) |
| elif binding == "aws_bedrock": |
| from lightrag.llm.bedrock import bedrock_embed |
|
|
| return await bedrock_embed(texts, model=model) |
| elif binding == "jina": |
| from lightrag.llm.jina import jina_embed |
|
|
| return await jina_embed( |
| texts, dimensions=dimensions, base_url=host, api_key=api_key |
| ) |
| else: |
| from lightrag.llm.openai import openai_embed |
|
|
| return await openai_embed( |
| texts, model=model, base_url=host, api_key=api_key |
| ) |
| except ImportError as e: |
| raise Exception(f"Failed to import {binding} embedding: {e}") |
|
|
| return optimized_embedding_function |
|
|
| llm_timeout = get_env_value("LLM_TIMEOUT", DEFAULT_LLM_TIMEOUT, int) |
| embedding_timeout = get_env_value( |
| "EMBEDDING_TIMEOUT", DEFAULT_EMBEDDING_TIMEOUT, int |
| ) |
|
|
| async def bedrock_model_complete( |
| prompt, |
| system_prompt=None, |
| history_messages=None, |
| keyword_extraction=False, |
| **kwargs, |
| ) -> str: |
| |
| from lightrag.llm.bedrock import bedrock_complete_if_cache |
|
|
| keyword_extraction = kwargs.pop("keyword_extraction", None) |
| if keyword_extraction: |
| kwargs["response_format"] = GPTKeywordExtractionFormat |
| if history_messages is None: |
| history_messages = [] |
|
|
| |
| kwargs["temperature"] = get_env_value("BEDROCK_LLM_TEMPERATURE", 1.0, float) |
|
|
| return await bedrock_complete_if_cache( |
| args.llm_model, |
| prompt, |
| system_prompt=system_prompt, |
| history_messages=history_messages, |
| **kwargs, |
| ) |
|
|
| |
| embedding_func = EmbeddingFunc( |
| embedding_dim=args.embedding_dim, |
| func=create_optimized_embedding_function( |
| config_cache=config_cache, |
| binding=args.embedding_binding, |
| model=args.embedding_model, |
| host=args.embedding_binding_host, |
| api_key=args.embedding_binding_api_key, |
| dimensions=args.embedding_dim, |
| args=args, |
| ), |
| ) |
|
|
| |
| rerank_model_func = None |
| if args.rerank_binding != "null": |
| from lightrag.rerank import cohere_rerank, jina_rerank, ali_rerank |
|
|
| |
| rerank_functions = { |
| "cohere": cohere_rerank, |
| "jina": jina_rerank, |
| "aliyun": ali_rerank, |
| } |
|
|
| |
| selected_rerank_func = rerank_functions.get(args.rerank_binding) |
| if not selected_rerank_func: |
| logger.error(f"Unsupported rerank binding: {args.rerank_binding}") |
| raise ValueError(f"Unsupported rerank binding: {args.rerank_binding}") |
|
|
| |
| if args.rerank_model is None or args.rerank_binding_host is None: |
| sig = inspect.signature(selected_rerank_func) |
|
|
| |
| if args.rerank_model is None and "model" in sig.parameters: |
| default_model = sig.parameters["model"].default |
| if default_model != inspect.Parameter.empty: |
| args.rerank_model = default_model |
|
|
| |
| if args.rerank_binding_host is None and "base_url" in sig.parameters: |
| default_base_url = sig.parameters["base_url"].default |
| if default_base_url != inspect.Parameter.empty: |
| args.rerank_binding_host = default_base_url |
|
|
| async def server_rerank_func( |
| query: str, documents: list, top_n: int = None, extra_body: dict = None |
| ): |
| """Server rerank function with configuration from environment variables""" |
| return await selected_rerank_func( |
| query=query, |
| documents=documents, |
| top_n=top_n, |
| api_key=args.rerank_binding_api_key, |
| model=args.rerank_model, |
| base_url=args.rerank_binding_host, |
| extra_body=extra_body, |
| ) |
|
|
| rerank_model_func = server_rerank_func |
| logger.info( |
| f"Reranking is enabled: {args.rerank_model or 'default model'} using {args.rerank_binding} provider" |
| ) |
| else: |
| logger.info("Reranking is disabled") |
|
|
| |
| from lightrag.api.config import OllamaServerInfos |
|
|
| ollama_server_infos = OllamaServerInfos( |
| name=args.simulated_model_name, tag=args.simulated_model_tag |
| ) |
|
|
| |
| try: |
| rag = LightRAG( |
| working_dir=args.working_dir, |
| workspace=args.workspace, |
| llm_model_func=create_llm_model_func(args.llm_binding), |
| llm_model_name=args.llm_model, |
| llm_model_max_async=args.max_async, |
| summary_max_tokens=args.summary_max_tokens, |
| summary_context_size=args.summary_context_size, |
| chunk_token_size=int(args.chunk_size), |
| chunk_overlap_token_size=int(args.chunk_overlap_size), |
| llm_model_kwargs=create_llm_model_kwargs( |
| args.llm_binding, args, llm_timeout |
| ), |
| embedding_func=embedding_func, |
| default_llm_timeout=llm_timeout, |
| default_embedding_timeout=embedding_timeout, |
| kv_storage=args.kv_storage, |
| graph_storage=args.graph_storage, |
| vector_storage=args.vector_storage, |
| doc_status_storage=args.doc_status_storage, |
| vector_db_storage_cls_kwargs={ |
| "cosine_better_than_threshold": args.cosine_threshold |
| }, |
| enable_llm_cache_for_entity_extract=args.enable_llm_cache_for_extract, |
| enable_llm_cache=args.enable_llm_cache, |
| rerank_model_func=rerank_model_func, |
| max_parallel_insert=args.max_parallel_insert, |
| max_graph_nodes=args.max_graph_nodes, |
| addon_params={ |
| "language": args.summary_language, |
| "entity_types": args.entity_types, |
| }, |
| ollama_server_infos=ollama_server_infos, |
| ) |
| except Exception as e: |
| logger.error(f"Failed to initialize LightRAG: {e}") |
| raise |
|
|
| |
| app.include_router( |
| create_document_routes( |
| rag, |
| doc_manager, |
| api_key, |
| ) |
| ) |
| app.include_router(create_query_routes(rag, api_key, args.top_k)) |
| app.include_router(create_graph_routes(rag, api_key)) |
|
|
| |
| ollama_api = OllamaAPI(rag, top_k=args.top_k, api_key=api_key) |
| app.include_router(ollama_api.router, prefix="/api") |
|
|
| @app.get("/") |
| async def redirect_to_webui(): |
| """Redirect root path to /webui""" |
| return RedirectResponse(url="/webui") |
|
|
| @app.get("/auth-status") |
| async def get_auth_status(): |
| """Get authentication status and guest token if auth is not configured""" |
|
|
| if not auth_handler.accounts: |
| |
| guest_token = auth_handler.create_token( |
| username="guest", role="guest", metadata={"auth_mode": "disabled"} |
| ) |
| return { |
| "auth_configured": False, |
| "access_token": guest_token, |
| "token_type": "bearer", |
| "auth_mode": "disabled", |
| "message": "Authentication is disabled. Using guest access.", |
| "core_version": core_version, |
| "api_version": __api_version__, |
| "webui_title": webui_title, |
| "webui_description": webui_description, |
| } |
|
|
| return { |
| "auth_configured": True, |
| "auth_mode": "enabled", |
| "core_version": core_version, |
| "api_version": __api_version__, |
| "webui_title": webui_title, |
| "webui_description": webui_description, |
| } |
|
|
| @app.post("/login") |
| async def login(form_data: OAuth2PasswordRequestForm = Depends()): |
| if not auth_handler.accounts: |
| |
| guest_token = auth_handler.create_token( |
| username="guest", role="guest", metadata={"auth_mode": "disabled"} |
| ) |
| return { |
| "access_token": guest_token, |
| "token_type": "bearer", |
| "auth_mode": "disabled", |
| "message": "Authentication is disabled. Using guest access.", |
| "core_version": core_version, |
| "api_version": __api_version__, |
| "webui_title": webui_title, |
| "webui_description": webui_description, |
| } |
| username = form_data.username |
| if auth_handler.accounts.get(username) != form_data.password: |
| raise HTTPException(status_code=401, detail="Incorrect credentials") |
|
|
| |
| user_token = auth_handler.create_token( |
| username=username, role="user", metadata={"auth_mode": "enabled"} |
| ) |
| return { |
| "access_token": user_token, |
| "token_type": "bearer", |
| "auth_mode": "enabled", |
| "core_version": core_version, |
| "api_version": __api_version__, |
| "webui_title": webui_title, |
| "webui_description": webui_description, |
| } |
|
|
| @app.get("/health", dependencies=[Depends(combined_auth)]) |
| async def get_status(): |
| """Get current system status""" |
| try: |
| pipeline_status = await get_namespace_data("pipeline_status") |
|
|
| if not auth_configured: |
| auth_mode = "disabled" |
| else: |
| auth_mode = "enabled" |
|
|
| |
| keyed_lock_info = cleanup_keyed_lock() |
|
|
| return { |
| "status": "healthy", |
| "working_directory": str(args.working_dir), |
| "input_directory": str(args.input_dir), |
| "configuration": { |
| |
| "llm_binding": args.llm_binding, |
| "llm_binding_host": args.llm_binding_host, |
| "llm_model": args.llm_model, |
| |
| "embedding_binding": args.embedding_binding, |
| "embedding_binding_host": args.embedding_binding_host, |
| "embedding_model": args.embedding_model, |
| "summary_max_tokens": args.summary_max_tokens, |
| "summary_context_size": args.summary_context_size, |
| "kv_storage": args.kv_storage, |
| "doc_status_storage": args.doc_status_storage, |
| "graph_storage": args.graph_storage, |
| "vector_storage": args.vector_storage, |
| "enable_llm_cache_for_extract": args.enable_llm_cache_for_extract, |
| "enable_llm_cache": args.enable_llm_cache, |
| "workspace": args.workspace, |
| "max_graph_nodes": args.max_graph_nodes, |
| |
| "enable_rerank": rerank_model_func is not None, |
| "rerank_binding": args.rerank_binding, |
| "rerank_model": args.rerank_model if rerank_model_func else None, |
| "rerank_binding_host": args.rerank_binding_host |
| if rerank_model_func |
| else None, |
| |
| "summary_language": args.summary_language, |
| "force_llm_summary_on_merge": args.force_llm_summary_on_merge, |
| "max_parallel_insert": args.max_parallel_insert, |
| "cosine_threshold": args.cosine_threshold, |
| "min_rerank_score": args.min_rerank_score, |
| "related_chunk_number": args.related_chunk_number, |
| "max_async": args.max_async, |
| "embedding_func_max_async": args.embedding_func_max_async, |
| "embedding_batch_num": args.embedding_batch_num, |
| }, |
| "auth_mode": auth_mode, |
| "pipeline_busy": pipeline_status.get("busy", False), |
| "keyed_locks": keyed_lock_info, |
| "core_version": core_version, |
| "api_version": __api_version__, |
| "webui_title": webui_title, |
| "webui_description": webui_description, |
| } |
| except Exception as e: |
| logger.error(f"Error getting health status: {str(e)}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| |
| class SmartStaticFiles(StaticFiles): |
| async def get_response(self, path: str, scope): |
| response = await super().get_response(path, scope) |
|
|
| if path.endswith(".html"): |
| response.headers["Cache-Control"] = ( |
| "no-cache, no-store, must-revalidate" |
| ) |
| response.headers["Pragma"] = "no-cache" |
| response.headers["Expires"] = "0" |
| elif ( |
| "/assets/" in path |
| ): |
| response.headers["Cache-Control"] = ( |
| "public, max-age=31536000, immutable" |
| ) |
| |
|
|
| |
| if path.endswith(".js"): |
| response.headers["Content-Type"] = "application/javascript" |
| elif path.endswith(".css"): |
| response.headers["Content-Type"] = "text/css" |
|
|
| return response |
|
|
| |
| static_dir = Path(__file__).parent / "webui" |
| static_dir.mkdir(exist_ok=True) |
| app.mount( |
| "/webui", |
| SmartStaticFiles( |
| directory=static_dir, html=True, check_dir=True |
| ), |
| name="webui", |
| ) |
|
|
| return app |
|
|
|
|
| def get_application(args=None): |
| """Factory function for creating the FastAPI application""" |
| if args is None: |
| args = global_args |
| return create_app(args) |
|
|
|
|
| def configure_logging(): |
| """Configure logging for uvicorn startup""" |
|
|
| |
| for logger_name in ["uvicorn", "uvicorn.access", "uvicorn.error", "lightrag"]: |
| logger = logging.getLogger(logger_name) |
| logger.handlers = [] |
| logger.filters = [] |
|
|
| |
| log_dir = os.getenv("LOG_DIR", os.getcwd()) |
| log_file_path = os.path.abspath(os.path.join(log_dir, DEFAULT_LOG_FILENAME)) |
|
|
| print(f"\nLightRAG log file: {log_file_path}\n") |
| os.makedirs(os.path.dirname(log_dir), exist_ok=True) |
|
|
| |
| log_max_bytes = get_env_value("LOG_MAX_BYTES", DEFAULT_LOG_MAX_BYTES, int) |
| log_backup_count = get_env_value("LOG_BACKUP_COUNT", DEFAULT_LOG_BACKUP_COUNT, int) |
|
|
| logging.config.dictConfig( |
| { |
| "version": 1, |
| "disable_existing_loggers": False, |
| "formatters": { |
| "default": { |
| "format": "%(levelname)s: %(message)s", |
| }, |
| "detailed": { |
| "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
| }, |
| }, |
| "handlers": { |
| "console": { |
| "formatter": "default", |
| "class": "logging.StreamHandler", |
| "stream": "ext://sys.stderr", |
| }, |
| "file": { |
| "formatter": "detailed", |
| "class": "logging.handlers.RotatingFileHandler", |
| "filename": log_file_path, |
| "maxBytes": log_max_bytes, |
| "backupCount": log_backup_count, |
| "encoding": "utf-8", |
| }, |
| }, |
| "loggers": { |
| |
| "uvicorn": { |
| "handlers": ["console", "file"], |
| "level": "INFO", |
| "propagate": False, |
| }, |
| "uvicorn.access": { |
| "handlers": ["console", "file"], |
| "level": "INFO", |
| "propagate": False, |
| "filters": ["path_filter"], |
| }, |
| "uvicorn.error": { |
| "handlers": ["console", "file"], |
| "level": "INFO", |
| "propagate": False, |
| }, |
| "lightrag": { |
| "handlers": ["console", "file"], |
| "level": "INFO", |
| "propagate": False, |
| "filters": ["path_filter"], |
| }, |
| }, |
| "filters": { |
| "path_filter": { |
| "()": "lightrag.utils.LightragPathFilter", |
| }, |
| }, |
| } |
| ) |
|
|
|
|
| def check_and_install_dependencies(): |
| """Check and install required dependencies""" |
| required_packages = [ |
| "uvicorn", |
| "tiktoken", |
| "fastapi", |
| |
| ] |
|
|
| for package in required_packages: |
| if not pm.is_installed(package): |
| print(f"Installing {package}...") |
| pm.install(package) |
| print(f"{package} installed successfully") |
|
|
|
|
| def main(): |
| |
| if "GUNICORN_CMD_ARGS" in os.environ: |
| |
| print("Running under Gunicorn - worker management handled by Gunicorn") |
| return |
|
|
| |
| if not check_env_file(): |
| sys.exit(1) |
|
|
| |
| check_and_install_dependencies() |
|
|
| from multiprocessing import freeze_support |
|
|
| freeze_support() |
|
|
| |
| configure_logging() |
| update_uvicorn_mode_config() |
| display_splash_screen(global_args) |
|
|
| |
| setup_signal_handlers() |
|
|
| |
| app = create_app(global_args) |
|
|
| |
| uvicorn_config = { |
| "app": app, |
| "host": global_args.host, |
| "port": global_args.port, |
| "log_config": None, |
| } |
|
|
| if global_args.ssl: |
| uvicorn_config.update( |
| { |
| "ssl_certfile": global_args.ssl_certfile, |
| "ssl_keyfile": global_args.ssl_keyfile, |
| } |
| ) |
|
|
| print( |
| f"Starting Uvicorn server in single-process mode on {global_args.host}:{global_args.port}" |
| ) |
| uvicorn.run(**uvicorn_config) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|