| import gradio as gr |
| import tempfile |
| import os |
| import yt_dlp |
| import re |
| from pathlib import Path |
| import google.generativeai as genai |
| from google.generativeai import upload_file, get_file |
| import time |
|
|
| |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") |
| if GOOGLE_API_KEY: |
| genai.configure(api_key=GOOGLE_API_KEY) |
| else: |
| raise ValueError("GOOGLE_API_KEY environment variable not set") |
|
|
| def is_valid_url(url): |
| """Check if URL is from supported platforms""" |
| patterns = [ |
| r'(https?://)?(www\.)?(youtube\.com|youtu\.be)', |
| r'(https?://)?(www\.)?(instagram\.com|instagr\.am)', |
| r'(https?://)?(www\.)?(tiktok\.com)', |
| r'(https?://)?(www\.)?(twitter\.com|x\.com)', |
| ] |
| |
| for pattern in patterns: |
| if re.search(pattern, url, re.IGNORECASE): |
| return True |
| return False |
|
|
| def get_video_info(url): |
| """Get video information without downloading""" |
| try: |
| |
| if 'youtube.com' in url or 'youtu.be' in url: |
| video_id = None |
| if 'youtu.be/' in url: |
| video_id = url.split('youtu.be/')[-1].split('?')[0] |
| elif 'watch?v=' in url: |
| video_id = url.split('watch?v=')[-1].split('&')[0] |
| |
| if video_id: |
| |
| import requests |
| oembed_url = f"https://www.youtube.com/oembed?url=https://www.youtube.com/watch?v={video_id}&format=json" |
| response = requests.get(oembed_url, timeout=10) |
| if response.status_code == 200: |
| data = response.json() |
| return { |
| 'title': data.get('title', 'Unknown Video'), |
| 'author': data.get('author_name', 'Unknown Author'), |
| 'thumbnail': data.get('thumbnail_url', ''), |
| } |
| except Exception as e: |
| print(f"Could not get video info: {e}") |
| |
| return None |
|
|
| def analyze_video_from_url(url, query): |
| """Analyze video based on URL and metadata instead of downloading""" |
| |
| video_info = get_video_info(url) |
| |
| |
| context = f"Video URL: {url}\n" |
| if video_info: |
| context += f"Title: {video_info['title']}\n" |
| context += f"Author: {video_info['author']}\n" |
| |
| |
| try: |
| model = genai.GenerativeModel('gemini-2.0-flash-exp') |
| |
| prompt = f""" |
| I have a video with the following information: |
| {context} |
| |
| User question: {query} |
| |
| Based on the video URL and available metadata, please provide helpful analysis and insights. |
| Since I cannot directly access the video content, please: |
| |
| 1. Analyze what type of content this might be based on the URL and title |
| 2. Provide general guidance about analyzing this type of video |
| 3. Suggest what insights could typically be extracted |
| 4. Give relevant advice based on the platform (YouTube, Instagram, etc.) |
| |
| Be helpful and informative while acknowledging the limitations of not having direct video access. |
| If this appears to be a specific type of content (tutorial, entertainment, news, etc.), |
| provide relevant analysis frameworks and questions that would be useful. |
| |
| Be conversational and engaging. |
| """ |
| |
| response = model.generate_content(prompt) |
| return response.text |
| |
| except Exception as e: |
| raise gr.Error(f"AI analysis failed: {str(e)}") |
|
|
| def download_video(url, progress=gr.Progress()): |
| """Download video from URL using yt-dlp with better error handling""" |
| if not is_valid_url(url): |
| raise gr.Error("Please enter a valid YouTube, Instagram, TikTok, or X video URL") |
| |
| progress(0.1, desc="Starting download...") |
| |
| |
| temp_video = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') |
| temp_video.close() |
|
|
| |
| ydl_opts = { |
| 'outtmpl': temp_video.name, |
| 'format': 'best[filesize<50M]/worst[filesize<50M]/best', |
| 'quiet': True, |
| 'no_warnings': True, |
| 'nooverwrites': False, |
| 'user_agent': 'Mozilla/5.0 (compatible; GradioApp/1.0)', |
| 'retries': 1, |
| 'fragment_retries': 1, |
| 'extractor_retries': 1, |
| 'socket_timeout': 15, |
| 'nocheckcertificate': True, |
| 'prefer_insecure': True, |
| } |
|
|
| progress(0.3, desc="Downloading video...") |
| |
| try: |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| ydl.download([url]) |
|
|
| |
| if os.path.exists(temp_video.name) and os.path.getsize(temp_video.name) > 0: |
| file_size_mb = os.path.getsize(temp_video.name) / (1024 * 1024) |
| |
| if file_size_mb > 50: |
| os.unlink(temp_video.name) |
| raise gr.Error(f"Video too large ({file_size_mb:.1f}MB). Please use a smaller video.") |
| |
| progress(0.7, desc="Processing video...") |
| return temp_video.name |
| else: |
| raise gr.Error("Downloaded file is empty or doesn't exist") |
|
|
| except Exception as e: |
| |
| if os.path.exists(temp_video.name): |
| try: |
| os.unlink(temp_video.name) |
| except: |
| pass |
| |
| error_msg = str(e) |
| |
| |
| if "Failed to resolve" in error_msg or "No address associated with hostname" in error_msg: |
| raise gr.Error("🌐 Network connectivity issue. Hugging Face Spaces may have restricted access to this platform. Try a different video or platform.") |
| elif "403" in error_msg or "Forbidden" in error_msg: |
| raise gr.Error("🚫 Video download was blocked by the platform. Try a different video.") |
| elif "404" in error_msg or "not found" in error_msg.lower(): |
| raise gr.Error("📹 Video not found. Please check the URL.") |
| elif "timeout" in error_msg.lower(): |
| raise gr.Error("⏱️ Download timed out. Try a shorter video.") |
| else: |
| |
| print(f"Download failed, falling back to URL analysis: {error_msg}") |
| raise Exception("download_failed") |
|
|
| def analyze_video_with_ai(video_path, query, progress=gr.Progress()): |
| """Analyze video using Google Gemini AI""" |
| if not video_path or not os.path.exists(video_path): |
| raise gr.Error("No video file found") |
| |
| progress(0.1, desc="Uploading video to AI...") |
| |
| try: |
| |
| processed_video = upload_file(video_path) |
| |
| progress(0.5, desc="Processing video...") |
| |
| |
| while processed_video.state.name == "PROCESSING": |
| time.sleep(2) |
| processed_video = get_file(processed_video.name) |
| |
| if processed_video.state.name == "FAILED": |
| raise gr.Error("Video processing failed") |
| |
| progress(0.8, desc="Generating response...") |
| |
| |
| model = genai.GenerativeModel('gemini-2.0-flash-exp') |
| |
| prompt = f""" |
| Analyze this video and respond to the user's question: {query} |
| |
| Provide a comprehensive, insightful response that includes: |
| 1. Direct analysis of the video content |
| 2. Key insights and observations |
| 3. Specific details from what you can see/hear |
| 4. Actionable takeaways if relevant |
| |
| Be conversational and engaging while being thorough and accurate. |
| """ |
| |
| response = model.generate_content([processed_video, prompt]) |
| |
| progress(1.0, desc="Complete!") |
| |
| return response.text |
| |
| except Exception as e: |
| raise gr.Error(f"AI analysis failed: {str(e)}") |
| |
| finally: |
| |
| try: |
| if video_path and os.path.exists(video_path): |
| os.unlink(video_path) |
| except: |
| pass |
|
|
| def process_video_and_chat(url, query): |
| """Main function to download video and get AI response with fallback""" |
| if not url.strip(): |
| raise gr.Error("Please enter a video URL") |
| |
| if not query.strip(): |
| raise gr.Error("Please enter a question about the video") |
| |
| progress = gr.Progress() |
| progress(0.0, desc="Starting...") |
| |
| try: |
| |
| video_path = download_video(url, progress) |
| |
| |
| response = analyze_video_with_ai(video_path, query, progress) |
| |
| return response |
| |
| except Exception as e: |
| |
| if str(e) == "download_failed" or "Network connectivity" in str(e) or "Failed to resolve" in str(e): |
| progress(0.5, desc="Switching to URL-based analysis...") |
| |
| |
| fallback_notice = "🔄 **Note**: Direct video download failed due to network restrictions, so I'm providing analysis based on the video URL and metadata.\n\n" |
| |
| try: |
| response = analyze_video_from_url(url, query) |
| return fallback_notice + response |
| except Exception as fallback_error: |
| raise gr.Error(f"Both video download and URL analysis failed: {str(fallback_error)}") |
| else: |
| |
| raise e |
|
|
| |
| def create_interface(): |
| with gr.Blocks( |
| title="The Plug - AI Video Analyzer", |
| theme=gr.themes.Soft(), |
| css=""" |
| .gradio-container { |
| max-width: 800px !important; |
| margin: auto !important; |
| } |
| .header { |
| text-align: center; |
| margin-bottom: 2rem; |
| } |
| .footer { |
| text-align: center; |
| margin-top: 2rem; |
| color: #666; |
| } |
| """ |
| ) as demo: |
| |
| |
| gr.HTML(""" |
| <div class="header"> |
| <h1>🎥 The Plug - AI Video Analyzer</h1> |
| <p>Analyze videos from YouTube, Instagram, TikTok, and X with AI</p> |
| <div style="background: #f0f0f0; padding: 10px; border-radius: 8px; margin: 10px 0;"> |
| <small><strong>🔄 Smart Fallback:</strong> If video download fails due to network restrictions, |
| the app will analyze based on video metadata and URL instead.</small> |
| </div> |
| </div> |
| """) |
| |
| with gr.Row(): |
| with gr.Column(): |
| |
| video_url = gr.Textbox( |
| label="Video URL", |
| placeholder="https://youtube.com/watch?v=... or Instagram/TikTok/X link", |
| lines=1, |
| info="Paste a video URL from YouTube, Instagram, TikTok, or X" |
| ) |
| |
| |
| query = gr.Textbox( |
| label="Your Question", |
| placeholder="What is the main topic? Summarize the key points...", |
| lines=3, |
| info="Ask anything about the video content" |
| ) |
| |
| |
| submit_btn = gr.Button("🚀 Analyze Video", variant="primary", size="lg") |
| |
| with gr.Row(): |
| with gr.Column(): |
| |
| response = gr.Textbox( |
| label="AI Analysis", |
| lines=15, |
| max_lines=25, |
| interactive=False, |
| show_copy_button=True |
| ) |
| |
| |
| gr.Examples( |
| examples=[ |
| ["https://www.youtube.com/watch?v=dQw4w9WgXcQ", "What is this video about?"], |
| ["", "Summarize the key points mentioned"], |
| ["", "What are the main takeaways?"], |
| ["", "Who is the target audience?"], |
| ], |
| inputs=[video_url, query], |
| label="Example Questions" |
| ) |
| |
| |
| gr.HTML(""" |
| <div class="footer"> |
| <p>Built with ❤️ using Gradio and Google Gemini AI</p> |
| <p>Supports YouTube, Instagram, TikTok, and X video URLs</p> |
| </div> |
| """) |
| |
| |
| submit_btn.click( |
| fn=process_video_and_chat, |
| inputs=[video_url, query], |
| outputs=response, |
| show_progress=True |
| ) |
| |
| |
| query.submit( |
| fn=process_video_and_chat, |
| inputs=[video_url, query], |
| outputs=response, |
| show_progress=True |
| ) |
| |
| return demo |
|
|
| |
| if __name__ == "__main__": |
| demo = create_interface() |
| demo.launch( |
| share=True, |
| server_name="0.0.0.0", |
| server_port=7860, |
| show_error=True |
| ) |
|
|