| from typing import List, Optional |
| from sqlmodel import Session, select |
| from src.models.task import Task, TaskCreate, TaskUpdate |
| from src.core.logging import log_operation, log_error, log_authorization_decision |
|
|
|
|
| class TaskService: |
| @staticmethod |
| def create_task(session: Session, task_create: TaskCreate) -> Task: |
| """ |
| Create a new task in the database |
| """ |
| try: |
| log_operation("CREATE_TASK", user_id=str(task_create.user_id)) |
|
|
| db_task = Task(**task_create.dict()) |
| session.add(db_task) |
| session.commit() |
| session.refresh(db_task) |
|
|
| log_operation("TASK_CREATED", user_id=str(task_create.user_id), task_id=db_task.id) |
| return db_task |
| except Exception as e: |
| log_error(e, "CREATE_TASK") |
| session.rollback() |
| raise |
|
|
| @staticmethod |
| def get_task_by_id(session: Session, task_id: int) -> Optional[Task]: |
| """ |
| Retrieve a task by its ID |
| """ |
| try: |
| log_operation("GET_TASK_BY_ID", task_id=task_id) |
|
|
| statement = select(Task).where(Task.id == task_id) |
| task = session.exec(statement).first() |
|
|
| if task: |
| log_operation("TASK_FOUND", task_id=task_id, user_id=task.user_id) |
| else: |
| log_operation("TASK_NOT_FOUND", task_id=task_id) |
|
|
| return task |
| except Exception as e: |
| log_error(e, "GET_TASK_BY_ID") |
| raise |
|
|
| @staticmethod |
| def get_tasks_by_user_id(session: Session, user_id: str) -> List[Task]: |
| """ |
| Retrieve all tasks for a specific user |
| """ |
| try: |
| log_operation("GET_TASKS_BY_USER", user_id=user_id) |
|
|
| |
| tasks = Task.get_by_user_id(session, user_id) |
|
|
| |
| |
| processed_tasks = [] |
| for task in tasks: |
| if hasattr(task, '__iter__') and not isinstance(task, str) and hasattr(task, '__getitem__'): |
| |
| try: |
| if len(task) > 0: |
| item = task[0] |
| if isinstance(item, Task): |
| processed_tasks.append(item) |
| else: |
| processed_tasks.append(task) |
| else: |
| processed_tasks.append(task) |
| except: |
| |
| processed_tasks.append(task) |
| else: |
| processed_tasks.append(task) |
|
|
| log_operation(f"FOUND_{len(processed_tasks)}_TASKS_FOR_USER", user_id=user_id) |
| return processed_tasks |
| except Exception as e: |
| log_error(e, "GET_TASKS_BY_USER") |
| raise |
|
|
| @staticmethod |
| def get_task_by_id_and_user_id(session: Session, task_id: int, user_id: str) -> Optional[Task]: |
| """ |
| Retrieve a task by ID for a specific user (enforcing data isolation) |
| """ |
| try: |
| log_operation("GET_TASK_BY_ID_AND_USER", user_id=user_id, task_id=task_id) |
|
|
| |
| task = Task.get_by_id_and_user_id(session, task_id, user_id) |
|
|
| if task: |
| log_operation("TASK_FOUND_FOR_USER", user_id=user_id, task_id=task_id) |
| else: |
| log_operation("TASK_NOT_FOUND_FOR_USER", user_id=user_id, task_id=task_id) |
|
|
| return task |
| except Exception as e: |
| log_error(e, "GET_TASK_BY_ID_AND_USER") |
| raise |
|
|
| @staticmethod |
| def update_task(session: Session, task_id: int, task_update: TaskUpdate, current_user_id: str = None) -> Optional[Task]: |
| """ |
| Update an existing task, with user ownership validation if current_user_id is provided |
| """ |
| try: |
| |
| statement = select(Task).where(Task.id == task_id) |
| db_task = session.exec(statement).first() |
|
|
| if not db_task: |
| log_operation("TASK_UPDATE_FAILED_NOT_FOUND", task_id=task_id) |
| return None |
|
|
| |
| if current_user_id and db_task.user_id != current_user_id: |
| log_authorization_decision("update", current_user_id, f"task-{task_id}", False) |
| raise PermissionError(f"User {current_user_id} does not own task {task_id}") |
|
|
| |
| if current_user_id: |
| log_authorization_decision("update", current_user_id, f"task-{task_id}", True) |
|
|
| |
| update_data = task_update.dict(exclude_unset=True) |
| for field, value in update_data.items(): |
| setattr(db_task, field, value) |
|
|
| session.add(db_task) |
| session.commit() |
| session.refresh(db_task) |
|
|
| log_operation("TASK_UPDATED", user_id=db_task.user_id, task_id=task_id) |
| return db_task |
| except Exception as e: |
| log_error(e, "UPDATE_TASK") |
| session.rollback() |
| raise |
|
|
| @staticmethod |
| def delete_task(session: Session, task_id: int, current_user_id: str = None) -> bool: |
| """ |
| Delete a task by its ID, with user ownership validation if current_user_id is provided |
| """ |
| try: |
| statement = select(Task).where(Task.id == task_id) |
| db_task = session.exec(statement).first() |
|
|
| if not db_task: |
| log_operation("TASK_DELETE_FAILED_NOT_FOUND", task_id=task_id) |
| return False |
|
|
| |
| if current_user_id and db_task.user_id != current_user_id: |
| log_authorization_decision("delete", current_user_id, f"task-{task_id}", False) |
| raise PermissionError(f"User {current_user_id} does not own task {task_id}") |
|
|
| |
| if current_user_id: |
| log_authorization_decision("delete", current_user_id, f"task-{task_id}", True) |
|
|
| session.delete(db_task) |
| session.commit() |
|
|
| log_operation("TASK_DELETED", user_id=db_task.user_id, task_id=task_id) |
| return True |
| except Exception as e: |
| log_error(e, "DELETE_TASK") |
| session.rollback() |
| raise |
|
|
| @staticmethod |
| def toggle_task_completion(session: Session, task_id: int, current_user_id: str = None) -> Optional[Task]: |
| """ |
| Toggle the completion status of a task, with user ownership validation if current_user_id is provided |
| """ |
| try: |
| statement = select(Task).where(Task.id == task_id) |
| db_task = session.exec(statement).first() |
|
|
| if not db_task: |
| log_operation("TASK_TOGGLE_FAILED_NOT_FOUND", task_id=task_id) |
| return None |
|
|
| |
| if current_user_id and db_task.user_id != current_user_id: |
| log_authorization_decision("toggle", current_user_id, f"task-{task_id}", False) |
| raise PermissionError(f"User {current_user_id} does not own task {task_id}") |
|
|
| |
| if current_user_id: |
| log_authorization_decision("toggle", current_user_id, f"task-{task_id}", True) |
|
|
| |
| db_task.completed = not db_task.completed |
|
|
| session.add(db_task) |
| session.commit() |
| session.refresh(db_task) |
|
|
| log_operation("TASK_COMPLETION_TOGGLED", user_id=db_task.user_id, task_id=task_id) |
| return db_task |
| except Exception as e: |
| log_error(e, "TOGGLE_TASK_COMPLETION") |
| session.rollback() |
| raise |
|
|
| @staticmethod |
| def verify_task_ownership(session: Session, task_id: int, user_id: str) -> bool: |
| """ |
| Verify that a specific user owns a specific task |
| """ |
| try: |
| statement = select(Task).where(Task.id == task_id) |
| task = session.exec(statement).first() |
|
|
| if not task: |
| return False |
|
|
| return task.user_id == user_id |
| except Exception as e: |
| log_error(e, "VERIFY_TASK_OWNERSHIP") |
| raise |