""" ClauseGuard — FastAPI Backend v3.0 ══════════════════════════════════ FIXED in v3.0: • Imports shared modules (no code duplication) • Fixed API schema to accept both {text} and {clauses} from extension • Added rate limiting • Added max text length validation • Fixed CORS (removed wildcard) • Added proper error responses """ import os import re import json import time from contextlib import asynccontextmanager from typing import Optional from collections import defaultdict from datetime import datetime import httpx import numpy as np from fastapi import FastAPI, HTTPException, Depends, Body, Request from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from auth import get_current_user, require_auth # ── Import shared modules ── # When deployed, these must be in the same directory or on PYTHONPATH import sys sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) try: from app import ( split_clauses, classify_cuad, extract_entities, detect_contradictions, compute_risk_score, CUAD_LABELS, RISK_MAP, DESC_MAP, _model_status, cuad_model, cuad_tokenizer ) from obligations import extract_obligations from compliance import check_compliance from compare import compare_contracts _SHARED_MODULES = True except ImportError: _SHARED_MODULES = False print("[API] WARNING: Could not import shared modules, using inline fallbacks") # ─── Config ─── SUPABASE_URL = os.environ.get("SUPABASE_URL", "") SUPABASE_SERVICE_KEY = os.environ.get("SUPABASE_SERVICE_ROLE_KEY", "") HF_API_TOKEN = os.environ.get("HF_API_TOKEN", "") SAULLM_ENDPOINT = os.environ.get("SAULLM_ENDPOINT", "") MAX_TEXT_LENGTH = int(os.environ.get("MAX_TEXT_LENGTH", "100000")) # 100KB default # ─── Rate Limiting ─── _rate_limits = {} # ip -> (count, window_start) RATE_LIMIT_REQUESTS = 30 RATE_LIMIT_WINDOW = 60 # seconds def _check_rate_limit(client_ip: str) -> bool: now = time.time() if client_ip in _rate_limits: count, window_start = _rate_limits[client_ip] if now - window_start > RATE_LIMIT_WINDOW: _rate_limits[client_ip] = (1, now) return True if count >= RATE_LIMIT_REQUESTS: return False _rate_limits[client_ip] = (count + 1, window_start) return True _rate_limits[client_ip] = (1, now) return True # ─── Supabase helper ─── async def supabase_insert(table: str, data: dict): if not SUPABASE_URL or not SUPABASE_SERVICE_KEY: return try: async with httpx.AsyncClient() as client: await client.post( f"{SUPABASE_URL}/rest/v1/{table}", json=data, headers={ "apikey": SUPABASE_SERVICE_KEY, "Authorization": f"Bearer {SUPABASE_SERVICE_KEY}", "Content-Type": "application/json", "Prefer": "return=minimal", }, timeout=10.0, ) except Exception: pass async def supabase_query(table: str, params: dict, headers_extra: dict = {}): if not SUPABASE_URL or not SUPABASE_SERVICE_KEY: return [] try: async with httpx.AsyncClient() as client: resp = await client.get( f"{SUPABASE_URL}/rest/v1/{table}", params=params, headers={ "apikey": SUPABASE_SERVICE_KEY, "Authorization": f"Bearer {SUPABASE_SERVICE_KEY}", **headers_extra, }, timeout=10.0, ) return resp.json() if resp.status_code == 200 else [] except Exception: return [] # ─── Request/Response Models ─── class AnalyzeRequest(BaseModel): text: Optional[str] = Field(None, min_length=50) clauses: Optional[list] = None # FIXED: accept clauses array from extension source_url: Optional[str] = None class AnalyzeResponse(BaseModel): risk_score: int grade: str total_clauses: int flagged_count: int results: list[dict] entities: list[dict] contradictions: list[dict] obligations: list[dict] compliance: dict model: str latency_ms: int class CompareRequest(BaseModel): text_a: str = Field(..., min_length=50) text_b: str = Field(..., min_length=50) class ExplainRequest(BaseModel): clause: str = Field(..., min_length=10, max_length=2000) category: str class ExplainResponse(BaseModel): clause: str category: str explanation: str legal_basis: str recommendation: str # ─── App ─── @asynccontextmanager async def lifespan(app: FastAPI): # Models are loaded when app.py is imported yield app = FastAPI(title="ClauseGuard API", version="3.0.0", lifespan=lifespan) # FIXED: No wildcard CORS ALLOWED_ORIGINS = [ "https://clauseguardweb.netlify.app", "http://localhost:3000", "http://localhost:3001", ] # Allow chrome extensions app.add_middleware( CORSMiddleware, allow_origins=ALLOWED_ORIGINS, allow_origin_regex=r"^chrome-extension://.*$", allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/health") async def health(): model_status = "ml" if _SHARED_MODULES and cuad_model else "regex" return { "status": "ok", "model": model_status, "version": "3.0.0", "shared_modules": _SHARED_MODULES, } @app.post("/api/analyze", response_model=AnalyzeResponse) async def analyze(req: AnalyzeRequest, request: Request, user: Optional[dict] = Depends(get_current_user)): # Rate limiting client_ip = request.client.host if request.client else "unknown" if not _check_rate_limit(client_ip): raise HTTPException(status_code=429, detail="Rate limit exceeded. Try again in 60 seconds.") # FIXED: Accept either text or clauses from extension text = req.text if not text and req.clauses: text = "\n\n".join(req.clauses) if isinstance(req.clauses, list) else str(req.clauses) if not text or len(text.strip()) < 50: raise HTTPException(status_code=400, detail="Text too short (minimum 50 characters)") # Max length check if len(text) > MAX_TEXT_LENGTH: raise HTTPException(status_code=400, detail=f"Text too long (maximum {MAX_TEXT_LENGTH} characters)") start = time.time() clauses = split_clauses(text) if not clauses: raise HTTPException(status_code=400, detail="No clauses detected in document") clause_results = [] for clause in clauses: predictions = classify_cuad(clause) if predictions: for pred in predictions: clause_results.append({ "text": clause, "label": pred["label"], "confidence": pred["confidence"], "risk": pred["risk"], "description": pred["description"], "source": pred.get("source", "unknown"), }) entities = extract_entities(text) contradictions = detect_contradictions(clause_results, text) risk, grade, sev_counts = compute_risk_score(clause_results, len(clauses)) obligations = extract_obligations(text) compliance = check_compliance(text) latency = int((time.time() - start) * 1000) results_for_db = [] for cr in clause_results: results_for_db.append({ "text": cr["text"], "categories": [{ "name": cr["label"], "severity": cr["risk"], "confidence": cr["confidence"], "description": cr["description"], }], }) if user: await supabase_insert("analyses", { "user_id": user["id"], "source_url": req.source_url, "total_clauses": len(clauses), "flagged_count": len(set(cr["text"] for cr in clause_results)), "risk_score": risk, "grade": grade, "clauses": results_for_db, "entities": entities, "contradictions": contradictions, "obligations": obligations, "compliance": compliance, }) return AnalyzeResponse( risk_score=risk, grade=grade, total_clauses=len(clauses), flagged_count=len(set(cr["text"] for cr in clause_results)), results=results_for_db, entities=entities, contradictions=contradictions, obligations=obligations, compliance=compliance, model="ml" if cuad_model else "regex", latency_ms=latency, ) @app.post("/api/compare") async def compare(req: CompareRequest, request: Request): client_ip = request.client.host if request.client else "unknown" if not _check_rate_limit(client_ip): raise HTTPException(status_code=429, detail="Rate limit exceeded.") result = compare_contracts(req.text_a, req.text_b) return result @app.post("/api/explain", response_model=ExplainResponse) async def explain(req: ExplainRequest, user: dict = Depends(require_auth)): desc = DESC_MAP.get(req.category, "Unknown category.") legal = "Consult local consumer protection laws." recommendation = "Review this clause carefully. Consider negotiating or seeking legal advice before agreeing." if SAULLM_ENDPOINT and HF_API_TOKEN: try: prompt = ( f"You are a consumer protection legal analyst. Analyze this contract clause " f"and explain why it may be unfair or risky.\n\n" f"Clause: \"{req.clause}\"\n" f"Category: {req.category}\n\n" f"Provide:\n" f"1. A plain-English explanation of what this clause means\n" f"2. The specific legal basis or consumer protection concern\n" f"3. A practical recommendation\n\n" f"Be concise. 3-4 sentences per section." ) async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post( SAULLM_ENDPOINT, json={"inputs": prompt, "parameters": {"max_new_tokens": 300, "temperature": 0.3}}, headers={"Authorization": f"Bearer {HF_API_TOKEN}"}, ) if resp.status_code == 200: output = resp.json() generated = output[0]["generated_text"] if isinstance(output, list) else output.get("generated_text", "") if generated and len(generated) > 50: parts = generated.split("\n\n") desc = parts[0] if len(parts) > 0 else desc legal = parts[1] if len(parts) > 1 else legal recommendation = parts[2] if len(parts) > 2 else recommendation except Exception: pass return ExplainResponse( clause=req.clause, category=req.category, explanation=desc, legal_basis=legal, recommendation=recommendation, ) @app.get("/api/history") async def history(user: dict = Depends(require_auth), limit: int = 20, offset: int = 0): limit = min(limit, 100) data = await supabase_query( "analyses", { "user_id": f"eq.{user['id']}", "select": "*", "order": "created_at.desc", "limit": str(limit), "offset": str(offset), }, ) return {"analyses": data, "limit": limit, "offset": offset} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)