File size: 9,236 Bytes
ebd182e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
"""
VoiceVerse AI β€” Content Ingestion Module.

Handles all input sources beyond file upload:
  - YouTube links      β†’ transcript via youtube-transcript-api
  - Article / website  β†’ readable text via trafilatura + BeautifulSoup fallback
  - Pasted raw text    β†’ light cleaning and validation

Returns plain text string that feeds into RAGStore.add_document().
rag.py is completely unchanged.
"""

import re
import urllib.parse
from utils import logger


# ══════════════════════════════════════════════════════════════════════════════
# URL type detection
# ══════════════════════════════════════════════════════════════════════════════

def _is_youtube(url: str) -> bool:
    parsed = urllib.parse.urlparse(url.strip())
    host = parsed.netloc.lower().replace("www.", "")
    return host in ("youtube.com", "youtu.be")


def _extract_youtube_id(url: str) -> str | None:
    patterns = [
        r"(?:v=)([a-zA-Z0-9_-]{11})",
        r"youtu\.be/([a-zA-Z0-9_-]{11})",
        r"embed/([a-zA-Z0-9_-]{11})",
        r"shorts/([a-zA-Z0-9_-]{11})",
    ]
    for pattern in patterns:
        match = re.search(pattern, url)
        if match:
            return match.group(1)
    return None


# ══════════════════════════════════════════════════════════════════════════════
# YouTube transcript
# ══════════════════════════════════════════════════════════════════════════════

def extract_youtube(url: str) -> str:
    try:
        from youtube_transcript_api import (
            YouTubeTranscriptApi, NoTranscriptFound, TranscriptsDisabled
        )
    except ImportError:
        raise ImportError(
            "youtube-transcript-api is not installed. "
            "Add 'youtube-transcript-api' to requirements.txt and restart the Space."
        )

    video_id = _extract_youtube_id(url)
    if not video_id:
        raise ValueError(f"Could not extract a YouTube video ID from: {url}")

    logger.info("Fetching YouTube transcript: video_id=%s", video_id)

    try:
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)

        # Prefer English manual captions, then English auto, then anything available
        try:
            transcript = transcript_list.find_manually_created_transcript(
                ["en", "en-US", "en-GB"]
            )
        except NoTranscriptFound:
            try:
                transcript = transcript_list.find_generated_transcript(
                    ["en", "en-US", "en-GB"]
                )
            except NoTranscriptFound:
                transcript = next(iter(transcript_list))
                logger.info("No English transcript β€” using: %s", transcript.language)

        entries = transcript.fetch()
        text = " ".join(entry["text"] for entry in entries)

        # Clean YouTube caption artifacts
        text = re.sub(r"\[.*?\]", "", text)        # [Music], [Applause] etc.
        text = re.sub(r"\s{2,}", " ", text).strip()

        if len(text) < 50:
            raise ValueError("YouTube transcript is too short to process.")

        logger.info("YouTube transcript: %d chars", len(text))
        return text

    except (NoTranscriptFound, TranscriptsDisabled) as e:
        raise ValueError(
            f"No transcript available for this video. "
            f"The video may have captions disabled or be private.\n\n"
            f"Tip: Copy the article/video text manually and use the Paste Text tab instead."
        )


# ══════════════════════════════════════════════════════════════════════════════
# Article / website URL
# ══════════════════════════════════════════════════════════════════════════════

def extract_url(url: str) -> str:
    """
    Fetch a webpage and extract readable text.
    Tries trafilatura first (best article extractor), falls back to BeautifulSoup.
    """
    url = url.strip()
    logger.info("Fetching URL: %s", url)

    headers = {
        "User-Agent": (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 (KHTML, like Gecko) "
            "Chrome/120.0.0.0 Safari/537.36"
        )
    }

    # ── Attempt 1: trafilatura ────────────────────────────────────────────────
    try:
        import trafilatura
        downloaded = trafilatura.fetch_url(url)
        if downloaded:
            text = trafilatura.extract(
                downloaded,
                include_comments=False,
                include_tables=True,
                no_fallback=False,
            )
            if text and len(text.strip()) > 100:
                logger.info("trafilatura extracted %d chars", len(text))
                return text.strip()
    except Exception as e:
        logger.warning("trafilatura failed (%s) β€” trying BeautifulSoup", e)

    # ── Attempt 2: requests + BeautifulSoup ──────────────────────────────────
    try:
        import requests
        from bs4 import BeautifulSoup

        resp = requests.get(url, headers=headers, timeout=15)
        resp.raise_for_status()

        soup = BeautifulSoup(resp.text, "html.parser")
        for tag in soup(["script", "style", "nav", "footer", "header",
                          "aside", "form", "noscript", "iframe"]):
            tag.decompose()

        article = soup.find("article") or soup.find("main") or soup.find("body")
        text = (
            article.get_text(separator=" ", strip=True)
            if article
            else soup.get_text(separator=" ", strip=True)
        )
        text = re.sub(r"\s{3,}", "\n\n", text)
        text = re.sub(r" {2,}", " ", text).strip()

        if len(text) < 100:
            raise ValueError("Could not extract enough text from this page.")

        logger.info("BeautifulSoup extracted %d chars", len(text))
        return text

    except Exception as e:
        raise ValueError(
            f"Could not fetch content from: {url}\n\n"
            f"Reason: {e}\n\n"
            "The page may require a login or block bots. "
            "Try copying the article text and pasting it in the Paste Text tab."
        )


# ══════════════════════════════════════════════════════════════════════════════
# Pasted raw text
# ══════════════════════════════════════════════════════════════════════════════

def extract_pasted_text(text: str) -> str:
    if not text or not text.strip():
        raise ValueError("No text was pasted. Please paste some content.")

    text = text.replace("\r\n", "\n").replace("\r", "\n")
    text = re.sub(r"\n{4,}", "\n\n\n", text)
    text = re.sub(r" {2,}", " ", text).strip()

    if len(text) < 50:
        raise ValueError(
            "Pasted text is too short. Please paste at least a paragraph of content."
        )

    logger.info("Pasted text ingested: %d chars", len(text))
    return text


# ══════════════════════════════════════════════════════════════════════════════
# Unified entry point
# ══════════════════════════════════════════════════════════════════════════════

def ingest_from_url_or_text(raw_input: str) -> tuple[str, str]:
    """
    Auto-detect whether input is a YouTube URL, article URL, or plain text.

    Returns:
        (extracted_text, source_label)
    """
    raw = raw_input.strip()
    if not raw:
        raise ValueError("Please enter a URL or paste some text.")

    if re.match(r"https?://", raw, re.IGNORECASE):
        if _is_youtube(raw):
            return extract_youtube(raw), "YouTube"
        else:
            return extract_url(raw), "Article / Website"
    else:
        return extract_pasted_text(raw), "Pasted Text"