Spaces:
Sleeping
Sleeping
| # app/pipeline/preprocessor.py | |
| # Input preprocessing: normalization, frame extraction, cleaning | |
| import re | |
| from dataclasses import dataclass, field | |
| from PIL import Image | |
| from app.config import get_settings | |
| from app.observability.logging import get_logger | |
| logger = get_logger(__name__) | |
| class ProcessedText: | |
| """Preprocessed text content.""" | |
| original: str | |
| cleaned: str | |
| word_count: int | |
| char_count: int | |
| language: str = "en" # placeholder for language detection | |
| class ProcessedImage: | |
| """Preprocessed image content.""" | |
| image: Image.Image | |
| width: int | |
| height: int | |
| format: str = "RGB" | |
| class ProcessedVideo: | |
| """Preprocessed video — a list of extracted frames.""" | |
| frames: list[ProcessedImage] = field(default_factory=list) | |
| frame_count: int = 0 | |
| duration_seconds: float = 0.0 | |
| metadata: dict = field(default_factory=dict) | |
| class Preprocessor: | |
| """ | |
| Input preprocessing for all content types. | |
| - Text: cleaning, normalization | |
| - Image: resize, format conversion | |
| - Video: frame extraction + per-frame preprocessing | |
| """ | |
| def __init__(self): | |
| self.settings = get_settings() | |
| def process_text(self, text: str) -> ProcessedText: | |
| """ | |
| Clean and normalize input text. | |
| - Strip excessive whitespace | |
| - Remove zero-width characters | |
| - Normalize unicode | |
| """ | |
| import unicodedata | |
| # Remove zero-width characters often used for obfuscation | |
| cleaned = re.sub(r"[\u200b\u200c\u200d\ufeff]", "", text) | |
| # Normalize unicode | |
| cleaned = unicodedata.normalize("NFKC", cleaned) | |
| # Collapse excessive whitespace | |
| cleaned = re.sub(r"\s+", " ", cleaned).strip() | |
| result = ProcessedText( | |
| original=text, | |
| cleaned=cleaned, | |
| word_count=len(cleaned.split()), | |
| char_count=len(cleaned), | |
| ) | |
| logger.debug( | |
| "text_preprocessed", | |
| word_count=result.word_count, | |
| char_count=result.char_count, | |
| ) | |
| return result | |
| def process_image(self, image_bytes: bytes) -> ProcessedImage: | |
| """ | |
| Load and preprocess image from bytes. | |
| - Convert to RGB | |
| - Record dimensions | |
| """ | |
| from app.utils.image_utils import load_image_from_bytes | |
| image = load_image_from_bytes(image_bytes) | |
| width, height = image.size | |
| result = ProcessedImage( | |
| image=image, | |
| width=width, | |
| height=height, | |
| ) | |
| logger.debug("image_preprocessed", width=width, height=height) | |
| return result | |
| def process_video(self, video_bytes: bytes) -> ProcessedVideo: | |
| """ | |
| Extract key frames from video. | |
| Uses OpenCV to sample frames at configured intervals. | |
| """ | |
| from app.utils.video_utils import extract_frames, get_video_metadata | |
| metadata = get_video_metadata(video_bytes) | |
| frames_pil = extract_frames( | |
| video_bytes, | |
| max_frames=self.settings.video_max_frames, | |
| fps_sample=self.settings.video_fps_sample, | |
| ) | |
| processed_frames = [] | |
| for frame in frames_pil: | |
| w, h = frame.size | |
| processed_frames.append( | |
| ProcessedImage(image=frame, width=w, height=h) | |
| ) | |
| result = ProcessedVideo( | |
| frames=processed_frames, | |
| frame_count=len(processed_frames), | |
| duration_seconds=metadata.get("duration_seconds", 0.0), | |
| metadata=metadata, | |
| ) | |
| logger.debug( | |
| "video_preprocessed", | |
| frames_extracted=result.frame_count, | |
| duration=result.duration_seconds, | |
| ) | |
| return result | |