| import logging |
| import threading |
| import time |
| from typing import Any, Optional |
|
|
| from flask import Flask, current_app |
| from pydantic import BaseModel, ConfigDict |
|
|
| from configs import dify_config |
| from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom |
| from core.app.entities.queue_entities import QueueMessageReplaceEvent |
| from core.moderation.base import ModerationAction, ModerationOutputsResult |
| from core.moderation.factory import ModerationFactory |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ModerationRule(BaseModel): |
| type: str |
| config: dict[str, Any] |
|
|
|
|
| class OutputModeration(BaseModel): |
| tenant_id: str |
| app_id: str |
|
|
| rule: ModerationRule |
| queue_manager: AppQueueManager |
|
|
| thread: Optional[threading.Thread] = None |
| thread_running: bool = True |
| buffer: str = "" |
| is_final_chunk: bool = False |
| final_output: Optional[str] = None |
| model_config = ConfigDict(arbitrary_types_allowed=True) |
|
|
| def should_direct_output(self) -> bool: |
| return self.final_output is not None |
|
|
| def get_final_output(self) -> str: |
| return self.final_output or "" |
|
|
| def append_new_token(self, token: str) -> None: |
| self.buffer += token |
|
|
| if not self.thread: |
| self.thread = self.start_thread() |
|
|
| def moderation_completion(self, completion: str, public_event: bool = False) -> str: |
| self.buffer = completion |
| self.is_final_chunk = True |
|
|
| result = self.moderation(tenant_id=self.tenant_id, app_id=self.app_id, moderation_buffer=completion) |
|
|
| if not result or not result.flagged: |
| return completion |
|
|
| if result.action == ModerationAction.DIRECT_OUTPUT: |
| final_output = result.preset_response |
| else: |
| final_output = result.text |
|
|
| if public_event: |
| self.queue_manager.publish(QueueMessageReplaceEvent(text=final_output), PublishFrom.TASK_PIPELINE) |
|
|
| return final_output |
|
|
| def start_thread(self) -> threading.Thread: |
| buffer_size = dify_config.MODERATION_BUFFER_SIZE |
| thread = threading.Thread( |
| target=self.worker, |
| kwargs={ |
| "flask_app": current_app._get_current_object(), |
| "buffer_size": buffer_size if buffer_size > 0 else dify_config.MODERATION_BUFFER_SIZE, |
| }, |
| ) |
|
|
| thread.start() |
|
|
| return thread |
|
|
| def stop_thread(self): |
| if self.thread and self.thread.is_alive(): |
| self.thread_running = False |
|
|
| def worker(self, flask_app: Flask, buffer_size: int): |
| with flask_app.app_context(): |
| current_length = 0 |
| while self.thread_running: |
| moderation_buffer = self.buffer |
| buffer_length = len(moderation_buffer) |
| if not self.is_final_chunk: |
| chunk_length = buffer_length - current_length |
| if 0 <= chunk_length < buffer_size: |
| time.sleep(1) |
| continue |
|
|
| current_length = buffer_length |
|
|
| result = self.moderation( |
| tenant_id=self.tenant_id, app_id=self.app_id, moderation_buffer=moderation_buffer |
| ) |
|
|
| if not result or not result.flagged: |
| continue |
|
|
| if result.action == ModerationAction.DIRECT_OUTPUT: |
| final_output = result.preset_response |
| self.final_output = final_output |
| else: |
| final_output = result.text + self.buffer[len(moderation_buffer) :] |
|
|
| |
| if self.thread_running: |
| self.queue_manager.publish(QueueMessageReplaceEvent(text=final_output), PublishFrom.TASK_PIPELINE) |
|
|
| if result.action == ModerationAction.DIRECT_OUTPUT: |
| break |
|
|
| def moderation(self, tenant_id: str, app_id: str, moderation_buffer: str) -> Optional[ModerationOutputsResult]: |
| try: |
| moderation_factory = ModerationFactory( |
| name=self.rule.type, app_id=app_id, tenant_id=tenant_id, config=self.rule.config |
| ) |
|
|
| result: ModerationOutputsResult = moderation_factory.moderation_for_outputs(moderation_buffer) |
| return result |
| except Exception as e: |
| logger.error("Moderation Output error: %s", e) |
|
|
| return None |
|
|