| from collections.abc import Generator, Mapping |
| from typing import Any, Union |
|
|
| from openai._exceptions import RateLimitError |
|
|
| from configs import dify_config |
| from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator |
| from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator |
| from core.app.apps.chat.app_generator import ChatAppGenerator |
| from core.app.apps.completion.app_generator import CompletionAppGenerator |
| from core.app.apps.workflow.app_generator import WorkflowAppGenerator |
| from core.app.entities.app_invoke_entities import InvokeFrom |
| from core.app.features.rate_limiting import RateLimit |
| from models.model import Account, App, AppMode, EndUser |
| from models.workflow import Workflow |
| from services.errors.llm import InvokeRateLimitError |
| from services.workflow_service import WorkflowService |
|
|
|
|
| class AppGenerateService: |
| @classmethod |
| def generate( |
| cls, |
| app_model: App, |
| user: Union[Account, EndUser], |
| args: Mapping[str, Any], |
| invoke_from: InvokeFrom, |
| streaming: bool = True, |
| ): |
| """ |
| App Content Generate |
| :param app_model: app model |
| :param user: user |
| :param args: args |
| :param invoke_from: invoke from |
| :param streaming: streaming |
| :return: |
| """ |
| max_active_request = AppGenerateService._get_max_active_requests(app_model) |
| rate_limit = RateLimit(app_model.id, max_active_request) |
| request_id = RateLimit.gen_request_key() |
| try: |
| request_id = rate_limit.enter(request_id) |
| if app_model.mode == AppMode.COMPLETION.value: |
| return rate_limit.generate( |
| CompletionAppGenerator().generate( |
| app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming |
| ), |
| request_id, |
| ) |
| elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent: |
| return rate_limit.generate( |
| AgentChatAppGenerator().generate( |
| app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming |
| ), |
| request_id, |
| ) |
| elif app_model.mode == AppMode.CHAT.value: |
| return rate_limit.generate( |
| ChatAppGenerator().generate( |
| app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming |
| ), |
| request_id, |
| ) |
| elif app_model.mode == AppMode.ADVANCED_CHAT.value: |
| workflow = cls._get_workflow(app_model, invoke_from) |
| return rate_limit.generate( |
| AdvancedChatAppGenerator().generate( |
| app_model=app_model, |
| workflow=workflow, |
| user=user, |
| args=args, |
| invoke_from=invoke_from, |
| stream=streaming, |
| ), |
| request_id, |
| ) |
| elif app_model.mode == AppMode.WORKFLOW.value: |
| workflow = cls._get_workflow(app_model, invoke_from) |
| return rate_limit.generate( |
| WorkflowAppGenerator().generate( |
| app_model=app_model, |
| workflow=workflow, |
| user=user, |
| args=args, |
| invoke_from=invoke_from, |
| stream=streaming, |
| ), |
| request_id, |
| ) |
| else: |
| raise ValueError(f"Invalid app mode {app_model.mode}") |
| except RateLimitError as e: |
| raise InvokeRateLimitError(str(e)) |
| finally: |
| if not streaming: |
| rate_limit.exit(request_id) |
|
|
| @staticmethod |
| def _get_max_active_requests(app_model: App) -> int: |
| max_active_requests = app_model.max_active_requests |
| if app_model.max_active_requests is None: |
| max_active_requests = int(dify_config.APP_MAX_ACTIVE_REQUESTS) |
| return max_active_requests |
|
|
| @classmethod |
| def generate_single_iteration(cls, app_model: App, user: Account, node_id: str, args: Any, streaming: bool = True): |
| if app_model.mode == AppMode.ADVANCED_CHAT.value: |
| workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) |
| return AdvancedChatAppGenerator().single_iteration_generate( |
| app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming |
| ) |
| elif app_model.mode == AppMode.WORKFLOW.value: |
| workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) |
| return WorkflowAppGenerator().single_iteration_generate( |
| app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming |
| ) |
| else: |
| raise ValueError(f"Invalid app mode {app_model.mode}") |
|
|
| @classmethod |
| def generate_more_like_this( |
| cls, |
| app_model: App, |
| user: Union[Account, EndUser], |
| message_id: str, |
| invoke_from: InvokeFrom, |
| streaming: bool = True, |
| ) -> Union[dict, Generator]: |
| """ |
| Generate more like this |
| :param app_model: app model |
| :param user: user |
| :param message_id: message id |
| :param invoke_from: invoke from |
| :param streaming: streaming |
| :return: |
| """ |
| return CompletionAppGenerator().generate_more_like_this( |
| app_model=app_model, message_id=message_id, user=user, invoke_from=invoke_from, stream=streaming |
| ) |
|
|
| @classmethod |
| def _get_workflow(cls, app_model: App, invoke_from: InvokeFrom) -> Workflow: |
| """ |
| Get workflow |
| :param app_model: app model |
| :param invoke_from: invoke from |
| :return: |
| """ |
| workflow_service = WorkflowService() |
| if invoke_from == InvokeFrom.DEBUGGER: |
| |
| workflow = workflow_service.get_draft_workflow(app_model=app_model) |
|
|
| if not workflow: |
| raise ValueError("Workflow not initialized") |
| else: |
| |
| workflow = workflow_service.get_published_workflow(app_model=app_model) |
|
|
| if not workflow: |
| raise ValueError("Workflow not published") |
|
|
| return workflow |
|
|