| import os |
| from typing import List, Dict |
| from dotenv import load_dotenv |
| from pathlib import Path |
|
|
| from evoagentx.core.logging import logger |
| from evoagentx.storages.base import StorageHandler |
| from evoagentx.rag.rag import RAGEngine |
| from evoagentx.storages.storages_config import VectorStoreConfig, DBConfig, StoreConfig |
| from evoagentx.rag.rag_config import RAGConfig, ReaderConfig, IndexConfig, EmbeddingConfig, RetrievalConfig |
| from evoagentx.rag.schema import Query, TextChunk |
| from evoagentx.benchmark.real_mm_rag import RealMMRAG |
| from evoagentx.models.openai_model import OpenAILLM |
| from evoagentx.models.model_configs import OpenAILLMConfig |
|
|
|
|
| |
| load_dotenv() |
|
|
| def demonstrate_rag_to_generation_pipeline(): |
| """Simple demo: Index 20 docs, retrieve 5, generate answer.""" |
| print("π EvoAgentX Multimodal RAG-to-Generation Pipeline") |
| print("=" * 60) |
| |
| |
| openai_key = os.getenv("OPENAI_API_KEY") |
| if not openai_key: |
| print("β OPENAI_API_KEY not found. Please set it to run this demo.") |
| return |
|
|
| |
| voyage_key = os.getenv("VOYAGE_API_KEY") |
| if not voyage_key: |
| print("β VOYAGE_API_KEY not found. Please set it to run this demo.") |
| return |
| |
| |
| datasets = RealMMRAG("./debug/data/real_mm_rag") |
| samples = datasets.get_random_samples(20, seed=42) |
| print(f"π Dataset loaded with {len(samples)} samples") |
| |
| |
| store_config = StoreConfig( |
| dbConfig=DBConfig(db_name="sqlite", path="./debug/data/real_mm_rag/cache/demo.sql"), |
| vectorConfig=VectorStoreConfig(vector_name="faiss", dimensions=1024, index_type="flat_l2"), |
| path="./debug/data/real_mm_rag/cache/indexing" |
| ) |
| storage_handler = StorageHandler(storageConfig=store_config) |
| |
| rag_config = RAGConfig( |
| modality="multimodal", |
| reader=ReaderConfig(recursive=True, exclude_hidden=True, errors="ignore"), |
| embedding=EmbeddingConfig(provider="voyage", model_name="voyage-multimodal-3", device="cpu" ,api_key=voyage_key), |
| index=IndexConfig(index_type="vector"), |
| retrieval=RetrievalConfig(retrivel_type="vector", top_k=5, similarity_cutoff=0.3) |
| ) |
| search_engine = RAGEngine(config=rag_config, storage_handler=storage_handler) |
| |
| |
| print("\nπ Step 1: Indexing 20 documents...") |
| corpus_id = "demo_corpus" |
| valid_paths = [s["image_path"] for s in samples if os.path.exists(s["image_path"])][:20] |
| |
| if len(valid_paths) < 20: |
| print(f"β οΈ Only found {len(valid_paths)} valid image paths, using those") |
| |
| corpus = search_engine.read(file_paths=valid_paths, corpus_id=corpus_id) |
| search_engine.add(index_type="vector", nodes=corpus, corpus_id=corpus_id) |
| print(f"β
Indexed {len(corpus.chunks)} image documents") |
| |
| |
| query_sample = next((s for s in samples if s["query"] and len(s["query"].strip()) > 10), None) |
| if not query_sample: |
| print("β No suitable query found in samples") |
| return |
| |
| query_text = query_sample["query"] |
| target_image = query_sample["image_filename"] |
| |
| print(f"\nπ Step 2: Querying with: '{query_text}'") |
| print(f"π― Target document: {target_image}") |
| |
| |
| query = Query(query_str=query_text, top_k=5) |
| result = search_engine.query(query, corpus_id=corpus_id) |
| retrieved_chunks = result.corpus.chunks |
| |
| print(f"\nπ Retrieved {len(retrieved_chunks)} documents:") |
| retrieved_paths = [] |
| for i, chunk in enumerate(retrieved_chunks): |
| filename = Path(chunk.image_path).name if chunk.image_path else "Unknown" |
| similarity = getattr(chunk.metadata, 'similarity_score', 0.0) |
| retrieved_paths.append(filename) |
| print(f" {i+1}. {filename} (similarity: {similarity:.3f})") |
| |
| |
| print(f"\nπ€ Step 3: Generating answer with GPT-4o...") |
| |
| try: |
| |
| llm_config = OpenAILLMConfig( |
| model="gpt-4o", |
| openai_key=openai_key, |
| temperature=0.1, |
| max_tokens=300 |
| ) |
| llm = OpenAILLM(config=llm_config) |
| |
| print("β
LLM initialized successfully") |
| |
| |
| content = [TextChunk(text=f"Query: {query_text}\n\nAnalyze these retrieved images and answer the query:")] |
| content.extend(retrieved_chunks[:3]) |
| |
| |
| response = llm.generate(messages=[ |
| {"role": "system", "content": "You are an expert image analyst. Answer queries based on provided images."}, |
| {"role": "user", "content": content} |
| ]) |
| |
| print("β
Response generated successfully") |
| answer = response.content |
| |
| except Exception as e: |
| import traceback |
| error_details = traceback.format_exc() |
| print(f"β Detailed error:") |
| print(error_details) |
| answer = f"Error in generation: {str(e)}" |
| |
| |
| print("\n" + "=" * 60) |
| print("π FINAL RESULTS") |
| print("=" * 60) |
| print(f"π QUERY: {query_text}") |
| print(f"\nπ RETRIEVED PATHS:") |
| for i, path in enumerate(retrieved_paths): |
| print(f" {i+1}. {path}") |
| print(f"\nπ― TARGET DOCUMENT: {target_image}") |
| print(f"\nπ€ GENERATED ANSWER:") |
| print(answer) |
| print("EXPECTED ANSWER:") |
| print(query_sample["answer"]) |
| print("=" * 60) |
| |
| |
| search_engine.clear(corpus_id=corpus_id) |
|
|
|
|
| if __name__ == "__main__": |
| demonstrate_rag_to_generation_pipeline() |
|
|