| import os |
| import shutil |
| import subprocess |
| import uuid |
| from datetime import timedelta |
| from typing import List |
| from fastapi import FastAPI, UploadFile, File |
| from fastapi.responses import FileResponse, JSONResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from faster_whisper import WhisperModel |
| from pydantic import BaseModel |
|
|
| app = FastAPI(title="AI Subtitle Studio Pro") |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| TEMP_DIR = "temp" |
| os.makedirs(TEMP_DIR, exist_ok=True) |
|
|
| print("⚡ [SYSTEM] Initializing AI Neural Network...") |
| model = WhisperModel("small", device="cpu", compute_type="int8") |
| print("✅ [SYSTEM] AI Core Ready.") |
|
|
| |
| class SubtitleSegment(BaseModel): |
| id: int |
| start: float |
| end: float |
| text: str |
|
|
| class StyleConfig(BaseModel): |
| font: str |
| fontSize: int |
| primaryColor: str |
| outlineColor: str |
| backType: str |
| outlineWidth: float |
| marginV: int |
| alignment: int |
|
|
| class ProcessRequest(BaseModel): |
| file_id: str |
| segments: List[SubtitleSegment] |
| style: StyleConfig |
|
|
| |
| def hex_to_ass(hex_color, alpha="00"): |
| hex_color = hex_color.lstrip('#') |
| if len(hex_color) != 6: return "&H00FFFFFF" |
| r, g, b = hex_color[0:2], hex_color[2:4], hex_color[4:6] |
| return f"&H{alpha}{b}{g}{r}" |
|
|
| def format_time_ass(seconds: float): |
| td = timedelta(seconds=seconds) |
| total_seconds = int(td.total_seconds()) |
| hours = total_seconds // 3600 |
| minutes = (total_seconds % 3600) // 60 |
| secs = total_seconds % 60 |
| centisecs = int(td.microseconds / 10000) |
| return f"{hours:01d}:{minutes:02d}:{secs:02d}.{centisecs:02d}" |
|
|
| def generate_ass_file(data: ProcessRequest, output_path: str): |
| s = data.style |
| |
| font_map = { |
| "vazir": "Vazirmatn", |
| "lalezar": "Lalezar", |
| "roboto": "Roboto", |
| "bangers": "Bangers" |
| } |
| font_name = font_map.get(s.font, "Arial") |
| |
| primary = hex_to_ass(s.primaryColor) |
| outline = hex_to_ass(s.outlineColor) |
| |
| border_style = 1 |
| back_color = "&H00000000" |
| |
| if s.backType == 'solid': |
| border_style = 3 |
| outline = hex_to_ass(s.outlineColor, "00") |
| elif s.backType == 'transparent': |
| border_style = 3 |
| outline = "&H80000000" |
| else: |
| border_style = 1 |
| |
| header = f"""[Script Info] |
| ScriptType: v4.00+ |
| PlayResX: 1080 |
| PlayResY: 1920 |
| WrapStyle: 1 |
| ScaledBorderAndShadow: yes |
| |
| [V4+ Styles] |
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding |
| Style: Default,{font_name},{s.fontSize},{primary},&H000000FF,{outline},{back_color},1,0,0,0,100,100,0,0,{border_style},{s.outlineWidth},0,{s.alignment},10,10,{s.marginV},1 |
| |
| [Events] |
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text |
| """ |
| with open(output_path, "w", encoding="utf-8") as f: |
| f.write(header) |
| for seg in data.segments: |
| start = format_time_ass(seg.start) |
| end = format_time_ass(seg.end) |
| clean_text = seg.text.strip().replace("\n", "\\N") |
| f.write(f"Dialogue: 0,{start},{end},Default,,0,0,0,,{clean_text}\n") |
|
|
| |
|
|
| @app.get("/") |
| async def home(): |
| return FileResponse("index.html") |
|
|
| @app.post("/api/analyze") |
| async def analyze_media(file: UploadFile = File(...)): |
| try: |
| file_id = str(uuid.uuid4())[:12] |
| file_ext = file.filename.split('.')[-1] |
| input_path = f"{TEMP_DIR}/{file_id}.{file_ext}" |
| |
| with open(input_path, "wb") as f: |
| shutil.copyfileobj(file.file, f) |
| |
| segments_gen, _ = model.transcribe(input_path, language="fa", beam_size=5) |
| |
| results = [] |
| for idx, seg in enumerate(segments_gen): |
| results.append({ |
| "id": idx, |
| "start": seg.start, |
| "end": seg.end, |
| "text": seg.text.strip() |
| }) |
| |
| return {"status": "success", "file_id": file_id, "segments": results} |
| |
| except Exception as e: |
| return JSONResponse(status_code=500, content={"error": str(e)}) |
|
|
| @app.post("/api/render") |
| async def render_video(data: ProcessRequest): |
| try: |
| exts = ["mp4", "mov", "avi", "mkv", "mp3"] |
| input_path = None |
| for ext in exts: |
| test_path = f"{TEMP_DIR}/{data.file_id}.{ext}" |
| if os.path.exists(test_path): |
| input_path = test_path |
| break |
| |
| if not input_path: |
| return JSONResponse(status_code=404, content={"error": "Source file lost"}) |
| |
| ass_path = f"{TEMP_DIR}/{data.file_id}.ass" |
| output_path = f"{TEMP_DIR}/{data.file_id}_final.mp4" |
| |
| generate_ass_file(data, ass_path) |
| |
| cmd = [ |
| "ffmpeg", "-y", |
| "-i", input_path, |
| "-vf", f"ass={ass_path}", |
| "-c:v", "libx264", "-preset", "ultrafast", "-crf", "26", |
| "-c:a", "aac", |
| output_path |
| ] |
| subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
| |
| return {"status": "done", "url": f"/download/{data.file_id}_final.mp4"} |
| |
| except Exception as e: |
| return JSONResponse(status_code=500, content={"error": str(e)}) |
|
|
| @app.get("/download/{filename}") |
| async def get_file(filename: str): |
| path = f"{TEMP_DIR}/{filename}" |
| if os.path.exists(path): |
| return FileResponse(path) |
| return JSONResponse(status_code=404, content={"error": "File missing"}) |