| """ |
| LangChain-Compatible Tools for SPARKNET |
| All tools follow LangChain's tool interface for seamless integration |
| with LangGraph agents and workflows. |
| """ |
|
|
| from typing import Optional, List, Dict, Any |
| from pydantic import BaseModel, Field |
| from langchain_core.tools import StructuredTool, tool |
| from loguru import logger |
| import json |
|
|
| |
| try: |
| import PyPDF2 |
| import fitz |
| PDF_AVAILABLE = True |
| except ImportError: |
| PDF_AVAILABLE = False |
| logger.warning("PDF libraries not available. Install PyPDF2 and pymupdf.") |
|
|
| |
| try: |
| from reportlab.lib.pagesizes import letter |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer |
| from reportlab.lib.styles import getSampleStyleSheet |
| REPORTLAB_AVAILABLE = True |
| except ImportError: |
| REPORTLAB_AVAILABLE = False |
| logger.warning("ReportLab not available. Install reportlab for PDF generation.") |
|
|
| |
| try: |
| from duckduckgo_search import DDGS |
| DDGS_AVAILABLE = True |
| except ImportError: |
| DDGS_AVAILABLE = False |
| logger.warning("DuckDuckGo search not available.") |
|
|
| try: |
| import wikipedia |
| WIKIPEDIA_AVAILABLE = True |
| except ImportError: |
| WIKIPEDIA_AVAILABLE = False |
| logger.warning("Wikipedia not available.") |
|
|
| try: |
| import arxiv |
| ARXIV_AVAILABLE = True |
| except ImportError: |
| ARXIV_AVAILABLE = False |
| logger.warning("Arxiv not available.") |
|
|
| |
| from ..utils.gpu_manager import get_gpu_manager |
|
|
|
|
| |
| |
| |
|
|
| class PDFExtractorInput(BaseModel): |
| """Input schema for PDF extraction.""" |
| file_path: str = Field(..., description="Path to the PDF file") |
| page_range: Optional[str] = Field(None, description="Page range (e.g., '1-5', 'all')") |
| extract_metadata: bool = Field(True, description="Extract PDF metadata") |
|
|
|
|
| class PatentParserInput(BaseModel): |
| """Input schema for patent parsing.""" |
| text: str = Field(..., description="Patent text to parse") |
| extract_claims: bool = Field(True, description="Extract patent claims") |
| extract_abstract: bool = Field(True, description="Extract abstract") |
| extract_description: bool = Field(True, description="Extract description") |
|
|
|
|
| class WebSearchInput(BaseModel): |
| """Input schema for web search.""" |
| query: str = Field(..., description="Search query") |
| max_results: int = Field(5, description="Maximum number of results") |
| region: str = Field("wt-wt", description="Search region (e.g., 'us-en', 'wt-wt')") |
|
|
|
|
| class WikipediaInput(BaseModel): |
| """Input schema for Wikipedia lookup.""" |
| query: str = Field(..., description="Wikipedia search query") |
| sentences: int = Field(3, description="Number of sentences to return") |
|
|
|
|
| class ArxivInput(BaseModel): |
| """Input schema for Arxiv search.""" |
| query: str = Field(..., description="Search query") |
| max_results: int = Field(5, description="Maximum number of results") |
| sort_by: str = Field("relevance", description="Sort by: relevance, lastUpdatedDate, submittedDate") |
|
|
|
|
| class DocumentGeneratorInput(BaseModel): |
| """Input schema for document generation.""" |
| output_path: str = Field(..., description="Output PDF file path") |
| title: str = Field(..., description="Document title") |
| content: str = Field(..., description="Document content (markdown or plain text)") |
| author: Optional[str] = Field(None, description="Document author") |
|
|
|
|
| class GPUMonitorInput(BaseModel): |
| """Input schema for GPU monitoring.""" |
| gpu_id: Optional[int] = Field(None, description="Specific GPU ID or None for all GPUs") |
|
|
|
|
| |
| |
| |
|
|
| def pdf_extractor_func(file_path: str, page_range: Optional[str] = None, |
| extract_metadata: bool = True) -> str: |
| """ |
| Extract text and metadata from PDF files. |
| Supports both PyPDF2 and PyMuPDF (fitz) backends. |
| |
| Args: |
| file_path: Path to PDF file |
| page_range: Page range like '1-5' or 'all' (default: all) |
| extract_metadata: Whether to extract metadata |
| |
| Returns: |
| Extracted text and metadata as formatted string |
| """ |
| if not PDF_AVAILABLE: |
| return "Error: PDF libraries not installed. Run: pip install PyPDF2 pymupdf" |
|
|
| try: |
| |
| doc = fitz.open(file_path) |
|
|
| |
| if page_range and page_range.lower() != 'all': |
| start, end = map(int, page_range.split('-')) |
| pages = range(start - 1, min(end, len(doc))) |
| else: |
| pages = range(len(doc)) |
|
|
| |
| text_parts = [] |
| for page_num in pages: |
| page = doc[page_num] |
| text_parts.append(f"--- Page {page_num + 1} ---\n{page.get_text()}") |
|
|
| extracted_text = "\n\n".join(text_parts) |
|
|
| |
| result = f"PDF: {file_path}\n" |
| result += f"Total Pages: {len(doc)}\n" |
| result += f"Extracted Pages: {len(pages)}\n\n" |
|
|
| if extract_metadata: |
| metadata = doc.metadata |
| result += "Metadata:\n" |
| for key, value in metadata.items(): |
| if value: |
| result += f" {key}: {value}\n" |
| result += "\n" |
|
|
| result += "=" * 80 + "\n" |
| result += "EXTRACTED TEXT:\n" |
| result += "=" * 80 + "\n" |
| result += extracted_text |
|
|
| doc.close() |
|
|
| logger.info(f"Extracted {len(pages)} pages from {file_path}") |
| return result |
|
|
| except Exception as e: |
| logger.error(f"PDF extraction failed: {e}") |
| return f"Error extracting PDF: {str(e)}" |
|
|
|
|
| def patent_parser_func(text: str, extract_claims: bool = True, |
| extract_abstract: bool = True, extract_description: bool = True) -> str: |
| """ |
| Parse patent document structure and extract key sections. |
| Uses heuristics to identify: abstract, claims, description, drawings. |
| |
| Args: |
| text: Patent text (from PDF or plain text) |
| extract_claims: Extract patent claims |
| extract_abstract: Extract abstract |
| extract_description: Extract detailed description |
| |
| Returns: |
| Structured patent information as JSON string |
| """ |
| try: |
| result = { |
| "abstract": "", |
| "claims": [], |
| "description": "", |
| "metadata": {} |
| } |
|
|
| lines = text.split('\n') |
| current_section = None |
|
|
| |
| for i, line in enumerate(lines): |
| line_lower = line.lower().strip() |
|
|
| |
| if 'abstract' in line_lower and len(line_lower) < 50: |
| current_section = 'abstract' |
| continue |
| elif 'claim' in line_lower and len(line_lower) < 50: |
| current_section = 'claims' |
| continue |
| elif 'description' in line_lower or 'detailed description' in line_lower: |
| if len(line_lower) < 100: |
| current_section = 'description' |
| continue |
| elif 'drawing' in line_lower or 'figure' in line_lower: |
| if len(line_lower) < 50: |
| current_section = 'drawings' |
| continue |
|
|
| |
| if current_section == 'abstract' and extract_abstract: |
| if line.strip(): |
| result['abstract'] += line + "\n" |
| elif current_section == 'claims' and extract_claims: |
| if line.strip() and (line.strip()[0].isdigit() or 'wherein' in line_lower): |
| result['claims'].append(line.strip()) |
| elif current_section == 'description' and extract_description: |
| if line.strip(): |
| result['description'] += line + "\n" |
|
|
| |
| for line in lines[:20]: |
| if 'patent' in line.lower() and any(char.isdigit() for char in line): |
| result['metadata']['patent_number'] = line.strip() |
| break |
|
|
| |
| output = "PATENT ANALYSIS\n" |
| output += "=" * 80 + "\n\n" |
|
|
| if result['abstract']: |
| output += "ABSTRACT:\n" |
| output += result['abstract'].strip()[:500] |
| output += "\n\n" |
|
|
| if result['claims']: |
| output += f"CLAIMS ({len(result['claims'])} found):\n" |
| for i, claim in enumerate(result['claims'][:10], 1): |
| output += f"\n{i}. {claim}\n" |
| output += "\n" |
|
|
| if result['description']: |
| output += "DESCRIPTION (excerpt):\n" |
| output += result['description'].strip()[:1000] |
| output += "\n\n" |
|
|
| output += "=" * 80 + "\n" |
| output += f"JSON OUTPUT:\n{json.dumps(result, indent=2)}" |
|
|
| logger.info(f"Parsed patent: {len(result['claims'])} claims extracted") |
| return output |
|
|
| except Exception as e: |
| logger.error(f"Patent parsing failed: {e}") |
| return f"Error parsing patent: {str(e)}" |
|
|
|
|
| |
| |
| |
|
|
| def web_search_func(query: str, max_results: int = 5, region: str = "wt-wt") -> str: |
| """ |
| Search the web using DuckDuckGo. |
| Returns top results with title, snippet, and URL. |
| |
| Args: |
| query: Search query |
| max_results: Maximum number of results |
| region: Search region code |
| |
| Returns: |
| Formatted search results |
| """ |
| if not DDGS_AVAILABLE: |
| return "Error: DuckDuckGo search not installed. Run: pip install duckduckgo-search" |
|
|
| try: |
| ddgs = DDGS() |
| results = list(ddgs.text(query, region=region, max_results=max_results)) |
|
|
| if not results: |
| return f"No results found for: {query}" |
|
|
| output = f"WEB SEARCH RESULTS: {query}\n" |
| output += "=" * 80 + "\n\n" |
|
|
| for i, result in enumerate(results, 1): |
| output += f"{i}. {result.get('title', 'No title')}\n" |
| output += f" {result.get('body', 'No description')}\n" |
| output += f" URL: {result.get('href', 'No URL')}\n\n" |
|
|
| logger.info(f"Web search completed: {len(results)} results for '{query}'") |
| return output |
|
|
| except Exception as e: |
| logger.error(f"Web search failed: {e}") |
| return f"Error performing web search: {str(e)}" |
|
|
|
|
| def wikipedia_func(query: str, sentences: int = 3) -> str: |
| """ |
| Search Wikipedia and return summary. |
| |
| Args: |
| query: Wikipedia search query |
| sentences: Number of sentences to return |
| |
| Returns: |
| Wikipedia summary |
| """ |
| if not WIKIPEDIA_AVAILABLE: |
| return "Error: Wikipedia not installed. Run: pip install wikipedia" |
|
|
| try: |
| |
| search_results = wikipedia.search(query) |
|
|
| if not search_results: |
| return f"No Wikipedia page found for: {query}" |
|
|
| |
| page = wikipedia.page(search_results[0], auto_suggest=False) |
|
|
| |
| summary = wikipedia.summary(search_results[0], sentences=sentences, auto_suggest=False) |
|
|
| output = f"WIKIPEDIA: {page.title}\n" |
| output += "=" * 80 + "\n\n" |
| output += summary + "\n\n" |
| output += f"URL: {page.url}\n" |
| output += f"Categories: {', '.join(page.categories[:5])}\n" |
|
|
| logger.info(f"Wikipedia lookup completed: {page.title}") |
| return output |
|
|
| except wikipedia.exceptions.DisambiguationError as e: |
| options = ', '.join(e.options[:5]) |
| return f"Disambiguation needed for '{query}'. Options: {options}" |
| except wikipedia.exceptions.PageError: |
| return f"No Wikipedia page found for: {query}" |
| except Exception as e: |
| logger.error(f"Wikipedia lookup failed: {e}") |
| return f"Error: {str(e)}" |
|
|
|
|
| def arxiv_func(query: str, max_results: int = 5, sort_by: str = "relevance") -> str: |
| """ |
| Search Arxiv for academic papers. |
| |
| Args: |
| query: Search query |
| max_results: Maximum number of results |
| sort_by: Sort by relevance, lastUpdatedDate, or submittedDate |
| |
| Returns: |
| Formatted Arxiv results |
| """ |
| if not ARXIV_AVAILABLE: |
| return "Error: Arxiv not installed. Run: pip install arxiv" |
|
|
| try: |
| |
| sort_map = { |
| "relevance": arxiv.SortCriterion.Relevance, |
| "lastUpdatedDate": arxiv.SortCriterion.LastUpdatedDate, |
| "submittedDate": arxiv.SortCriterion.SubmittedDate, |
| } |
| sort_criterion = sort_map.get(sort_by, arxiv.SortCriterion.Relevance) |
|
|
| |
| search = arxiv.Search( |
| query=query, |
| max_results=max_results, |
| sort_by=sort_criterion |
| ) |
|
|
| results = list(search.results()) |
|
|
| if not results: |
| return f"No Arxiv papers found for: {query}" |
|
|
| output = f"ARXIV SEARCH: {query}\n" |
| output += "=" * 80 + "\n\n" |
|
|
| for i, paper in enumerate(results, 1): |
| output += f"{i}. {paper.title}\n" |
| output += f" Authors: {', '.join(str(author) for author in paper.authors[:3])}\n" |
| output += f" Published: {paper.published.strftime('%Y-%m-%d')}\n" |
| output += f" Summary: {paper.summary[:200]}...\n" |
| output += f" PDF: {paper.pdf_url}\n" |
| output += f" Categories: {', '.join(paper.categories)}\n\n" |
|
|
| logger.info(f"Arxiv search completed: {len(results)} papers for '{query}'") |
| return output |
|
|
| except Exception as e: |
| logger.error(f"Arxiv search failed: {e}") |
| return f"Error searching Arxiv: {str(e)}" |
|
|
|
|
| |
| |
| |
|
|
| def document_generator_func(output_path: str, title: str, content: str, |
| author: Optional[str] = None) -> str: |
| """ |
| Generate PDF document from text content. |
| Supports basic formatting and styling. |
| |
| Args: |
| output_path: Output PDF file path |
| title: Document title |
| content: Document content (plain text or simple markdown) |
| author: Optional author name |
| |
| Returns: |
| Success message with file path |
| """ |
| if not REPORTLAB_AVAILABLE: |
| return "Error: ReportLab not installed. Run: pip install reportlab" |
|
|
| try: |
| |
| doc = SimpleDocTemplate(output_path, pagesize=letter) |
| styles = getSampleStyleSheet() |
| story = [] |
|
|
| |
| title_style = styles['Title'] |
| story.append(Paragraph(title, title_style)) |
| story.append(Spacer(1, 12)) |
|
|
| |
| if author: |
| author_style = styles['Normal'] |
| story.append(Paragraph(f"By: {author}", author_style)) |
| story.append(Spacer(1, 12)) |
|
|
| |
| paragraphs = content.split('\n\n') |
| for para in paragraphs: |
| if para.strip(): |
| |
| if para.strip().startswith('#'): |
| |
| heading_text = para.strip().lstrip('#').strip() |
| story.append(Paragraph(heading_text, styles['Heading2'])) |
| else: |
| |
| story.append(Paragraph(para.strip(), styles['Normal'])) |
| story.append(Spacer(1, 6)) |
|
|
| |
| doc.build(story) |
|
|
| logger.info(f"Generated PDF: {output_path}") |
| return f"Successfully generated PDF: {output_path}\nTitle: {title}\nPages: {len(paragraphs)}" |
|
|
| except Exception as e: |
| logger.error(f"PDF generation failed: {e}") |
| return f"Error generating PDF: {str(e)}" |
|
|
|
|
| |
| |
| |
|
|
| def gpu_monitor_func(gpu_id: Optional[int] = None) -> str: |
| """ |
| Monitor GPU status, memory usage, and utilization. |
| |
| Args: |
| gpu_id: Specific GPU ID or None for all GPUs |
| |
| Returns: |
| Formatted GPU status information |
| """ |
| try: |
| gpu_manager = get_gpu_manager() |
|
|
| if gpu_id is not None: |
| |
| info = gpu_manager.get_gpu_info(gpu_id) |
|
|
| if "error" in info: |
| return f"Error: {info['error']}" |
|
|
| output = f"GPU {info['gpu_id']}: {info['name']}\n" |
| output += f"Memory: {info['memory_used'] / 1024**3:.2f} GB / {info['memory_total'] / 1024**3:.2f} GB " |
| output += f"({info['memory_percent']:.1f}% used)\n" |
| output += f"Free Memory: {info['memory_free'] / 1024**3:.2f} GB\n" |
| output += f"GPU Utilization: {info['gpu_utilization']}%\n" |
| output += f"Temperature: {info['temperature']}°C\n" |
|
|
| return output |
| else: |
| |
| return gpu_manager.monitor() |
|
|
| except Exception as e: |
| logger.error(f"GPU monitoring error: {e}") |
| return f"Error monitoring GPU: {str(e)}" |
|
|
|
|
| |
| |
| |
|
|
| |
| pdf_extractor_tool = StructuredTool.from_function( |
| func=pdf_extractor_func, |
| name="pdf_extractor", |
| description=( |
| "Extract text and metadata from PDF files. " |
| "Useful for analyzing patent documents, research papers, and legal documents. " |
| "Supports page range selection and metadata extraction." |
| ), |
| args_schema=PDFExtractorInput, |
| return_direct=False, |
| ) |
|
|
| patent_parser_tool = StructuredTool.from_function( |
| func=patent_parser_func, |
| name="patent_parser", |
| description=( |
| "Parse patent document structure and extract key sections: abstract, claims, description. " |
| "Useful for analyzing patent documents and identifying key innovations." |
| ), |
| args_schema=PatentParserInput, |
| return_direct=False, |
| ) |
|
|
| web_search_tool = StructuredTool.from_function( |
| func=web_search_func, |
| name="web_search", |
| description=( |
| "Search the web using DuckDuckGo. Returns top results with titles, snippets, and URLs. " |
| "Useful for market research, competitor analysis, and finding relevant information." |
| ), |
| args_schema=WebSearchInput, |
| return_direct=False, |
| ) |
|
|
| wikipedia_tool = StructuredTool.from_function( |
| func=wikipedia_func, |
| name="wikipedia", |
| description=( |
| "Search Wikipedia and get article summaries. " |
| "Useful for background information on technologies, companies, and concepts." |
| ), |
| args_schema=WikipediaInput, |
| return_direct=False, |
| ) |
|
|
| arxiv_tool = StructuredTool.from_function( |
| func=arxiv_func, |
| name="arxiv_search", |
| description=( |
| "Search Arxiv for academic papers and preprints. " |
| "Useful for finding relevant research, state-of-the-art methods, and technical background." |
| ), |
| args_schema=ArxivInput, |
| return_direct=False, |
| ) |
|
|
| document_generator_tool = StructuredTool.from_function( |
| func=document_generator_func, |
| name="document_generator", |
| description=( |
| "Generate PDF documents from text content. " |
| "Useful for creating reports, briefs, and documentation." |
| ), |
| args_schema=DocumentGeneratorInput, |
| return_direct=False, |
| ) |
|
|
| gpu_monitor_tool = StructuredTool.from_function( |
| func=gpu_monitor_func, |
| name="gpu_monitor", |
| description=( |
| "Monitor GPU status including memory usage, utilization, and temperature. " |
| "Useful for checking GPU availability before running models." |
| ), |
| args_schema=GPUMonitorInput, |
| return_direct=False, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class VISTAToolRegistry: |
| """ |
| Registry of tools organized by VISTA scenario. |
| Enables scenario-specific tool selection for optimal performance. |
| """ |
|
|
| SCENARIO_TOOLS = { |
| "patent_wakeup": [ |
| pdf_extractor_tool, |
| patent_parser_tool, |
| web_search_tool, |
| wikipedia_tool, |
| arxiv_tool, |
| document_generator_tool, |
| ], |
| "agreement_safety": [ |
| pdf_extractor_tool, |
| web_search_tool, |
| document_generator_tool, |
| ], |
| "partner_matching": [ |
| web_search_tool, |
| wikipedia_tool, |
| arxiv_tool, |
| ], |
| "general": [ |
| pdf_extractor_tool, |
| patent_parser_tool, |
| web_search_tool, |
| wikipedia_tool, |
| arxiv_tool, |
| document_generator_tool, |
| gpu_monitor_tool, |
| ], |
| } |
|
|
| @classmethod |
| def get_tools(cls, scenario: str = "general") -> List[StructuredTool]: |
| """ |
| Get tools for a specific VISTA scenario. |
| |
| Args: |
| scenario: VISTA scenario type |
| |
| Returns: |
| List of LangChain tools |
| """ |
| tools = cls.SCENARIO_TOOLS.get(scenario, cls.SCENARIO_TOOLS["general"]) |
| logger.info(f"Retrieved {len(tools)} tools for scenario: {scenario}") |
| return tools |
|
|
| @classmethod |
| def get_all_tools(cls) -> List[StructuredTool]: |
| """Get all available tools.""" |
| return cls.SCENARIO_TOOLS["general"] |
|
|
| @classmethod |
| def list_scenarios(cls) -> List[str]: |
| """List available scenarios.""" |
| return list(cls.SCENARIO_TOOLS.keys()) |
|
|
|
|
| |
| |
| |
|
|
| def get_vista_tools(scenario: str = "general") -> List[StructuredTool]: |
| """ |
| Get LangChain tools for a VISTA scenario. |
| |
| Args: |
| scenario: Scenario name (patent_wakeup, agreement_safety, partner_matching, general) |
| |
| Returns: |
| List of LangChain StructuredTool instances |
| """ |
| return VISTAToolRegistry.get_tools(scenario) |
|
|
|
|
| def get_all_tools() -> List[StructuredTool]: |
| """Get all available LangChain tools.""" |
| return VISTAToolRegistry.get_all_tools() |
|
|
|
|
| |
| __all__ = [ |
| "pdf_extractor_tool", |
| "patent_parser_tool", |
| "web_search_tool", |
| "wikipedia_tool", |
| "arxiv_tool", |
| "document_generator_tool", |
| "gpu_monitor_tool", |
| "VISTAToolRegistry", |
| "get_vista_tools", |
| "get_all_tools", |
| ] |
|
|