Spaces:
Runtime error
Runtime error
| """ | |
| Telegram Multi-Part File Streamer - Main Application | |
| High-performance file upload and streaming service with zero-disk buffering | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import AsyncGenerator, Optional | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, Request, HTTPException, Response | |
| from fastapi.responses import StreamingResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import uvicorn | |
| from session_manager import SessionManager | |
| from database import Database, FileMetadata | |
| from utils import ( | |
| calculate_part_and_offset, | |
| generate_unique_id, | |
| CHUNK_SIZE, | |
| MAX_PART_SIZE | |
| ) | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Global instances | |
| session_manager: Optional[SessionManager] = None | |
| database: Optional[Database] = None | |
| async def lifespan(app: FastAPI): | |
| """Application lifespan manager""" | |
| global session_manager, database | |
| logger.info("Initializing application...") | |
| # Initialize database | |
| database = Database() | |
| await database.connect() | |
| # Initialize session manager | |
| session_manager = SessionManager() | |
| await session_manager.initialize() | |
| logger.info("Application initialized successfully") | |
| yield | |
| # Cleanup | |
| logger.info("Shutting down application...") | |
| await session_manager.cleanup() | |
| await database.disconnect() | |
| logger.info("Application shutdown complete") | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="Telegram Multi-Part File Streamer", | |
| description="High-performance file upload and streaming service", | |
| version="1.0.0", | |
| lifespan=lifespan | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def root(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "online", | |
| "service": "Telegram Multi-Part File Streamer", | |
| "version": "1.0.0" | |
| } | |
| async def health_check(): | |
| """Detailed health check""" | |
| session_count = len(session_manager.sessions) if session_manager else 0 | |
| db_connected = database.is_connected() if database else False | |
| return { | |
| "status": "healthy" if (session_count > 0 and db_connected) else "degraded", | |
| "sessions": session_count, | |
| "database": "connected" if db_connected else "disconnected" | |
| } | |
| async def upload_file(request: Request, filename: Optional[str] = None): | |
| """ | |
| High-speed zero-disk file upload endpoint | |
| Streams data directly from HTTP to Telegram with auto-splitting | |
| """ | |
| if not session_manager or not database: | |
| raise HTTPException(status_code=503, detail="Service not initialized") | |
| logger.info(f"Upload request received: filename={filename}") | |
| unique_id = generate_unique_id() | |
| file_parts = [] | |
| total_size = 0 | |
| part_number = 0 | |
| try: | |
| # Create async generator from request stream | |
| async def request_stream() -> AsyncGenerator[bytes, None]: | |
| async for chunk in request.stream(): | |
| yield chunk | |
| # Buffer for part assembly | |
| part_buffer = bytearray() | |
| async for chunk in request_stream(): | |
| part_buffer.extend(chunk) | |
| # Check if we need to upload this part | |
| while len(part_buffer) >= MAX_PART_SIZE: | |
| part_number += 1 | |
| part_data = bytes(part_buffer[:MAX_PART_SIZE]) | |
| part_buffer = part_buffer[MAX_PART_SIZE:] | |
| logger.info(f"Uploading part {part_number} ({len(part_data)} bytes)") | |
| # Upload part to Telegram | |
| file_id = await session_manager.upload_part( | |
| part_data, | |
| f"{filename or unique_id}_part_{part_number}" | |
| ) | |
| file_parts.append({ | |
| "part_number": part_number, | |
| "file_id": file_id, | |
| "size": len(part_data) | |
| }) | |
| total_size += len(part_data) | |
| logger.info( | |
| f"Part {part_number} uploaded successfully. " | |
| f"Total size: {total_size / (1024**3):.2f} GB" | |
| ) | |
| # Upload remaining data as final part | |
| if len(part_buffer) > 0: | |
| part_number += 1 | |
| part_data = bytes(part_buffer) | |
| logger.info(f"Uploading final part {part_number} ({len(part_data)} bytes)") | |
| file_id = await session_manager.upload_part( | |
| part_data, | |
| f"{filename or unique_id}_part_{part_number}" | |
| ) | |
| file_parts.append({ | |
| "part_number": part_number, | |
| "file_id": file_id, | |
| "size": len(part_data) | |
| }) | |
| total_size += len(part_data) | |
| # Store metadata in database | |
| metadata = FileMetadata( | |
| unique_id=unique_id, | |
| filename=filename or f"file_{unique_id}", | |
| total_size=total_size, | |
| parts=file_parts, | |
| part_count=part_number | |
| ) | |
| await database.save_file_metadata(metadata) | |
| logger.info( | |
| f"Upload completed: unique_id={unique_id}, " | |
| f"parts={part_number}, total_size={total_size / (1024**3):.2f} GB" | |
| ) | |
| return { | |
| "success": True, | |
| "unique_id": unique_id, | |
| "filename": metadata.filename, | |
| "total_size": total_size, | |
| "parts": part_number, | |
| "download_url": f"/dl/{unique_id}" | |
| } | |
| except Exception as e: | |
| logger.error(f"Upload failed: {str(e)}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}") | |
| async def stream_file(unique_id: str, request: Request): | |
| """ | |
| High-speed streaming endpoint with full range request support | |
| Supports multi-part concatenation and parallel connections | |
| """ | |
| if not session_manager or not database: | |
| raise HTTPException(status_code=503, detail="Service not initialized") | |
| # Fetch file metadata | |
| metadata = await database.get_file_metadata(unique_id) | |
| if not metadata: | |
| raise HTTPException(status_code=404, detail="File not found") | |
| # Parse range header | |
| range_header = request.headers.get("range") | |
| start = 0 | |
| end = metadata.total_size - 1 | |
| status_code = 200 | |
| if range_header: | |
| # Parse range: bytes=start-end | |
| range_str = range_header.replace("bytes=", "") | |
| range_parts = range_str.split("-") | |
| if range_parts[0]: | |
| start = int(range_parts[0]) | |
| if range_parts[1]: | |
| end = int(range_parts[1]) | |
| status_code = 206 # Partial Content | |
| # Validate range | |
| if start < 0 or end >= metadata.total_size or start > end: | |
| raise HTTPException(status_code=416, detail="Range not satisfiable") | |
| logger.info( | |
| f"Streaming request: unique_id={unique_id}, " | |
| f"range={start}-{end}, size={end - start + 1}" | |
| ) | |
| # Create streaming response | |
| content_length = end - start + 1 | |
| headers = { | |
| "Content-Type": "application/octet-stream", | |
| "Content-Length": str(content_length), | |
| "Accept-Ranges": "bytes", | |
| "Content-Disposition": f'attachment; filename="{metadata.filename}"', | |
| } | |
| if status_code == 206: | |
| headers["Content-Range"] = f"bytes {start}-{end}/{metadata.total_size}" | |
| async def stream_generator() -> AsyncGenerator[bytes, None]: | |
| """Generate stream from Telegram parts""" | |
| bytes_sent = 0 | |
| current_position = 0 | |
| for part in metadata.parts: | |
| part_start = current_position | |
| part_end = current_position + part["size"] - 1 | |
| # Check if this part overlaps with requested range | |
| if part_end < start: | |
| current_position += part["size"] | |
| continue | |
| if part_start > end: | |
| break | |
| # Calculate offset within this part | |
| offset_in_part = max(0, start - part_start) | |
| bytes_to_read = min( | |
| part["size"] - offset_in_part, | |
| content_length - bytes_sent | |
| ) | |
| logger.debug( | |
| f"Streaming part {part['part_number']}: " | |
| f"offset={offset_in_part}, bytes={bytes_to_read}" | |
| ) | |
| # Stream this part with retry logic | |
| retry_count = 0 | |
| max_retries = 3 | |
| while retry_count < max_retries: | |
| try: | |
| async for chunk in session_manager.stream_part( | |
| part["file_id"], | |
| offset=offset_in_part, | |
| limit=bytes_to_read | |
| ): | |
| chunk_size = len(chunk) | |
| # Ensure we don't send more than requested | |
| if bytes_sent + chunk_size > content_length: | |
| chunk = chunk[:content_length - bytes_sent] | |
| chunk_size = len(chunk) | |
| yield chunk | |
| bytes_sent += chunk_size | |
| if bytes_sent >= content_length: | |
| return | |
| break # Success | |
| except Exception as e: | |
| retry_count += 1 | |
| if retry_count >= max_retries: | |
| logger.error( | |
| f"Failed to stream part {part['part_number']}: {str(e)}" | |
| ) | |
| raise | |
| wait_time = 2 ** retry_count | |
| logger.warning( | |
| f"Retry {retry_count}/{max_retries} for part " | |
| f"{part['part_number']} after {wait_time}s" | |
| ) | |
| await asyncio.sleep(wait_time) | |
| current_position += part["size"] | |
| return StreamingResponse( | |
| stream_generator(), | |
| status_code=status_code, | |
| headers=headers, | |
| media_type="application/octet-stream" | |
| ) | |
| async def get_file_info(unique_id: str): | |
| """Get file metadata and information""" | |
| if not database: | |
| raise HTTPException(status_code=503, detail="Service not initialized") | |
| metadata = await database.get_file_metadata(unique_id) | |
| if not metadata: | |
| raise HTTPException(status_code=404, detail="File not found") | |
| return { | |
| "unique_id": metadata.unique_id, | |
| "filename": metadata.filename, | |
| "total_size": metadata.total_size, | |
| "total_size_gb": f"{metadata.total_size / (1024**3):.2f}", | |
| "parts": metadata.part_count, | |
| "uploaded_at": metadata.uploaded_at, | |
| "download_url": f"/dl/{unique_id}" | |
| } | |
| async def delete_file(unique_id: str): | |
| """Delete file and all its parts""" | |
| if not session_manager or not database: | |
| raise HTTPException(status_code=503, detail="Service not initialized") | |
| # Get metadata | |
| metadata = await database.get_file_metadata(unique_id) | |
| if not metadata: | |
| raise HTTPException(status_code=404, detail="File not found") | |
| # Delete from Telegram (best effort) | |
| deleted_parts = 0 | |
| for part in metadata.parts: | |
| try: | |
| await session_manager.delete_part(part["file_id"]) | |
| deleted_parts += 1 | |
| except Exception as e: | |
| logger.warning(f"Failed to delete part {part['part_number']}: {str(e)}") | |
| # Delete from database | |
| await database.delete_file_metadata(unique_id) | |
| logger.info(f"Deleted file: unique_id={unique_id}, parts={deleted_parts}") | |
| return { | |
| "success": True, | |
| "unique_id": unique_id, | |
| "deleted_parts": deleted_parts, | |
| "total_parts": metadata.part_count | |
| } | |
| if __name__ == "__main__": | |
| uvicorn.run( | |
| "main:app", | |
| host="0.0.0.0", | |
| port=8000, | |
| workers=1, # Single worker for shared session state | |
| log_level="info" | |
| ) | |