| import os |
| import json |
| import time |
| import asyncio |
| import aiohttp |
| from typing import Dict, List, Set, Optional |
| from urllib.parse import quote, urljoin |
| from datetime import datetime |
| from pathlib import Path |
| from datasets import Dataset, DatasetDict |
| import huggingface_hub |
|
|
| from fastapi import FastAPI, BackgroundTasks, HTTPException, status |
| from fastapi.responses import JSONResponse |
| from pydantic import BaseModel, Field |
| import uvicorn |
|
|
| |
| CAPTIONS_DIR = Path("captions_data") |
| CAPTIONS_DIR.mkdir(exist_ok=True) |
|
|
| |
| HF_TOKEN = os.getenv("HF_TOKEN") |
| HF_DATASET_ID = os.getenv("HF_DATASET_ID", "fred808/helium") |
|
|
| if not HF_TOKEN: |
| raise ValueError("HF_TOKEN environment variable is required") |
|
|
| def get_caption_file_path(course: str) -> Path: |
| """Get the path to the JSON file for storing course captions""" |
| safe_name = quote(course, safe='') |
| return CAPTIONS_DIR / f"{safe_name}_captions.json" |
|
|
| def save_captions_to_file(course: str, captions: List[Dict]) -> None: |
| """Save captions to a JSON file""" |
| try: |
| file_path = get_caption_file_path(course) |
| with open(file_path, 'w', encoding='utf-8') as f: |
| json.dump(captions, f, indent=2, ensure_ascii=False) |
| print(f"β Saved {len(captions)} captions for {course}") |
| except Exception as e: |
| print(f"Error saving captions for {course}: {e}") |
|
|
| def load_captions_from_file(course: str) -> List[Dict]: |
| """Load existing captions from JSON file""" |
| try: |
| file_path = get_caption_file_path(course) |
| if file_path.exists(): |
| with open(file_path, 'r', encoding='utf-8') as f: |
| captions = json.load(f) |
| print(f"β Loaded {len(captions)} existing captions for {course}") |
| return captions |
| except Exception as e: |
| print(f"Error loading captions for {course}: {e}") |
| return [] |
|
|
| |
| SOURCE_SERVER = "https://fred808-vssee.hf.space" |
| CAPTION_SERVERS = [ |
| "https://fred808-pil-4-1.hf.space/analyze", |
| "https://fred808-pil-4-2.hf.space/analyze", |
| "https://fred808-pil-4-3.hf.space/analyze", |
| "https://fred1012-fred1012-gw0j2h.hf.space/analyze", |
| "https://fred1012-fred1012-wqs6c2.hf.space/analyze", |
| "https://fred1012-fred1012-oncray.hf.space/analyze", |
| "https://fred1012-fred1012-4goge7.hf.space/analyze", |
| "https://fred1012-fred1012-z0eh7m.hf.space/analyze", |
| "https://fred1012-fred1012-u95rte.hf.space/analyze", |
| "https://fred1012-fred1012-igje22.hf.space/analyze", |
| "https://fred1012-fred1012-ibkuf8.hf.space/analyze", |
| "https://fred1012-fred1012-nwqthy.hf.space/analyze", |
| "https://fred1012-fred1012-4ldqj4.hf.space/analyze", |
| "https://fred1012-fred1012-pivlzg.hf.space/analyze", |
| "https://fred1012-fred1012-ptlc5u.hf.space/analyze", |
| "https://fred1012-fred1012-u7lh57.hf.space/analyze", |
| "https://fred1012-fred1012-q8djv1.hf.space/analyze", |
| "https://fredalone-fredalone-ozugrp.hf.space/analyze", |
| "https://fredalone-fredalone-9brxj2.hf.space/analyze", |
| "https://fredalone-fredalone-p8vq9a.hf.space/analyze", |
| "https://fredalone-fredalone-vbli2y.hf.space/analyze", |
| "https://fredalone-fredalone-uggger.hf.space/analyze", |
| "https://fredalone-fredalone-nmi7e8.hf.space/analyze", |
| "https://fredalone-fredalone-d1f26d.hf.space/analyze", |
| "https://fredalone-fredalone-461jp2.hf.space/analyze", |
| "https://fredalone-fredalone-3enfg4.hf.space/analyze", |
| "https://fredalone-fredalone-dqdbpv.hf.space/analyze", |
| "https://fredalone-fredalone-ivtjua.hf.space/analyze", |
| "https://fredalone-fredalone-6bezt2.hf.space/analyze", |
| "https://fredalone-fredalone-e0wfnk.hf.space/analyze", |
| "https://fredalone-fredalone-zu2t7j.hf.space/analyze", |
| "https://fredalone-fredalone-dqtv1o.hf.space/analyze", |
| "https://fredalone-fredalone-wclyog.hf.space/analyze", |
| "https://fredalone-fredalone-t27vig.hf.space/analyze", |
| "https://fredalone-fredalone-gahbxh.hf.space/analyze", |
| "https://fredalone-fredalone-kw2po4.hf.space/analyze", |
| "https://fredalone-fredalone-8h285h.hf.space/analyze" |
| ] |
| MODEL_TYPE = "Florence-2-large" |
|
|
| |
| class CourseInfo(BaseModel): |
| course_folder: str |
|
|
| class ImageInfo(BaseModel): |
| filename: str |
|
|
| class CaptionRequest(BaseModel): |
| image_url: str |
| model_choice: str = MODEL_TYPE |
|
|
| class CaptionResponse(BaseModel): |
| success: bool |
| caption: Optional[str] = None |
| error: Optional[str] = None |
|
|
| class ServerStatus(BaseModel): |
| url: str |
| model: str |
| busy: bool |
| total_processed: int |
| total_time: float |
| fps: float |
|
|
| class ProcessingStatus(BaseModel): |
| course: str |
| total_images: int |
| processed_images: int |
| progress_percent: float |
| status: str |
|
|
| class StartProcessingRequest(BaseModel): |
| courses: Optional[List[str]] = None |
| continuous: bool = True |
|
|
| |
| app = FastAPI( |
| title="Caption Coordinator API", |
| description="Distributed caption processing coordinator", |
| version="1.0.0" |
| ) |
|
|
| |
| processed_images: Dict[str, Set[str]] = {} |
| course_captions: Dict[str, List[Dict]] = {} |
| failed_images: Dict[str, Set[str]] = {} |
| servers = [] |
| is_processing = False |
| current_processing_task = None |
| auto_start_processing = True |
|
|
| class CaptionServer: |
| def __init__(self, url): |
| self.url = url |
| self.busy = False |
| self.model = "unknown" |
| self.total_processed = 0 |
| self.total_time = 0 |
|
|
| @property |
| def fps(self): |
| return self.total_processed / self.total_time if self.total_time > 0 else 0 |
|
|
| |
| def initialize_servers(): |
| global servers |
| servers = [CaptionServer(url) for url in CAPTION_SERVERS] |
|
|
| |
| @app.get("/") |
| async def root(): |
| return { |
| "message": "Caption Coordinator API", |
| "status": "running", |
| "auto_processing": auto_start_processing, |
| "is_processing": is_processing |
| } |
|
|
| @app.get("/health") |
| async def health(): |
| return { |
| "status": "healthy", |
| "servers_available": len([s for s in servers if not s.busy]), |
| "total_servers": len(servers), |
| "is_processing": is_processing, |
| "auto_processing": auto_start_processing |
| } |
|
|
| @app.get("/courses") |
| async def get_courses(): |
| """Fetch available courses from source server""" |
| try: |
| async with aiohttp.ClientSession() as session: |
| async with session.get(f"{SOURCE_SERVER}/courses") as resp: |
| data = await resp.json() |
| if isinstance(data, dict) and 'courses' in data: |
| return [c['course_folder'] for c in data['courses'] if isinstance(c, dict)] |
| return [] |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error fetching courses: {e}") |
|
|
| @app.get("/courses/{course}/images") |
| async def get_course_images(course: str): |
| """Fetch images list for a course""" |
| try: |
| course_frames = f"{course}_frames" if not course.endswith("_frames") else course |
| url = f"{SOURCE_SERVER}/images/{quote(course_frames)}" |
| async with aiohttp.ClientSession() as session: |
| async with session.get(url) as resp: |
| data = await resp.json() |
| if isinstance(data, dict) and 'images' in data: |
| return data['images'] |
| return [] |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error fetching images: {e}") |
|
|
| @app.get("/servers/status") |
| async def get_servers_status(): |
| """Get status of all caption servers""" |
| server_statuses = [] |
| for server in servers: |
| server_statuses.append(ServerStatus( |
| url=server.url, |
| model=server.model, |
| busy=server.busy, |
| total_processed=server.total_processed, |
| total_time=server.total_time, |
| fps=server.fps |
| )) |
| return server_statuses |
|
|
| @app.get("/processing/status") |
| async def get_processing_status(): |
| """Get current processing status""" |
| status_info = {} |
| for course in processed_images: |
| total = len(processed_images[course]) |
| processed = len(course_captions.get(course, [])) |
| failed = len(failed_images.get(course, set())) |
| status_info[course] = { |
| "course": course, |
| "total_images": total, |
| "processed_images": processed, |
| "failed_images": failed, |
| "progress_percent": (processed / total * 100) if total > 0 else 0, |
| "status": "completed" if processed + failed >= total else "processing" |
| } |
| return status_info |
|
|
| @app.post("/processing/start") |
| async def start_processing(request: StartProcessingRequest = StartProcessingRequest()): |
| """Start caption processing""" |
| global is_processing, current_processing_task |
| |
| if is_processing: |
| raise HTTPException(status_code=400, detail="Processing is already running") |
| |
| is_processing = True |
| current_processing_task = asyncio.create_task( |
| processing_loop(request.courses, request.continuous) |
| ) |
| |
| return { |
| "message": "Processing started", |
| "continuous": request.continuous, |
| "specific_courses": request.courses |
| } |
|
|
| @app.post("/processing/stop") |
| async def stop_processing(): |
| """Stop caption processing""" |
| global is_processing, current_processing_task |
| |
| if not is_processing: |
| raise HTTPException(status_code=400, detail="Processing is not running") |
| |
| is_processing = False |
| if current_processing_task: |
| current_processing_task.cancel() |
| try: |
| await current_processing_task |
| except asyncio.CancelledError: |
| pass |
| current_processing_task = None |
| |
| return {"message": "Processing stopped"} |
|
|
| @app.get("/captions/{course}") |
| async def get_captions(course: str): |
| """Get captions for a specific course""" |
| captions = load_captions_from_file(course) |
| return { |
| "course": course, |
| "total_captions": len(captions), |
| "captions": captions |
| } |
|
|
| @app.delete("/captions/{course}") |
| async def delete_captions(course: str): |
| """Delete captions for a specific course""" |
| try: |
| file_path = get_caption_file_path(course) |
| if file_path.exists(): |
| file_path.unlink() |
| if course in processed_images: |
| del processed_images[course] |
| if course in course_captions: |
| del course_captions[course] |
| if course in failed_images: |
| del failed_images[course] |
| return {"message": f"Captions for {course} deleted"} |
| else: |
| raise HTTPException(status_code=404, detail=f"No captions found for {course}") |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error deleting captions: {e}") |
|
|
| |
| async def fetch_courses() -> List[str]: |
| """Fetch available courses from source server""" |
| async with aiohttp.ClientSession() as session: |
| async with session.get(f"{SOURCE_SERVER}/courses") as resp: |
| data = await resp.json() |
| if isinstance(data, dict) and 'courses' in data: |
| return [c['course_folder'] for c in data['courses'] if isinstance(c, dict)] |
| return [] |
|
|
| async def fetch_course_images(course: str) -> List[Dict]: |
| """Fetch images list for a course""" |
| course_frames = f"{course}_frames" if not course.endswith("_frames") else course |
| url = f"{SOURCE_SERVER}/images/{quote(course_frames)}" |
| async with aiohttp.ClientSession() as session: |
| async with session.get(url) as resp: |
| data = await resp.json() |
| if isinstance(data, dict) and 'images' in data: |
| return data['images'] |
| return [] |
|
|
| async def get_caption(server: str, image_url: str) -> Dict: |
| """Get caption from a specific server""" |
| params = { |
| 'image_url': image_url, |
| 'model_choice': MODEL_TYPE |
| } |
| try: |
| async with aiohttp.ClientSession() as session: |
| async with session.get(server, params=params, timeout=30) as resp: |
| return await resp.json() |
| except Exception as e: |
| print(f"Error from {server}: {e}") |
| return None |
|
|
| async def get_model_info(): |
| """Get model information from caption servers""" |
| model_info = [] |
| async with aiohttp.ClientSession() as session: |
| for server in CAPTION_SERVERS: |
| try: |
| health_url = server.rsplit('/analyze', 1)[0] + '/health' |
| async with session.get(health_url) as resp: |
| info = await resp.json() |
| model_info.append({ |
| 'url': server, |
| 'model': info.get('model_choice', 'unknown') |
| }) |
| except Exception as e: |
| print(f"Couldn't get model info from {server}: {e}") |
| return model_info |
|
|
| async def process_image(server: CaptionServer, course: str, image: Dict) -> Dict: |
| """Process single image through one caption server with better error handling""" |
| if server.busy: |
| return None |
| |
| server.busy = True |
| start_time = time.time() |
| |
| try: |
| |
| course_frames = f"{course}_frames" if not course.endswith("_frames") else course |
| image_url = urljoin(SOURCE_SERVER, f"/images/{quote(course_frames)}/{quote(image['filename'])}") |
| result = await get_caption(server.url, image_url) |
| |
| processing_time = time.time() - start_time |
| server.total_time += processing_time |
| |
| if result and result.get('success') and result.get('caption'): |
| server.total_processed += 1 |
| metadata = { |
| "image": image['filename'], |
| "caption": result['caption'], |
| "server": server.url, |
| "processing_time": processing_time, |
| "timestamp": datetime.now().isoformat() |
| } |
| print(f"Server {server.url} processed {image['filename']} in {processing_time:.2f}s ({server.fps:.2f} fps)") |
| return metadata |
| else: |
| |
| error_msg = result.get('error', 'Unknown error') if result else 'No response' |
| print(f"Server {server.url} failed for {image['filename']}: {error_msg}") |
| return None |
| |
| except asyncio.TimeoutError: |
| print(f"Server {server.url} timeout for {image['filename']}") |
| return None |
| except Exception as e: |
| print(f"Error processing {image['filename']} on {server.url}: {e}") |
| return None |
| |
| finally: |
| server.busy = False |
|
|
| async def upload_to_huggingface(course: str, metadata_list: List[Dict]): |
| """Upload course captions to Hugging Face dataset""" |
| try: |
| print(f"π€ Uploading {len(metadata_list)} captions for {course} to Hugging Face...") |
| |
| |
| dataset_data = { |
| "course": [], |
| "image_filename": [], |
| "caption": [], |
| "processing_server": [], |
| "processing_time": [], |
| "timestamp": [] |
| } |
| |
| for metadata in metadata_list: |
| dataset_data["course"].append(course) |
| dataset_data["image_filename"].append(metadata["image"]) |
| dataset_data["caption"].append(metadata["caption"]) |
| dataset_data["processing_server"].append(metadata["server"]) |
| dataset_data["processing_time"].append(metadata["processing_time"]) |
| dataset_data["timestamp"].append(metadata["timestamp"]) |
| |
| |
| dataset = Dataset.from_dict(dataset_data) |
| |
| |
| huggingface_hub.login(token=HF_TOKEN) |
| |
| |
| dataset.push_to_hub( |
| HF_DATASET_ID, |
| config_name=course.replace("/", "_").replace(" ", "_"), |
| split="train", |
| commit_message=f"Add captions for course {course} - {len(metadata_list)} images" |
| ) |
| |
| print(f"β
Successfully uploaded {len(metadata_list)} captions for {course} to {HF_DATASET_ID}") |
| return True |
| |
| except Exception as e: |
| print(f"β Error uploading to Hugging Face: {e}") |
| return False |
|
|
| async def process_course(course: str, servers: List[CaptionServer]): |
| """Process all images in a course using available servers with proper retry logic""" |
| |
| if course not in processed_images: |
| processed_images[course] = set() |
| if course not in course_captions: |
| course_captions[course] = load_captions_from_file(course) |
| |
| for cap in course_captions[course]: |
| processed_images[course].add(cap['image']) |
| if course not in failed_images: |
| failed_images[course] = set() |
| |
| |
| images = await fetch_course_images(course) |
| if not images: |
| print(f"No images found for course {course}") |
| return |
| |
| print(f"\nProcessing {len(images)} images for course {course}") |
| |
| |
| pending_images = {} |
| for img in images: |
| filename = img['filename'] |
| if filename not in processed_images[course] and filename not in failed_images[course]: |
| pending_images[filename] = {'image': img, 'retries': 0, 'max_retries': 5} |
| |
| if not pending_images: |
| print(f"All images already processed or failed for course {course}") |
| print(f"- Processed: {len(processed_images[course])}, Failed: {len(failed_images[course])}") |
| |
| |
| if len(processed_images[course]) + len(failed_images[course]) >= len(images): |
| if course_captions[course]: |
| print(f"π€ Course {course} completed, uploading to Hugging Face...") |
| await upload_to_huggingface(course, course_captions[course]) |
| return |
| |
| print(f"Images to process: {len(pending_images)} (already processed: {len(processed_images[course])}, failed: {len(failed_images[course])})") |
| |
| batch_size = len([s for s in servers if not s.busy]) |
| processed_in_this_run = 0 |
| |
| while pending_images and is_processing: |
| |
| tasks = [] |
| assigned_images = [] |
| |
| for server in servers: |
| if not server.busy and pending_images: |
| |
| filename, img_data = next(iter(pending_images.items())) |
| img = img_data['image'] |
| |
| |
| tasks.append(process_image(server, course, img)) |
| assigned_images.append((filename, img, img_data['retries'])) |
| |
| del pending_images[filename] |
| |
| if not tasks: |
| |
| await asyncio.sleep(0.1) |
| continue |
| |
| |
| results = await asyncio.gather(*tasks) |
| |
| |
| has_new_results = False |
| for (filename, img, current_retries), result in zip(assigned_images, results): |
| if result: |
| |
| processed_images[course].add(filename) |
| course_captions[course].append(result) |
| has_new_results = True |
| processed_in_this_run += 1 |
| print(f"β Successfully processed {filename}") |
| else: |
| |
| if current_retries < 5: |
| |
| pending_images[filename] = { |
| 'image': img, |
| 'retries': current_retries + 1, |
| 'max_retries': 5 |
| } |
| print(f"β» Retry {current_retries + 1}/5 for {filename}") |
| else: |
| |
| failed_images[course].add(filename) |
| print(f"β Failed to process {filename} after 5 retries") |
| |
| |
| if has_new_results: |
| save_captions_to_file(course, course_captions[course]) |
| |
| |
| total = len(images) |
| done = len(processed_images[course]) |
| failed_count = len(failed_images[course]) |
| pending_count = len(pending_images) |
| progress_percent = (done / total * 100) if total > 0 else 0 |
| |
| print(f"\rProgress: {done}/{total} ({progress_percent:.1f}%) - {pending_count} pending, {failed_count} failed, {processed_in_this_run} new", end="", flush=True) |
| |
| |
| await asyncio.sleep(0.5) |
| |
| |
| total = len(images) |
| done = len(processed_images[course]) |
| failed_count = len(failed_images[course]) |
| |
| if done + failed_count >= total: |
| if failed_count > 0: |
| print(f"\nβ Course {course} completed with {failed_count} failed images") |
| else: |
| print(f"\nβ Course {course} fully completed") |
| |
| |
| if course_captions[course]: |
| print(f"π€ Uploading {len(course_captions[course])} captions to Hugging Face...") |
| success = await upload_to_huggingface(course, course_captions[course]) |
| if success: |
| print(f"β
Successfully uploaded {course} to Hugging Face") |
| else: |
| print(f"β Failed to upload {course} to Hugging Face") |
| else: |
| print(f"\nβ Course {course} partially completed: {done}/{total} processed, {failed_count} failed") |
|
|
| async def processing_loop(specific_courses: Optional[List[str]] = None, continuous: bool = True): |
| """Main processing loop with proper error handling""" |
| global is_processing |
| |
| |
| model_info = await get_model_info() |
| print("\nCaption Servers:") |
| available_servers = [] |
| for info, server in zip(model_info, servers): |
| server.model = info['model'] |
| if MODEL_TYPE in info.get('model', ''): |
| available_servers.append(server) |
| print(f"β {server.url} confirmed {MODEL_TYPE}") |
| else: |
| print(f"β {server.url} using {server.model} - skipping (requires {MODEL_TYPE})") |
| |
| if not available_servers: |
| print(f"\nError: No servers with {MODEL_TYPE} available!") |
| is_processing = False |
| return |
| |
| |
| processing_servers = available_servers |
| print(f"\nUsing {len(processing_servers)} servers with {MODEL_TYPE}") |
| |
| |
| existing_captions = list(CAPTIONS_DIR.glob("*_captions.json")) |
| if existing_captions: |
| print("\nFound existing caption files:") |
| for cap_file in existing_captions: |
| course = cap_file.stem.replace("_captions", "") |
| try: |
| with open(cap_file, 'r', encoding='utf-8') as f: |
| captions = json.load(f) |
| print(f"- {course}: {len(captions)} captions") |
| except Exception as e: |
| print(f"- Error reading {cap_file.name}: {e}") |
| print() |
| |
| start_time = time.time() |
| iteration = 0 |
| |
| while is_processing: |
| try: |
| iteration += 1 |
| print(f"\n{'='*50}") |
| print(f"Processing Iteration {iteration}") |
| print(f"{'='*50}") |
| |
| |
| if specific_courses: |
| courses = specific_courses |
| print(f"Processing specific courses: {courses}") |
| else: |
| courses = await fetch_courses() |
| print(f"Found {len(courses)} courses") |
| |
| if not courses: |
| print("No courses found, waiting...") |
| if not continuous: |
| break |
| await asyncio.sleep(10) |
| continue |
| |
| |
| for course in courses: |
| if not is_processing: |
| break |
| |
| print(f"\n--- Processing course: {course} ---") |
| await process_course(course, processing_servers) |
| |
| |
| print("\nServer Stats:") |
| total_processed = sum(s.total_processed for s in processing_servers) |
| elapsed = time.time() - start_time |
| if elapsed > 0: |
| print(f"Total images processed: {total_processed}") |
| print(f"Overall speed: {total_processed/elapsed:.2f} fps") |
| for s in processing_servers: |
| print(f"- {s.url}: {s.total_processed} images, {s.fps:.2f} fps") |
| print() |
| |
| if not continuous: |
| print("One-time processing completed") |
| break |
| |
| |
| print("Waiting for new courses...") |
| await asyncio.sleep(5) |
| |
| except asyncio.CancelledError: |
| print("Processing cancelled") |
| break |
| except Exception as e: |
| print(f"Error in processing loop: {str(e)}") |
| import traceback |
| traceback.print_exc() |
| await asyncio.sleep(10) |
| |
| is_processing = False |
| print("Processing loop stopped") |
|
|
| |
| @app.on_event("startup") |
| async def startup_event(): |
| """Initialize servers and start processing on startup""" |
| initialize_servers() |
| print("Caption Coordinator API started") |
| print(f"Source server: {SOURCE_SERVER}") |
| print(f"Caption servers: {len(CAPTION_SERVERS)}") |
| print(f"Hugging Face dataset: {HF_DATASET_ID}") |
| print(f"HF Token: {'β
Set' if HF_TOKEN else 'β Missing'}") |
| |
| |
| if auto_start_processing: |
| print("Auto-starting processing loop...") |
| global is_processing, current_processing_task |
| is_processing = True |
| current_processing_task = asyncio.create_task(processing_loop()) |
|
|
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=8000, reload=True) |