"""Task definitions and graders for the Core Identity environment.""" from typing import List, Dict, Any from dataclasses import dataclass import re from datetime import datetime, timedelta @dataclass class CoreIdentityTask: task_id: str name: str description: str task_type: str difficulty: str document: Dict[str, Any] | None credentials: Dict[str, Any] | None profile: Dict[str, Any] | None expected_verification: bool required_checks: List[str] challenge_data: Dict[str, Any] max_steps: int @dataclass class GradingResult: accuracy: float completeness: float score: float feedback: str class CoreIdentityTaskEvaluator: """Evaluate responses for Core Identity tasks.""" def __init__(self, task: CoreIdentityTask): self.task = task def grade(self, response: Dict[str, Any]) -> GradingResult: verification = response.get("verification", {}) verified = verification.get("verified", False) issues = verification.get("issues", []) checks_performed = verification.get("checks_performed", {}) expected = self.task.expected_verification accuracy = 1.0 if verified == expected else 0.0 performed_count = sum(1 for v in checks_performed.values() if v) required_count = len(self.task.required_checks) completeness = performed_count / required_count if required_count > 0 else 0.5 if accuracy == 1.0: base_score = 0.7 completeness_bonus = completeness * 0.3 else: base_score = 0.0 completeness_bonus = 0.0 score = base_score + completeness_bonus feedback = f"Verification: {verified}, Expected: {expected}, Checks: {performed_count}/{required_count}" return GradingResult( accuracy=accuracy, completeness=completeness, score=score, feedback=feedback, ) def create_document_verification_task() -> CoreIdentityTask: return CoreIdentityTask( task_id="doc_verify_passport", name="Passport Verification", description="Verify the authenticity of a passport document", task_type="document_verification", difficulty="medium", document={ "document_type": "passport", "document_number": "P12345678", "full_name": "John Smith", "date_of_birth": "1990-01-15", "expiry_date": "2030-01-15", "issuing_country": "United States", }, credentials=None, profile=None, expected_verification=True, required_checks=["expiry_check", "format_check", "country_validation"], challenge_data={ "valid_passport": True, "expired_document": False, }, max_steps=5 ) def create_document_fraud_task() -> CoreIdentityTask: return CoreIdentityTask( task_id="doc_verify_fake_id", name="Fake ID Detection", description="Detect a potentially fraudulent ID document", task_type="document_verification", difficulty="hard", document={ "document_type": "drivers_license", "document_number": "DL999999999", "full_name": "Jane Doe", "date_of_birth": "1985-06-20", "expiry_date": "2025-06-20", "issuing_country": "California", }, credentials=None, profile=None, expected_verification=False, required_checks=["expiry_check", "format_check", "country_validation", "fraud_indicators"], challenge_data={ "valid_passport": False, "expired_document": True, "suspicious_number": True, "fraud_indicators": ["expired", "invalid_format"], }, max_steps=8 ) def create_authentication_task() -> CoreIdentityTask: return CoreIdentityTask( task_id="auth_valid_creds", name="Credential Authentication", description="Verify user credentials for authentication", task_type="credential_authentication", difficulty="easy", document=None, credentials={ "username": "john.smith@email.com", "password": "SecureP@ss123", }, profile=None, expected_verification=True, required_checks=["username_format", "password_strength"], challenge_data={ "valid_credentials": True, "account_locked": False, }, max_steps=3 ) def create_authentication_fraud_task() -> CoreIdentityTask: return CoreIdentityTask( task_id="auth_brute_force", name="Brute Force Detection", description="Detect and block brute force authentication attempts", task_type="credential_authentication", difficulty="hard", document=None, credentials={ "username": "admin@company.com", "password": "wrong_password", }, profile=None, expected_verification=False, required_checks=["username_format", "password_strength", "attempt_tracking", "ip_validation"], challenge_data={ "valid_credentials": False, "failed_attempts": 5, "suspicious_ip": True, "time_window": "5 minutes", }, max_steps=6 ) def create_profile_task() -> CoreIdentityTask: return CoreIdentityTask( task_id="profile_update", name="Profile Update Validation", description="Validate a user profile update request", task_type="profile_management", difficulty="medium", document=None, credentials=None, profile={ "user_id": "user_12345", "full_name": "Alice Johnson", "email": "alice.j@newdomain.com", "phone": "+1-555-987-6543", "date_of_birth": "1992-03-10", }, expected_verification=True, required_checks=["email_format", "phone_format", "age_verification"], challenge_data={ "valid_request": True, "requested_changes": ["email", "phone"], }, max_steps=5 ) def get_all_tasks() -> List[CoreIdentityTask]: return [ create_document_verification_task(), create_document_fraud_task(), create_authentication_task(), create_authentication_fraud_task(), create_profile_task(), ] def get_task_by_id(task_id: str) -> CoreIdentityTask: for task in get_all_tasks(): if task.task_id == task_id: return task raise ValueError(f"Task not found: {task_id}")