| from typing import Optional, Union |
|
|
| from core.app.entities.app_invoke_entities import InvokeFrom |
| from extensions.ext_database import db |
| from libs.infinite_scroll_pagination import InfiniteScrollPagination |
| from models.account import Account |
| from models.model import App, EndUser |
| from models.web import PinnedConversation |
| from services.conversation_service import ConversationService |
|
|
|
|
| class WebConversationService: |
| @classmethod |
| def pagination_by_last_id( |
| cls, |
| app_model: App, |
| user: Optional[Union[Account, EndUser]], |
| last_id: Optional[str], |
| limit: int, |
| invoke_from: InvokeFrom, |
| pinned: Optional[bool] = None, |
| sort_by="-updated_at", |
| ) -> InfiniteScrollPagination: |
| include_ids = None |
| exclude_ids = None |
| if pinned is not None: |
| pinned_conversations = ( |
| db.session.query(PinnedConversation) |
| .filter( |
| PinnedConversation.app_id == app_model.id, |
| PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"), |
| PinnedConversation.created_by == user.id, |
| ) |
| .order_by(PinnedConversation.created_at.desc()) |
| .all() |
| ) |
| pinned_conversation_ids = [pc.conversation_id for pc in pinned_conversations] |
| if pinned: |
| include_ids = pinned_conversation_ids |
| else: |
| exclude_ids = pinned_conversation_ids |
|
|
| return ConversationService.pagination_by_last_id( |
| app_model=app_model, |
| user=user, |
| last_id=last_id, |
| limit=limit, |
| invoke_from=invoke_from, |
| include_ids=include_ids, |
| exclude_ids=exclude_ids, |
| sort_by=sort_by, |
| ) |
|
|
| @classmethod |
| def pin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]): |
| pinned_conversation = ( |
| db.session.query(PinnedConversation) |
| .filter( |
| PinnedConversation.app_id == app_model.id, |
| PinnedConversation.conversation_id == conversation_id, |
| PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"), |
| PinnedConversation.created_by == user.id, |
| ) |
| .first() |
| ) |
|
|
| if pinned_conversation: |
| return |
|
|
| conversation = ConversationService.get_conversation( |
| app_model=app_model, conversation_id=conversation_id, user=user |
| ) |
|
|
| pinned_conversation = PinnedConversation( |
| app_id=app_model.id, |
| conversation_id=conversation.id, |
| created_by_role="account" if isinstance(user, Account) else "end_user", |
| created_by=user.id, |
| ) |
|
|
| db.session.add(pinned_conversation) |
| db.session.commit() |
|
|
| @classmethod |
| def unpin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]): |
| pinned_conversation = ( |
| db.session.query(PinnedConversation) |
| .filter( |
| PinnedConversation.app_id == app_model.id, |
| PinnedConversation.conversation_id == conversation_id, |
| PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"), |
| PinnedConversation.created_by == user.id, |
| ) |
| .first() |
| ) |
|
|
| if not pinned_conversation: |
| return |
|
|
| db.session.delete(pinned_conversation) |
| db.session.commit() |
|
|