| from datetime import datetime, timezone |
| from typing import Any, Dict, List, Optional |
|
|
| from fastapi import HTTPException, status |
| from bson import ObjectId |
|
|
| from ..database.connection import get_collection |
| from ..models.monthly_record import ( |
| MonthlyRecord, MonthlyRecordCreate, MonthlyRecordUpdate, |
| ExpenseCategory, month_key_from, normalize_month_name |
| ) |
|
|
| class RecordService: |
| def __init__(self): |
| self.col = get_collection() |
|
|
| async def ensure_indexes(self) -> None: |
| |
| await self.col.create_index("month_key", unique=True) |
|
|
| def _calc_totals(self, salary: float, expenses: List[ExpenseCategory]) -> Dict[str, float]: |
| total_expenses = round(sum(e.amount for e in expenses), 2) |
| remaining = round(salary - total_expenses, 2) |
| return {"total_expenses": total_expenses, "remaining": remaining} |
|
|
| def _serialize(self, doc: Dict[str, Any]) -> Dict[str, Any]: |
| if not doc: |
| return doc |
| doc["_id"] = str(doc["_id"]) |
| return doc |
|
|
| async def get_by_month_key(self, month_key: str) -> Dict[str, Any]: |
| doc = await self.col.find_one({"month_key": month_key}) |
| if not doc: |
| raise HTTPException(status_code=404, detail="No record found for this month") |
| return self._serialize(doc) |
|
|
| async def list(self, limit: int = 12, skip: int = 0) -> List[Dict[str, Any]]: |
| cursor = self.col.find({}).sort("month_key", 1).skip(skip).limit(limit) |
| return [self._serialize(d) async for d in cursor] |
|
|
| async def create(self, payload: MonthlyRecordCreate) -> Dict[str, Any]: |
| month = normalize_month_name(payload.month) |
| month_key = month_key_from(month, payload.year) |
| now = datetime.now(timezone.utc) |
|
|
| totals = self._calc_totals(payload.salary, payload.expenses) |
|
|
| doc = { |
| "month": month, |
| "year": payload.year, |
| "month_key": month_key, |
| "salary": float(payload.salary), |
| "expenses": [e.model_dump() for e in payload.expenses], |
| "total_expenses": totals["total_expenses"], |
| "remaining": totals["remaining"], |
| "created_at": now, |
| "updated_at": now, |
| } |
|
|
| try: |
| result = await self.col.insert_one(doc) |
| except Exception as e: |
| |
| raise HTTPException(status_code=400, detail=str(e)) |
|
|
| created = await self.col.find_one({"_id": result.inserted_id}) |
| return self._serialize(created) |
|
|
| async def update(self, month_key: str, payload: MonthlyRecordUpdate) -> Dict[str, Any]: |
| existing = await self.col.find_one({"month_key": month_key}) |
| if not existing: |
| raise HTTPException(status_code=404, detail="No record found for this month") |
|
|
| salary = payload.salary if payload.salary is not None else existing["salary"] |
| expenses = payload.expenses if payload.expenses is not None else existing["expenses"] |
| |
| expenses_list = [e.model_dump() if hasattr(e, "model_dump") else e for e in expenses] |
|
|
| totals = self._calc_totals(salary, [ExpenseCategory(**e) for e in expenses_list]) |
|
|
| update_doc = { |
| "$set": { |
| "salary": float(salary), |
| "expenses": expenses_list, |
| "total_expenses": totals["total_expenses"], |
| "remaining": totals["remaining"], |
| "updated_at": datetime.now(timezone.utc), |
| } |
| } |
|
|
| await self.col.update_one({"month_key": month_key}, update_doc) |
| updated = await self.col.find_one({"month_key": month_key}) |
| return self._serialize(updated) |
|
|
| async def delete(self, month_key: str) -> None: |
| result = await self.col.delete_one({"month_key": month_key}) |
| if result.deleted_count == 0: |
| raise HTTPException(status_code=404, detail="No record found for this month") |
|
|