"""Task loader service — reads task definitions from JSON files.""" from __future__ import annotations import json import logging from pathlib import Path from lexenvs.config import get_settings, project_root from lexenvs.schemas.task import TaskDefinition from lexenvs.services.kb_filter_service import filter_allowlist, filter_knowledge_base logger = logging.getLogger(__name__) class TaskLoaderService: """Loads task definitions from the data/tasks/ directory. Resolves ``knowledge_base_ref`` and ``system_prompt_ref`` in each task's prompt by reading the referenced files from the data directory. """ def __init__(self, tasks_dir: str | Path | None = None) -> None: if tasks_dir is not None: self._tasks_dir = Path(tasks_dir) else: settings = get_settings() self._tasks_dir = project_root() / settings.tasks_data_dir self._data_dir = self._tasks_dir.parent self._file_cache: dict[str, str] = {} self._tasks: dict[str, TaskDefinition] = {} self._load_tasks() def _read_data_file(self, ref: str) -> str: """Read and cache a file from the data directory.""" if ref in self._file_cache: return self._file_cache[ref] file_path = (self._data_dir / ref).resolve() if not file_path.is_relative_to(self._data_dir.resolve()): logger.error("File ref escapes data directory: %s", ref) raise ValueError(f"Invalid file ref: {ref!r}") if not file_path.exists(): logger.warning("Data file not found: %s", file_path) return "" content = file_path.read_text(encoding="utf-8") self._file_cache[ref] = content logger.info("Loaded data file: %s (%d chars)", ref, len(content)) return content def _load_tasks(self) -> None: """Load all task JSON files from the tasks directory.""" if not self._tasks_dir.exists(): logger.warning("Tasks directory not found: %s", self._tasks_dir) return for task_file in sorted(self._tasks_dir.glob("*.json")): try: raw = json.loads(task_file.read_text(encoding="utf-8")) prompt = raw.get("prompt", {}) # Resolve system_prompt_ref → system sys_ref = prompt.get("system_prompt_ref") if sys_ref and not prompt.get("system"): prompt["system"] = self._read_data_file(sys_ref) # Resolve knowledge_base_ref → context kb_ref = prompt.get("knowledge_base_ref") if kb_ref and not prompt.get("context"): full_kb = self._read_data_file(kb_ref) kb_filter = prompt.get("kb_filter") if kb_filter: prompt["context"] = filter_knowledge_base(full_kb, kb_filter) prompt["system"] = filter_allowlist(prompt.get("system", ""), kb_filter) else: prompt["context"] = full_kb task = TaskDefinition.model_validate(raw) self._tasks[task.task_id] = task logger.info("Loaded task: %s", task.task_id) except (json.JSONDecodeError, ValueError) as e: logger.error("Invalid task file %s: %s", task_file.name, e) except OSError as e: logger.error("Cannot read task file %s: %s", task_file.name, e) logger.info("Loaded %d tasks from %s", len(self._tasks), self._tasks_dir) def get_task(self, task_id: str) -> TaskDefinition | None: """Get a single task definition by ID.""" return self._tasks.get(task_id) def list_tasks(self) -> list[TaskDefinition]: """Return all loaded task definitions.""" return list(self._tasks.values()) def reload(self) -> None: """Reload tasks from disk.""" self._tasks.clear() self._file_cache.clear() self._load_tasks() def create_task_loader_service() -> TaskLoaderService: """Factory for svcs registration.""" return TaskLoaderService()