| from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Form
|
| from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
| from fastapi.middleware.cors import CORSMiddleware
|
| import easyocr
|
| import numpy as np
|
| from PIL import Image
|
| import io
|
| import asyncio
|
| from typing import Optional, List, Dict
|
| import json
|
| import os
|
| from datetime import datetime
|
|
|
|
|
| from utils.scraper import Scraper
|
| from utils.health_score import HealthScoreCalculator
|
| from models.database import Database
|
|
|
| app = FastAPI(title="IndiScan API")
|
| security = HTTPBasic()
|
| db = Database()
|
| health_calculator = HealthScoreCalculator()
|
|
|
|
|
| app.add_middleware(
|
| CORSMiddleware,
|
| allow_origins=["*"],
|
| allow_credentials=True,
|
| allow_methods=["*"],
|
| allow_headers=["*"],
|
| )
|
|
|
|
|
| reader = easyocr.Reader(['en'])
|
|
|
| def verify_admin(credentials: HTTPBasicCredentials = Depends(security)):
|
| is_admin = db.verify_admin(credentials.username, credentials.password)
|
| if not is_admin:
|
| raise HTTPException(
|
| status_code=401,
|
| detail="Invalid credentials",
|
| headers={"WWW-Authenticate": "Basic"},
|
| )
|
| return credentials.username
|
|
|
| @app.post("/scan/barcode")
|
| async def scan_barcode(barcode: str):
|
| """Scan a product by barcode"""
|
|
|
| product = db.get_product(barcode)
|
| if product:
|
| return product
|
|
|
|
|
| async with Scraper() as scraper:
|
| prices = await scraper.get_all_prices(barcode)
|
| if not prices:
|
| raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
|
| first_result = prices[0]
|
| product_data = {
|
| 'barcode': barcode,
|
| 'name': first_result['title'],
|
| 'prices': prices,
|
| 'last_updated': datetime.now().isoformat()
|
| }
|
|
|
|
|
| db.add_product(product_data)
|
| return product_data
|
|
|
| @app.post("/scan/image")
|
| async def scan_image(file: UploadFile = File(...)):
|
| """Scan product image for ingredients"""
|
| contents = await file.read()
|
| image = Image.open(io.BytesIO(contents))
|
|
|
|
|
| image_np = np.array(image)
|
|
|
|
|
| results = reader.readtext(image_np)
|
| text = ' '.join([result[1] for result in results])
|
|
|
|
|
| async with Scraper() as scraper:
|
| ingredients = scraper.extract_ingredients(text)
|
| nutrition_info = scraper.extract_nutrition_info(text)
|
|
|
|
|
| health_score = health_calculator.calculate_score(ingredients)
|
| nutrition_analysis = health_calculator.analyze_nutrition(nutrition_info)
|
|
|
| return {
|
| 'ingredients': ingredients,
|
| 'nutrition_info': nutrition_info,
|
| 'health_score': health_score,
|
| 'nutrition_analysis': nutrition_analysis
|
| }
|
|
|
| @app.post("/analyze/text")
|
| async def analyze_text(text: str = Form(...), product_type: str = Form("food")):
|
| """Analyze product from text description"""
|
| async with Scraper() as scraper:
|
| ingredients = scraper.extract_ingredients(text)
|
| nutrition_info = scraper.extract_nutrition_info(text)
|
|
|
| health_score = health_calculator.calculate_score(ingredients, product_type)
|
| nutrition_analysis = health_calculator.analyze_nutrition(nutrition_info)
|
|
|
| return {
|
| 'ingredients': ingredients,
|
| 'nutrition_info': nutrition_info,
|
| 'health_score': health_score,
|
| 'nutrition_analysis': nutrition_analysis
|
| }
|
|
|
| @app.get("/products/{barcode}")
|
| async def get_product(barcode: str):
|
| """Get product information by barcode"""
|
| product = db.get_product(barcode)
|
| if not product:
|
| raise HTTPException(status_code=404, detail="Product not found")
|
| return product
|
|
|
| @app.post("/products/add")
|
| async def add_product(
|
| barcode: str = Form(...),
|
| name: str = Form(...),
|
| ingredients: str = Form(...),
|
| product_type: str = Form("food"),
|
| admin_user: str = Depends(verify_admin)
|
| ):
|
| """Add or update product information (admin only)"""
|
| try:
|
| ingredients_list = json.loads(ingredients)
|
| except json.JSONDecodeError:
|
| ingredients_list = [i.strip() for i in ingredients.split(',')]
|
|
|
| product_data = {
|
| 'barcode': barcode,
|
| 'name': name,
|
| 'ingredients': ingredients_list,
|
| 'product_type': product_type,
|
| 'added_by': admin_user,
|
| 'is_verified': True,
|
| 'last_updated': datetime.now().isoformat()
|
| }
|
|
|
| db.add_product(product_data)
|
| return {"message": "Product added successfully"}
|
|
|
| @app.get("/products/update")
|
| async def update_products(admin_user: str = Depends(verify_admin)):
|
| """Update products that haven't been updated in 60 days (admin only)"""
|
| products_to_update = db.get_products_for_update()
|
|
|
| async with Scraper() as scraper:
|
| for barcode in products_to_update:
|
| try:
|
| prices = await scraper.get_all_prices(barcode)
|
| if prices:
|
| first_result = prices[0]
|
| product_data = {
|
| 'barcode': barcode,
|
| 'name': first_result['title'],
|
| 'prices': prices,
|
| 'last_updated': datetime.now().isoformat()
|
| }
|
| db.add_product(product_data)
|
| except Exception as e:
|
| print(f"Error updating product {barcode}: {str(e)}")
|
| continue
|
|
|
| return {"message": f"Updated {len(products_to_update)} products"}
|
|
|
| @app.get("/export")
|
| async def export_data(admin_user: str = Depends(verify_admin)):
|
| """Export database to CSV (admin only)"""
|
| try:
|
| export_dir = "data/exports"
|
| os.makedirs(export_dir, exist_ok=True)
|
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
| export_path = f"{export_dir}/export_{timestamp}"
|
| db.export_to_csv(export_path)
|
| return {"message": f"Data exported to {export_path}"}
|
| except Exception as e:
|
| raise HTTPException(status_code=500, detail=f"Export failed: {str(e)}")
|
|
|
| @app.post("/import")
|
| async def import_data(file: UploadFile = File(...), admin_user: str = Depends(verify_admin)):
|
| """Import data from CSV (admin only)"""
|
| try:
|
| contents = await file.read()
|
| import_dir = "data/imports"
|
| os.makedirs(import_dir, exist_ok=True)
|
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
| import_path = f"{import_dir}/import_{timestamp}"
|
|
|
| with open(import_path, 'wb') as f:
|
| f.write(contents)
|
|
|
| db.import_from_csv(import_path)
|
| return {"message": "Data imported successfully"}
|
| except Exception as e:
|
| raise HTTPException(status_code=500, detail=f"Import failed: {str(e)}")
|
|
|
| if __name__ == "__main__":
|
| import uvicorn
|
| uvicorn.run(app, host="0.0.0.0", port=8000) |