| from fastapi import FastAPI,HTTPException, Request, Query |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
| from typing import Optional, List |
| from LoadBalancer import LoadBalancer |
| import logging |
| import os |
| import urllib.parse |
| from utils import read_json_file, is_valid_url |
| from tvdb import recent_list, genre_list |
|
|
| CACHE_DIR = os.getenv("CACHE_DIR") |
| TOKEN = os.getenv("TOKEN") |
| REPO = os.getenv("REPO") |
|
|
| app = FastAPI() |
|
|
| origins = ["*"] |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=origins, |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| global load_balancer |
| |
| load_balancer = LoadBalancer(cache_dir=CACHE_DIR, token=TOKEN, repo=REPO) |
|
|
| @app.get("/") |
| def greet_json(): |
| return {"Version": load_balancer.version} |
|
|
| @app.post("/api/post/register") |
| async def register_instance(request: Request): |
| try: |
| data = await request.json() |
| if not data or "url" not in data: |
| return JSONResponse(content={"error": "No URL provided"}, status_code=400) |
| |
| url = data["url"] |
| if not is_valid_url(url): |
| return JSONResponse(content={"error": "Invalid URL"}, status_code=400) |
|
|
| |
| load_balancer.register_instance(url) |
| logging.info(f"Instance registered: {url}") |
|
|
| return JSONResponse(content={"message": f"Instance {url} registered successfully"}, status_code=200) |
|
|
| except Exception as e: |
| logging.error(f"Error registering instance: {e}") |
| return JSONResponse(content={"error": "Failed to register instance"}, status_code=500) |
|
|
| @app.get("/api/get/file_structure") |
| async def get_file_structure(): |
| return load_balancer.file_structure |
|
|
| @app.get("/api/get/movie/store") |
| async def get_movie_store(): |
| return load_balancer.FILM_STORE |
|
|
| @app.get("/api/get/series/store") |
| async def get_series_store(): |
| return load_balancer.TV_STORE |
|
|
| @app.get("/api/get/movie/all") |
| async def get_all_movies_api(): |
| return load_balancer.get_all_films() |
|
|
| @app.get("/api/get/series/all") |
| async def get_all_tvshows_api(): |
| return load_balancer.get_all_tv_shows() |
|
|
| @app.get("/api/get/recent") |
| async def get_recent_items(limit: int = 5): |
| |
| recent_films = recent_list.get_sorted_entries('film') |
| recent_series = recent_list.get_sorted_entries('series') |
|
|
| |
| limited_films = recent_films[:limit] |
| limited_series = recent_series[:limit] |
|
|
| |
| return JSONResponse(content={ |
| 'movies': limited_films, |
| 'series': limited_series |
| }) |
|
|
| @app.get("/api/get/genre_categories") |
| async def get_genre_categories(media_type: Optional[str] = Query(None, description="Filter by media type: 'movie' or 'series'")): |
| """ |
| Retrieve all available genre categories along with their density (number of media items). |
| |
| Query Parameters: |
| media_type: Optional. Filter by media type ('movie' or 'series'). If not provided, returns the total count. |
| |
| Returns: |
| A JSON response containing a list of genre objects, for example: |
| [{'name': 'Comedy', 'density': 12}, {'name': 'Drama', 'density': 8}, ...] |
| """ |
| try: |
| categories = [ |
| { |
| "name": genre, |
| "density": sum( |
| 1 for entry in data["entries"].values() |
| if media_type is None or entry[3] == media_type |
| ) |
| } |
| for genre, data in sorted(genre_list.genres.items()) |
| ] |
| return JSONResponse(content={"genres": categories}) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error retrieving genre categories: {str(e)}") |
|
|
| @app.get("/api/get/genre") |
| async def get_genre_items( |
| genre: List[str] = Query(...), |
| media_type: Optional[str] = None, |
| limit: int = 5, |
| page: int = 1 |
| ): |
| """ |
| Get recent items from specified genres with an optional media type filter, a limit on the number of results, and pagination. |
| |
| :param genre: The genres to filter by (e.g., 'Comedy'). |
| :param media_type: Optional. Filter by media type ('movie' or 'series'). |
| :param limit: The maximum number of items to return for each media type. |
| :param page: The page number for pagination. |
| :return: A JSON response containing the filtered items. |
| """ |
| |
| entries = genre_list.get_entries_by_multiple_genres(genre, media_type=media_type) |
| |
| |
| movies = [{'title': entry[0]} for entry in entries if entry[4] == 'movie'] |
| series = [{'title': entry[0]} for entry in entries if entry[4] == 'series'] |
|
|
| |
| start = (page - 1) * limit |
| end = start + limit |
|
|
| |
| limited_movies = movies[start:end] |
| limited_series = series[start:end] |
|
|
| |
| results = { |
| 'movies': limited_movies, |
| 'series': limited_series, |
| 'page': page, |
| 'limit': limit, |
| 'total_movies': len(movies), |
| 'total_series': len(series) |
| } |
|
|
| |
| return JSONResponse(content=results) |
|
|
|
|
| @app.get("/api/get/movie/metadata/{title}") |
| async def get_movie_metadata_api(title: str): |
| """Endpoint to get the movie metadata by title.""" |
| if not title: |
| raise HTTPException(status_code=400, detail="No title provided") |
| |
| full_dir_path = os.path.join(CACHE_DIR, 'movie') |
| json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json") |
| |
| if os.path.exists(json_cache_path): |
| data = await read_json_file(json_cache_path) |
| return JSONResponse(content=data) |
| |
| raise HTTPException(status_code=404, detail="Metadata not found") |
|
|
| @app.get("/api/get/movie/card/{title}") |
| async def get_movie_card_api(title: str): |
| """Endpoint to get the movie metadata by title.""" |
| if not title: |
| raise HTTPException(status_code=400, detail="No title provided") |
| |
| full_dir_path = os.path.join(CACHE_DIR, 'movie') |
| json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json") |
| |
| if os.path.exists(json_cache_path): |
| data = await read_json_file(json_cache_path) |
| image = data['data']['image'] |
| trailers = data['data']['trailers'] or [] |
| eng_title = None |
| banner = [] |
| portrait =[] |
| overview = None |
|
|
| if data['data'].get('translations') and data['data']['translations'].get('nameTranslations'): |
| for name in data['data']['translations']['nameTranslations']: |
| if name['language'] == 'eng': |
| eng_title = name.get('name') |
| break |
|
|
| if data['data'].get('translations') and data['data']['translations'].get('overviewTranslations'): |
| overviews = data['data']['translations']['overviewTranslations'] |
| |
| |
| for o in overviews: |
| if o['language'] == 'eng': |
| overview = o.get('overview') |
| break |
|
|
| |
| if not overview and len(overviews) == 1: |
| overview = overviews[0].get('overview') |
|
|
|
|
| if data['data'].get('artworks'): |
| for artwork in data['data']['artworks']: |
| if artwork['type'] == 15: |
| banner.append(artwork) |
|
|
| if data['data'].get('artworks'): |
| for artwork in data['data']['artworks']: |
| if artwork['type'] == 14: |
| portrait.append(artwork) |
|
|
|
|
| year = data['data']['year'] |
| return JSONResponse(content={'title':eng_title or title, 'year': year, 'image': image,'portrait':portrait, 'banner':banner, 'overview':overview, 'trailers': trailers}) |
| |
| raise HTTPException(status_code=404, detail="Card not found") |
|
|
| @app.get("/api/get/series/metadata/{title}") |
| async def get_series_metadata_api(title: str): |
| """Endpoint to get the TV show metadata by title.""" |
| if not title: |
| raise HTTPException(status_code=400, detail="No title provided") |
| full_dir_path = os.path.join(CACHE_DIR, 'series') |
| json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json") |
| |
| if os.path.exists(json_cache_path): |
| data = await read_json_file(json_cache_path) |
| |
| tv_structure_data = load_balancer.get_tv_structure(title) |
| if tv_structure_data: |
| data['file_structure'] = tv_structure_data |
| |
| return JSONResponse(content=data) |
| |
| raise HTTPException(status_code=404, detail="Metadata not found") |
|
|
| @app.get("/api/get/series/card/{title}") |
| async def get_series_card_api(title: str): |
| """Endpoint to get the TV show metadata by title.""" |
| if not title: |
| raise HTTPException(status_code=400, detail="No title provided") |
| full_dir_path = os.path.join(CACHE_DIR, 'series') |
| json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json") |
| |
| if os.path.exists(json_cache_path): |
| data = await read_json_file(json_cache_path) |
| image = data['data']['image'] |
| trailers = data['data']['trailers'] or [] |
| eng_title = None |
| overview = None |
| portrait = [] |
| banner = [] |
| if data['data'].get('translations') and data['data']['translations'].get('nameTranslations'): |
| for name in data['data']['translations']['nameTranslations']: |
| if name['language'] == 'eng': |
| eng_title = name.get('name') |
| break |
| year = data['data']['year'] |
|
|
| if data['data'].get('translations') and data['data']['translations'].get('overviewTranslations'): |
| overviews = data['data']['translations']['overviewTranslations'] |
| |
| |
| for o in overviews: |
| if o['language'] == 'eng': |
| overview = o.get('overview') |
| break |
|
|
| |
| if not overview and len(overviews) == 1: |
| overview = overviews[0].get('overview') |
|
|
| if data['data'].get('artworks'): |
| for artwork in data['data']['artworks']: |
| if artwork['type'] == 3: |
| banner.append(artwork) |
|
|
| if data['data'].get('artworks'): |
| for artwork in data['data']['artworks']: |
| if artwork['type'] == 2: |
| portrait.append(artwork) |
|
|
|
|
| return JSONResponse(content={'title':eng_title or title, 'year': year, 'image': image, 'portrait':portrait,'banner': banner, 'overview':overview, 'trailers': trailers}) |
| |
| raise HTTPException(status_code=404, detail="Card not found") |
|
|
|
|
| @app.get("/api/get/series/metadata/{title}/{season}") |
| async def get_season_metadata_api(title: str, season: str): |
| """Endpoint to get the TV show season metadata by title and season.""" |
| if not season: |
| raise HTTPException(status_code=400, detail="Season must be provided and cannot be empty") |
| |
| |
| json_cache_path = os.path.join(CACHE_DIR, "metadata", title, f"{season}.json") |
| print(json_cache_path) |
| |
| if os.path.exists(json_cache_path): |
| data = await read_json_file(json_cache_path) |
| return JSONResponse(content=data) |
| |
| raise HTTPException(status_code=404, detail="Metadata not found") |
|
|
| @app.get('/api/get/instances') |
| async def get_instances(): |
| return load_balancer.instances |
|
|
| @app.get('/api/get/instances/health') |
| async def get_instances_health(): |
| return load_balancer.instances_health |
|
|
| @app.get("/api/get/movie/{title}") |
| async def get_movie_api(title: str): |
| """Endpoint to get the movie by title.""" |
| if not title: |
| raise HTTPException(status_code=400, detail="Title parameter is required") |
| |
| |
| if title in load_balancer.FILM_STORE: |
| url = load_balancer.FILM_STORE[title] |
| return JSONResponse(content={"url": url}) |
|
|
| movie_path = load_balancer.find_movie_path(title) |
| |
| if not movie_path: |
| raise HTTPException(status_code=404, detail="Movie not found") |
| |
| |
| response = load_balancer.download_film_to_best_instance(title=title) |
| if response: |
| return JSONResponse(content=response) |
|
|
| @app.get("/api/get/series/{title}/{season}/{episode}") |
| async def get_tv_show_api(title: str, season: str, episode: str): |
| """Endpoint to get the TV show by title, season, and episode.""" |
| if not title or not season or not episode: |
| raise HTTPException(status_code=400, detail="Title, season, and episode parameters are required") |
|
|
| |
| if title in load_balancer.TV_STORE and season in load_balancer.TV_STORE[title]: |
| for ep in load_balancer.TV_STORE[title][season]: |
| if episode in ep: |
| url = load_balancer.TV_STORE[title][season][ep] |
| return JSONResponse(content={"url": url}) |
|
|
| tv_path = load_balancer.find_tv_path(title) |
| |
| if not tv_path: |
| raise HTTPException(status_code=404, detail="TV show not found") |
|
|
| episode_path = None |
| for directory in load_balancer.file_structure: |
| if directory['type'] == 'directory' and directory['path'] == 'tv': |
| for sub_directory in directory['contents']: |
| if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): |
| for season_dir in sub_directory['contents']: |
| if season_dir['type'] == 'directory' and season in season_dir['path']: |
| for episode_file in season_dir['contents']: |
| if episode_file['type'] == 'file' and episode in episode_file['path']: |
| episode_path = episode_file['path'] |
| break |
|
|
| if not episode_path: |
| raise HTTPException(status_code=404, detail="Episode not found") |
| |
| |
| response = load_balancer.download_episode_to_best_instance(title=title, season=season, episode=episode) |
| if response: |
| return JSONResponse(content=response) |