| from datetime import datetime |
| from typing import Any |
|
|
| from pydantic import Field |
| from pydantic.dataclasses import dataclass |
|
|
| from astrbot.core.agent.run_context import ContextWrapper |
| from astrbot.core.agent.tool import FunctionTool, ToolExecResult |
| from astrbot.core.astr_agent_context import AstrAgentContext |
|
|
|
|
| def _extract_job_session(job: Any) -> str | None: |
| payload = getattr(job, "payload", None) |
| if not isinstance(payload, dict): |
| return None |
| session = payload.get("session") |
| return str(session) if session is not None else None |
|
|
|
|
| @dataclass |
| class CreateActiveCronTool(FunctionTool[AstrAgentContext]): |
| name: str = "create_future_task" |
| description: str = ( |
| "Create a future task for your future. Supports recurring cron expressions or one-time run_at datetime. " |
| "Use this when you or the user want scheduled follow-up or proactive actions." |
| ) |
| parameters: dict = Field( |
| default_factory=lambda: { |
| "type": "object", |
| "properties": { |
| "cron_expression": { |
| "type": "string", |
| "description": "Cron expression defining recurring schedule (e.g., '0 8 * * *' or '0 23 * * mon-fri'). Prefer named weekdays like 'mon-fri' or 'sat,sun' instead of numeric day-of-week ranges such as '1-5' to avoid ambiguity across cron implementations.", |
| }, |
| "run_at": { |
| "type": "string", |
| "description": "ISO datetime for one-time execution, e.g., 2026-02-02T08:00:00+08:00. Use with run_once=true.", |
| }, |
| "note": { |
| "type": "string", |
| "description": "Detailed instructions for your future agent to execute when it wakes.", |
| }, |
| "name": { |
| "type": "string", |
| "description": "Optional label to recognize this future task.", |
| }, |
| "run_once": { |
| "type": "boolean", |
| "description": "If true, the task will run only once and then be deleted. Use run_at to specify the time.", |
| }, |
| }, |
| "required": ["note"], |
| } |
| ) |
|
|
| async def call( |
| self, context: ContextWrapper[AstrAgentContext], **kwargs |
| ) -> ToolExecResult: |
| cron_mgr = context.context.context.cron_manager |
| if cron_mgr is None: |
| return "error: cron manager is not available." |
|
|
| cron_expression = kwargs.get("cron_expression") |
| run_at = kwargs.get("run_at") |
| run_once = bool(kwargs.get("run_once", False)) |
| note = str(kwargs.get("note", "")).strip() |
| name = str(kwargs.get("name") or "").strip() or "active_agent_task" |
|
|
| if not note: |
| return "error: note is required." |
| if run_once and not run_at: |
| return "error: run_at is required when run_once=true." |
| if (not run_once) and not cron_expression: |
| return "error: cron_expression is required when run_once=false." |
| if run_once and cron_expression: |
| cron_expression = None |
| run_at_dt = None |
| if run_at: |
| try: |
| run_at_dt = datetime.fromisoformat(str(run_at)) |
| except Exception: |
| return "error: run_at must be ISO datetime, e.g., 2026-02-02T08:00:00+08:00" |
|
|
| payload = { |
| "session": context.context.event.unified_msg_origin, |
| "sender_id": context.context.event.get_sender_id(), |
| "note": note, |
| "origin": "tool", |
| } |
|
|
| job = await cron_mgr.add_active_job( |
| name=name, |
| cron_expression=str(cron_expression) if cron_expression else None, |
| payload=payload, |
| description=note, |
| run_once=run_once, |
| run_at=run_at_dt, |
| ) |
| next_run = job.next_run_time or run_at_dt |
| suffix = ( |
| f"one-time at {next_run}" |
| if run_once |
| else f"expression '{cron_expression}' (next {next_run})" |
| ) |
| return f"Scheduled future task {job.job_id} ({job.name}) {suffix}." |
|
|
|
|
| @dataclass |
| class DeleteCronJobTool(FunctionTool[AstrAgentContext]): |
| name: str = "delete_future_task" |
| description: str = "Delete a future task (cron job) by its job_id." |
| parameters: dict = Field( |
| default_factory=lambda: { |
| "type": "object", |
| "properties": { |
| "job_id": { |
| "type": "string", |
| "description": "The job_id returned when the job was created.", |
| } |
| }, |
| "required": ["job_id"], |
| } |
| ) |
|
|
| async def call( |
| self, context: ContextWrapper[AstrAgentContext], **kwargs |
| ) -> ToolExecResult: |
| cron_mgr = context.context.context.cron_manager |
| if cron_mgr is None: |
| return "error: cron manager is not available." |
| current_umo = context.context.event.unified_msg_origin |
| job_id = kwargs.get("job_id") |
| if not job_id: |
| return "error: job_id is required." |
| job = await cron_mgr.db.get_cron_job(str(job_id)) |
| if not job: |
| return f"error: cron job {job_id} not found." |
| if _extract_job_session(job) != current_umo: |
| return "error: you can only delete future tasks in the current umo." |
| await cron_mgr.delete_job(str(job_id)) |
| return f"Deleted cron job {job_id}." |
|
|
|
|
| @dataclass |
| class ListCronJobsTool(FunctionTool[AstrAgentContext]): |
| name: str = "list_future_tasks" |
| description: str = "List existing future tasks (cron jobs) for inspection." |
| parameters: dict = Field( |
| default_factory=lambda: { |
| "type": "object", |
| "properties": { |
| "job_type": { |
| "type": "string", |
| "description": "Optional filter: basic or active_agent.", |
| } |
| }, |
| } |
| ) |
|
|
| async def call( |
| self, context: ContextWrapper[AstrAgentContext], **kwargs |
| ) -> ToolExecResult: |
| cron_mgr = context.context.context.cron_manager |
| if cron_mgr is None: |
| return "error: cron manager is not available." |
| current_umo = context.context.event.unified_msg_origin |
| job_type = kwargs.get("job_type") |
| jobs = [ |
| job |
| for job in await cron_mgr.list_jobs(job_type) |
| if _extract_job_session(job) == current_umo |
| ] |
| if not jobs: |
| return "No cron jobs found." |
| lines = [] |
| for j in jobs: |
| lines.append( |
| f"{j.job_id} | {j.name} | {j.job_type} | run_once={getattr(j, 'run_once', False)} | enabled={j.enabled} | next={j.next_run_time}" |
| ) |
| return "\n".join(lines) |
|
|
|
|
| CREATE_CRON_JOB_TOOL = CreateActiveCronTool() |
| DELETE_CRON_JOB_TOOL = DeleteCronJobTool() |
| LIST_CRON_JOBS_TOOL = ListCronJobsTool() |
|
|
| __all__ = [ |
| "CREATE_CRON_JOB_TOOL", |
| "DELETE_CRON_JOB_TOOL", |
| "LIST_CRON_JOBS_TOOL", |
| "CreateActiveCronTool", |
| "DeleteCronJobTool", |
| "ListCronJobsTool", |
| ] |
|
|