| """ |
| Verdict Logic Engine |
| Determines green/red verdict based on detected classes |
| """ |
|
|
| import json |
| from typing import List, Dict, Optional |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class VerdictEngine: |
| """ |
| Engine to determine verdict (green/red) based on detected objects |
| """ |
| |
| def __init__( |
| self, |
| green_classes: Optional[List[str]] = None, |
| red_classes: Optional[List[str]] = None, |
| rules_path: Optional[str] = None |
| ): |
| """ |
| Initialize verdict engine |
| |
| Args: |
| green_classes: List of class names that trigger green verdict |
| red_classes: List of class names that trigger red verdict |
| rules_path: Optional path to JSON file with custom rules |
| """ |
| self.green_classes = green_classes or [] |
| self.red_classes = red_classes or [] |
| self.custom_rules = {} |
| |
| |
| if rules_path: |
| try: |
| self.load_rules(rules_path) |
| except FileNotFoundError: |
| logger.warning(f"Rules file not found: {rules_path}") |
| |
| |
| if not self.green_classes and not self.red_classes and not self.custom_rules: |
| logger.info("No verdict rules specified. Using default configuration.") |
| self._setup_default_rules() |
| |
| def _setup_default_rules(self): |
| """ |
| Setup default verdict rules |
| Customize this based on your specific use case |
| """ |
| |
| |
| |
| |
| |
| self.custom_rules = { |
| "all_classes_required": False, |
| "min_detections_for_green": 1, |
| "min_confidence_for_green": 0.5, |
| "priority": "red" |
| } |
| |
| logger.info("Default verdict rules configured") |
| |
| def load_rules(self, rules_path: str): |
| """Load verdict rules from JSON file""" |
| with open(rules_path, 'r') as f: |
| rules = json.load(f) |
| |
| self.green_classes = rules.get("green_classes", self.green_classes) |
| self.red_classes = rules.get("red_classes", self.red_classes) |
| self.custom_rules = rules.get("custom_rules", self.custom_rules) |
| |
| logger.info(f"Loaded rules from {rules_path}") |
| logger.info(f"Green classes: {self.green_classes}") |
| logger.info(f"Red classes: {self.red_classes}") |
| |
| def get_verdict(self, detections: List[Dict]) -> Dict: |
| """ |
| Determine verdict based on detections |
| |
| New Logic: |
| - GREEN: All 3 labels present (jio jersey, jio logo, person) AND exactly 1 person |
| - RED: Any condition not met |
| |
| Args: |
| detections: List of detection dictionaries from YOLO |
| |
| Returns: |
| Dictionary with verdict, confidence, and message |
| """ |
| if len(detections) == 0: |
| return { |
| "verdict": "red", |
| "confidence": 1.0, |
| "message": "Image rejected. No objects detected in the image. Please ensure the image contains: Jio jersey, Jio logo, and exactly one person." |
| } |
| |
| |
| detected_classes = [det["class_name"] for det in detections] |
| confidences = [det["confidence"] for det in detections] |
| avg_confidence = sum(confidences) / len(confidences) |
| |
| |
| unique_classes = set(detected_classes) |
| |
| |
| person_count = detected_classes.count("person") |
| |
| |
| has_jio_jersey = "jio jersey" in unique_classes |
| has_jio_logo = "jio logo" in unique_classes |
| has_person = "person" in unique_classes |
| has_exactly_one_person = person_count == 1 |
| |
| |
| all_labels_present = has_jio_jersey and has_jio_logo and has_person |
| all_conditions_met = all_labels_present and has_exactly_one_person |
| |
| logger.info(f"Detected classes: {detected_classes}") |
| logger.info(f"Unique classes: {unique_classes}") |
| logger.info(f"Person count: {person_count}") |
| logger.info(f"All conditions met: {all_conditions_met}") |
| |
| |
| if all_conditions_met: |
| message = "Image approved! All requirements met: Jio jersey found, Jio logo found, and exactly one person detected." |
| verdict = "green" |
| else: |
| |
| issues = [] |
| |
| if not has_jio_jersey: |
| issues.append("Jio jersey not detected") |
| if not has_jio_logo: |
| issues.append("Jio logo not detected") |
| if not has_person: |
| issues.append("No person detected") |
| elif person_count == 0: |
| issues.append("No person detected") |
| elif person_count > 1: |
| issues.append(f"Multiple people detected ({person_count} found, need exactly 1)") |
| |
| |
| if issues: |
| message = f"Image rejected. {'. '.join(issues)}. Requirements: Jio jersey, Jio logo, and exactly one person must be present." |
| else: |
| message = "Image rejected. Please ensure all requirements are met: Jio jersey, Jio logo, and exactly one person." |
| |
| verdict = "red" |
| |
| return { |
| "verdict": verdict, |
| "confidence": avg_confidence, |
| "message": message |
| } |
| |
| def _apply_verdict_logic( |
| self, |
| detected_classes: List[str], |
| unique_classes: set, |
| num_unique_classes: int, |
| max_confidence: float, |
| avg_confidence: float, |
| num_detections: int |
| ) -> tuple: |
| """ |
| Apply verdict logic based on detected classes |
| |
| Returns: |
| Tuple of (verdict, message) |
| """ |
| |
| if self.green_classes or self.red_classes: |
| return self._apply_class_based_logic( |
| detected_classes, unique_classes, avg_confidence |
| ) |
| |
| |
| if self.custom_rules: |
| return self._apply_custom_rules( |
| num_unique_classes, num_detections, avg_confidence |
| ) |
| |
| |
| return self._apply_default_logic( |
| num_unique_classes, num_detections, avg_confidence |
| ) |
| |
| def _apply_class_based_logic( |
| self, |
| detected_classes: List[str], |
| unique_classes: set, |
| avg_confidence: float |
| ) -> tuple: |
| """Apply logic based on specific green/red class lists""" |
| |
| |
| priority = self.custom_rules.get("priority", "red") |
| |
| if priority == "red" and self.red_classes: |
| red_detected = any(cls in self.red_classes for cls in detected_classes) |
| if red_detected: |
| return ( |
| "red", |
| f"Detected prohibited object(s): {', '.join(unique_classes & set(self.red_classes))}" |
| ) |
| |
| |
| if self.green_classes: |
| green_detected = any(cls in self.green_classes for cls in detected_classes) |
| if green_detected: |
| return ( |
| "green", |
| f"Detected approved object(s): {', '.join(unique_classes & set(self.green_classes))}" |
| ) |
| |
| |
| if priority == "green" and self.red_classes: |
| red_detected = any(cls in self.red_classes for cls in detected_classes) |
| if red_detected: |
| return ( |
| "red", |
| f"Detected prohibited object(s): {', '.join(unique_classes & set(self.red_classes))}" |
| ) |
| |
| |
| return ( |
| "red", |
| f"Detected objects do not match approved criteria: {', '.join(unique_classes)}" |
| ) |
| |
| def _apply_custom_rules( |
| self, |
| num_unique_classes: int, |
| num_detections: int, |
| avg_confidence: float |
| ) -> tuple: |
| """Apply custom rules from configuration""" |
| |
| all_classes_required = self.custom_rules.get("all_classes_required", False) |
| min_detections = self.custom_rules.get("min_detections_for_green", 1) |
| min_confidence = self.custom_rules.get("min_confidence_for_green", 0.5) |
| |
| |
| if all_classes_required: |
| if num_unique_classes == 3: |
| if avg_confidence >= min_confidence: |
| return ( |
| "green", |
| f"All required classes detected with confidence {avg_confidence:.2%}" |
| ) |
| else: |
| return ( |
| "red", |
| f"All classes detected but confidence too low: {avg_confidence:.2%}" |
| ) |
| else: |
| return ( |
| "red", |
| f"Only {num_unique_classes}/3 required classes detected" |
| ) |
| |
| |
| if num_detections >= min_detections and avg_confidence >= min_confidence: |
| return ( |
| "green", |
| f"Detected {num_detections} object(s) with confidence {avg_confidence:.2%}" |
| ) |
| |
| return ( |
| "red", |
| f"Insufficient detections ({num_detections}) or confidence ({avg_confidence:.2%})" |
| ) |
| |
| def _apply_default_logic( |
| self, |
| num_unique_classes: int, |
| num_detections: int, |
| avg_confidence: float |
| ) -> tuple: |
| """ |
| Apply default verdict logic |
| |
| Default strategy: |
| - GREEN: If at least 1 class detected with good confidence |
| - RED: If no detections or low confidence |
| |
| Customize this method based on your specific requirements |
| """ |
| |
| |
| if num_detections >= 1 and avg_confidence >= 0.5: |
| return ( |
| "green", |
| f"Detected {num_detections} object(s) across {num_unique_classes} class(es)" |
| ) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| return ( |
| "red", |
| f"Criteria not met: {num_detections} detection(s), {num_unique_classes} class(es)" |
| ) |
| |
| def set_green_classes(self, classes: List[str]): |
| """Set classes that trigger green verdict""" |
| self.green_classes = classes |
| logger.info(f"Green classes set to: {classes}") |
| |
| def set_red_classes(self, classes: List[str]): |
| """Set classes that trigger red verdict""" |
| self.red_classes = classes |
| logger.info(f"Red classes set to: {classes}") |
| |
| def update_rules(self, rules: Dict): |
| """Update custom rules""" |
| self.custom_rules.update(rules) |
| logger.info(f"Custom rules updated: {self.custom_rules}") |
|
|