astrbbbb / astrbot /core /tools /cron_tools.py
qa1145's picture
Upload 1245 files
8ede856 verified
from datetime import datetime
from typing import Any
from pydantic import Field
from pydantic.dataclasses import dataclass
from astrbot.core.agent.run_context import ContextWrapper
from astrbot.core.agent.tool import FunctionTool, ToolExecResult
from astrbot.core.astr_agent_context import AstrAgentContext
def _extract_job_session(job: Any) -> str | None:
payload = getattr(job, "payload", None)
if not isinstance(payload, dict):
return None
session = payload.get("session")
return str(session) if session is not None else None
@dataclass
class CreateActiveCronTool(FunctionTool[AstrAgentContext]):
name: str = "create_future_task"
description: str = (
"Create a future task for your future. Supports recurring cron expressions or one-time run_at datetime. "
"Use this when you or the user want scheduled follow-up or proactive actions."
)
parameters: dict = Field(
default_factory=lambda: {
"type": "object",
"properties": {
"cron_expression": {
"type": "string",
"description": "Cron expression defining recurring schedule (e.g., '0 8 * * *' or '0 23 * * mon-fri'). Prefer named weekdays like 'mon-fri' or 'sat,sun' instead of numeric day-of-week ranges such as '1-5' to avoid ambiguity across cron implementations.",
},
"run_at": {
"type": "string",
"description": "ISO datetime for one-time execution, e.g., 2026-02-02T08:00:00+08:00. Use with run_once=true.",
},
"note": {
"type": "string",
"description": "Detailed instructions for your future agent to execute when it wakes.",
},
"name": {
"type": "string",
"description": "Optional label to recognize this future task.",
},
"run_once": {
"type": "boolean",
"description": "If true, the task will run only once and then be deleted. Use run_at to specify the time.",
},
},
"required": ["note"],
}
)
async def call(
self, context: ContextWrapper[AstrAgentContext], **kwargs
) -> ToolExecResult:
cron_mgr = context.context.context.cron_manager
if cron_mgr is None:
return "error: cron manager is not available."
cron_expression = kwargs.get("cron_expression")
run_at = kwargs.get("run_at")
run_once = bool(kwargs.get("run_once", False))
note = str(kwargs.get("note", "")).strip()
name = str(kwargs.get("name") or "").strip() or "active_agent_task"
if not note:
return "error: note is required."
if run_once and not run_at:
return "error: run_at is required when run_once=true."
if (not run_once) and not cron_expression:
return "error: cron_expression is required when run_once=false."
if run_once and cron_expression:
cron_expression = None
run_at_dt = None
if run_at:
try:
run_at_dt = datetime.fromisoformat(str(run_at))
except Exception:
return "error: run_at must be ISO datetime, e.g., 2026-02-02T08:00:00+08:00"
payload = {
"session": context.context.event.unified_msg_origin,
"sender_id": context.context.event.get_sender_id(),
"note": note,
"origin": "tool",
}
job = await cron_mgr.add_active_job(
name=name,
cron_expression=str(cron_expression) if cron_expression else None,
payload=payload,
description=note,
run_once=run_once,
run_at=run_at_dt,
)
next_run = job.next_run_time or run_at_dt
suffix = (
f"one-time at {next_run}"
if run_once
else f"expression '{cron_expression}' (next {next_run})"
)
return f"Scheduled future task {job.job_id} ({job.name}) {suffix}."
@dataclass
class DeleteCronJobTool(FunctionTool[AstrAgentContext]):
name: str = "delete_future_task"
description: str = "Delete a future task (cron job) by its job_id."
parameters: dict = Field(
default_factory=lambda: {
"type": "object",
"properties": {
"job_id": {
"type": "string",
"description": "The job_id returned when the job was created.",
}
},
"required": ["job_id"],
}
)
async def call(
self, context: ContextWrapper[AstrAgentContext], **kwargs
) -> ToolExecResult:
cron_mgr = context.context.context.cron_manager
if cron_mgr is None:
return "error: cron manager is not available."
current_umo = context.context.event.unified_msg_origin
job_id = kwargs.get("job_id")
if not job_id:
return "error: job_id is required."
job = await cron_mgr.db.get_cron_job(str(job_id))
if not job:
return f"error: cron job {job_id} not found."
if _extract_job_session(job) != current_umo:
return "error: you can only delete future tasks in the current umo."
await cron_mgr.delete_job(str(job_id))
return f"Deleted cron job {job_id}."
@dataclass
class ListCronJobsTool(FunctionTool[AstrAgentContext]):
name: str = "list_future_tasks"
description: str = "List existing future tasks (cron jobs) for inspection."
parameters: dict = Field(
default_factory=lambda: {
"type": "object",
"properties": {
"job_type": {
"type": "string",
"description": "Optional filter: basic or active_agent.",
}
},
}
)
async def call(
self, context: ContextWrapper[AstrAgentContext], **kwargs
) -> ToolExecResult:
cron_mgr = context.context.context.cron_manager
if cron_mgr is None:
return "error: cron manager is not available."
current_umo = context.context.event.unified_msg_origin
job_type = kwargs.get("job_type")
jobs = [
job
for job in await cron_mgr.list_jobs(job_type)
if _extract_job_session(job) == current_umo
]
if not jobs:
return "No cron jobs found."
lines = []
for j in jobs:
lines.append(
f"{j.job_id} | {j.name} | {j.job_type} | run_once={getattr(j, 'run_once', False)} | enabled={j.enabled} | next={j.next_run_time}"
)
return "\n".join(lines)
CREATE_CRON_JOB_TOOL = CreateActiveCronTool()
DELETE_CRON_JOB_TOOL = DeleteCronJobTool()
LIST_CRON_JOBS_TOOL = ListCronJobsTool()
__all__ = [
"CREATE_CRON_JOB_TOOL",
"DELETE_CRON_JOB_TOOL",
"LIST_CRON_JOBS_TOOL",
"CreateActiveCronTool",
"DeleteCronJobTool",
"ListCronJobsTool",
]