Spaces:
Sleeping
Sleeping
File size: 3,797 Bytes
71c1ad2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | # app/pipeline/preprocessor.py
# Input preprocessing: normalization, frame extraction, cleaning
import re
from dataclasses import dataclass, field
from PIL import Image
from app.config import get_settings
from app.observability.logging import get_logger
logger = get_logger(__name__)
@dataclass
class ProcessedText:
"""Preprocessed text content."""
original: str
cleaned: str
word_count: int
char_count: int
language: str = "en" # placeholder for language detection
@dataclass
class ProcessedImage:
"""Preprocessed image content."""
image: Image.Image
width: int
height: int
format: str = "RGB"
@dataclass
class ProcessedVideo:
"""Preprocessed video — a list of extracted frames."""
frames: list[ProcessedImage] = field(default_factory=list)
frame_count: int = 0
duration_seconds: float = 0.0
metadata: dict = field(default_factory=dict)
class Preprocessor:
"""
Input preprocessing for all content types.
- Text: cleaning, normalization
- Image: resize, format conversion
- Video: frame extraction + per-frame preprocessing
"""
def __init__(self):
self.settings = get_settings()
def process_text(self, text: str) -> ProcessedText:
"""
Clean and normalize input text.
- Strip excessive whitespace
- Remove zero-width characters
- Normalize unicode
"""
import unicodedata
# Remove zero-width characters often used for obfuscation
cleaned = re.sub(r"[\u200b\u200c\u200d\ufeff]", "", text)
# Normalize unicode
cleaned = unicodedata.normalize("NFKC", cleaned)
# Collapse excessive whitespace
cleaned = re.sub(r"\s+", " ", cleaned).strip()
result = ProcessedText(
original=text,
cleaned=cleaned,
word_count=len(cleaned.split()),
char_count=len(cleaned),
)
logger.debug(
"text_preprocessed",
word_count=result.word_count,
char_count=result.char_count,
)
return result
def process_image(self, image_bytes: bytes) -> ProcessedImage:
"""
Load and preprocess image from bytes.
- Convert to RGB
- Record dimensions
"""
from app.utils.image_utils import load_image_from_bytes
image = load_image_from_bytes(image_bytes)
width, height = image.size
result = ProcessedImage(
image=image,
width=width,
height=height,
)
logger.debug("image_preprocessed", width=width, height=height)
return result
def process_video(self, video_bytes: bytes) -> ProcessedVideo:
"""
Extract key frames from video.
Uses OpenCV to sample frames at configured intervals.
"""
from app.utils.video_utils import extract_frames, get_video_metadata
metadata = get_video_metadata(video_bytes)
frames_pil = extract_frames(
video_bytes,
max_frames=self.settings.video_max_frames,
fps_sample=self.settings.video_fps_sample,
)
processed_frames = []
for frame in frames_pil:
w, h = frame.size
processed_frames.append(
ProcessedImage(image=frame, width=w, height=h)
)
result = ProcessedVideo(
frames=processed_frames,
frame_count=len(processed_frames),
duration_seconds=metadata.get("duration_seconds", 0.0),
metadata=metadata,
)
logger.debug(
"video_preprocessed",
frames_extracted=result.frame_count,
duration=result.duration_seconds,
)
return result
|