Spaces:
Sleeping
Sleeping
| """ | |
| VoiceVerse AI β Content Ingestion Module. | |
| Handles all input sources beyond file upload: | |
| - YouTube links β transcript via youtube-transcript-api | |
| - Article / website β readable text via trafilatura + BeautifulSoup fallback | |
| - Pasted raw text β light cleaning and validation | |
| Returns plain text string that feeds into RAGStore.add_document(). | |
| rag.py is completely unchanged. | |
| """ | |
| import re | |
| import urllib.parse | |
| from utils import logger | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # URL type detection | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _is_youtube(url: str) -> bool: | |
| parsed = urllib.parse.urlparse(url.strip()) | |
| host = parsed.netloc.lower().replace("www.", "") | |
| return host in ("youtube.com", "youtu.be") | |
| def _extract_youtube_id(url: str) -> str | None: | |
| patterns = [ | |
| r"(?:v=)([a-zA-Z0-9_-]{11})", | |
| r"youtu\.be/([a-zA-Z0-9_-]{11})", | |
| r"embed/([a-zA-Z0-9_-]{11})", | |
| r"shorts/([a-zA-Z0-9_-]{11})", | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, url) | |
| if match: | |
| return match.group(1) | |
| return None | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # YouTube transcript | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_youtube(url: str) -> str: | |
| try: | |
| from youtube_transcript_api import ( | |
| YouTubeTranscriptApi, NoTranscriptFound, TranscriptsDisabled | |
| ) | |
| except ImportError: | |
| raise ImportError( | |
| "youtube-transcript-api is not installed. " | |
| "Add 'youtube-transcript-api' to requirements.txt and restart the Space." | |
| ) | |
| video_id = _extract_youtube_id(url) | |
| if not video_id: | |
| raise ValueError(f"Could not extract a YouTube video ID from: {url}") | |
| logger.info("Fetching YouTube transcript: video_id=%s", video_id) | |
| try: | |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
| # Prefer English manual captions, then English auto, then anything available | |
| try: | |
| transcript = transcript_list.find_manually_created_transcript( | |
| ["en", "en-US", "en-GB"] | |
| ) | |
| except NoTranscriptFound: | |
| try: | |
| transcript = transcript_list.find_generated_transcript( | |
| ["en", "en-US", "en-GB"] | |
| ) | |
| except NoTranscriptFound: | |
| transcript = next(iter(transcript_list)) | |
| logger.info("No English transcript β using: %s", transcript.language) | |
| entries = transcript.fetch() | |
| text = " ".join(entry["text"] for entry in entries) | |
| # Clean YouTube caption artifacts | |
| text = re.sub(r"\[.*?\]", "", text) # [Music], [Applause] etc. | |
| text = re.sub(r"\s{2,}", " ", text).strip() | |
| if len(text) < 50: | |
| raise ValueError("YouTube transcript is too short to process.") | |
| logger.info("YouTube transcript: %d chars", len(text)) | |
| return text | |
| except (NoTranscriptFound, TranscriptsDisabled) as e: | |
| raise ValueError( | |
| f"No transcript available for this video. " | |
| f"The video may have captions disabled or be private.\n\n" | |
| f"Tip: Copy the article/video text manually and use the Paste Text tab instead." | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Article / website URL | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_url(url: str) -> str: | |
| """ | |
| Fetch a webpage and extract readable text. | |
| Tries trafilatura first (best article extractor), falls back to BeautifulSoup. | |
| """ | |
| url = url.strip() | |
| logger.info("Fetching URL: %s", url) | |
| headers = { | |
| "User-Agent": ( | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/120.0.0.0 Safari/537.36" | |
| ) | |
| } | |
| # ββ Attempt 1: trafilatura ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| import trafilatura | |
| downloaded = trafilatura.fetch_url(url) | |
| if downloaded: | |
| text = trafilatura.extract( | |
| downloaded, | |
| include_comments=False, | |
| include_tables=True, | |
| no_fallback=False, | |
| ) | |
| if text and len(text.strip()) > 100: | |
| logger.info("trafilatura extracted %d chars", len(text)) | |
| return text.strip() | |
| except Exception as e: | |
| logger.warning("trafilatura failed (%s) β trying BeautifulSoup", e) | |
| # ββ Attempt 2: requests + BeautifulSoup ββββββββββββββββββββββββββββββββββ | |
| try: | |
| import requests | |
| from bs4 import BeautifulSoup | |
| resp = requests.get(url, headers=headers, timeout=15) | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| for tag in soup(["script", "style", "nav", "footer", "header", | |
| "aside", "form", "noscript", "iframe"]): | |
| tag.decompose() | |
| article = soup.find("article") or soup.find("main") or soup.find("body") | |
| text = ( | |
| article.get_text(separator=" ", strip=True) | |
| if article | |
| else soup.get_text(separator=" ", strip=True) | |
| ) | |
| text = re.sub(r"\s{3,}", "\n\n", text) | |
| text = re.sub(r" {2,}", " ", text).strip() | |
| if len(text) < 100: | |
| raise ValueError("Could not extract enough text from this page.") | |
| logger.info("BeautifulSoup extracted %d chars", len(text)) | |
| return text | |
| except Exception as e: | |
| raise ValueError( | |
| f"Could not fetch content from: {url}\n\n" | |
| f"Reason: {e}\n\n" | |
| "The page may require a login or block bots. " | |
| "Try copying the article text and pasting it in the Paste Text tab." | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Pasted raw text | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_pasted_text(text: str) -> str: | |
| if not text or not text.strip(): | |
| raise ValueError("No text was pasted. Please paste some content.") | |
| text = text.replace("\r\n", "\n").replace("\r", "\n") | |
| text = re.sub(r"\n{4,}", "\n\n\n", text) | |
| text = re.sub(r" {2,}", " ", text).strip() | |
| if len(text) < 50: | |
| raise ValueError( | |
| "Pasted text is too short. Please paste at least a paragraph of content." | |
| ) | |
| logger.info("Pasted text ingested: %d chars", len(text)) | |
| return text | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Unified entry point | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def ingest_from_url_or_text(raw_input: str) -> tuple[str, str]: | |
| """ | |
| Auto-detect whether input is a YouTube URL, article URL, or plain text. | |
| Returns: | |
| (extracted_text, source_label) | |
| """ | |
| raw = raw_input.strip() | |
| if not raw: | |
| raise ValueError("Please enter a URL or paste some text.") | |
| if re.match(r"https?://", raw, re.IGNORECASE): | |
| if _is_youtube(raw): | |
| return extract_youtube(raw), "YouTube" | |
| else: | |
| return extract_url(raw), "Article / Website" | |
| else: | |
| return extract_pasted_text(raw), "Pasted Text" | |