sofia-ai-workspace / api_wrapper.py
GoGma's picture
Store generated content metadata in sofia-rivera-gallery dataset automatically
b6991b4 verified
"""
Sofia AI API Wrapper - FastAPI REST API
Integración completa para n8n y almacenamiento en metadatos de HF
"""
from fastapi import FastAPI, HTTPException, Header, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List, Dict
import os
import json
from datetime import datetime
from huggingface_hub import HfApi
from generation import generate_image_from_prompt
# Configuración de Hugging Face
HF_TOKEN = os.getenv("HF_TOKEN")
REPO_ID = "GoGma/sofia-rivera-gallery" # Tu dataset de metadata
# Crear instancia de FastAPI
app = FastAPI(
title="Sofia AI API",
description="API REST con almacenamiento automático en metadatos",
version="1.1.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Modelos
class WorkflowRequest(BaseModel):
topic: str
platform: str
style: str = "lifestyle"
tone: str = "engaging"
class WorkflowResponse(BaseModel):
success: bool
content: str
image_path: Optional[str]
metadata_status: str
# Autenticación
API_KEY = os.getenv("SOFIA_API_KEY", "sofia_dev_key_2026")
def verify_api_key(x_api_key: str = Header(None)):
if x_api_key != API_KEY:
raise HTTPException(status_code=401, detail="API Key inválida")
return x_api_key
def save_to_metadata(data: dict):
"""Guarda el contenido generado en el dataset de Hugging Face"""
if not HF_TOKEN:
return "⚠️ Metadata no guardada: HF_TOKEN no configurado"
try:
api = HfApi(token=HF_TOKEN)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = f"content_{timestamp}.json"
# Preparar JSON
json_content = json.dumps(data, indent=4, ensure_ascii=False)
# Subir al repositorio de galería/metadata
api.upload_file(
path_or_fileobj=json_content.encode("utf-8"),
path_in_repo=f"history/{filename}",
repo_id=REPO_ID,
repo_type="dataset",
commit_message=f"Add generated content metadata: {timestamp}"
)
return "✅ Metadata guardada en HF Dataset"
except Exception as e:
return f"❌ Error guardando metadata: {str(e)}"
@app.post("/api/v1/workflow/complete", response_model=WorkflowResponse)
def complete_workflow(
request: WorkflowRequest,
api_key: str = Depends(verify_api_key)
):
try:
# 1. Crear contenido
content_templates = {
"instagram": f"✨ {request.topic}
Caption: Let's talk about {request.topic}! 💫
Hashtags: #{request.topic.replace(' ', '')} #SofiaRivera",
"twitter": f"💭 {request.topic}
#{request.topic.replace(' ', '')} #SofiaAI",
}
content = content_templates.get(request.platform.lower(), f"Content: {request.topic}")
# 2. Generar imagen
sofia_base = "Professional portrait of Sofia Rivera, a beautiful 25yo Spanish-Latina woman, hazel eyes, wavy dark chocolate hair, sun-kissed skin, highly detailed, 8k, instagram style, "
full_prompt = sofia_base + request.style
img_path, status = generate_image_from_prompt(prompt=full_prompt)
# 3. Guardar en Metadata de Hugging Face
metadata_payload = {
"timestamp": datetime.now().isoformat(),
"topic": request.topic,
"platform": request.platform,
"style": request.style,
"content": content,
"image_url": img_path,
"server_status": status
}
meta_status = save_to_metadata(metadata_payload)
return WorkflowResponse(
success=True,
content=content,
image_path=img_path,
metadata_status=meta_status
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)