| from fastapi import FastAPI, HTTPException, Query |
| from typing import List, Optional |
| from pydantic import BaseModel |
| from time import sleep |
| from curl_cffi.requests import Session |
| from urllib.parse import urlencode, unquote, urlparse, parse_qs |
| import base64 |
| from typing import Dict, Any |
| from concurrent.futures import ThreadPoolExecutor |
| from webscout.litagent import LitAgent |
| from bs4 import BeautifulSoup |
| import json |
|
|
| app = FastAPI( |
| title="Snapzion Search API", |
| description="A FastAPI wrapper for the Search library with advanced features.", |
| version="1.0.0", |
| ) |
|
|
| |
| |
|
|
| class BingSearchResult(BaseModel): |
| url: str |
| title: str |
| description: str |
| metadata: Dict[str, Any] = {} |
|
|
| class BingImageResult(BaseModel): |
| title: str |
| image: str |
| thumbnail: str |
| url: str |
| source: str |
|
|
| class BingNewsResult(BaseModel): |
| title: str |
| url: str |
| description: str |
| source: str = "" |
|
|
| class BingSearch: |
| """Bing search implementation with configurable parameters and advanced features.""" |
| _executor: ThreadPoolExecutor = ThreadPoolExecutor() |
|
|
| def __init__( |
| self, |
| timeout: int = 10, |
| proxies: Optional[Dict[str, str]] = None, |
| verify: bool = True, |
| lang: str = "en-US", |
| sleep_interval: float = 0.0, |
| impersonate: str = "chrome110" |
| ): |
| self.timeout = timeout |
| self.proxies = proxies if proxies else {} |
| self.verify = verify |
| self.lang = lang |
| self.sleep_interval = sleep_interval |
| self._base_url = "https://www.bing.com" |
| self.session = Session( |
| proxies=self.proxies, |
| verify=self.verify, |
| timeout=self.timeout, |
| impersonate=impersonate |
| ) |
| |
| self.session.headers.update({ |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36" |
| }) |
|
|
| |
| def _selectors(self, element): |
| selectors = { |
| 'links': 'ol#b_results > li', |
| 'next': 'a.sb_pagN' |
| } |
| return selectors[element] |
|
|
| def _first_page(self, query): |
| url = f'{self._base_url}/search?q={query}&search=&form=QBLH' |
| return {'url': url, 'data': None} |
|
|
| def _next_page(self, soup): |
| selector = self._selectors('next') |
| next_page_tag = soup.select_one(selector) |
| url = None |
| if next_page_tag and next_page_tag.get('href'): |
| url = self._base_url + next_page_tag['href'] |
| return {'url': url, 'data': None} |
|
|
| def _get_url(self, tag): |
| url = tag.get('href', '') |
| resp = url |
| try: |
| parsed_url = urlparse(url) |
| query_params = parse_qs(parsed_url.query) |
| if "u" in query_params: |
| encoded_url = query_params["u"][0][2:] |
| try: |
| decoded_bytes = base64.urlsafe_b64decode(encoded_url + '===') |
| except base64.binascii.Error as e: |
| print(f"Error decoding Base64 string: {e}") |
| return url |
| resp = decoded_bytes.decode('utf-8') |
| except Exception as e: |
| print(f"Error decoding Base64 string: {e}") |
| return resp |
|
|
| |
| def text( |
| self, |
| keywords: str, |
| region: str = None, |
| safesearch: str = "moderate", |
| max_results: int = 10, |
| unique: bool = True |
| ) -> List[BingSearchResult]: |
| if not keywords: |
| raise ValueError("Search keywords cannot be empty") |
|
|
| fetched_results = [] |
| fetched_links = set() |
|
|
| def fetch_page(url): |
| try: |
| resp = self.session.get(url) |
| resp.raise_for_status() |
| return resp.text |
| except Exception as e: |
| raise Exception(f"Bing search failed: {str(e)}") |
|
|
| current_url = self._first_page(keywords)['url'] |
| |
| while current_url and len(fetched_results) < max_results: |
| html = fetch_page(current_url) |
| soup = BeautifulSoup(html, "html.parser") |
| |
| |
| result_blocks = soup.select(self._selectors('links')) |
| |
| for result in result_blocks: |
| |
| title_tag = result.find('h2') |
| if not title_tag: |
| continue |
| |
| link_tag = title_tag.find('a') |
| if not link_tag or not link_tag.has_attr('href'): |
| continue |
| |
| url_val = self._get_url(link_tag) |
| title = title_tag.get_text(strip=True) |
|
|
| |
| desc_container = result.find('div', class_='b_caption') |
| description = '' |
| if desc_container: |
| |
| desc_p = desc_container.find('p') |
| if desc_p: |
| description = desc_p.get_text(strip=True) |
| else: |
| description = desc_container.get_text(strip=True) |
| |
| |
| if not description: |
| p_tag = result.find('p') |
| if p_tag: |
| description = p_tag.get_text(strip=True) |
|
|
| if url_val and title: |
| if unique and url_val in fetched_links: |
| continue |
| |
| fetched_results.append(BingSearchResult(url=url_val, title=title, description=description)) |
| fetched_links.add(url_val) |
| |
| if len(fetched_results) >= max_results: |
| break |
| |
| if len(fetched_results) >= max_results: |
| break |
|
|
| |
| next_page_info = self._next_page(soup) |
| current_url = next_page_info['url'] |
| if current_url: |
| sleep(self.sleep_interval) |
|
|
| return fetched_results[:max_results] |
|
|
|
|
| def suggestions(self, query: str, region: str = None) -> List[str]: |
| if not query: |
| raise ValueError("Search query cannot be empty") |
| params = { |
| "query": query, |
| "mkt": region if region else "en-US" |
| } |
| url = f"https://api.bing.com/osjson.aspx?{urlencode(params)}" |
| try: |
| resp = self.session.get(url) |
| resp.raise_for_status() |
| data = resp.json() |
| if isinstance(data, list) and len(data) > 1 and isinstance(data[1], list): |
| return data[1] |
| return [] |
| except Exception as e: |
| if hasattr(e, 'response') and e.response is not None: |
| raise Exception(f"Bing suggestions failed with status {e.response.status_code}: {str(e)}") |
| else: |
| raise Exception(f"Bing suggestions failed: {str(e)}") |
|
|
| def images( |
| self, |
| keywords: str, |
| region: str = None, |
| safesearch: str = "moderate", |
| max_results: int = 10 |
| ) -> List[BingImageResult]: |
| if not keywords: |
| raise ValueError("Search keywords cannot be empty") |
| safe_map = { |
| "on": "Strict", |
| "moderate": "Moderate", |
| "off": "Off" |
| } |
| safe = safe_map.get(safesearch.lower(), "Moderate") |
| params = { |
| "q": keywords, |
| "count": max_results, |
| "setlang": self.lang, |
| "safeSearch": safe, |
| } |
| if region: |
| params["mkt"] = region |
| url = f"{self._base_url}/images/search?{urlencode(params)}" |
| try: |
| resp = self.session.get(url) |
| resp.raise_for_status() |
| html = resp.text |
| except Exception as e: |
| if hasattr(e, 'response') and e.response is not None: |
| raise Exception(f"Bing image search failed with status {e.response.status_code}: {str(e)}") |
| else: |
| raise Exception(f"Bing image search failed: {str(e)}") |
| soup = BeautifulSoup(html, "html.parser") |
| results = [] |
| for item in soup.select("a.iusc"): |
| try: |
| m = item.get("m") |
| meta = json.loads(m) if m else {} |
| image_url = meta.get("murl", "") |
| thumb_url = meta.get("turl", "") |
| title = meta.get("t", "") |
| page_url = meta.get("purl", "") |
| source = meta.get("surl", "") |
| if image_url: |
| results.append(BingImageResult(title=title, image=image_url, thumbnail=thumb_url, url=page_url, source=source)) |
| if len(results) >= max_results: |
| break |
| except Exception: |
| continue |
| return results[:max_results] |
|
|
| def news( |
| self, |
| keywords: str, |
| region: str = None, |
| safesearch: str = "moderate", |
| max_results: int = 10, |
| ) -> List['BingNewsResult']: |
| if not keywords: |
| raise ValueError("Search keywords cannot be empty") |
| safe_map = { |
| "on": "Strict", |
| "moderate": "Moderate", |
| "off": "Off" |
| } |
| safe = safe_map.get(safesearch.lower(), "Moderate") |
| params = { |
| "q": keywords, |
| "form": "QBNH", |
| "safeSearch": safe, |
| } |
| if region: |
| params["mkt"] = region |
| url = f"{self._base_url}/news/search?{urlencode(params)}" |
| try: |
| resp = self.session.get(url) |
| resp.raise_for_status() |
| except Exception as e: |
| if hasattr(e, 'response') and e.response is not None: |
| raise Exception(f"Bing news search failed with status {e.response.status_code}: {str(e)}") |
| else: |
| raise Exception(f"Bing news search failed: {str(e)}") |
| soup = BeautifulSoup(resp.text, "html.parser") |
| results = [] |
| for item in soup.select("div.news-card, div.card, div.newsitem, div.card-content, div.t_s_main"): |
| a_tag = item.find("a") |
| title = a_tag.get_text(strip=True) if a_tag else '' |
| url_val = a_tag['href'] if a_tag and a_tag.has_attr('href') else '' |
| desc_tag = item.find("div", class_="snippet") or item.find("div", class_="news-card-snippet") or item.find("div", class_="snippetText") |
| description = desc_tag.get_text(strip=True) if desc_tag else '' |
| source_tag = item.find("div", class_="source") |
| source = source_tag.get_text(strip=True) if source_tag else '' |
| if url_val and title: |
| results.append(BingNewsResult(title=title, url=url_val, description=description, source=source)) |
| if len(results) >= max_results: |
| break |
| if not results: |
| for item in soup.select("a.title"): |
| title = item.get_text(strip=True) |
| url_val = item['href'] if item.has_attr('href') else '' |
| description = '' |
| source = '' |
| if url_val and title: |
| results.append(BingNewsResult(title=title, url=url_val, description=description, source=source)) |
| if len(results) >= max_results: |
| break |
| return results[:max_results] |
|
|
|
|
| bing = BingSearch() |
|
|
| @app.get("/search", response_model=List[BingSearchResult]) |
| async def text_search( |
| query: str = Query(..., description="The search keywords."), |
| region: Optional[str] = Query(None, description="The region for the search (e.g., 'us-US')."), |
| safesearch: str = Query("moderate", description="Safe search level ('on', 'moderate', 'off')."), |
| max_results: int = Query(10, description="Maximum number of results to return."), |
| ): |
| """ |
| Perform a text search on Bing. |
| """ |
| try: |
| results = bing.text( |
| keywords=query, |
| region=region, |
| safesearch=safesearch, |
| max_results=max_results, |
| ) |
| return results |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/suggestions", response_model=List[str]) |
| async def get_suggestions( |
| query: str = Query(..., description="The search query for which to fetch suggestions."), |
| region: Optional[str] = Query(None, description="The region for the suggestions (e.g., 'en-US')."), |
| ): |
| """ |
| Fetches search suggestions for a given query. |
| """ |
| try: |
| suggestions = bing.suggestions(query=query, region=region) |
| return suggestions |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/images", response_model=List[BingImageResult]) |
| async def image_search( |
| query: str = Query(..., description="The search keywords for images."), |
| region: Optional[str] = Query(None, description="The region for the image search (e.g., 'us-US')."), |
| safesearch: str = Query("moderate", description="Safe search level ('on', 'moderate', 'off')."), |
| max_results: int = Query(10, description="Maximum number of image results to return."), |
| ): |
| """ |
| Perform an image search on Bing. |
| """ |
| try: |
| results = bing.images( |
| keywords=query, |
| region=region, |
| safesearch=safesearch, |
| max_results=max_results, |
| ) |
| return results |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/news", response_model=List[BingNewsResult]) |
| async def news_search( |
| query: str = Query(..., description="The search keywords for news."), |
| region: Optional[str] = Query(None, description="The region for the news search (e.g., 'us-US')."), |
| safesearch: str = Query("moderate", description="Safe search level ('on', 'moderate', 'off')."), |
| max_results: int = Query(10, description="Maximum number of news results to return."), |
| ): |
| """ |
| Perform a news search on Bing. |
| """ |
| try: |
| results = bing.news( |
| keywords=query, |
| region=region, |
| safesearch=safesearch, |
| max_results=max_results, |
| ) |
| return results |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8000) |