| """ |
| REST API server for BackgroundFX Pro. |
| Provides HTTP endpoints for all processing functionality. |
| """ |
|
|
| from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks, Depends, status |
| from fastapi.responses import FileResponse, StreamingResponse, JSONResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| from fastapi.staticfiles import StaticFiles |
| from pydantic import BaseModel, Field, validator |
| from typing import Dict, List, Optional, Union, Any |
| from enum import Enum |
| import asyncio |
| import aiofiles |
| from pathlib import Path |
| import tempfile |
| import shutil |
| import uuid |
| import time |
| from datetime import datetime, timedelta |
| import jwt |
| import cv2 |
| import numpy as np |
| import io |
| import base64 |
| from concurrent.futures import ThreadPoolExecutor |
| import redis |
| from contextlib import asynccontextmanager |
|
|
| from ..utils.logger import setup_logger |
| from .pipeline import ProcessingPipeline, PipelineConfig, ProcessingMode |
| from .video_processor import VideoProcessorAPI, StreamConfig, VideoStreamMode |
| from .batch_processor import BatchProcessor, BatchConfig, BatchItem, BatchPriority |
|
|
| logger = setup_logger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| class ServerConfig: |
| """Server configuration.""" |
| HOST: str = "0.0.0.0" |
| PORT: int = 8000 |
| UPLOAD_DIR: str = "uploads" |
| OUTPUT_DIR: str = "outputs" |
| TEMP_DIR: str = "temp" |
| MAX_UPLOAD_SIZE: int = 500 * 1024 * 1024 |
| ALLOWED_EXTENSIONS: List[str] = [".jpg", ".jpeg", ".png", ".mp4", ".avi", ".mov"] |
| |
| |
| SECRET_KEY: str = "your-secret-key-change-in-production" |
| ALGORITHM: str = "HS256" |
| ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 |
| |
| |
| REDIS_URL: str = "redis://localhost:6379" |
| CACHE_TTL: int = 3600 |
| |
| |
| RATE_LIMIT_REQUESTS: int = 100 |
| RATE_LIMIT_WINDOW: int = 60 |
| |
| |
| MAX_WORKERS: int = 4 |
| ENABLE_GPU: bool = True |
|
|
|
|
| config = ServerConfig() |
|
|
|
|
| |
| |
| |
|
|
| class BackgroundType(str, Enum): |
| """Background types.""" |
| BLUR = "blur" |
| OFFICE = "office" |
| GRADIENT = "gradient" |
| NATURE = "nature" |
| CUSTOM = "custom" |
| NONE = "none" |
|
|
|
|
| class QualityPreset(str, Enum): |
| """Quality presets.""" |
| LOW = "low" |
| MEDIUM = "medium" |
| HIGH = "high" |
| ULTRA = "ultra" |
|
|
|
|
| class ProcessingRequest(BaseModel): |
| """Base processing request.""" |
| background: BackgroundType = BackgroundType.BLUR |
| background_url: Optional[str] = None |
| quality: QualityPreset = QualityPreset.HIGH |
| preserve_original: bool = False |
| |
| class Config: |
| schema_extra = { |
| "example": { |
| "background": "office", |
| "quality": "high", |
| "preserve_original": False |
| } |
| } |
|
|
|
|
| class ImageProcessingRequest(ProcessingRequest): |
| """Image processing request.""" |
| resize: Optional[tuple[int, int]] = None |
| apply_effects: List[str] = Field(default_factory=list) |
| output_format: str = "png" |
|
|
|
|
| class VideoProcessingRequest(ProcessingRequest): |
| """Video processing request.""" |
| start_time: Optional[float] = None |
| end_time: Optional[float] = None |
| fps: Optional[float] = None |
| resolution: Optional[tuple[int, int]] = None |
| codec: str = "h264" |
|
|
|
|
| class BatchProcessingRequest(BaseModel): |
| """Batch processing request.""" |
| items: List[Dict[str, Any]] |
| parallel: bool = True |
| priority: str = "normal" |
| callback_url: Optional[str] = None |
|
|
|
|
| class StreamingRequest(BaseModel): |
| """Streaming request.""" |
| source: str |
| stream_type: str = "webcam" |
| output_format: str = "hls" |
| quality: QualityPreset = QualityPreset.MEDIUM |
|
|
|
|
| class ProcessingResponse(BaseModel): |
| """Processing response.""" |
| job_id: str |
| status: str |
| progress: float = 0.0 |
| message: Optional[str] = None |
| result_url: Optional[str] = None |
| metadata: Dict[str, Any] = Field(default_factory=dict) |
| created_at: datetime = Field(default_factory=datetime.now) |
| completed_at: Optional[datetime] = None |
|
|
|
|
| class JobStatus(BaseModel): |
| """Job status response.""" |
| job_id: str |
| status: str |
| progress: float |
| current_stage: Optional[str] = None |
| time_elapsed: float |
| time_remaining: Optional[float] = None |
| errors: List[str] = Field(default_factory=list) |
|
|
|
|
| |
| |
| |
|
|
| class JobManager: |
| """Manage processing jobs.""" |
| |
| def __init__(self): |
| self.jobs: Dict[str, ProcessingResponse] = {} |
| self.executor = ThreadPoolExecutor(max_workers=config.MAX_WORKERS) |
| self.redis_client = None |
| try: |
| self.redis_client = redis.from_url(config.REDIS_URL) |
| except: |
| logger.warning("Redis not available, using in-memory storage") |
| |
| def create_job(self) -> str: |
| """Create new job ID.""" |
| job_id = str(uuid.uuid4()) |
| self.jobs[job_id] = ProcessingResponse( |
| job_id=job_id, |
| status="pending" |
| ) |
| return job_id |
| |
| def update_job(self, job_id: str, **kwargs): |
| """Update job status.""" |
| if job_id in self.jobs: |
| for key, value in kwargs.items(): |
| if hasattr(self.jobs[job_id], key): |
| setattr(self.jobs[job_id], key, value) |
| |
| |
| if self.redis_client: |
| try: |
| self.redis_client.setex( |
| f"job:{job_id}", |
| config.CACHE_TTL, |
| self.jobs[job_id].json() |
| ) |
| except: |
| pass |
| |
| def get_job(self, job_id: str) -> Optional[ProcessingResponse]: |
| """Get job status.""" |
| |
| if job_id in self.jobs: |
| return self.jobs[job_id] |
| |
| |
| if self.redis_client: |
| try: |
| data = self.redis_client.get(f"job:{job_id}") |
| if data: |
| return ProcessingResponse.parse_raw(data) |
| except: |
| pass |
| |
| return None |
|
|
|
|
| |
| |
| |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| """Application lifespan manager.""" |
| |
| logger.info("Starting BackgroundFX Pro API Server") |
| |
| |
| for dir_path in [config.UPLOAD_DIR, config.OUTPUT_DIR, config.TEMP_DIR]: |
| Path(dir_path).mkdir(parents=True, exist_ok=True) |
| |
| |
| app.state.pipeline = ProcessingPipeline( |
| PipelineConfig(use_gpu=config.ENABLE_GPU) |
| ) |
| app.state.video_processor = VideoProcessorAPI() |
| app.state.batch_processor = BatchProcessor() |
| app.state.job_manager = JobManager() |
| |
| yield |
| |
| |
| logger.info("Shutting down BackgroundFX Pro API Server") |
| app.state.pipeline.shutdown() |
| app.state.video_processor.cleanup() |
| app.state.batch_processor.cleanup() |
|
|
|
|
| app = FastAPI( |
| title="BackgroundFX Pro API", |
| description="Professional background removal and replacement API", |
| version="1.0.0", |
| lifespan=lifespan |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| |
| |
| |
|
|
| security = HTTPBearer() |
|
|
|
|
| def create_access_token(data: dict) -> str: |
| """Create JWT access token.""" |
| to_encode = data.copy() |
| expire = datetime.utcnow() + timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES) |
| to_encode.update({"exp": expire}) |
| return jwt.encode(to_encode, config.SECRET_KEY, algorithm=config.ALGORITHM) |
|
|
|
|
| def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: |
| """Verify JWT token.""" |
| token = credentials.credentials |
| try: |
| payload = jwt.decode(token, config.SECRET_KEY, algorithms=[config.ALGORITHM]) |
| username: str = payload.get("sub") |
| if username is None: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid authentication credentials", |
| ) |
| return username |
| except jwt.PyJWTError: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid authentication credentials", |
| ) |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/") |
| async def root(): |
| """Root endpoint.""" |
| return { |
| "name": "BackgroundFX Pro API", |
| "version": "1.0.0", |
| "status": "running", |
| "endpoints": { |
| "health": "/health", |
| "docs": "/docs", |
| "process_image": "/api/v1/process/image", |
| "process_video": "/api/v1/process/video", |
| "batch": "/api/v1/batch", |
| "stream": "/api/v1/stream" |
| } |
| } |
|
|
|
|
| @app.get("/health") |
| async def health_check(): |
| """Health check endpoint.""" |
| return { |
| "status": "healthy", |
| "timestamp": datetime.now().isoformat(), |
| "services": { |
| "pipeline": "ready", |
| "video_processor": "ready", |
| "batch_processor": "ready", |
| "redis": "connected" if app.state.job_manager.redis_client else "disconnected" |
| } |
| } |
|
|
|
|
| @app.get("/api/v1/stats") |
| async def get_statistics(current_user: str = Depends(verify_token)): |
| """Get processing statistics.""" |
| return { |
| "pipeline": app.state.pipeline.get_statistics(), |
| "video": app.state.video_processor.get_stats(), |
| "batch": app.state.batch_processor.get_status() |
| } |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/api/v1/process/image", response_model=ProcessingResponse) |
| async def process_image( |
| background_tasks: BackgroundTasks, |
| file: UploadFile = File(...), |
| request: ImageProcessingRequest = Depends(), |
| current_user: str = Depends(verify_token) |
| ): |
| """Process a single image.""" |
| |
| |
| if not file.filename.lower().endswith(tuple(config.ALLOWED_EXTENSIONS)): |
| raise HTTPException(400, "Invalid file format") |
| |
| if file.size > config.MAX_UPLOAD_SIZE: |
| raise HTTPException(413, "File too large") |
| |
| |
| job_id = app.state.job_manager.create_job() |
| |
| |
| upload_path = Path(config.UPLOAD_DIR) / f"{job_id}_{file.filename}" |
| async with aiofiles.open(upload_path, 'wb') as f: |
| content = await file.read() |
| await f.write(content) |
| |
| |
| background_tasks.add_task( |
| process_image_task, |
| app.state, |
| job_id, |
| str(upload_path), |
| request |
| ) |
| |
| return ProcessingResponse( |
| job_id=job_id, |
| status="processing", |
| message="Image processing started" |
| ) |
|
|
|
|
| async def process_image_task(app_state, job_id: str, input_path: str, request: ImageProcessingRequest): |
| """Background task for image processing.""" |
| try: |
| |
| app_state.job_manager.update_job(job_id, status="processing", progress=0.1) |
| |
| |
| image = cv2.imread(input_path) |
| |
| |
| background = None |
| if request.background == BackgroundType.CUSTOM and request.background_url: |
| |
| |
| pass |
| elif request.background != BackgroundType.NONE: |
| background = request.background.value |
| |
| |
| config = PipelineConfig( |
| quality_preset=request.quality.value, |
| apply_effects=request.apply_effects |
| ) |
| |
| |
| result = app_state.pipeline.process_image(image, background) |
| |
| if result.success: |
| |
| output_filename = f"{job_id}_output.{request.output_format}" |
| output_path = Path(config.OUTPUT_DIR) / output_filename |
| cv2.imwrite(str(output_path), result.output_image) |
| |
| |
| app_state.job_manager.update_job( |
| job_id, |
| status="completed", |
| progress=1.0, |
| result_url=f"/api/v1/download/{output_filename}", |
| completed_at=datetime.now(), |
| metadata={ |
| "quality_score": result.quality_score, |
| "processing_time": result.processing_time |
| } |
| ) |
| else: |
| app_state.job_manager.update_job( |
| job_id, |
| status="failed", |
| message="Processing failed" |
| ) |
| |
| except Exception as e: |
| logger.error(f"Image processing failed for job {job_id}: {e}") |
| app_state.job_manager.update_job( |
| job_id, |
| status="failed", |
| message=str(e) |
| ) |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/api/v1/process/video", response_model=ProcessingResponse) |
| async def process_video( |
| background_tasks: BackgroundTasks, |
| file: UploadFile = File(...), |
| request: VideoProcessingRequest = Depends(), |
| current_user: str = Depends(verify_token) |
| ): |
| """Process a video file.""" |
| |
| |
| if not file.filename.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')): |
| raise HTTPException(400, "Invalid video format") |
| |
| |
| job_id = app_state.job_manager.create_job() |
| |
| |
| upload_path = Path(config.UPLOAD_DIR) / f"{job_id}_{file.filename}" |
| async with aiofiles.open(upload_path, 'wb') as f: |
| content = await file.read() |
| await f.write(content) |
| |
| |
| background_tasks.add_task( |
| process_video_task, |
| app.state, |
| job_id, |
| str(upload_path), |
| request |
| ) |
| |
| return ProcessingResponse( |
| job_id=job_id, |
| status="processing", |
| message="Video processing started" |
| ) |
|
|
|
|
| async def process_video_task(app_state, job_id: str, input_path: str, request: VideoProcessingRequest): |
| """Background task for video processing.""" |
| try: |
| |
| def progress_callback(progress: float, info: Dict): |
| app_state.job_manager.update_job( |
| job_id, |
| progress=progress, |
| metadata=info |
| ) |
| |
| |
| output_path = Path(config.OUTPUT_DIR) / f"{job_id}_output.mp4" |
| |
| stats = await app_state.video_processor.process_video_async( |
| input_path, |
| str(output_path), |
| background=request.background.value if request.background != BackgroundType.NONE else None, |
| progress_callback=progress_callback |
| ) |
| |
| |
| app_state.job_manager.update_job( |
| job_id, |
| status="completed", |
| progress=1.0, |
| result_url=f"/api/v1/download/{output_path.name}", |
| completed_at=datetime.now(), |
| metadata={ |
| "frames_processed": stats.frames_processed, |
| "processing_fps": stats.processing_fps, |
| "avg_quality": stats.avg_quality_score |
| } |
| ) |
| |
| except Exception as e: |
| logger.error(f"Video processing failed for job {job_id}: {e}") |
| app_state.job_manager.update_job( |
| job_id, |
| status="failed", |
| message=str(e) |
| ) |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/api/v1/batch", response_model=ProcessingResponse) |
| async def process_batch( |
| background_tasks: BackgroundTasks, |
| request: BatchProcessingRequest, |
| current_user: str = Depends(verify_token) |
| ): |
| """Process multiple files in batch.""" |
| |
| |
| job_id = app.state.job_manager.create_job() |
| |
| |
| background_tasks.add_task( |
| process_batch_task, |
| app.state, |
| job_id, |
| request |
| ) |
| |
| return ProcessingResponse( |
| job_id=job_id, |
| status="processing", |
| message=f"Batch processing started for {len(request.items)} items" |
| ) |
|
|
|
|
| async def process_batch_task(app_state, job_id: str, request: BatchProcessingRequest): |
| """Background task for batch processing.""" |
| try: |
| |
| batch_items = [] |
| for item_data in request.items: |
| batch_item = BatchItem( |
| id=item_data.get('id', str(uuid.uuid4())), |
| input_path=item_data['input_path'], |
| output_path=item_data['output_path'], |
| file_type=item_data.get('file_type', 'image'), |
| priority=BatchPriority[request.priority.upper()], |
| background=item_data.get('background') |
| ) |
| batch_items.append(batch_item) |
| |
| |
| def progress_callback(progress: float, info: Dict): |
| app_state.job_manager.update_job( |
| job_id, |
| progress=progress, |
| metadata=info |
| ) |
| |
| |
| batch_config = BatchConfig( |
| progress_callback=progress_callback, |
| max_workers=config.MAX_WORKERS if request.parallel else 1 |
| ) |
| |
| processor = BatchProcessor(batch_config) |
| report = processor.process_batch(batch_items) |
| |
| |
| app_state.job_manager.update_job( |
| job_id, |
| status="completed", |
| progress=1.0, |
| completed_at=datetime.now(), |
| metadata={ |
| "total_items": report.total_items, |
| "successful_items": report.successful_items, |
| "failed_items": report.failed_items, |
| "avg_quality": report.quality_metrics.get('avg_quality', 0) |
| } |
| ) |
| |
| |
| if request.callback_url: |
| |
| |
| pass |
| |
| except Exception as e: |
| logger.error(f"Batch processing failed for job {job_id}: {e}") |
| app_state.job_manager.update_job( |
| job_id, |
| status="failed", |
| message=str(e) |
| ) |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/api/v1/stream/start") |
| async def start_stream( |
| request: StreamingRequest, |
| current_user: str = Depends(verify_token) |
| ): |
| """Start a streaming session.""" |
| |
| |
| stream_config = StreamConfig( |
| source=request.source, |
| stream_mode=VideoStreamMode[request.stream_type.upper()], |
| output_format=request.output_format, |
| output_path=f"{config.OUTPUT_DIR}/stream_{uuid.uuid4()}" |
| ) |
| |
| |
| success = app.state.video_processor.start_stream_processing( |
| stream_config, |
| background=None |
| ) |
| |
| if success: |
| return { |
| "status": "streaming", |
| "stream_url": f"/api/v1/stream/live/{stream_config.output_path}", |
| "message": "Streaming started" |
| } |
| else: |
| raise HTTPException(500, "Failed to start streaming") |
|
|
|
|
| @app.get("/api/v1/stream/stop") |
| async def stop_stream(current_user: str = Depends(verify_token)): |
| """Stop streaming session.""" |
| app.state.video_processor.stop_stream_processing() |
| return {"status": "stopped", "message": "Streaming stopped"} |
|
|
|
|
| @app.get("/api/v1/stream/preview") |
| async def get_stream_preview(current_user: str = Depends(verify_token)): |
| """Get stream preview frame.""" |
| frame = app.state.video_processor.get_preview_frame() |
| |
| if frame is not None: |
| |
| _, buffer = cv2.imencode('.jpg', frame) |
| return StreamingResponse( |
| io.BytesIO(buffer), |
| media_type="image/jpeg" |
| ) |
| else: |
| raise HTTPException(404, "No preview available") |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/api/v1/job/{job_id}", response_model=ProcessingResponse) |
| async def get_job_status( |
| job_id: str, |
| current_user: str = Depends(verify_token) |
| ): |
| """Get job status.""" |
| job = app.state.job_manager.get_job(job_id) |
| |
| if job: |
| return job |
| else: |
| raise HTTPException(404, "Job not found") |
|
|
|
|
| @app.get("/api/v1/jobs") |
| async def list_jobs( |
| current_user: str = Depends(verify_token), |
| limit: int = 10, |
| offset: int = 0 |
| ): |
| """List recent jobs.""" |
| jobs = list(app.state.job_manager.jobs.values()) |
| return { |
| "total": len(jobs), |
| "jobs": jobs[offset:offset + limit] |
| } |
|
|
|
|
| @app.delete("/api/v1/job/{job_id}") |
| async def cancel_job( |
| job_id: str, |
| current_user: str = Depends(verify_token) |
| ): |
| """Cancel a job.""" |
| |
| app.state.job_manager.update_job(job_id, status="cancelled") |
| return {"message": "Job cancelled"} |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/api/v1/download/{filename}") |
| async def download_file( |
| filename: str, |
| current_user: str = Depends(verify_token) |
| ): |
| """Download processed file.""" |
| file_path = Path(config.OUTPUT_DIR) / filename |
| |
| if file_path.exists(): |
| return FileResponse( |
| path=file_path, |
| filename=filename, |
| media_type='application/octet-stream' |
| ) |
| else: |
| raise HTTPException(404, "File not found") |
|
|
|
|
| |
| |
| |
|
|
| from fastapi import WebSocket, WebSocketDisconnect |
|
|
| @app.websocket("/ws/job/{job_id}") |
| async def websocket_job_updates(websocket: WebSocket, job_id: str): |
| """WebSocket for real-time job updates.""" |
| await websocket.accept() |
| |
| try: |
| while True: |
| |
| job = app.state.job_manager.get_job(job_id) |
| |
| if job: |
| await websocket.send_json(job.dict()) |
| |
| if job.status in ["completed", "failed", "cancelled"]: |
| break |
| |
| await asyncio.sleep(1) |
| |
| except WebSocketDisconnect: |
| logger.info(f"WebSocket disconnected for job {job_id}") |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| |
| uvicorn.run( |
| app, |
| host=config.HOST, |
| port=config.PORT, |
| log_level="info" |
| ) |