| from langchain_groq import ChatGroq |
| from langgraph.graph import StateGraph, START, END |
| from IPython.display import Image, display, Markdown |
| from typing_extensions import TypedDict |
| from langgraph.constants import Send |
| from langchain_core.messages import HumanMessage, SystemMessage |
| from langchain_community.tools.tavily_search import TavilySearchResults |
| import os |
| import getpass |
| from typing import Annotated, List, Dict, Any |
| import operator |
| from pydantic import BaseModel, Field |
| from datetime import datetime |
| import requests |
| from bs4 import BeautifulSoup |
| import re |
| import json |
| import gradio as gr |
| from langdetect import detect |
|
|
| |
| class NewsItem(BaseModel): |
| title: str = Field(description="Title of the AI news article") |
| url: str = Field(description="URL of the news article") |
| source: str = Field(description="Source website of the news") |
| description: str = Field(description="Brief description of the news article") |
|
|
| class NewsResults(BaseModel): |
| news_items: List[NewsItem] = Field(description="List of AI news articles found") |
|
|
| class Subsection(BaseModel): |
| title: str = Field(description="Title of the subsection (based on news item title)") |
| source: str = Field(description="Source of the news item") |
| url: str = Field(description="URL of the news item") |
| content: str = Field(description="Content for this subsection") |
|
|
| class Section(BaseModel): |
| name: str = Field(description="Name for this section of the blog") |
| description: str = Field(description="Description for this section of the blog") |
| information: str = Field(description="Information which should be included in this section of the blog") |
| subsections: List[Subsection] = Field(description="Subsections for each news item in this category", default=[]) |
|
|
| class Sections(BaseModel): |
| sections: List[Section] = Field(description="List of sections for this blog") |
|
|
| |
| class NewsState(TypedDict): |
| query: str |
| date: str |
| search_results: List[Dict[str, Any]] |
| news_items: List[Dict[str, Any]] |
| |
| class BlogState(TypedDict): |
| content: str |
| sections: List[Section] |
| completed_sections: Annotated[List, operator.add] |
| final_report: str |
|
|
| class WorkerState(TypedDict): |
| section: Section |
| completed_sections: Annotated[List, operator.add] |
|
|
| class ArticleScraperState(TypedDict): |
| url: str |
| article_content: str |
|
|
| |
| def is_english(text): |
| |
| if not text or len(text.strip()) < 50: |
| return False |
| |
| try: |
| |
| return detect(text) == 'en' |
| except: |
| |
| common_english_words = ['the', 'and', 'in', 'to', 'of', 'is', 'for', 'with', 'on', 'that', |
| 'this', 'are', 'was', 'be', 'have', 'it', 'not', 'they', 'by', 'from'] |
| text_lower = text.lower() |
| |
| english_word_count = sum(1 for word in common_english_words if f" {word} " in f" {text_lower} ") |
| |
| text_words = len(text_lower.split()) |
| if text_words == 0: |
| return False |
| |
| english_ratio = english_word_count / min(20, text_words) |
| return english_word_count >= 5 or english_ratio > 0.25 |
|
|
| |
| def search_ai_news(state: NewsState): |
| """Search for the latest AI news using Tavily""" |
| search_tool = TavilySearchResults(max_results=10) |
| |
| |
| today = state.get("date", datetime.now().strftime("%Y-%m-%d")) |
| |
| |
| query = f"latest artificial intelligence news {today} english" |
| |
| |
| search_results = search_tool.invoke({"query": query}) |
| |
| |
| filtered_results = [] |
| for result in search_results: |
| if "youtube.com" not in result.get("url", "").lower(): |
| |
| content = result.get("content", "") + " " + result.get("title", "") |
| if is_english(content): |
| filtered_results.append(result) |
| |
| return {"search_results": filtered_results} |
|
|
| def parse_news_items(state: NewsState): |
| """Parse search results into structured news items using a more robust approach""" |
| search_results = state["search_results"] |
| |
| |
| formatted_results = "\n\n".join([ |
| f"Title: {result.get('title', 'No title')}\n" |
| f"URL: {result.get('url', 'No URL')}\n" |
| f"Content: {result.get('content', 'No content')}" |
| for result in search_results |
| ]) |
| |
| |
| system_prompt = """ |
| Extract AI news articles from these search results. Filter out any that aren't about artificial intelligence. |
| |
| For each relevant AI news article, provide: |
| - title: The title of the article |
| - url: The URL of the article |
| - source: The source website of the news |
| - description: A brief description of the article |
| |
| Format your response as a JSON list of objects. Only include the relevant fields, nothing else. |
| Example format: |
| [ |
| { |
| "title": "New AI Development", |
| "url": "https://example.com/news/ai-dev", |
| "source": "Example News", |
| "description": "Description of the AI development" |
| } |
| ] |
| """ |
| |
| |
| response = llm.invoke([ |
| SystemMessage(content=system_prompt), |
| HumanMessage(content=f"Here are the search results:\n\n{formatted_results}") |
| ]) |
| |
| |
| response_text = response.content |
| |
| |
| json_match = re.search(r'\[\s*\{.*\}\s*\]', response_text, re.DOTALL) |
| |
| news_items = [] |
| if json_match: |
| try: |
| |
| news_items = json.loads(json_match.group(0)) |
| except json.JSONDecodeError: |
| |
| news_items = [{ |
| "title": "AI News Roundup", |
| "url": "https://example.com/ai-news", |
| "source": "Various Sources", |
| "description": "Compilation of latest AI news from various sources." |
| }] |
| else: |
| |
| news_items = [{ |
| "title": "AI News Roundup", |
| "url": "https://example.com/ai-news", |
| "source": "Various Sources", |
| "description": "Compilation of latest AI news from various sources." |
| }] |
| |
| return {"news_items": news_items} |
|
|
| |
| def scrape_article_content(state: ArticleScraperState): |
| """Scrape the content from a news article URL""" |
| url = state["url"] |
| |
| try: |
| headers = { |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
| } |
| response = requests.get(url, headers=headers, timeout=10) |
| response.raise_for_status() |
| |
| soup = BeautifulSoup(response.text, 'html.parser') |
| |
| |
| article_text = "" |
| |
| |
| article = soup.find('article') |
| if article: |
| paragraphs = article.find_all('p') |
| else: |
| |
| paragraphs = soup.find_all('p') |
| |
| |
| article_text = "\n\n".join([p.get_text().strip() for p in paragraphs]) |
| |
| |
| article_text = re.sub(r'\s+', ' ', article_text).strip() |
| |
| |
| if len(article_text) > 10000: |
| article_text = article_text[:10000] + "..." |
| |
| |
| if not is_english(article_text[:500]): |
| return {"article_content": "Content not in English or insufficient text to analyze."} |
| |
| return {"article_content": article_text} |
| |
| except Exception as e: |
| return {"article_content": f"Error scraping article: {str(e)}"} |
|
|
| |
| def orchestrator(state: BlogState): |
| """Orchestrator that generates a plan for the blog based on news items""" |
| |
| try: |
| |
| content_lines = state['content'].split('\n\n') |
| news_items = [] |
| current_item = {} |
| |
| for content_block in content_lines: |
| if content_block.startswith('TITLE:'): |
| |
| if current_item and 'title' in current_item: |
| news_items.append(current_item) |
| current_item = {} |
| |
| lines = content_block.split('\n') |
| for line in lines: |
| if line.startswith('TITLE:'): |
| current_item['title'] = line.replace('TITLE:', '').strip() |
| elif line.startswith('SOURCE:'): |
| current_item['source'] = line.replace('SOURCE:', '').strip() |
| elif line.startswith('URL:'): |
| current_item['url'] = line.replace('URL:', '').strip() |
| elif line.startswith('DESCRIPTION:'): |
| current_item['description'] = line.replace('DESCRIPTION:', '').strip() |
| elif line.startswith('CONTENT:'): |
| current_item['content'] = line.replace('CONTENT:', '').strip() |
| elif 'content' in current_item: |
| |
| current_item['content'] += ' ' + content_block |
| |
| |
| if current_item and 'title' in current_item: |
| news_items.append(current_item) |
| |
| |
| ai_tech_items = [] |
| ai_business_items = [] |
| ai_research_items = [] |
| |
| for item in news_items: |
| title = item.get('title', '').lower() |
| description = item.get('description', '').lower() |
| |
| |
| if any(kw in title + description for kw in ['business', 'market', 'company', 'investment', 'startup']): |
| ai_business_items.append(item) |
| elif any(kw in title + description for kw in ['research', 'study', 'paper', 'university']): |
| ai_research_items.append(item) |
| else: |
| ai_tech_items.append(item) |
| |
| |
| sections = [] |
| |
| |
| if ai_tech_items: |
| tech_subsections = [ |
| Subsection( |
| title=item['title'], |
| source=item['source'], |
| url=item['url'], |
| content=f"{item.get('description', '')} {item.get('content', '')[:500]}..." |
| ) for item in ai_tech_items |
| ] |
| |
| sections.append(Section( |
| name="AI Technology Developments", |
| description="Recent advancements in AI technology and applications", |
| information="Cover the latest developments in AI technology.", |
| subsections=tech_subsections |
| )) |
| |
| |
| if ai_business_items: |
| business_subsections = [ |
| Subsection( |
| title=item['title'], |
| source=item['source'], |
| url=item['url'], |
| content=f"{item.get('description', '')} {item.get('content', '')[:500]}..." |
| ) for item in ai_business_items |
| ] |
| |
| sections.append(Section( |
| name="AI in Business", |
| description="How AI is transforming industries and markets", |
| information="Focus on business applications and market trends in AI.", |
| subsections=business_subsections |
| )) |
| |
| |
| if ai_research_items: |
| research_subsections = [ |
| Subsection( |
| title=item['title'], |
| source=item['source'], |
| url=item['url'], |
| content=f"{item.get('description', '')} {item.get('content', '')[:500]}..." |
| ) for item in ai_research_items |
| ] |
| |
| sections.append(Section( |
| name="AI Research and Studies", |
| description="Latest research findings and academic work in AI", |
| information="Cover recent research papers and studies in AI.", |
| subsections=research_subsections |
| )) |
| |
| |
| if not sections: |
| general_subsections = [ |
| Subsection( |
| title=item['title'], |
| source=item['source'], |
| url=item['url'], |
| content=f"{item.get('description', '')} {item.get('content', '')[:500]}..." |
| ) for item in news_items |
| ] |
| |
| sections.append(Section( |
| name="Latest AI News", |
| description="Roundup of the latest AI news from around the web", |
| information="Cover a range of AI news topics.", |
| subsections=general_subsections |
| )) |
| |
| return {"sections": sections} |
| except Exception as e: |
| print(f"Error in orchestrator: {str(e)}") |
| |
| fallback_sections = [ |
| Section( |
| name="Latest AI Developments", |
| description="Overview of recent AI advancements and research", |
| information="Summarize the latest AI developments from the provided content.", |
| subsections=[] |
| ) |
| ] |
| return {"sections": fallback_sections} |
|
|
| def llm_call(state: WorkerState): |
| """Worker writes a section of the blog with subsections for each news item""" |
| |
| section = state['section'] |
| |
| |
| section_header = f"## {section.name}\n\n{section.description}\n" |
| |
| |
| subsections_content = "" |
| if section.subsections: |
| for idx, subsection in enumerate(section.subsections): |
| |
| subsection_prompt = f""" |
| Write a detailed subsection about this AI news item: |
| Title: {subsection.title} |
| Source: {subsection.source} |
| URL: {subsection.url} |
| |
| Content to summarize and expand on: |
| {subsection.content} |
| |
| Keep your response focused on the news item and make it engaging. Use markdown formatting. |
| """ |
| |
| subsection_content = llm.invoke([ |
| SystemMessage(content="You are writing a subsection for an AI news blog. Write in a professional but engaging style. Include key details and insights. Use markdown formatting."), |
| HumanMessage(content=subsection_prompt) |
| ]) |
| |
| |
| formatted_subsection = f"### {subsection.title}\n\n" |
| formatted_subsection += f"*Source: [{subsection.source}]({subsection.url})*\n\n" |
| formatted_subsection += subsection_content.content |
| |
| subsections_content += formatted_subsection + "\n\n" |
| else: |
| |
| section_content = llm.invoke([ |
| SystemMessage(content="Write a blog section following the provided name, description, and information. Include no preamble. Use markdown formatting."), |
| HumanMessage(content=f"Here is the section name: {section.name}\nDescription: {section.description}\nInformation: {section.information}") |
| ]) |
| subsections_content = section_content.content |
| |
| |
| complete_section = section_header + subsections_content |
| |
| |
| return {"completed_sections": [complete_section]} |
|
|
| def synthesizer(state: BlogState): |
| """Synthesize full blog from sections with proper formatting and hierarchical TOC""" |
| |
| |
| completed_sections = state["completed_sections"] |
| |
| |
| completed_report = "\n\n".join(completed_sections) |
| |
| |
| today = datetime.now().strftime("%Y-%m-%d") |
| blog_title = f"# AI News Roundup - {today}" |
| |
| |
| intro = llm.invoke([ |
| SystemMessage(content="Write a brief introduction for an AI news roundup blog post. Keep it under 100 words. Be engaging and professional."), |
| HumanMessage(content=f"Today's date is {today}. Write a brief introduction for an AI news roundup.") |
| ]) |
| |
| |
| table_of_contents = "## Table of Contents\n\n" |
| |
| |
| section_matches = re.findall(r'## ([^\n]+)', completed_report) |
| |
| for i, section_name in enumerate(section_matches, 1): |
| |
| |
| section_anchor = section_name.lower().replace(' ', '-') |
| table_of_contents += f"{i}. [{section_name}](#{section_anchor})\n" |
| |
| |
| section_start = completed_report.find(f"## {section_name}") |
| next_section_match = re.search(r'## ', completed_report[section_start+1:]) |
| if next_section_match: |
| section_end = section_start + 1 + next_section_match.start() |
| section_text = completed_report[section_start:section_end] |
| else: |
| section_text = completed_report[section_start:] |
| |
| |
| subsection_matches = re.findall(r'### ([^\n]+)', section_text) |
| |
| for j, subsection_name in enumerate(subsection_matches, 1): |
| |
| subsection_anchor = subsection_name.lower().replace(' ', '-').replace(':', '').replace('?', '').replace('!', '').replace('.', '') |
| |
| table_of_contents += f" {i}.{j}. [{subsection_name}](#{subsection_anchor})\n" |
| |
| final_report = f"{blog_title}\n\n{intro.content}\n\n{table_of_contents}\n\n---\n\n{completed_report}\n\n---\n\n*This AI News Roundup was automatically generated on {today}.*" |
| |
| return {"final_report": final_report} |
|
|
| |
| def assign_workers(state: BlogState): |
| """Assign a worker to each section in the plan""" |
| |
| |
| return [Send("llm_call", {"section": s}) for s in state["sections"]] |
|
|
| |
| def create_news_search_workflow(): |
| """Create a workflow for searching and parsing AI news""" |
| workflow = StateGraph(NewsState) |
| |
| |
| workflow.add_node("search_ai_news", search_ai_news) |
| workflow.add_node("parse_news_items", parse_news_items) |
| |
| |
| workflow.add_edge(START, "search_ai_news") |
| workflow.add_edge("search_ai_news", "parse_news_items") |
| workflow.add_edge("parse_news_items", END) |
| |
| return workflow.compile() |
|
|
| def create_article_scraper_workflow(): |
| """Create a workflow for scraping article content""" |
| workflow = StateGraph(ArticleScraperState) |
| |
| |
| workflow.add_node("scrape_article", scrape_article_content) |
| |
| |
| workflow.add_edge(START, "scrape_article") |
| workflow.add_edge("scrape_article", END) |
| |
| return workflow.compile() |
|
|
| def create_blog_generator_workflow(): |
| """Create a workflow for generating the blog""" |
| workflow = StateGraph(BlogState) |
| |
| |
| workflow.add_node("orchestrator", orchestrator) |
| workflow.add_node("llm_call", llm_call) |
| workflow.add_node("synthesizer", synthesizer) |
| |
| |
| workflow.add_edge(START, "orchestrator") |
| workflow.add_conditional_edges("orchestrator", assign_workers, ["llm_call"]) |
| workflow.add_edge("llm_call", "synthesizer") |
| workflow.add_edge("synthesizer", END) |
| |
| return workflow.compile() |
|
|
| def generate_ai_news_blog(groq_api_key=None, tavily_api_key=None, date=None): |
| """Main function to generate AI news blog""" |
| |
| if groq_api_key: |
| os.environ["GROQ_API_KEY"] = groq_api_key |
| if tavily_api_key: |
| os.environ["TAVILY_API_KEY"] = tavily_api_key |
| |
| |
| global llm |
| llm = ChatGroq(model="qwen-2.5-32b") |
| |
| |
| if not date: |
| today = datetime.now().strftime("%Y-%m-%d") |
| else: |
| today = date |
| |
| |
| news_search = create_news_search_workflow() |
| news_results = news_search.invoke({"query": "latest artificial intelligence news", "date": today}) |
| |
| print(f"Found {len(news_results['news_items'])} AI news items") |
| |
| |
| article_scraper = create_article_scraper_workflow() |
| news_contents = [] |
| |
| for item in news_results["news_items"]: |
| print(f"Scraping: {item['title']} from {item['source']}") |
| result = article_scraper.invoke({"url": item['url']}) |
| |
| |
| if "not in English" in result["article_content"]: |
| print(f"Skipping non-English content: {item['title']}") |
| continue |
| |
| news_contents.append({ |
| "title": item['title'], |
| "url": item['url'], |
| "source": item['source'], |
| "description": item['description'], |
| "content": result["article_content"] |
| }) |
| |
| |
| if not news_contents: |
| return "No English language AI news items found for the specified date. Please try a different date." |
| |
| |
| formatted_content = "\n\n".join([ |
| f"TITLE: {item['title']}\nSOURCE: {item['source']}\nURL: {item['url']}\nDESCRIPTION: {item['description']}\nCONTENT: {item['content'][:2000]}..." |
| for item in news_contents |
| ]) |
| |
| |
| blog_generator = create_blog_generator_workflow() |
| blog_result = blog_generator.invoke({ |
| "content": formatted_content, |
| "completed_sections": [] |
| }) |
| |
| return blog_result["final_report"] |
|
|
| |
| def create_gradio_interface(): |
| """Create a Gradio interface for the AI News Blog Generator""" |
| |
| def run_generation(groq_key, tavily_key, selected_date): |
| if not groq_key or not tavily_key: |
| return "Please provide both API keys." |
| |
| try: |
| result = generate_ai_news_blog(groq_key, tavily_key, selected_date) |
| return result |
| except Exception as e: |
| return f"Error generating blog: {str(e)}" |
| |
| |
| with gr.Blocks(title="AI News Blog Generator") as demo: |
| gr.Markdown("# AI News Blog Generator") |
| gr.Markdown("Generate a daily roundup of AI news articles, categorized by topic.") |
| |
| with gr.Row(): |
| with gr.Column(): |
| groq_key = gr.Textbox(label="Groq API Key", placeholder="Enter your Groq API key", type="password") |
| tavily_key = gr.Textbox(label="Tavily API Key", placeholder="Enter your Tavily API key", type="password") |
| date_picker = gr.Textbox(label="Date (YYYY-MM-DD)", placeholder="Leave empty for today's date", |
| value=datetime.now().strftime("%Y-%m-%d")) |
| with gr.Row(): |
| generate_button = gr.Button("Generate AI News Blog", variant="primary") |
| clear_button = gr.Button("Clear Output") |
| |
| with gr.Column(): |
| status_text = gr.Textbox(label="Status", placeholder="Ready to generate", interactive=False) |
| output_md = gr.Markdown("Your AI News Blog will appear here.") |
| |
| |
| generate_button.click( |
| fn=lambda: "Generating AI News Blog... This may take several minutes.", |
| inputs=None, |
| outputs=status_text, |
| queue=False |
| ).then( |
| fn=run_generation, |
| inputs=[groq_key, tavily_key, date_picker], |
| outputs=output_md |
| ).then( |
| fn=lambda: "Blog generation complete!", |
| inputs=None, |
| outputs=status_text |
| ) |
| |
| |
| clear_button.click( |
| fn=lambda: ("Ready to generate", ""), |
| inputs=None, |
| outputs=[status_text, output_md] |
| ) |
| |
| return demo |
|
|
| |
| if __name__ == "__main__": |
| try: |
| |
| demo = create_gradio_interface() |
| demo.launch() |
| |
| except Exception as e: |
| print(f"Error running the pipeline: {str(e)}") |