customeragent-api / tests /test_ingestion.py
anasraza526's picture
Clean deploy to Hugging Face
ac90985
import asyncio
import os
import sys
import shutil
# Disable tokenizers parallelism to prevent deadlocks/segfaults on fork
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# Add server directory to path
sys.path.append(os.path.join(os.path.dirname(__file__), '../server'))
from app.services.parsers.medical_parsers import MedQuADParser
from app.services.medical_retriever import get_medical_retriever
async def test_ingestion():
print("πŸ₯ Testing Healthcare Data Ingestion Pipeline...\n")
# Setup paths
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
raw_dir = os.path.join(base_dir, 'server/datasets/raw/medquad')
processed_dir = os.path.join(base_dir, 'server/datasets/processed')
os.makedirs(raw_dir, exist_ok=True)
os.makedirs(processed_dir, exist_ok=True)
# 0. CLEANUP: Remove old cache to prevent segfaults from corrupt data
cache_file = os.path.join(processed_dir, "medical_hybrid_cache.pkl")
faiss_file = os.path.join(processed_dir, "medical_faiss_index.bin")
if os.path.exists(cache_file): os.remove(cache_file)
if os.path.exists(faiss_file): os.remove(faiss_file)
if os.path.exists("faiss_index.bin"): os.remove("faiss_index.bin") # Hybrid retrieval fallback
print("🧹 Cleaned up old cache files.")
# 1. Create Dummy File
dummy_file = os.path.join(raw_dir, 'test_dummy.json')
with open(dummy_file, 'w') as f:
f.write('''
[
{
"question": "What is the treatment for dummyitis?",
"answer": "The treatment for dummyitis involves rest and chocolate.",
"focus": "Dummy Disease",
"group": "Rare Conditions"
}
]
''')
print("βœ… Created dummy MedQuAD file.")
# 2. Test Parser directly
parser = MedQuADParser()
records = parser.parse(dummy_file)
print(f"βœ… Parser output: {records[0]}")
assert records[0]['source'] == 'MedQuAD'
assert 'chocolate' in records[0]['answer']
# 3. Test Ingestion directly into Retriever
retriever = get_medical_retriever()
print("πŸ”„ Ingesting into Vector Store (this simulates the script)...")
# SYSTEM FIX: Monkeypatch embedding generation to avoid Segfaults on M1/M2 during testing
# The actual embedding model works in main app, but crashes in this standalone test script due to library conflicts.
from app.services.vector_operations import VectorOperations
import random
async def mock_get_embedding(text):
return [random.random() for _ in range(384)]
# Save original method just in case, then replace
original_get_embedding = VectorOperations.get_embedding
VectorOperations.get_embedding = mock_get_embedding
try:
await retriever.add_medical_qa(records)
print("βœ… Ingestion method returned.")
except Exception as e:
print(f"❌ Ingestion exception: {e}")
# 4. Verification Query
test_q = "treatment for dummyitis"
print(f"πŸ”Ž Searching for: '{test_q}'")
# Need embedding for search
from app.services.vector_operations import VectorOperations
print(" Generatng embedding...")
embedding = await VectorOperations.get_embedding(test_q)
print(" Embedding generated.")
print(" Detailed search...")
results = retriever.search_medical(test_q, embedding, top_k=1)
if results:
match = results[0]
print(f"βœ… Match Found:\n Source: {match.get('source')}\n Answer: {match.get('answer')}\n Conf: {match.get('confidence')}")
if match.get('source') == 'MedQuAD' and 'chocolate' in match.get('answer'):
print("\nπŸŽ‰ SUCCESS: Data ingested and retrieved with correct metadata!")
else:
print("\n❌ FAILED: Metadata mismatch.")
else:
print("\n❌ FAILED: No results found.")
if __name__ == "__main__":
asyncio.run(test_ingestion())