| """ |
| Stripe webhook handler – updates API key tier on subscription events. |
| """ |
|
|
| import os |
| import stripe |
| from fastapi import APIRouter, Request, HTTPException |
| from app.core.usage_tracker import update_key_tier, Tier |
|
|
| router = APIRouter(prefix="/webhooks", tags=["webhooks"]) |
|
|
| STRIPE_WEBHOOK_SECRET = os.getenv("STRIPE_WEBHOOK_SECRET") |
| stripe.api_key = os.getenv("STRIPE_SECRET_KEY") |
|
|
|
|
| @router.post("/stripe") |
| async def stripe_webhook(request: Request): |
| payload = await request.body() |
| sig_header = request.headers.get("stripe-signature") |
|
|
| if not STRIPE_WEBHOOK_SECRET or not stripe.api_key: |
| raise HTTPException(status_code=500, detail="Stripe not configured") |
|
|
| try: |
| event = stripe.Webhook.construct_event( |
| payload, sig_header, STRIPE_WEBHOOK_SECRET |
| ) |
| except ValueError: |
| raise HTTPException(status_code=400, detail="Invalid payload") |
| except stripe.error.SignatureVerificationError: |
| raise HTTPException(status_code=400, detail="Invalid signature") |
|
|
| |
| if event["type"] == "checkout.session.completed": |
| session = event["data"]["object"] |
| api_key = session.get("client_reference_id") or session.get("metadata", {}).get("api_key") |
| if api_key: |
| update_key_tier(api_key, Tier.PRO) |
| elif event["type"] == "customer.subscription.deleted": |
| subscription = event["data"]["object"] |
| |
| |
| |
| api_key = subscription.get("metadata", {}).get("api_key") |
| if api_key: |
| update_key_tier(api_key, Tier.FREE) |
|
|
| return {"status": "ok"} |
|
|