| """ |
| Агенты для анализа логов на основе smolagents. |
| Используют трансформеры для интеллектуального анализа. |
| """ |
|
|
| import os |
| import json |
| from smolagents import ToolCallingAgent, InferenceClientModel, FinalAnswerTool |
| from schemas.schemas import parser_schema, anomaly_schema, rca_schema |
| from utils.validation import validate_schema |
| from utils.agent_runner import run_agent_safely |
|
|
| |
| |
| |
| hf_token = os.getenv("HF_TOKEN") |
| if not hf_token: |
| raise ValueError( |
| "HF_TOKEN environment variable is not set. " |
| "Please set it in Hugging Face Spaces secrets or as an environment variable." |
| ) |
|
|
| model = InferenceClientModel( |
| model_id="deepseek-ai/DeepSeek-V3.1-Terminus", |
| token=hf_token, |
| max_tokens=4096 |
| ) |
|
|
| final_tool = FinalAnswerTool() |
|
|
| |
| parser_agent = ToolCallingAgent( |
| model=model, |
| tools=[final_tool], |
| instructions=""" |
| Ты эксперт по анализу системных логов. Твоя задача - парсить сырые логи и преобразовывать их в структурированный JSON формат. |
| |
| ВАЖНО: Твой ответ должен быть ПОЛНЫМ и ЗАВЕРШЁННЫМ JSON объектом. Не обрезай ответ! |
| |
| Для каждой строки лога определи: |
| - timestamp: временная метка (если есть) |
| - level: уровень логирования (INFO, WARNING, ERROR, CRITICAL, DEBUG, TRACE) |
| - message: основное сообщение |
| - type: тип события (SYSTEM, HTTP_REQUEST, DATABASE, AUTHENTICATION, CONNECTION, EXCEPTION, GENERAL) |
| |
| Также определи: |
| - errors: список всех событий с уровнем ERROR или CRITICAL (полные объекты событий) |
| - warnings: список всех событий с уровнем WARNING (полные объекты событий) |
| - statistics: статистика с total_lines, parsed_events, errors, warnings, info_messages, event_types, time_range |
| |
| Временной диапазон (time_range) должен содержать start и end - первую и последнюю временную метку. |
| |
| Ответ строго верни в JSON через final_answer в следующем формате (ОБЯЗАТЕЛЬНО ПОЛНЫЙ JSON): |
| { |
| "events": [{"line_number": int, "timestamp": "str|null", "level": "str", "message": "str", "type": "str"}, ...], |
| "errors": [{"line_number": int, "timestamp": "str|null", "level": "str", "message": "str", "type": "str"}, ...], |
| "warnings": [{"line_number": int, "timestamp": "str|null", "level": "str", "message": "str", "type": "str"}, ...], |
| "statistics": { |
| "total_lines": int, |
| "parsed_events": int, |
| "errors": int, |
| "warnings": int, |
| "info_messages": int, |
| "event_types": {"TYPE": count, ...}, |
| "time_range": {"start": "str|null", "end": "str|null"} | null |
| } |
| } |
| """, |
| name="LogParserAgent", |
| max_steps=10, |
| ) |
|
|
| |
| anomaly_agent = ToolCallingAgent( |
| model=model, |
| tools=[final_tool], |
| instructions=""" |
| Ты эксперт по обнаружению аномалий в системных логах. Твоя задача - анализировать структурированные логи и находить проблемные паттерны. |
| |
| Ищи следующие типы аномалий: |
| 1. BURST_ERRORS - всплески ошибок (5+ ошибок за короткий промежуток времени) |
| 2. REPEATED_ERRORS - повторяющиеся ошибки (одна и та же ошибка 3+ раза) |
| 3. ERROR_BEFORE_CRASH - ошибки перед крашем системы (критические ошибки в конце логов) |
| 4. TEMPORAL_SPIKE - временные всплески событий (превышение среднего в 2 раза) |
| 5. REPEATED_STACK_TRACES - повторяющиеся stack traces |
| |
| Для каждой аномалии укажи: |
| - type: тип аномалии |
| - severity: CRITICAL, HIGH, MEDIUM, LOW |
| - description: описание проблемы |
| - count: количество вхождений (если применимо) |
| - error_message: пример сообщения об ошибке (если есть) |
| - metadata: дополнительная информация (affected_lines и т.д.) |
| |
| Также вычисли статистику: |
| - total: общее количество аномалий |
| - by_type: распределение по типам |
| - by_severity: распределение по серьёзности |
| |
| Ответ строго верни в JSON через final_answer в следующем формате: |
| { |
| "anomalies": [{"type": "str", "severity": "str", "description": "str", "count": int|null, "error_message": "str|null", "metadata": object|null}, ...], |
| "statistics": { |
| "total": int, |
| "by_type": {"TYPE": count, ...}, |
| "by_severity": {"SEVERITY": count, ...} |
| }, |
| "severity_summary": {"CRITICAL": int, "HIGH": int, "MEDIUM": int, "LOW": int} |
| } |
| """, |
| name="AnomalyDetectionAgent", |
| max_steps=10, |
| ) |
|
|
| |
| rca_agent = ToolCallingAgent( |
| model=model, |
| tools=[final_tool], |
| instructions=""" |
| Ты эксперт по анализу первопричин и формированию рекомендаций. Твоя задача - интерпретировать обнаруженные аномалии и предлагать конкретные решения. |
| |
| Для каждой группы аномалий определи: |
| - possible_causes: возможные первопричины проблемы |
| - recommendations: рекомендации по устранению с приоритетами (CRITICAL, HIGH, MEDIUM, LOW) |
| - actions: конкретные действия для решения проблемы |
| |
| Также сформулируй общие рекомендации для улучшения системы мониторинга и предотвращения подобных проблем. |
| |
| Ответ строго верни в JSON через final_answer в следующем формате: |
| { |
| "analysis": { |
| "root_causes": ["причина1", "причина2", ...], |
| "details": [{ |
| "anomaly_type": "str", |
| "severity": "str", |
| "description": "str", |
| "possible_causes": ["причина1", ...] |
| }, ...] |
| }, |
| "recommendations": [{ |
| "priority": "str", |
| "text": "str", |
| "actions": ["действие1", "действие2", ...] |
| }, ...], |
| "general_recommendations": ["рекомендация1", "рекомендация2", ...] |
| } |
| """, |
| name="RootCauseAgent", |
| max_steps=10, |
| ) |
|
|
| |
| def run_parser_agent(raw_logs: str): |
| """Парсит сырые логи в структурированный формат.""" |
| if not raw_logs or not raw_logs.strip(): |
| return { |
| "events": [], |
| "errors": [], |
| "warnings": [], |
| "statistics": { |
| "total_lines": 0, |
| "parsed_events": 0, |
| "errors": 0, |
| "warnings": 0, |
| "info_messages": 0, |
| "event_types": {}, |
| "time_range": None |
| } |
| } |
| |
| result = run_agent_safely(parser_agent, task=raw_logs) |
| validate_schema(result, parser_schema) |
| return result |
|
|
| |
| def run_anomaly_agent(structured_data: dict): |
| """Обнаруживает аномалии в структурированных логах.""" |
| input_data = json.dumps(structured_data, ensure_ascii=False, indent=2) |
| result = run_agent_safely(anomaly_agent, task=input_data) |
| validate_schema(result, anomaly_schema) |
| return result |
|
|
| |
| def run_rca_agent(anomaly_report: dict): |
| """Анализирует первопричины и генерирует рекомендации.""" |
| input_data = json.dumps(anomaly_report, ensure_ascii=False, indent=2) |
| result = run_agent_safely(rca_agent, task=input_data) |
| validate_schema(result, rca_schema) |
| return result |
|
|
| |
| gpt_prompt_agent = ToolCallingAgent( |
| model=model, |
| tools=[final_tool], |
| instructions=""" |
| Ты эксперт по созданию детальных промптов для GPT-моделей. Твоя задача - создать готовый промпт для анализа проблем на основе структурированных данных о логах, аномалиях и рекомендациях. |
| |
| Промпт должен быть структурированным и содержать: |
| 1. Контекст проблемы - общее описание ситуации |
| 2. Информация о системе - статистика, временные диапазоны, типы событий |
| 3. Обнаруженные проблемы - детальное описание аномалий с приоритетами |
| 4. Статистика и метрики - количественные показатели |
| 5. Примеры ошибок - ключевые ошибки из логов |
| 6. Предварительный анализ - рекомендации от предыдущих агентов (если есть) |
| 7. Запрос на решение - конкретные вопросы для GPT |
| |
| Промпт должен быть готов к использованию - его можно скопировать и вставить в ChatGPT, Claude или другую GPT-модель. |
| |
| Ответ верни как обычный текст (не JSON), используя final_answer. Это должен быть готовый промпт на русском языке в формате Markdown. |
| """, |
| name="GPTPromptAgent", |
| max_steps=10, |
| ) |
|
|
| |
| def run_gpt_prompt_agent(structured_data: dict, anomaly_report: dict, recommendations: str = None): |
| """Генерирует промпт для GPT на основе всех данных анализа.""" |
| |
| input_data = { |
| "structured_data": structured_data, |
| "anomaly_report": anomaly_report, |
| "recommendations": recommendations |
| } |
| input_json = json.dumps(input_data, ensure_ascii=False, indent=2) |
| |
| result = run_agent_safely(gpt_prompt_agent, task=input_json, return_string=True) |
| |
| |
| if isinstance(result, str): |
| return result |
| elif isinstance(result, dict): |
| |
| if "answer" in result: |
| return result["answer"] |
| elif "prompt" in result: |
| return result["prompt"] |
| else: |
| return json.dumps(result, ensure_ascii=False, indent=2) |
| else: |
| return str(result) |
|
|
| __all__ = [ |
| 'run_parser_agent', |
| 'run_anomaly_agent', |
| 'run_rca_agent', |
| 'run_gpt_prompt_agent', |
| ] |
|
|