| """ |
| VoiceVerse AI β Content Ingestion Module. |
| |
| Handles all input sources beyond file upload: |
| - YouTube links β transcript via youtube-transcript-api |
| - Article / website β readable text via trafilatura + BeautifulSoup fallback |
| - Pasted raw text β light cleaning and validation |
| |
| Returns plain text string that feeds into RAGStore.add_document(). |
| rag.py is completely unchanged. |
| """ |
|
|
| import re |
| import urllib.parse |
| from utils import logger |
|
|
|
|
| |
| |
| |
|
|
| def _is_youtube(url: str) -> bool: |
| parsed = urllib.parse.urlparse(url.strip()) |
| host = parsed.netloc.lower().replace("www.", "") |
| return host in ("youtube.com", "youtu.be") |
|
|
|
|
| def _extract_youtube_id(url: str) -> str | None: |
| patterns = [ |
| r"(?:v=)([a-zA-Z0-9_-]{11})", |
| r"youtu\.be/([a-zA-Z0-9_-]{11})", |
| r"embed/([a-zA-Z0-9_-]{11})", |
| r"shorts/([a-zA-Z0-9_-]{11})", |
| ] |
| for pattern in patterns: |
| match = re.search(pattern, url) |
| if match: |
| return match.group(1) |
| return None |
|
|
|
|
| |
| |
| |
|
|
| def extract_youtube(url: str) -> str: |
| try: |
| from youtube_transcript_api import ( |
| YouTubeTranscriptApi, NoTranscriptFound, TranscriptsDisabled |
| ) |
| except ImportError: |
| raise ImportError( |
| "youtube-transcript-api is not installed. " |
| "Add 'youtube-transcript-api' to requirements.txt and restart the Space." |
| ) |
|
|
| video_id = _extract_youtube_id(url) |
| if not video_id: |
| raise ValueError(f"Could not extract a YouTube video ID from: {url}") |
|
|
| logger.info("Fetching YouTube transcript: video_id=%s", video_id) |
|
|
| try: |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) |
|
|
| |
| try: |
| transcript = transcript_list.find_manually_created_transcript( |
| ["en", "en-US", "en-GB"] |
| ) |
| except NoTranscriptFound: |
| try: |
| transcript = transcript_list.find_generated_transcript( |
| ["en", "en-US", "en-GB"] |
| ) |
| except NoTranscriptFound: |
| transcript = next(iter(transcript_list)) |
| logger.info("No English transcript β using: %s", transcript.language) |
|
|
| entries = transcript.fetch() |
| text = " ".join(entry["text"] for entry in entries) |
|
|
| |
| text = re.sub(r"\[.*?\]", "", text) |
| text = re.sub(r"\s{2,}", " ", text).strip() |
|
|
| if len(text) < 50: |
| raise ValueError("YouTube transcript is too short to process.") |
|
|
| logger.info("YouTube transcript: %d chars", len(text)) |
| return text |
|
|
| except (NoTranscriptFound, TranscriptsDisabled) as e: |
| raise ValueError( |
| f"No transcript available for this video. " |
| f"The video may have captions disabled or be private.\n\n" |
| f"Tip: Copy the article/video text manually and use the Paste Text tab instead." |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def extract_url(url: str) -> str: |
| """ |
| Fetch a webpage and extract readable text. |
| Tries trafilatura first (best article extractor), falls back to BeautifulSoup. |
| """ |
| url = url.strip() |
| logger.info("Fetching URL: %s", url) |
|
|
| headers = { |
| "User-Agent": ( |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| "Chrome/120.0.0.0 Safari/537.36" |
| ) |
| } |
|
|
| |
| try: |
| import trafilatura |
| downloaded = trafilatura.fetch_url(url) |
| if downloaded: |
| text = trafilatura.extract( |
| downloaded, |
| include_comments=False, |
| include_tables=True, |
| no_fallback=False, |
| ) |
| if text and len(text.strip()) > 100: |
| logger.info("trafilatura extracted %d chars", len(text)) |
| return text.strip() |
| except Exception as e: |
| logger.warning("trafilatura failed (%s) β trying BeautifulSoup", e) |
|
|
| |
| try: |
| import requests |
| from bs4 import BeautifulSoup |
|
|
| resp = requests.get(url, headers=headers, timeout=15) |
| resp.raise_for_status() |
|
|
| soup = BeautifulSoup(resp.text, "html.parser") |
| for tag in soup(["script", "style", "nav", "footer", "header", |
| "aside", "form", "noscript", "iframe"]): |
| tag.decompose() |
|
|
| article = soup.find("article") or soup.find("main") or soup.find("body") |
| text = ( |
| article.get_text(separator=" ", strip=True) |
| if article |
| else soup.get_text(separator=" ", strip=True) |
| ) |
| text = re.sub(r"\s{3,}", "\n\n", text) |
| text = re.sub(r" {2,}", " ", text).strip() |
|
|
| if len(text) < 100: |
| raise ValueError("Could not extract enough text from this page.") |
|
|
| logger.info("BeautifulSoup extracted %d chars", len(text)) |
| return text |
|
|
| except Exception as e: |
| raise ValueError( |
| f"Could not fetch content from: {url}\n\n" |
| f"Reason: {e}\n\n" |
| "The page may require a login or block bots. " |
| "Try copying the article text and pasting it in the Paste Text tab." |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def extract_pasted_text(text: str) -> str: |
| if not text or not text.strip(): |
| raise ValueError("No text was pasted. Please paste some content.") |
|
|
| text = text.replace("\r\n", "\n").replace("\r", "\n") |
| text = re.sub(r"\n{4,}", "\n\n\n", text) |
| text = re.sub(r" {2,}", " ", text).strip() |
|
|
| if len(text) < 50: |
| raise ValueError( |
| "Pasted text is too short. Please paste at least a paragraph of content." |
| ) |
|
|
| logger.info("Pasted text ingested: %d chars", len(text)) |
| return text |
|
|
|
|
| |
| |
| |
|
|
| def ingest_from_url_or_text(raw_input: str) -> tuple[str, str]: |
| """ |
| Auto-detect whether input is a YouTube URL, article URL, or plain text. |
| |
| Returns: |
| (extracted_text, source_label) |
| """ |
| raw = raw_input.strip() |
| if not raw: |
| raise ValueError("Please enter a URL or paste some text.") |
|
|
| if re.match(r"https?://", raw, re.IGNORECASE): |
| if _is_youtube(raw): |
| return extract_youtube(raw), "YouTube" |
| else: |
| return extract_url(raw), "Article / Website" |
| else: |
| return extract_pasted_text(raw), "Pasted Text" |
|
|