SentinelAI / app /pipeline /preprocessor.py
sajith-0701's picture
initial deployment for HF Spaces
71c1ad2
# app/pipeline/preprocessor.py
# Input preprocessing: normalization, frame extraction, cleaning
import re
from dataclasses import dataclass, field
from PIL import Image
from app.config import get_settings
from app.observability.logging import get_logger
logger = get_logger(__name__)
@dataclass
class ProcessedText:
"""Preprocessed text content."""
original: str
cleaned: str
word_count: int
char_count: int
language: str = "en" # placeholder for language detection
@dataclass
class ProcessedImage:
"""Preprocessed image content."""
image: Image.Image
width: int
height: int
format: str = "RGB"
@dataclass
class ProcessedVideo:
"""Preprocessed video — a list of extracted frames."""
frames: list[ProcessedImage] = field(default_factory=list)
frame_count: int = 0
duration_seconds: float = 0.0
metadata: dict = field(default_factory=dict)
class Preprocessor:
"""
Input preprocessing for all content types.
- Text: cleaning, normalization
- Image: resize, format conversion
- Video: frame extraction + per-frame preprocessing
"""
def __init__(self):
self.settings = get_settings()
def process_text(self, text: str) -> ProcessedText:
"""
Clean and normalize input text.
- Strip excessive whitespace
- Remove zero-width characters
- Normalize unicode
"""
import unicodedata
# Remove zero-width characters often used for obfuscation
cleaned = re.sub(r"[\u200b\u200c\u200d\ufeff]", "", text)
# Normalize unicode
cleaned = unicodedata.normalize("NFKC", cleaned)
# Collapse excessive whitespace
cleaned = re.sub(r"\s+", " ", cleaned).strip()
result = ProcessedText(
original=text,
cleaned=cleaned,
word_count=len(cleaned.split()),
char_count=len(cleaned),
)
logger.debug(
"text_preprocessed",
word_count=result.word_count,
char_count=result.char_count,
)
return result
def process_image(self, image_bytes: bytes) -> ProcessedImage:
"""
Load and preprocess image from bytes.
- Convert to RGB
- Record dimensions
"""
from app.utils.image_utils import load_image_from_bytes
image = load_image_from_bytes(image_bytes)
width, height = image.size
result = ProcessedImage(
image=image,
width=width,
height=height,
)
logger.debug("image_preprocessed", width=width, height=height)
return result
def process_video(self, video_bytes: bytes) -> ProcessedVideo:
"""
Extract key frames from video.
Uses OpenCV to sample frames at configured intervals.
"""
from app.utils.video_utils import extract_frames, get_video_metadata
metadata = get_video_metadata(video_bytes)
frames_pil = extract_frames(
video_bytes,
max_frames=self.settings.video_max_frames,
fps_sample=self.settings.video_fps_sample,
)
processed_frames = []
for frame in frames_pil:
w, h = frame.size
processed_frames.append(
ProcessedImage(image=frame, width=w, height=h)
)
result = ProcessedVideo(
frames=processed_frames,
frame_count=len(processed_frames),
duration_seconds=metadata.get("duration_seconds", 0.0),
metadata=metadata,
)
logger.debug(
"video_preprocessed",
frames_extracted=result.frame_count,
duration=result.duration_seconds,
)
return result