| from typing import List |
| from fastapi import APIRouter, Depends, HTTPException, status |
| from sqlmodel import Session |
| from src.services.task_service import TaskService |
| from src.models.task import Task, TaskCreate, TaskUpdate, TaskResponse |
| from src.core.database import get_session |
| from src.core.logging import log_operation |
| from src.auth.deps import get_current_user_id |
| from src.auth.security import authorize_user_for_task |
|
|
| router = APIRouter() |
|
|
|
|
| @router.post("/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED) |
| def create_task( |
| task_create: TaskCreate, |
| current_user_id: str = Depends(get_current_user_id), |
| session: Session = Depends(get_session) |
| ) -> TaskResponse: |
| """ |
| Create a new task for the authenticated user |
| """ |
| try: |
| from src.utils.validators import validate_task_create |
|
|
| |
| task_create.user_id = current_user_id |
|
|
| |
| validate_task_create(task_create) |
|
|
| |
| db_task = TaskService.create_task(session, task_create) |
|
|
| log_operation("TASK_CREATED_SUCCESSFULLY", user_id=current_user_id, task_id=db_task.id) |
| return TaskResponse.model_validate(db_task) |
| except HTTPException: |
| raise |
| except Exception as e: |
| log_operation("CREATE_TASK_ERROR", user_id=current_user_id) |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"An error occurred while creating the task: {str(e)}" |
| ) |
|
|
|
|
| @router.get("/user/{user_id}", response_model=List[TaskResponse]) |
| def get_tasks_for_user( |
| user_id: str, |
| current_user_id: str = Depends(get_current_user_id), |
| session: Session = Depends(get_session) |
| ) -> List[TaskResponse]: |
| """ |
| Get all tasks for the authenticated user |
| """ |
| try: |
| |
| if user_id != current_user_id: |
| log_operation(f"UNAUTHORIZED_ACCESS_ATTEMPT_tasks_for_user_{user_id}", user_id=current_user_id) |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail="You can only access your own tasks" |
| ) |
|
|
| |
| if not user_id or len(user_id.strip()) == 0: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail="user_id is required" |
| ) |
|
|
| |
| tasks = TaskService.get_tasks_by_user_id(session, user_id) |
|
|
| log_operation(f"GET_TASKS_SUCCESS ({len(tasks)} tasks)", user_id=user_id) |
|
|
| |
| return [TaskResponse.model_validate(task, from_attributes=True) for task in tasks] |
| except HTTPException: |
| raise |
| except Exception as e: |
| log_operation("GET_TASKS_ERROR", user_id=user_id) |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"An error occurred while retrieving tasks: {str(e)}" |
| ) |
|
|
|
|
| @router.put("/{task_id}", response_model=TaskResponse) |
| def update_task( |
| task_id: int, |
| task_update: TaskUpdate, |
| current_user_id: str = Depends(get_current_user_id), |
| session: Session = Depends(get_session) |
| ) -> TaskResponse: |
| """ |
| Update a task by ID (only if the task belongs to the authenticated user) |
| """ |
| try: |
| from src.utils.validators import validate_task_update |
| validate_task_update(task_update) |
|
|
| |
| existing_task = TaskService.get_task_by_id(session, task_id) |
| if not existing_task: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail=f"Task with id {task_id} not found" |
| ) |
|
|
| |
| if existing_task.user_id != current_user_id: |
| log_operation("UNAUTHORIZED_ACCESS_ATTEMPT", user_id=current_user_id, task_id=task_id) |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail="You can only update your own tasks" |
| ) |
|
|
| |
| updated_task = TaskService.update_task(session, task_id, task_update) |
| if not updated_task: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail=f"Task with id {task_id} not found" |
| ) |
|
|
| log_operation("TASK_UPDATED_SUCCESSFULLY", user_id=current_user_id, task_id=task_id) |
| return TaskResponse.model_validate(updated_task) |
| except HTTPException: |
| raise |
| except Exception as e: |
| log_operation("UPDATE_TASK_ERROR", user_id=current_user_id) |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"An error occurred while updating the task: {str(e)}" |
| ) |
|
|
|
|
| @router.patch("/{task_id}/toggle", response_model=TaskResponse) |
| def toggle_task_completion( |
| task_id: int, |
| current_user_id: str = Depends(get_current_user_id), |
| session: Session = Depends(get_session) |
| ) -> TaskResponse: |
| """ |
| Toggle the completion status of a task (only if the task belongs to the authenticated user) |
| """ |
| try: |
| |
| existing_task = TaskService.get_task_by_id(session, task_id) |
| if not existing_task: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail=f"Task with id {task_id} not found" |
| ) |
|
|
| |
| if existing_task.user_id != current_user_id: |
| log_operation("UNAUTHORIZED_ACCESS_ATTEMPT", user_id=current_user_id, task_id=task_id) |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail="You can only toggle completion status of your own tasks" |
| ) |
|
|
| |
| toggled_task = TaskService.toggle_task_completion(session, task_id) |
| if not toggled_task: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail=f"Task with id {task_id} not found" |
| ) |
|
|
| log_operation("TASK_COMPLETION_TOGGLED_SUCCESSFULLY", user_id=current_user_id, task_id=task_id) |
| return TaskResponse.model_validate(toggled_task) |
| except HTTPException: |
| raise |
| except Exception as e: |
| log_operation("TOGGLE_TASK_ERROR", user_id=current_user_id) |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"An error occurred while toggling the task: {str(e)}" |
| ) |
|
|
|
|
| @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT) |
| def delete_task( |
| task_id: int, |
| current_user_id: str = Depends(get_current_user_id), |
| session: Session = Depends(get_session) |
| ): |
| """ |
| Delete a task by ID (only if the task belongs to the authenticated user) |
| """ |
| try: |
| |
| existing_task = TaskService.get_task_by_id(session, task_id) |
| if not existing_task: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail=f"Task with id {task_id} not found" |
| ) |
|
|
| |
| if existing_task.user_id != current_user_id: |
| log_operation("UNAUTHORIZED_ACCESS_ATTEMPT", user_id=current_user_id, task_id=task_id) |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail="You can only delete your own tasks" |
| ) |
|
|
| |
| success = TaskService.delete_task(session, task_id) |
| if not success: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail=f"Task with id {task_id} not found" |
| ) |
|
|
| log_operation("TASK_DELETED_SUCCESSFULLY", user_id=current_user_id, task_id=task_id) |
| except HTTPException: |
| raise |
| except Exception as e: |
| log_operation("DELETE_TASK_ERROR", user_id=current_user_id) |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"An error occurred while deleting the task: {str(e)}" |
| ) |