|
|
| from rdflib import Graph, Namespace, URIRef, Literal |
| from typing import Dict, List, Optional |
| from langgraph.graph import StateGraph |
| from langchain.prompts import ChatPromptTemplate |
| import json |
| from dotenv import load_dotenv |
| import os |
| from dataclasses import dataclass |
| from langchain_community.chat_models import ChatOllama |
| from langchain_groq import ChatGroq |
| import logging |
| |
| from analyzers import DrugInteractionAnalyzer |
|
|
|
|
| |
| load_dotenv() |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s [%(levelname)s] %(message)s', |
| handlers=[ |
| logging.FileHandler("app.log"), |
| logging.StreamHandler() |
| ] |
| ) |
|
|
| |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
| if not GROQ_API_KEY: |
| logging.error("GROQ_API_KEY not found in environment variables. Please add it to your .env file.") |
| raise ValueError("GROQ_API_KEY not found in environment variables. Please add it to your .env file.") |
|
|
| @dataclass |
| class GraphState: |
| """State type for the graph.""" |
| input: str |
| query: Optional[str] = None |
| ontology_results: Optional[str] = None |
| response: Optional[str] = None |
|
|
| class OntologyAgent: |
| def __init__(self, owl_file_path: str): |
| """Initialize the OntologyAgent with an OWL file.""" |
|
|
| self.g = Graph() |
| try: |
| self.g.parse(owl_file_path, format="xml") |
| self.ns = Namespace("http://www.example.org/DrugInteraction.owl#") |
| logging.info(f"Ontology loaded successfully from {owl_file_path}") |
| except Exception as e: |
| logging.error(f"Failed to load ontology file: {e}") |
| raise ValueError(f"Failed to load ontology file: {e}") |
|
|
| def create_agent_graph(owl_file_path: str) -> StateGraph: |
| """Create a processing graph for drug interaction analysis using separate agents.""" |
| analyzer = DrugInteractionAnalyzer(owl_file_path) |
| |
| def user_input_node(state: GraphState) -> Dict[str, str]: |
| logging.info("Processing user input.") |
|
|
| return {"query": state.input} |
| |
| def ontology_query_node(state: GraphState) -> Dict[str, str]: |
| try: |
| logging.info("Executing ontology queries.") |
| drug_names = [d.strip() for d in state.input.split(",")] |
| results = analyzer.analyze_drugs(drug_names) |
| logging.info(f"Ontology query results: {results}") |
| return {"ontology_results": json.dumps(results, indent=2)} |
| except Exception as e: |
| logging.warning(f"Ontology query failed: {e}") |
|
|
| return {"ontology_results": json.dumps({"error": str(e)})} |
| |
| def llm_processing_node(state: GraphState) -> Dict[str, str]: |
| template = """ |
| Based on the drug interaction analysis results: |
| {ontology_results} |
| |
| Please provide a comprehensive summary of: |
| 1. Direct interactions between the drugs |
| 2. Potential conflicts |
| 3. Similar drug alternatives |
| 4. Recommended alternatives if conflicts exist |
| |
| If no results were found, please indicate this clearly. |
| Format the response in a clear, structured manner. |
| """ |
| |
| prompt = ChatPromptTemplate.from_template(template) |
|
|
| try: |
| llm = ChatGroq( |
| model_name="llama3-groq-70b-8192-tool-use-preview", |
| api_key=GROQ_API_KEY, |
| temperature=0.7 |
| ) |
| logging.info("LLM initialized successfully.") |
| except Exception as e: |
| logging.error(f"Error initializing LLM: {e}") |
| return {"response": f"Error initializing LLM: {str(e)}"} |
|
|
| chain = prompt | llm |
|
|
| try: |
| response = chain.invoke({ |
| "ontology_results": state.ontology_results |
| }) |
|
|
| logging.info("LLM processing completed successfully.") |
| return {"response": response.content} |
| except Exception as e: |
| logging.error(f"Error processing results with LLM: {e}") |
| return {"response": f"Error processing results: {str(e)}"} |
|
|
| |
| workflow = StateGraph(GraphState) |
|
|
| workflow.add_node("input_processor", user_input_node) |
| workflow.add_node("ontology_query", ontology_query_node) |
| workflow.add_node("llm_processing", llm_processing_node) |
|
|
| workflow.add_edge("input_processor", "ontology_query") |
| workflow.add_edge("ontology_query", "llm_processing") |
|
|
| workflow.set_entry_point("input_processor") |
|
|
| logging.info("Agent graph created and configured successfully.") |
|
|
| return workflow.compile() |
|
|
| def main(): |
| """Main function to run the drug interaction analysis.""" |
| try: |
| logging.info("Starting Drug Interaction Analysis System.") |
| |
|
|
| print("Drug Interaction Analysis System") |
| print("Enter drug names separated by commas (e.g., Aspirin, Warfarin):") |
| user_input = input("Drugs: ").strip() |
| |
| if not user_input: |
| logging.warning("No drug names provided. Exiting.") |
| print("No drug names provided. Exiting.") |
| return |
| |
| owl_file_path = os.path.join("ontology", "DrugInteraction.owl") |
| if not os.path.exists(owl_file_path): |
| logging.error(f"Ontology file not found: {owl_file_path}") |
|
|
| raise FileNotFoundError(f"Ontology file not found: {owl_file_path}") |
| |
| agent_graph = create_agent_graph(owl_file_path) |
| result = agent_graph.invoke(GraphState(input=user_input)) |
| |
| print("\nAnalysis Results:") |
| print(result["response"]) |
|
|
| logging.info("Analysis completed and results displayed.") |
| |
| except Exception as e: |
| logging.error(f"An error occurred: {str(e)}") |
| print(f"An error occurred: {str(e)}") |
| print("Please check your input and try again.") |
|
|
| if __name__ == "__main__": |
| main() |
|
|
|
|