| """ |
| YouTube Video Analysis Tool - Extract transcripts or analyze frames from YouTube videos |
| Author: @mangubee |
| Date: 2026-01-13 |
| |
| Provides two modes for YouTube video analysis: |
| - Transcript Mode: youtube-transcript-api (instant, 1-3 seconds) or Whisper fallback |
| - Frame Mode: Extract video frames and analyze with vision models |
| |
| Transcript Mode Workflow: |
| YouTube URL |
| ├─ Has transcript? ✅ → Use youtube-transcript-api (instant) |
| └─ No transcript? ❌ → Download audio + Whisper (slower, but works) |
| |
| Frame Mode Workflow: |
| YouTube URL |
| ├─ Download video with yt-dlp |
| ├─ Extract N frames at regular intervals |
| └─ Analyze frames with vision models (summarize findings) |
| |
| Requirements: |
| - youtube-transcript-api: pip install youtube-transcript-api |
| - yt-dlp: pip install yt-dlp |
| - openai: pip install openai (via src.tools.audio) |
| - opencv-python: pip install opencv-python (for frame extraction) |
| - PIL: pip install Pillow (for image handling) |
| """ |
|
|
| import logging |
| import os |
| import re |
| import tempfile |
| from typing import Dict, Any, Optional |
| from pathlib import Path |
|
|
| |
| |
| |
| |
| YOUTUBE_PATTERNS = [ |
| r'(?:youtube\.com\/watch\?v=|youtu\.be\/|youtube\.com\/shorts\/)([a-zA-Z0-9_-]{11})', |
| ] |
|
|
| |
| AUDIO_FORMAT = "mp3" |
| AUDIO_QUALITY = "128" |
|
|
| |
| FRAME_COUNT = 6 |
| FRAME_QUALITY = "worst" |
|
|
| |
| CLEANUP_TEMP_FILES = True |
|
|
| |
| |
| |
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| def save_transcript_to_cache(video_id: str, text: str, source: str) -> None: |
| """ |
| Save transcript to _log/ folder for debugging. |
| |
| Args: |
| video_id: YouTube video ID |
| text: Transcript text |
| source: "api" or "whisper" |
| """ |
| try: |
| log_dir = Path("_log") |
| log_dir.mkdir(exist_ok=True) |
|
|
| cache_file = log_dir / f"{video_id}_transcript.md" |
| with open(cache_file, "w", encoding="utf-8") as f: |
| f.write(f"# YouTube Transcript\n\n") |
| f.write(f"**Video ID:** {video_id}\n") |
| f.write(f"**Source:** {source}\n") |
| f.write(f"**Length:** {len(text)} characters\n") |
| f.write(f"**Generated:** {__import__('datetime').datetime.now().isoformat()}\n\n") |
| f.write(f"## Transcript\n\n") |
| f.write(f"{text}\n") |
|
|
| logger.info(f"Transcript saved: {cache_file}") |
| except Exception as e: |
| logger.warning(f"Failed to save transcript: {e}") |
|
|
|
|
| |
| |
| |
|
|
| def extract_video_id(url: str) -> Optional[str]: |
| """ |
| Extract video ID from various YouTube URL formats. |
| |
| Supports: |
| - youtube.com/watch?v=VIDEO_ID |
| - youtu.be/VIDEO_ID |
| - youtube.com/shorts/VIDEO_ID |
| |
| Args: |
| url: YouTube URL |
| |
| Returns: |
| Video ID (11 characters) or None if not found |
| |
| Examples: |
| >>> extract_video_id("https://youtube.com/watch?v=dQw4w9WgXcQ") |
| "dQw4w9WgXcQ" |
| |
| >>> extract_video_id("https://youtu.be/dQw4w9WgXcQ") |
| "dQw4w9WgXcQ" |
| """ |
| if not url: |
| return None |
|
|
| for pattern in YOUTUBE_PATTERNS: |
| match = re.search(pattern, url) |
| if match: |
| return match.group(1) |
|
|
| return None |
|
|
|
|
| |
| |
| |
|
|
| def get_youtube_transcript(video_id: str) -> Dict[str, Any]: |
| """ |
| Get transcript using youtube-transcript-api. |
| |
| Args: |
| video_id: YouTube video ID (11 characters) |
| |
| Returns: |
| Dict with structure: { |
| "text": str, # Transcript text |
| "video_id": str, # Video ID |
| "source": str, # "api" or "whisper" |
| "success": bool, # True if transcription succeeded |
| "error": str or None # Error message if failed |
| } |
| """ |
| try: |
| from youtube_transcript_api import YouTubeTranscriptApi |
|
|
| logger.info(f"Fetching transcript for video: {video_id}") |
|
|
| |
| |
| api = YouTubeTranscriptApi() |
| transcript_list = api.fetch( |
| video_id, |
| languages=['en', 'en-US', 'en-GB'] |
| ) |
|
|
| |
| text_parts = [] |
| for entry in transcript_list: |
| text = entry.get('text', '').strip() |
| if text: |
| text_parts.append(text) |
|
|
| text = ' '.join(text_parts) |
|
|
| logger.info(f"Transcript fetched: {len(text)} characters") |
|
|
| |
| save_transcript_to_cache(video_id, text, "api") |
|
|
| return { |
| "text": text, |
| "video_id": video_id, |
| "source": "api", |
| "success": True, |
| "error": None |
| } |
|
|
| except Exception as e: |
| error_msg = str(e) |
| logger.error(f"YouTube transcript API failed: {error_msg}") |
|
|
| |
| if "No transcript found" in error_msg or "Could not retrieve a transcript" in error_msg: |
| return { |
| "text": "", |
| "video_id": video_id, |
| "source": "api", |
| "success": False, |
| "error": "No transcript available (video may not have captions)" |
| } |
|
|
| return { |
| "text": "", |
| "video_id": video_id, |
| "source": "api", |
| "success": False, |
| "error": f"Transcript API error: {error_msg}" |
| } |
|
|
|
|
| |
| |
| |
|
|
| def download_audio(video_url: str) -> Optional[str]: |
| """ |
| Download audio from YouTube using yt-dlp. |
| |
| Args: |
| video_url: Full YouTube URL |
| |
| Returns: |
| Path to downloaded audio file or None if failed |
| """ |
| try: |
| import yt_dlp |
|
|
| logger.info(f"Downloading audio from: {video_url}") |
|
|
| |
| temp_dir = tempfile.gettempdir() |
| output_path = os.path.join(temp_dir, f"youtube_audio_{os.getpid()}.{AUDIO_FORMAT}") |
|
|
| |
| ydl_opts = { |
| 'format': 'bestaudio/best', |
| 'postprocessors': [{ |
| 'key': 'FFmpegExtractAudio', |
| 'preferredcodec': AUDIO_FORMAT, |
| 'preferredquality': AUDIO_QUALITY, |
| }], |
| 'outtmpl': output_path.replace(f'.{AUDIO_FORMAT}', ''), |
| 'quiet': True, |
| 'no_warnings': True, |
| } |
|
|
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| ydl.download([video_url]) |
|
|
| |
| actual_path = output_path if os.path.exists(output_path) else output_path |
|
|
| if os.path.exists(actual_path): |
| logger.info(f"Audio downloaded: {actual_path} ({os.path.getsize(actual_path)} bytes)") |
| return actual_path |
| else: |
| |
| for file in os.listdir(temp_dir): |
| if file.startswith(f"youtube_audio_{os.getpid()}"): |
| actual_path = os.path.join(temp_dir, file) |
| logger.info(f"Audio downloaded: {actual_path}") |
| return actual_path |
|
|
| logger.error("Audio file not found after download") |
| return None |
|
|
| except ImportError: |
| logger.error("yt-dlp not installed. Run: pip install yt-dlp") |
| return None |
| except Exception as e: |
| logger.error(f"Audio download failed: {e}") |
| return None |
|
|
|
|
| def transcribe_from_audio(video_url: str) -> Dict[str, Any]: |
| """ |
| Fallback: Download audio and transcribe with Whisper. |
| |
| Args: |
| video_url: Full YouTube URL |
| |
| Returns: |
| Dict with structure: { |
| "text": str, # Transcript text |
| "video_id": str, # Video ID |
| "source": str, # "whisper" |
| "success": bool, # True if transcription succeeded |
| "error": str or None # Error message if failed |
| } |
| """ |
| video_id = extract_video_id(video_url) |
|
|
| if not video_id: |
| return { |
| "text": "", |
| "video_id": "", |
| "source": "whisper", |
| "success": False, |
| "error": "Invalid YouTube URL" |
| } |
|
|
| |
| audio_file = download_audio(video_url) |
|
|
| if not audio_file: |
| return { |
| "text": "", |
| "video_id": video_id, |
| "source": "whisper", |
| "success": False, |
| "error": "Failed to download audio" |
| } |
|
|
| try: |
| |
| from src.tools.audio import transcribe_audio |
|
|
| |
| result = transcribe_audio(audio_file) |
|
|
| |
| if CLEANUP_TEMP_FILES: |
| try: |
| os.remove(audio_file) |
| logger.info(f"Cleaned up temp file: {audio_file}") |
| except Exception as e: |
| logger.warning(f"Failed to cleanup temp file: {e}") |
|
|
| if result["success"]: |
| |
| save_transcript_to_cache(video_id, result["text"], "whisper") |
|
|
| return { |
| "text": result["text"], |
| "video_id": video_id, |
| "source": "whisper", |
| "success": True, |
| "error": None |
| } |
| else: |
| return { |
| "text": "", |
| "video_id": video_id, |
| "source": "whisper", |
| "success": False, |
| "error": result.get("error", "Transcription failed") |
| } |
|
|
| except Exception as e: |
| logger.error(f"Whisper transcription failed: {e}") |
| return { |
| "text": "", |
| "video_id": video_id, |
| "source": "whisper", |
| "success": False, |
| "error": f"Whisper transcription failed: {str(e)}" |
| } |
|
|
|
|
| |
| |
| |
|
|
| def download_video(url: str) -> Optional[str]: |
| """ |
| Download video from YouTube using yt-dlp for frame extraction. |
| |
| Args: |
| url: Full YouTube URL |
| |
| Returns: |
| Path to downloaded video file or None if failed |
| """ |
| try: |
| import yt_dlp |
|
|
| logger.info(f"Downloading video from: {url}") |
|
|
| |
| temp_dir = tempfile.gettempdir() |
| output_path = os.path.join(temp_dir, f"youtube_video_{os.getpid()}") |
|
|
| |
| ydl_opts = { |
| 'format': f'best[ext=mp4]/best', |
| 'outtmpl': output_path, |
| 'quiet': True, |
| 'no_warnings': True, |
| } |
|
|
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| ydl.download([url]) |
|
|
| |
| for file in os.listdir(temp_dir): |
| if file.startswith(f"youtube_video_{os.getpid()}"): |
| actual_path = os.path.join(temp_dir, file) |
| size_mb = os.path.getsize(actual_path) / (1024 * 1024) |
| logger.info(f"Video downloaded: {actual_path} ({size_mb:.2f}MB)") |
| return actual_path |
|
|
| logger.error("Video file not found after download") |
| return None |
|
|
| except ImportError: |
| logger.error("yt-dlp not installed. Run: pip install yt-dlp") |
| return None |
| except Exception as e: |
| logger.error(f"Video download failed: {e}") |
| return None |
|
|
|
|
| def extract_frames(video_path: str, count: int = FRAME_COUNT) -> list: |
| """ |
| Extract frames from video at regular intervals. |
| |
| Args: |
| video_path: Path to video file |
| count: Number of frames to extract (default: FRAME_COUNT) |
| |
| Returns: |
| List of (frame_path, timestamp) tuples |
| """ |
| try: |
| import cv2 |
|
|
| cap = cv2.VideoCapture(video_path) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| duration = total_frames / fps if fps > 0 else 0 |
|
|
| logger.info(f"Video: {total_frames} frames, {fps:.2f} FPS, {duration:.2f}s duration") |
|
|
| |
| if total_frames <= count: |
| frame_indices = list(range(total_frames)) |
| else: |
| interval = total_frames / count |
| frame_indices = [int(i * interval) for i in range(count)] |
|
|
| logger.info(f"Extracting {len(frame_indices)} frames at indices: {frame_indices[:3]}...") |
|
|
| frames = [] |
| temp_dir = tempfile.gettempdir() |
|
|
| for idx, frame_idx in enumerate(frame_indices): |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) |
| ret, frame = cap.read() |
|
|
| if ret: |
| timestamp = frame_idx / fps if fps > 0 else 0 |
| frame_path = os.path.join(temp_dir, f"frame_{os.getpid()}_{idx}.jpg") |
| cv2.imwrite(frame_path, frame) |
| frames.append((frame_path, timestamp)) |
| logger.debug(f"Frame {idx}: {timestamp:.2f}s -> {frame_path}") |
| else: |
| logger.warning(f"Failed to extract frame at index {frame_idx}") |
|
|
| cap.release() |
| logger.info(f"Extracted {len(frames)} frames") |
| return frames |
|
|
| except ImportError: |
| logger.error("opencv-python not installed. Run: pip install opencv-python") |
| return [] |
| except Exception as e: |
| logger.error(f"Frame extraction failed: {e}") |
| return [] |
|
|
|
|
| def analyze_frames(frames: list, question: str = None) -> Dict[str, Any]: |
| """ |
| Analyze video frames using vision models. |
| |
| Args: |
| frames: List of (frame_path, timestamp) tuples |
| question: Optional question to ask about frames |
| |
| Returns: |
| Dict with structure: { |
| "text": str, # Summarized analysis |
| "video_id": str, # Video ID (placeholder) |
| "source": str, # "frames" |
| "success": bool, # True if analysis succeeded |
| "error": str or None # Error message if failed |
| "frame_count": int, # Number of frames analyzed |
| } |
| """ |
| from src.tools.vision import analyze_image |
|
|
| if not frames: |
| return { |
| "text": "", |
| "video_id": "", |
| "source": "frames", |
| "success": False, |
| "error": "No frames to analyze", |
| "frame_count": 0, |
| } |
|
|
| |
| if not question: |
| question = "Describe what you see in this frame. Include any visible text, objects, people, or actions." |
|
|
| try: |
| logger.info(f"Analyzing {len(frames)} frames with vision model...") |
|
|
| frame_analyses = [] |
|
|
| for idx, (frame_path, timestamp) in enumerate(frames): |
| logger.info(f"Analyzing frame {idx + 1}/{len(frames)} at {timestamp:.2f}s...") |
|
|
| |
| frame_question = f"This is frame {idx + 1} of {len(frames)} from a video at timestamp {timestamp:.2f} seconds. {question}" |
|
|
| try: |
| result = analyze_image(frame_path, frame_question) |
| answer = result.get("answer", "") |
|
|
| |
| frame_analyses.append(f"[Frame {idx + 1} @ {timestamp:.2f}s]\n{answer}") |
|
|
| logger.info(f"Frame {idx + 1} analyzed: {len(answer)} chars") |
|
|
| except Exception as e: |
| logger.warning(f"Frame {idx + 1} analysis failed: {e}") |
| frame_analyses.append(f"[Frame {idx + 1} @ {timestamp:.2f}s]\nAnalysis failed: {str(e)}") |
|
|
| |
| if CLEANUP_TEMP_FILES: |
| for frame_path, _ in frames: |
| try: |
| os.remove(frame_path) |
| except Exception as e: |
| logger.warning(f"Failed to cleanup frame {frame_path}: {e}") |
|
|
| |
| combined_text = "\n\n".join(frame_analyses) |
|
|
| logger.info(f"Frame analysis complete: {len(combined_text)} chars total") |
|
|
| return { |
| "text": combined_text, |
| "video_id": "", |
| "source": "frames", |
| "success": True, |
| "error": None, |
| "frame_count": len(frames), |
| } |
|
|
| except Exception as e: |
| logger.error(f"Frame analysis failed: {e}") |
| return { |
| "text": "", |
| "video_id": "", |
| "source": "frames", |
| "success": False, |
| "error": f"Frame analysis failed: {str(e)}", |
| "frame_count": len(frames), |
| } |
|
|
|
|
| def process_video_frames(url: str, question: str = None, frame_count: int = FRAME_COUNT) -> Dict[str, Any]: |
| """ |
| Download video, extract frames, and analyze with vision models. |
| |
| Args: |
| url: Full YouTube URL |
| question: Optional question to ask about frames |
| frame_count: Number of frames to extract |
| |
| Returns: |
| Dict with structure: { |
| "text": str, # Combined frame analyses |
| "video_id": str, # Video ID |
| "source": str, # "frames" |
| "success": bool, # True if processing succeeded |
| "error": str or None # Error message if failed |
| "frame_count": int # Number of frames analyzed |
| } |
| """ |
| video_id = extract_video_id(url) |
|
|
| if not video_id: |
| return { |
| "text": "", |
| "video_id": "", |
| "source": "frames", |
| "success": False, |
| "error": "Invalid YouTube URL", |
| "frame_count": 0, |
| } |
|
|
| |
| video_file = download_video(url) |
|
|
| if not video_file: |
| return { |
| "text": "", |
| "video_id": video_id, |
| "source": "frames", |
| "success": False, |
| "error": "Failed to download video", |
| "frame_count": 0, |
| } |
|
|
| try: |
| |
| frames = extract_frames(video_file, frame_count) |
|
|
| if not frames: |
| return { |
| "text": "", |
| "video_id": video_id, |
| "source": "frames", |
| "success": False, |
| "error": "Failed to extract frames", |
| "frame_count": 0, |
| } |
|
|
| |
| result = analyze_frames(frames, question) |
|
|
| |
| if CLEANUP_TEMP_FILES: |
| try: |
| os.remove(video_file) |
| logger.info(f"Cleaned up temp video: {video_file}") |
| except Exception as e: |
| logger.warning(f"Failed to cleanup temp video: {e}") |
|
|
| |
| result["video_id"] = video_id |
|
|
| return result |
|
|
| except Exception as e: |
| logger.error(f"Video frame processing failed: {e}") |
| return { |
| "text": "", |
| "video_id": video_id, |
| "source": "frames", |
| "success": False, |
| "error": f"Video processing failed: {str(e)}", |
| "frame_count": 0, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def youtube_analyze(url: str, mode: str = "transcript") -> Dict[str, Any]: |
| """ |
| Analyze YouTube video using transcript or frame processing mode. |
| |
| Transcript Mode: Extract transcript (youtube-transcript-api or Whisper) |
| Frame Mode: Extract frames and analyze with vision models |
| |
| Args: |
| url: YouTube video URL (youtube.com, youtu.be, shorts) |
| mode: Analysis mode - "transcript" (default) or "frames" |
| |
| Returns: |
| Dict with structure: { |
| "text": str, # Transcript or frame analyses |
| "video_id": str, # Video ID |
| "source": str, # "api", "whisper", or "frames" |
| "success": bool, # True if analysis succeeded |
| "error": str or None # Error message if failed |
| "frame_count": int # Number of frames (frame mode only) |
| } |
| |
| Raises: |
| ValueError: If URL is not valid or mode is invalid |
| |
| Examples: |
| >>> youtube_analyze("https://youtube.com/watch?v=dQw4w9WgXcQ", mode="transcript") |
| {"text": "Never gonna give you up...", "video_id": "dQw4w9WgXcQ", "source": "api", "success": True, "error": None} |
| |
| >>> youtube_analyze("https://youtube.com/watch?v=dQw4w9WgXcQ", mode="frames") |
| {"text": "[Frame 1 @ 0.00s]\nA man...", "video_id": "dQw4w9WgXcQ", "source": "frames", "success": True, "frame_count": 6, "error": None} |
| """ |
| |
| video_id = extract_video_id(url) |
|
|
| if not video_id: |
| logger.error(f"Invalid YouTube URL: {url}") |
| return { |
| "text": "", |
| "video_id": "", |
| "source": "none", |
| "success": False, |
| "error": f"Invalid YouTube URL: {url}" |
| } |
|
|
| |
| mode = mode.lower() |
| if mode not in ("transcript", "frames"): |
| logger.error(f"Invalid mode: {mode}") |
| return { |
| "text": "", |
| "video_id": video_id, |
| "source": "none", |
| "success": False, |
| "error": f"Invalid mode: {mode}. Valid: transcript, frames" |
| } |
|
|
| logger.info(f"Processing YouTube video: {video_id} (mode: {mode})") |
|
|
| |
| if mode == "frames": |
| |
| result = process_video_frames(url) |
| if result["success"]: |
| logger.info(f"Frame analysis complete: {result.get('frame_count', 0)} frames, {len(result['text'])} chars") |
| return result |
|
|
| else: |
| |
| result = get_youtube_transcript(video_id) |
|
|
| if result["success"]: |
| logger.info(f"Transcript retrieved via API: {len(result['text'])} characters") |
| logger.info(f"Transcript content: {result['text'][:200]}...") |
| return result |
|
|
| |
| logger.info(f"Transcript API failed, trying audio transcription...") |
| result = transcribe_from_audio(url) |
|
|
| if result["success"]: |
| logger.info(f"Transcript retrieved via Whisper: {len(result['text'])} characters") |
| logger.info(f"Full transcript: {result['text']}") |
| else: |
| logger.error(f"All transcript methods failed for video: {video_id}") |
|
|
| return result |
|
|
|
|
| |
| def youtube_transcript(url: str) -> Dict[str, Any]: |
| """ |
| Wrapper for youtube_analyze that respects YOUTUBE_MODE environment variable. |
| |
| This allows the agent to switch between transcript and frame modes |
| without changing the function signature used in the graph. |
| |
| Mode selection: |
| - YOUTUBE_MODE env variable (set by UI): "transcript" or "frames" |
| - Default: "transcript" (backward compatible) |
| |
| Args: |
| url: YouTube video URL |
| |
| Returns: |
| Dict with structure from youtube_analyze() |
| """ |
| |
| mode = os.getenv("YOUTUBE_MODE", "transcript").lower() |
|
|
| logger.info(f"youtube_transcript called with YOUTUBE_MODE={mode}") |
|
|
| return youtube_analyze(url, mode=mode) |
|
|