full-stack-todo-backend / src /services /task_service.py
m-ahmad-official's picture
Initial commit: Full-stack todo backend for Hugging Face Spaces
6bed18e
from typing import List, Optional
from sqlmodel import Session, select
from src.models.task import Task, TaskCreate, TaskUpdate
from src.core.logging import log_operation, log_error, log_authorization_decision
class TaskService:
@staticmethod
def create_task(session: Session, task_create: TaskCreate) -> Task:
"""
Create a new task in the database
"""
try:
log_operation("CREATE_TASK", user_id=str(task_create.user_id))
db_task = Task(**task_create.dict())
session.add(db_task)
session.commit()
session.refresh(db_task)
log_operation("TASK_CREATED", user_id=str(task_create.user_id), task_id=db_task.id)
return db_task
except Exception as e:
log_error(e, "CREATE_TASK")
session.rollback()
raise
@staticmethod
def get_task_by_id(session: Session, task_id: int) -> Optional[Task]:
"""
Retrieve a task by its ID
"""
try:
log_operation("GET_TASK_BY_ID", task_id=task_id)
statement = select(Task).where(Task.id == task_id)
task = session.exec(statement).first()
if task:
log_operation("TASK_FOUND", task_id=task_id, user_id=task.user_id)
else:
log_operation("TASK_NOT_FOUND", task_id=task_id)
return task
except Exception as e:
log_error(e, "GET_TASK_BY_ID")
raise
@staticmethod
def get_tasks_by_user_id(session: Session, user_id: str) -> List[Task]:
"""
Retrieve all tasks for a specific user
"""
try:
log_operation("GET_TASKS_BY_USER", user_id=user_id)
# Using the enhanced model method
tasks = Task.get_by_user_id(session, user_id)
# Ensure we're returning Task objects and not Row objects
# If the result contains Row objects, extract the Task from them
processed_tasks = []
for task in tasks:
if hasattr(task, '__iter__') and not isinstance(task, str) and hasattr(task, '__getitem__'):
# This looks like a Row/tuple object, extract the first element if it's a Task
try:
if len(task) > 0:
item = task[0]
if isinstance(item, Task):
processed_tasks.append(item)
else:
processed_tasks.append(task)
else:
processed_tasks.append(task)
except:
# If there's any issue with unpacking, just add the original
processed_tasks.append(task)
else:
processed_tasks.append(task)
log_operation(f"FOUND_{len(processed_tasks)}_TASKS_FOR_USER", user_id=user_id)
return processed_tasks
except Exception as e:
log_error(e, "GET_TASKS_BY_USER")
raise
@staticmethod
def get_task_by_id_and_user_id(session: Session, task_id: int, user_id: str) -> Optional[Task]:
"""
Retrieve a task by ID for a specific user (enforcing data isolation)
"""
try:
log_operation("GET_TASK_BY_ID_AND_USER", user_id=user_id, task_id=task_id)
# Using the enhanced model method for data isolation
task = Task.get_by_id_and_user_id(session, task_id, user_id)
if task:
log_operation("TASK_FOUND_FOR_USER", user_id=user_id, task_id=task_id)
else:
log_operation("TASK_NOT_FOUND_FOR_USER", user_id=user_id, task_id=task_id)
return task
except Exception as e:
log_error(e, "GET_TASK_BY_ID_AND_USER")
raise
@staticmethod
def update_task(session: Session, task_id: int, task_update: TaskUpdate, current_user_id: str = None) -> Optional[Task]:
"""
Update an existing task, with user ownership validation if current_user_id is provided
"""
try:
# Get the existing task
statement = select(Task).where(Task.id == task_id)
db_task = session.exec(statement).first()
if not db_task:
log_operation("TASK_UPDATE_FAILED_NOT_FOUND", task_id=task_id)
return None
# If current user is provided, validate ownership
if current_user_id and db_task.user_id != current_user_id:
log_authorization_decision("update", current_user_id, f"task-{task_id}", False)
raise PermissionError(f"User {current_user_id} does not own task {task_id}")
# Log successful authorization if user was validated
if current_user_id:
log_authorization_decision("update", current_user_id, f"task-{task_id}", True)
# Apply updates
update_data = task_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_task, field, value)
session.add(db_task)
session.commit()
session.refresh(db_task)
log_operation("TASK_UPDATED", user_id=db_task.user_id, task_id=task_id)
return db_task
except Exception as e:
log_error(e, "UPDATE_TASK")
session.rollback()
raise
@staticmethod
def delete_task(session: Session, task_id: int, current_user_id: str = None) -> bool:
"""
Delete a task by its ID, with user ownership validation if current_user_id is provided
"""
try:
statement = select(Task).where(Task.id == task_id)
db_task = session.exec(statement).first()
if not db_task:
log_operation("TASK_DELETE_FAILED_NOT_FOUND", task_id=task_id)
return False
# If current user is provided, validate ownership
if current_user_id and db_task.user_id != current_user_id:
log_authorization_decision("delete", current_user_id, f"task-{task_id}", False)
raise PermissionError(f"User {current_user_id} does not own task {task_id}")
# Log successful authorization if user was validated
if current_user_id:
log_authorization_decision("delete", current_user_id, f"task-{task_id}", True)
session.delete(db_task)
session.commit()
log_operation("TASK_DELETED", user_id=db_task.user_id, task_id=task_id)
return True
except Exception as e:
log_error(e, "DELETE_TASK")
session.rollback()
raise
@staticmethod
def toggle_task_completion(session: Session, task_id: int, current_user_id: str = None) -> Optional[Task]:
"""
Toggle the completion status of a task, with user ownership validation if current_user_id is provided
"""
try:
statement = select(Task).where(Task.id == task_id)
db_task = session.exec(statement).first()
if not db_task:
log_operation("TASK_TOGGLE_FAILED_NOT_FOUND", task_id=task_id)
return None
# If current user is provided, validate ownership
if current_user_id and db_task.user_id != current_user_id:
log_authorization_decision("toggle", current_user_id, f"task-{task_id}", False)
raise PermissionError(f"User {current_user_id} does not own task {task_id}")
# Log successful authorization if user was validated
if current_user_id:
log_authorization_decision("toggle", current_user_id, f"task-{task_id}", True)
# Toggle completion status
db_task.completed = not db_task.completed
session.add(db_task)
session.commit()
session.refresh(db_task)
log_operation("TASK_COMPLETION_TOGGLED", user_id=db_task.user_id, task_id=task_id)
return db_task
except Exception as e:
log_error(e, "TOGGLE_TASK_COMPLETION")
session.rollback()
raise
@staticmethod
def verify_task_ownership(session: Session, task_id: int, user_id: str) -> bool:
"""
Verify that a specific user owns a specific task
"""
try:
statement = select(Task).where(Task.id == task_id)
task = session.exec(statement).first()
if not task:
return False
return task.user_id == user_id
except Exception as e:
log_error(e, "VERIFY_TASK_OWNERSHIP")
raise