lexenvs-harbor / src /lexenvs /services /task_loader_service.py
endishai's picture
Upload folder using huggingface_hub
2312199 verified
"""Task loader service — reads task definitions from JSON files."""
from __future__ import annotations
import json
import logging
from pathlib import Path
from lexenvs.config import get_settings, project_root
from lexenvs.schemas.task import TaskDefinition
from lexenvs.services.kb_filter_service import filter_allowlist, filter_knowledge_base
logger = logging.getLogger(__name__)
class TaskLoaderService:
"""Loads task definitions from the data/tasks/ directory.
Resolves ``knowledge_base_ref`` and ``system_prompt_ref`` in each task's
prompt by reading the referenced files from the data directory.
"""
def __init__(self, tasks_dir: str | Path | None = None) -> None:
if tasks_dir is not None:
self._tasks_dir = Path(tasks_dir)
else:
settings = get_settings()
self._tasks_dir = project_root() / settings.tasks_data_dir
self._data_dir = self._tasks_dir.parent
self._file_cache: dict[str, str] = {}
self._tasks: dict[str, TaskDefinition] = {}
self._load_tasks()
def _read_data_file(self, ref: str) -> str:
"""Read and cache a file from the data directory."""
if ref in self._file_cache:
return self._file_cache[ref]
file_path = (self._data_dir / ref).resolve()
if not file_path.is_relative_to(self._data_dir.resolve()):
logger.error("File ref escapes data directory: %s", ref)
raise ValueError(f"Invalid file ref: {ref!r}")
if not file_path.exists():
logger.warning("Data file not found: %s", file_path)
return ""
content = file_path.read_text(encoding="utf-8")
self._file_cache[ref] = content
logger.info("Loaded data file: %s (%d chars)", ref, len(content))
return content
def _load_tasks(self) -> None:
"""Load all task JSON files from the tasks directory."""
if not self._tasks_dir.exists():
logger.warning("Tasks directory not found: %s", self._tasks_dir)
return
for task_file in sorted(self._tasks_dir.glob("*.json")):
try:
raw = json.loads(task_file.read_text(encoding="utf-8"))
prompt = raw.get("prompt", {})
# Resolve system_prompt_ref → system
sys_ref = prompt.get("system_prompt_ref")
if sys_ref and not prompt.get("system"):
prompt["system"] = self._read_data_file(sys_ref)
# Resolve knowledge_base_ref → context
kb_ref = prompt.get("knowledge_base_ref")
if kb_ref and not prompt.get("context"):
full_kb = self._read_data_file(kb_ref)
kb_filter = prompt.get("kb_filter")
if kb_filter:
prompt["context"] = filter_knowledge_base(full_kb, kb_filter)
prompt["system"] = filter_allowlist(prompt.get("system", ""), kb_filter)
else:
prompt["context"] = full_kb
task = TaskDefinition.model_validate(raw)
self._tasks[task.task_id] = task
logger.info("Loaded task: %s", task.task_id)
except (json.JSONDecodeError, ValueError) as e:
logger.error("Invalid task file %s: %s", task_file.name, e)
except OSError as e:
logger.error("Cannot read task file %s: %s", task_file.name, e)
logger.info("Loaded %d tasks from %s", len(self._tasks), self._tasks_dir)
def get_task(self, task_id: str) -> TaskDefinition | None:
"""Get a single task definition by ID."""
return self._tasks.get(task_id)
def list_tasks(self) -> list[TaskDefinition]:
"""Return all loaded task definitions."""
return list(self._tasks.values())
def reload(self) -> None:
"""Reload tasks from disk."""
self._tasks.clear()
self._file_cache.clear()
self._load_tasks()
def create_task_loader_service() -> TaskLoaderService:
"""Factory for svcs registration."""
return TaskLoaderService()