File size: 8,567 Bytes
6bed18e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 | from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session
from src.services.task_service import TaskService
from src.models.task import Task, TaskCreate, TaskUpdate, TaskResponse
from src.core.database import get_session
from src.core.logging import log_operation
from src.auth.deps import get_current_user_id
from src.auth.security import authorize_user_for_task
router = APIRouter()
@router.post("/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED)
def create_task(
task_create: TaskCreate,
current_user_id: str = Depends(get_current_user_id),
session: Session = Depends(get_session)
) -> TaskResponse:
"""
Create a new task for the authenticated user
"""
try:
from src.utils.validators import validate_task_create
# Override user_id with authenticated user's ID to ensure security
task_create.user_id = current_user_id
# Now validate the task with the proper user_id
validate_task_create(task_create)
# Create the task
db_task = TaskService.create_task(session, task_create)
log_operation("TASK_CREATED_SUCCESSFULLY", user_id=current_user_id, task_id=db_task.id)
return TaskResponse.model_validate(db_task)
except HTTPException:
raise
except Exception as e:
log_operation("CREATE_TASK_ERROR", user_id=current_user_id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"An error occurred while creating the task: {str(e)}"
)
@router.get("/user/{user_id}", response_model=List[TaskResponse])
def get_tasks_for_user(
user_id: str,
current_user_id: str = Depends(get_current_user_id),
session: Session = Depends(get_session)
) -> List[TaskResponse]:
"""
Get all tasks for the authenticated user
"""
try:
# Verify that the requested user_id matches the authenticated user's ID
if user_id != current_user_id:
log_operation(f"UNAUTHORIZED_ACCESS_ATTEMPT_tasks_for_user_{user_id}", user_id=current_user_id)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only access your own tasks"
)
# Validate that user_id is provided
if not user_id or len(user_id.strip()) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="user_id is required"
)
# Get tasks for the user
tasks = TaskService.get_tasks_by_user_id(session, user_id)
log_operation(f"GET_TASKS_SUCCESS ({len(tasks)} tasks)", user_id=user_id)
# Return as response models using SQLModel's serialization
return [TaskResponse.model_validate(task, from_attributes=True) for task in tasks]
except HTTPException:
raise
except Exception as e:
log_operation("GET_TASKS_ERROR", user_id=user_id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"An error occurred while retrieving tasks: {str(e)}"
)
@router.put("/{task_id}", response_model=TaskResponse)
def update_task(
task_id: int,
task_update: TaskUpdate,
current_user_id: str = Depends(get_current_user_id),
session: Session = Depends(get_session)
) -> TaskResponse:
"""
Update a task by ID (only if the task belongs to the authenticated user)
"""
try:
from src.utils.validators import validate_task_update
validate_task_update(task_update)
# Get the task from the database
existing_task = TaskService.get_task_by_id(session, task_id)
if not existing_task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task with id {task_id} not found"
)
# Verify that the authenticated user owns this task
if existing_task.user_id != current_user_id:
log_operation("UNAUTHORIZED_ACCESS_ATTEMPT", user_id=current_user_id, task_id=task_id)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only update your own tasks"
)
# Update the task
updated_task = TaskService.update_task(session, task_id, task_update)
if not updated_task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task with id {task_id} not found"
)
log_operation("TASK_UPDATED_SUCCESSFULLY", user_id=current_user_id, task_id=task_id)
return TaskResponse.model_validate(updated_task)
except HTTPException:
raise
except Exception as e:
log_operation("UPDATE_TASK_ERROR", user_id=current_user_id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"An error occurred while updating the task: {str(e)}"
)
@router.patch("/{task_id}/toggle", response_model=TaskResponse)
def toggle_task_completion(
task_id: int,
current_user_id: str = Depends(get_current_user_id),
session: Session = Depends(get_session)
) -> TaskResponse:
"""
Toggle the completion status of a task (only if the task belongs to the authenticated user)
"""
try:
# Get the task from the database
existing_task = TaskService.get_task_by_id(session, task_id)
if not existing_task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task with id {task_id} not found"
)
# Verify that the authenticated user owns this task
if existing_task.user_id != current_user_id:
log_operation("UNAUTHORIZED_ACCESS_ATTEMPT", user_id=current_user_id, task_id=task_id)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only toggle completion status of your own tasks"
)
# Toggle the task completion status
toggled_task = TaskService.toggle_task_completion(session, task_id)
if not toggled_task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task with id {task_id} not found"
)
log_operation("TASK_COMPLETION_TOGGLED_SUCCESSFULLY", user_id=current_user_id, task_id=task_id)
return TaskResponse.model_validate(toggled_task)
except HTTPException:
raise
except Exception as e:
log_operation("TOGGLE_TASK_ERROR", user_id=current_user_id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"An error occurred while toggling the task: {str(e)}"
)
@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_task(
task_id: int,
current_user_id: str = Depends(get_current_user_id),
session: Session = Depends(get_session)
):
"""
Delete a task by ID (only if the task belongs to the authenticated user)
"""
try:
# Get the task from the database
existing_task = TaskService.get_task_by_id(session, task_id)
if not existing_task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task with id {task_id} not found"
)
# Verify that the authenticated user owns this task
if existing_task.user_id != current_user_id:
log_operation("UNAUTHORIZED_ACCESS_ATTEMPT", user_id=current_user_id, task_id=task_id)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only delete your own tasks"
)
# Delete the task
success = TaskService.delete_task(session, task_id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task with id {task_id} not found"
)
log_operation("TASK_DELETED_SUCCESSFULLY", user_id=current_user_id, task_id=task_id)
except HTTPException:
raise
except Exception as e:
log_operation("DELETE_TASK_ERROR", user_id=current_user_id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"An error occurred while deleting the task: {str(e)}"
) |