| import os |
| import requests |
| from fastapi import FastAPI, HTTPException, Depends |
| from fastapi.security import OAuth2PasswordBearer |
| from langchain_community.document_loaders import YoutubeLoader, UnstructuredPDFLoader, WebBaseLoader |
| from langchain_community.document_loaders import OnlinePDFLoader |
| from bs4 import BeautifulSoup |
| from urllib.parse import urljoin |
| import httpx |
| app = FastAPI() |
|
|
| API_KEY = os.environ["API_KEY"] |
|
|
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") |
|
|
| async def validate_token(token: str = Depends(oauth2_scheme)): |
| if token != API_KEY: |
| raise HTTPException(status_code=401, detail="Invalid API Key") |
|
|
| @app.post("/extract_text", tags=["Text Extraction"], dependencies=[Depends(validate_token)]) |
| def extract_text(url: str, language: str = "ja", length: int = 150000,use_jina:bool = True): |
| try: |
| if "youtube.com" in url or "youtu.be" in url: |
| |
| loader = YoutubeLoader.from_youtube_url( |
| youtube_url=url, |
| add_video_info=True, |
| language=[language], |
| ) |
| docs = loader.load() |
| text_content = str(docs) |
| elif url.endswith(".pdf"): |
| |
| loader = OnlinePDFLoader(url) |
| docs = loader.load() |
| text_content = docs[0].page_content |
| else: |
| |
| |
| |
| |
| if use_jina: |
| response = requests.get("https://r.jina.ai/"+ url) |
| text_content = response.text |
| else: |
| response = requests.get(url,timeout = 10) |
| text_content = str(convert_to_markdown(response.text,url)) |
|
|
| if len(text_content) < length: |
| return {"text_content": text_content} |
| else: |
| return { |
| "text_content": text_content[: int(length / 2)] |
| + text_content[len(text_content) - int(length / 2) :] |
| } |
| except Exception as e: |
| error_msg = str(e) |
| return {"message": error_msg} |
|
|
| @app.post("/httpx_bs", tags=["Text Extraction and beautiful soup"], dependencies=[Depends(validate_token)]) |
| def httpx_bs(url: str, length: int = 150000): |
| try: |
| response = httpx.get(url) |
| text_content = str(convert_to_markdown(response,url)) |
|
|
| if len(text_content) < length: |
| return {"text_content": text_content} |
| else: |
| return { |
| "text_content": text_content[: int(length / 2)] |
| + text_content[len(text_content) - int(length / 2) :] |
| } |
| except Exception as e: |
| error_msg = str(e) |
| return {"message": error_msg} |
|
|
| @app.post("/extract_from_url", tags=["Text Extraction from URL"], dependencies=[Depends(validate_token)]) |
| def extract_from_url(url: str, length: int = 150000, tool: str = "httpx"): |
| try: |
| if tool == "jina": |
| response = requests.get("https://r.jina.ai/" + url) |
| text_content = response.text |
| elif tool == "httpx": |
| response = httpx.get(url) |
| text_content = str(convert_to_markdown(response.text, url)) |
| elif tool == "requests": |
| response = requests.get(url, timeout=10) |
| text_content = str(convert_to_markdown(response.text, url)) |
| elif tool == "webbaseloader": |
| loader = WebBaseLoader(url) |
| docs = loader.load() |
| text_content = docs[0].page_content |
| else: |
| raise ValueError("Invalid tool specified. Choose from 'jina', 'httpx', 'requests', or 'webbaseloader'.") |
|
|
| if len(text_content) < length: |
| return {"text_content": text_content} |
| else: |
| return { |
| "text_content": text_content[: int(length / 2)] |
| + text_content[len(text_content) - int(length / 2) :] |
| } |
| except Exception as e: |
| error_msg = str(e) |
| return {"message": error_msg} |
|
|
|
|
| def convert_to_markdown(response_text,url): |
| |
| |
|
|
| soup = BeautifulSoup(response_text, 'html.parser') |
| markdown = "" |
|
|
| |
| if soup.title: |
| markdown += f"# {soup.title.string.strip()}\n\n" |
|
|
| |
| main_content = soup.body |
| if main_content: |
| for element in main_content.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'a', 'ul', 'ol']): |
| if element.name.startswith('h'): |
| level = int(element.name[1]) |
| markdown += f"{'#' * level} {element.get_text().strip()}\n\n" |
| elif element.name == 'p': |
| markdown += f"{element.get_text().strip()}\n\n" |
| elif element.name == 'a': |
| href = element.get('href') |
| if href: |
| full_url = urljoin(url, href) |
| markdown += f"[{element.get_text().strip()}]({full_url})\n\n" |
| elif element.name in ['ul', 'ol']: |
| for li in element.find_all('li'): |
| markdown += f"- {li.get_text().strip()}\n" |
| markdown += "\n" |
|
|
| return markdown |