| |
|
|
| import os |
| import asyncio |
| import numpy as np |
| import nest_asyncio |
| from google import genai |
| from google.genai import types |
| from dotenv import load_dotenv |
| from lightrag.utils import EmbeddingFunc |
| from lightrag import LightRAG, QueryParam |
| from lightrag.kg.shared_storage import initialize_pipeline_status |
| from lightrag.llm.siliconcloud import siliconcloud_embedding |
| from lightrag.utils import setup_logger |
| from lightrag.utils import TokenTracker |
|
|
| setup_logger("lightrag", level="DEBUG") |
|
|
| |
| nest_asyncio.apply() |
|
|
| load_dotenv() |
| gemini_api_key = os.getenv("GEMINI_API_KEY") |
| siliconflow_api_key = os.getenv("SILICONFLOW_API_KEY") |
|
|
| WORKING_DIR = "./dickens" |
|
|
| if not os.path.exists(WORKING_DIR): |
| os.mkdir(WORKING_DIR) |
|
|
| token_tracker = TokenTracker() |
|
|
|
|
| async def llm_model_func( |
| prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs |
| ) -> str: |
| |
| client = genai.Client(api_key=gemini_api_key) |
|
|
| |
| if history_messages is None: |
| history_messages = [] |
|
|
| combined_prompt = "" |
| if system_prompt: |
| combined_prompt += f"{system_prompt}\n" |
|
|
| for msg in history_messages: |
| |
| combined_prompt += f"{msg['role']}: {msg['content']}\n" |
|
|
| |
| combined_prompt += f"user: {prompt}" |
|
|
| |
| response = client.models.generate_content( |
| model="gemini-2.0-flash", |
| contents=[combined_prompt], |
| config=types.GenerateContentConfig( |
| max_output_tokens=5000, temperature=0, top_k=10 |
| ), |
| ) |
|
|
| |
| usage = getattr(response, "usage_metadata", None) |
| prompt_tokens = getattr(usage, "prompt_token_count", 0) or 0 |
| completion_tokens = getattr(usage, "candidates_token_count", 0) or 0 |
| total_tokens = getattr(usage, "total_token_count", 0) or ( |
| prompt_tokens + completion_tokens |
| ) |
|
|
| token_counts = { |
| "prompt_tokens": prompt_tokens, |
| "completion_tokens": completion_tokens, |
| "total_tokens": total_tokens, |
| } |
|
|
| token_tracker.add_usage(token_counts) |
|
|
| |
| return response.text |
|
|
|
|
| async def embedding_func(texts: list[str]) -> np.ndarray: |
| return await siliconcloud_embedding( |
| texts, |
| model="BAAI/bge-m3", |
| api_key=siliconflow_api_key, |
| max_token_size=512, |
| ) |
|
|
|
|
| async def initialize_rag(): |
| rag = LightRAG( |
| working_dir=WORKING_DIR, |
| entity_extract_max_gleaning=1, |
| enable_llm_cache=True, |
| enable_llm_cache_for_entity_extract=True, |
| embedding_cache_config={"enabled": True, "similarity_threshold": 0.90}, |
| llm_model_func=llm_model_func, |
| embedding_func=EmbeddingFunc( |
| embedding_dim=1024, |
| max_token_size=8192, |
| func=embedding_func, |
| ), |
| ) |
|
|
| await rag.initialize_storages() |
| await initialize_pipeline_status() |
|
|
| return rag |
|
|
|
|
| def main(): |
| |
| rag = asyncio.run(initialize_rag()) |
|
|
| with open("./book.txt", "r", encoding="utf-8") as f: |
| rag.insert(f.read()) |
|
|
| |
| with token_tracker: |
| print( |
| rag.query( |
| "What are the top themes in this story?", param=QueryParam(mode="naive") |
| ) |
| ) |
|
|
| print( |
| rag.query( |
| "What are the top themes in this story?", param=QueryParam(mode="local") |
| ) |
| ) |
|
|
| print( |
| rag.query( |
| "What are the top themes in this story?", |
| param=QueryParam(mode="global"), |
| ) |
| ) |
|
|
| print( |
| rag.query( |
| "What are the top themes in this story?", |
| param=QueryParam(mode="hybrid"), |
| ) |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|