reframe / app.py
macayaven's picture
Update app.py
758422c verified
"""
Re-frame: Cognitive Reframing Assistant
A Gradio-based CBT tool for identifying and reframing cognitive distortions
"""
from __future__ import annotations
import hashlib
import json
import os
import shutil
from datetime import datetime
from typing import Optional
import gradio as gr
from huggingface_hub import whoami as _hf_whoami
# Import our CBT knowledge base
from cbt_knowledge import (
COGNITIVE_DISTORTIONS,
find_similar_situations,
)
# Import UI components
from ui_components.landing import create_landing_tab
from ui_components.learn import create_learn_tab
# Agentic LLM support (Hugging Face Inference API)
try:
from agents import CBTAgent
AGENT_AVAILABLE = True
except Exception as e:
print(f"Error importing CBTAgent: {e}")
CBTAgent = None # type: ignore
AGENT_AVAILABLE = False
# Load translations
def load_translations():
"""Load translation files for internationalization"""
translations = {}
for lang in ['en', 'es']:
try:
with open(f'locales/{lang}.json', encoding='utf-8') as f:
translations[lang] = json.load(f)
except FileNotFoundError:
# Fallback to embedded translations if files don't exist
print(f"Error loading translations for {lang}: FileNotFoundError")
# Fallback translations
if 'en' not in translations:
translations['en'] = {
"app_title": "🧠 re-frame: Cognitive Reframing Assistant",
"app_description": "Using CBT principles to help you find balanced perspectives",
"welcome": {
"title": "Welcome to re-frame",
"subtitle": "Find a kinder perspective",
"description": (
"Using ideas from Cognitive Behavioral Therapy (CBT), we help you notice "
"thinking patterns and explore gentler, more balanced perspectives."
),
"how_it_works": "How it works",
"step1": "Share your thoughts",
"step1_desc": "Tell us what's on your mind",
"step2": "Identify patterns",
"step2_desc": "We'll help spot thinking traps",
"step3": "Find balance",
"step3_desc": "Explore alternative perspectives",
"start_chat": "Start Chat",
"disclaimer": "Important: This is a self-help tool, not therapy or medical advice.",
"privacy": "Privacy: No data is stored beyond your session.",
},
"chat": {
"title": "Chat",
"placeholder": "Share what's on your mind...",
"send": "Send",
"clear": "New Session",
"thinking": "Thinking...",
"distortions_found": "Thinking patterns identified:",
"reframe_suggestion": "Alternative perspective:",
"similar_situations": "Similar situations:",
"try_this": "You might try:",
},
"learn": {
"title": "Learn",
"select_distortion": "Select a thinking pattern to explore",
"definition": "Definition",
"examples": "Common Examples",
"strategies": "Reframing Strategies",
"actions": "Small Steps to Try",
},
}
if 'es' not in translations:
translations['es'] = {
"app_title": "🧠 re-frame: Asistente de Reencuadre Cognitivo",
"app_description": (
"Usando principios de TCC para ayudarte a encontrar perspectivas equilibradas"
),
"welcome": {
"title": "Bienvenido a re-frame",
"subtitle": "Encuentra una perspectiva más amable",
"description": (
"Usando ideas de la Terapia Cognitivo-Conductual (TCC), te ayudamos a notar "
"patrones de pensamiento y explorar perspectivas más gentiles y equilibradas."
),
"how_it_works": "Cómo funciona",
"step1": "Comparte tus pensamientos",
"step1_desc": "Cuéntanos qué piensas",
"step2": "Identifica patrones",
"step2_desc": "Te ayudamos a detectar trampas mentales",
"step3": "Encuentra balance",
"step3_desc": "Explora perspectivas alternativas",
"start_chat": "Iniciar Chat",
"disclaimer": (
"Importante: Esta es una herramienta de autoayuda, "
"no terapia ni consejo médico."
),
"privacy": "Privacidad: No se almacenan datos más allá de tu sesión.",
},
"chat": {
"title": "Chat",
"placeholder": "Comparte lo que piensas...",
"send": "Enviar",
"clear": "Nueva Sesión",
"thinking": "Pensando...",
"distortions_found": "Patrones de pensamiento identificados:",
"reframe_suggestion": "Perspectiva alternativa:",
"similar_situations": "Situaciones similares:",
"try_this": "Podrías intentar:",
},
"learn": {
"title": "Aprender",
"select_distortion": "Selecciona un patrón de pensamiento para explorar",
"definition": "Definición",
"examples": "Ejemplos Comunes",
"strategies": "Estrategias de Reencuadre",
"actions": "Pequeños Pasos a Intentar",
},
}
return translations
class CBTChatbot:
"""Main chatbot class for handling CBT conversations"""
def __init__(self, language='en', memory_size: int = 6):
self.language = language
self.translations = load_translations()
self.t = self.translations.get(language, self.translations['en'])
self.conversation_history: list[list[str]] = []
self.identified_distortions: list[tuple[str, float]] = []
self.memory_size = max(2, int(memory_size))
def _history_to_context(self, history) -> list[dict]:
"""Convert Chatbot history to agent context.
Supports both legacy [[user, assistant], ...] and new Gradio
messages format [{role, content}, ...]. Returns a list of
{user, assistant} dicts capped to memory_size.
"""
ctx: list[dict] = []
if not history:
return ctx
# New messages format
if isinstance(history, list) and history and isinstance(history[0], dict):
pending_user = None
for msg in history:
role = str(msg.get("role", ""))
content = str(msg.get("content", ""))
if role == "user":
pending_user = content
elif role == "assistant" and pending_user is not None:
ctx.append({"user": pending_user, "assistant": content})
pending_user = None
return ctx[-self.memory_size :]
# Legacy tuple format
for turn in history:
if isinstance(turn, (list, tuple)) and len(turn) == 2:
ctx.append({"user": turn[0] or "", "assistant": turn[1] or ""})
return ctx[-self.memory_size :]
def process_message(
self,
message: str,
history: list[list[str]],
use_agent: bool = False,
agent: Optional[CBTAgent] = None,
) -> tuple[list[list[str]], str, str, str]:
"""
Process user message and generate response with CBT analysis
Returns:
- Updated chat history
- Identified distortions display
- Reframe suggestion
- Similar situations display
"""
if not message or message.strip() == "":
return history or [], "", "", ""
# Add user message to history
history = history or []
# Agentic path only: remove non-LLM fallback
if use_agent and agent is not None:
try:
analysis = agent.analyze_thought(message)
response = agent.generate_response(
message, context=self._history_to_context(history)
)
distortions_display = self._format_distortions(analysis.get("distortions", []))
reframe_display = analysis.get("reframe", "")
primary = analysis.get("distortions", [])
primary_code = primary[0][0] if primary else None
situations_display = (
self._format_similar_situations(primary_code) if primary_code else ""
)
except Exception as e:
# Do not fallback to local heuristics
history.append([message, f"Agent error: {e}"])
return history, "", "", ""
else:
# Non-agent mode disabled
history.append(
[message, "Agent-only mode: please enable the agent to generate responses."]
)
return history, "", "", ""
# Update history with memory cap
history.append([message, response])
if len(history) > self.memory_size:
history = history[-self.memory_size :]
return history, distortions_display, reframe_display, situations_display
def _format_distortions(self, distortions: list[tuple[str, float]]) -> str:
"""Format detected distortions for display"""
if not distortions:
return ""
lines = [f"### {self.t['chat']['distortions_found']}\n"]
for code, confidence in distortions[:3]: # Show top 3
for _key, info in COGNITIVE_DISTORTIONS.items():
if info['code'] == code:
lines.append(f"**{info['name']}** ({confidence * 100:.0f}% match)")
lines.append(f"*{info['definition']}*\n")
break
return "\n".join(lines)
def _format_similar_situations(self, distortion_code: str) -> str:
"""Format similar situations for display"""
situations = find_similar_situations(distortion_code, num_situations=2)
if not situations:
return ""
lines = [f"### {self.t['chat']['similar_situations']}\n"]
for i, situation in enumerate(situations, 1):
lines.append(f"**Example {i}:** {situation['situation']}")
lines.append(f"*Distorted:* \"{situation['distorted']}\"")
lines.append(f"*Reframed:* \"{situation['reframed']}\"\n")
return "\n".join(lines)
def clear_session(self):
"""Clear the conversation session"""
self.conversation_history = []
self.identified_distortions = []
return [], "", "", ""
def create_app(language='en'):
"""Create and configure the Gradio application"""
# Initialize chatbot
chatbot = CBTChatbot(language)
t = chatbot.t
# Custom CSS for better styling
custom_css = """
.gradio-container {
font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif;
}
.gr-button-primary {
background-color: #2563eb !important;
border-color: #2563eb !important;
}
.gr-button-primary:hover {
background-color: #1e40af !important;
}
.info-box {
background-color: #f0f9ff;
border: 1px solid #3b82f6;
border-radius: 8px;
padding: 12px;
margin: 8px 0;
}
"""
with gr.Blocks(title=t['app_title'], theme=gr.themes.Soft(), css=custom_css) as app:
gr.Markdown(f"# {t['app_title']}")
gr.Markdown(f"*{t['app_description']}*")
with gr.Tabs():
# Welcome Tab
with gr.Tab(t['welcome']['title']):
create_landing_tab(t['welcome'])
# Chat Tab
with gr.Tab(t['chat']['title']):
# Settings row (agentic only)
with gr.Row():
gr.LoginButton()
billing_notice = gr.Markdown("")
with gr.Row():
with gr.Column(scale=2):
chatbot_ui = gr.Chatbot(height=400, label="Conversation", type='messages')
with gr.Row():
msg_input = gr.Textbox(
label="", placeholder=t['chat']['placeholder'], scale=4
)
send_btn = gr.Button(t['chat']['send'], variant="primary", scale=1)
clear_btn = gr.Button(t['chat']['clear'], variant="secondary")
with gr.Column(scale=1):
gr.Markdown("### Analysis")
distortions_output = gr.Markdown(label="Patterns Detected")
reframe_output = gr.Markdown(label="Reframe Suggestion")
situations_output = gr.Markdown(label="Similar Situations")
# Owner-only Admin controls (gated at load)
admin_accordion = gr.Accordion(
"Owner Controls", open=False, visible=False
)
with admin_accordion:
# Locked message for non-owners (kept hidden unless needed)
chat_locked_panel = gr.Markdown(
"### Owner only\nPlease log in with your Hugging Face account.",
visible=False,
)
chat_admin_panel = gr.Column(visible=False)
with chat_admin_panel:
gr.Markdown("## Admin Dashboard")
admin_summary = gr.Markdown("")
admin_limit_info = gr.Markdown("")
# Owner-only model selection
model_dropdown = gr.Dropdown(
label="Model (HF)",
choices=[
"meta-llama/Llama-3.1-8B-Instruct",
"meta-llama/Llama-3.1-70B-Instruct",
"Qwen/Qwen2.5-7B-Instruct",
"mistralai/Mixtral-8x7B-Instruct-v0.1",
"google/gemma-2-9b-it",
],
value=os.getenv("MODEL_NAME", "meta-llama/Llama-3.1-8B-Instruct"),
allow_custom_value=True,
info="Only visible to owner. Requires HF Inference API token.",
)
with gr.Row():
override_tb = gr.Textbox(
label="Per-user interaction limit override (blank to clear)"
)
set_override_btn = gr.Button(
"Set Limit Override", variant="secondary"
)
refresh_btn = gr.Button(
"Refresh Metrics", variant="secondary"
)
gr.Markdown("### Debug")
owner_identity_md = gr.Markdown("")
with gr.Row():
identity_btn = gr.Button(
"Refresh Identity", variant="secondary"
)
storage_btn = gr.Button(
"Check /data", variant="secondary"
)
storage_info_md = gr.Markdown("")
# Internal state for agent instance, selected model, and agentic enable flag
agent_state = gr.State(value=None)
model_state = gr.State(
value=os.getenv("MODEL_NAME", "meta-llama/Llama-3.1-8B-Instruct")
)
agentic_enabled_state = gr.State(value=True)
# Admin runtime settings (e.g., per-user limit override)
admin_state = gr.State(value={"per_user_limit_override": None})
# Connect chat interface (streaming)
def _ensure_hf_token_env():
# Honor either HF_TOKEN or HUGGINGFACEHUB_API_TOKEN
token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN")
if token and not os.getenv("HUGGINGFACEHUB_API_TOKEN"):
os.environ["HUGGINGFACEHUB_API_TOKEN"] = token
def _stream_chunks(text: str, chunk_words: int = 12):
words = (text or "").split()
buf = []
for i, w in enumerate(words, 1):
buf.append(w)
if i % chunk_words == 0:
yield " ".join(buf)
buf = []
if buf:
yield " ".join(buf)
# Budget guard helpers
def _get_call_log_path():
return os.getenv("AGENT_CALL_LOG_PATH", "/tmp/agent_calls.json")
# Simple privacy-preserving metrics (no raw PII/content)
def _get_metrics_path():
return os.getenv("APP_METRICS_PATH", "/tmp/app_metrics.json")
def _load_call_log():
try:
with open(_get_call_log_path(), encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def _load_metrics():
try:
with open(_get_metrics_path(), encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def _save_call_log(data):
try:
with open(_get_call_log_path(), "w", encoding="utf-8") as f:
json.dump(data, f)
except Exception:
pass
def _save_metrics(data):
try:
with open(_get_metrics_path(), "w", encoding="utf-8") as f:
json.dump(data, f)
except Exception:
pass
def _today_key():
return datetime.utcnow().strftime("%Y-%m-%d")
# Metrics helpers
def _metrics_today():
m = _load_metrics()
return m.get(_today_key(), {})
def _write_metrics_today(d):
m = _load_metrics()
m[_today_key()] = d
_save_metrics(m)
def _inc_metric(key: str, inc: int = 1):
d = _metrics_today()
d[key] = int(d.get(key, 0)) + inc
_write_metrics_today(d)
def _record_distortion_counts(codes: list[str]):
if not codes:
return
d = _metrics_today()
dist = d.get("distortion_counts", {})
if not isinstance(dist, dict):
dist = {}
for c in codes:
dist[c] = int(dist.get(c, 0)) + 1
d["distortion_counts"] = dist
_write_metrics_today(d)
def _record_response_chars(n: int):
d = _metrics_today()
d["response_chars_total"] = int(d.get("response_chars_total", 0)) + max(
0, int(n)
)
d["response_count"] = int(d.get("response_count", 0)) + 1
_write_metrics_today(d)
def _calls_today():
data = _load_call_log()
day = _today_key()
val = data.get(day, 0)
# Backward compatible: handle both scalar int and per-day dict blob
if isinstance(val, dict):
return int(val.get("calls", 0))
try:
return int(val)
except Exception:
return 0
def _inc_calls_today():
data = _load_call_log()
day = _today_key()
day_blob = data.get(day, {}) if isinstance(data.get(day, {}), dict) else {}
try:
day_blob["calls"] = int(day_blob.get("calls", 0)) + 1
except Exception:
day_blob["calls"] = 1
data[day] = day_blob
_save_call_log(data)
def _agentic_budget_allows():
hard = os.getenv("HF_AGENT_HARD_DISABLE", "").lower() in ("1", "true", "yes")
if hard:
return False
limit = os.getenv("HF_AGENT_MAX_CALLS_PER_DAY")
if not limit:
return True
try:
limit_i = int(limit)
except Exception:
return True
return _calls_today() < max(0, limit_i)
def respond_stream(
message,
history,
model_value,
agent_obj,
agentic_ok,
admin_settings,
request: "gr.Request",
profile: "gr.OAuthProfile | None" = None,
):
if not message:
yield history, "", "", "", agent_obj, "", agentic_ok
return
budget_ok = _agentic_budget_allows()
notice = ""
# Compute user id (salted hash) for per-user quotas
def _user_id(req: "gr.Request", prof: "gr.OAuthProfile | None") -> str:
try:
salt = os.getenv("USAGE_SALT", "reframe_salt")
# Prefer OAuth profile when available
if prof is not None:
# Try common fields in OAuth profile
username = None
for key in (
"preferred_username",
"username",
"login",
"name",
"sub",
"id",
):
try:
if hasattr(prof, key):
username = getattr(prof, key)
elif isinstance(prof, dict) and key in prof:
username = prof[key]
if username:
break
except Exception as e:
print(f"Error getting username from profile: {e}")
pass
raw = f"oauth:{username or 'unknown'}"
# req is expected to be provided by Gradio
elif getattr(req, "username", None):
raw = f"user:{req.username}"
else:
ip = getattr(getattr(req, "client", None), "host", "?")
ua = (
dict(req.headers).get("user-agent", "?")
if getattr(req, "headers", None)
else "?"
)
sess = getattr(req, "session_hash", None) or "?"
raw = f"ipua:{ip}|{ua}|{sess}"
return hashlib.sha256(f"{salt}|{raw}".encode()).hexdigest()
except Exception as e:
print(f"Error getting user id: {e}")
return "anon"
user_id = _user_id(request, profile)
# Helper functions for tracking per-user interactions
def _interactions_today(uid: str) -> int:
data = _load_call_log()
day = _today_key()
day_blob = data.get(day, {}) if isinstance(data.get(day, {}), dict) else {}
inter = (
day_blob.get("interactions", {})
if isinstance(day_blob.get("interactions", {}), dict)
else {}
)
val = inter.get(uid, 0)
if isinstance(val, (str, int, float)):
try:
return int(val)
except Exception as e:
print(f"Error getting interactions today: {e}")
return 0
return 0
def _inc_interactions_today(uid: str):
data = _load_call_log()
day = _today_key()
day_blob = data.get(day, {}) if isinstance(data.get(day, {}), dict) else {}
inter = (
day_blob.get("interactions", {})
if isinstance(day_blob.get("interactions", {}), dict)
else {}
)
inter[uid] = int(inter.get(uid, 0)) + 1
day_blob["interactions"] = inter
data[day] = day_blob
_save_call_log(data)
# Determine how many interactions the user has already had today
try:
interactions_before = _interactions_today(user_id)
except Exception as e:
print(f"Error getting interactions today: {e}")
interactions_before = 0
# Per-user interaction quota (counts 1 per message)
max_interactions_env = os.getenv("HF_AGENT_MAX_INTERACTIONS_PER_USER")
try:
# Default to a generous 12 if not configured
per_user_limit_env = (
int(max_interactions_env) if max_interactions_env else 12
)
except Exception:
per_user_limit_env = 12
per_user_limit = per_user_limit_env
# Admin override (runtime)
try:
override = None
if isinstance(admin_settings, dict):
override = admin_settings.get("per_user_limit_override")
if isinstance(override, int | float) and int(override) > 0:
per_user_limit = int(override)
except Exception:
pass
if per_user_limit is not None and interactions_before >= max(
0, per_user_limit
):
_inc_metric("blocked_interactions")
yield (
history or [],
"",
"",
"",
agent_obj,
f"Per-user limit reached ({per_user_limit} interactions).",
agentic_ok,
)
return
if not AGENT_AVAILABLE:
yield (
history or [],
"",
"",
"",
agent_obj,
"Agent not available. Check HF token and model name.",
agentic_ok,
)
return
if not agentic_ok:
yield (
history or [],
"",
"",
"",
agent_obj,
"Agentic mode disabled due to a prior quota/billing error.",
agentic_ok,
)
return
if not budget_ok:
yield (
history or [],
"",
"",
"",
agent_obj,
"Daily budget reached. Set HF_AGENT_MAX_CALLS_PER_DAY or try tomorrow.",
agentic_ok,
)
return
# Count one interaction for this user upfront
_inc_interactions_today(user_id)
interactions_after = interactions_before + 1
# Lazily initialize agent if requested
_ensure_hf_token_env()
if agent_obj is None:
try:
agent_obj = CBTAgent(model_name=model_value)
except Exception as e:
err = str(e)
yield (
history or [],
"",
"",
"",
agent_obj,
f"Agent failed to initialize: {err}",
agentic_ok,
)
return
# Prepare side panels first for a snappy UI
try:
analysis = agent_obj.analyze_thought(message)
distortions_display = chatbot._format_distortions(
analysis.get("distortions", [])
)
reframe_display = analysis.get("reframe", "")
primary = analysis.get("distortions", [])
primary_code = primary[0][0] if primary else None
situations_display = (
chatbot._format_similar_situations(primary_code) if primary_code else ""
)
# Metrics: record this interaction
_inc_metric("total_interactions")
_record_distortion_counts([c for c, _ in analysis.get("distortions", [])])
_inc_calls_today()
except Exception as e:
distortions_display = reframe_display = situations_display = ""
# Detect quota/billing signals and permanently disable agent for this run
msg = str(e).lower()
if any(
k in msg
for k in [
"quota",
"limit",
"billing",
"payment",
"insufficient",
"402",
"429",
]
):
agentic_ok = False
notice = "Agentic mode disabled due to quota/billing error."
else:
notice = f"Agent analysis failed: {e}"
_inc_metric("agent_errors")
yield (
history or [],
distortions_display,
reframe_display,
situations_display,
agent_obj,
notice,
agentic_ok,
)
return
# Start streaming the assistant reply (messages format)
history = history or []
# Append user message then assistant placeholder
try:
history.append({"role": "user", "content": message})
except Exception:
history = list(history) + [{"role": "user", "content": message}]
history.append({"role": "assistant", "content": ""})
# Optional prune to last N pairs to keep UI light
try:
pairs = chatbot._history_to_context(history[:-1])
pruned: list[dict] = []
for p in pairs:
pruned.append({"role": "user", "content": p.get("user", "")})
pruned.append({"role": "assistant", "content": p.get("assistant", "")})
pruned.append({"role": "user", "content": message})
pruned.append({"role": "assistant", "content": ""})
history = pruned
except Exception:
pass
# Choose response source: true token streaming via HF Inference
try:
_inc_calls_today()
stream = getattr(agent_obj, "stream_generate_response", None)
if callable(stream):
token_iter = stream(
message, context=chatbot._history_to_context(history[:-1])
)
else:
# Fallback to non-streaming
def _one_shot():
yield agent_obj.generate_response(
message, context=chatbot._history_to_context(history[:-1])
)
token_iter = _one_shot()
except Exception as e:
_inc_metric("agent_errors")
yield (
history,
distortions_display,
reframe_display,
situations_display,
agent_obj,
f"Agent response failed: {e}",
agentic_ok,
)
return
acc = ""
for chunk in token_iter:
if not chunk:
continue
acc += str(chunk)
if isinstance(history[-1], dict):
history[-1]["content"] = acc
else:
try:
history[-1][1] = acc
except Exception:
pass
# yield streaming frame
yield (
history,
distortions_display,
reframe_display,
situations_display,
agent_obj,
notice,
agentic_ok,
)
# Final yield ensures the last state is consistent
_record_response_chars(len(acc))
# Show remaining interactions
try:
remaining = (
None
if per_user_limit is None
else max(0, per_user_limit - interactions_after)
)
if remaining is not None:
notice = (
notice + f"\nRemaining interactions today: {remaining}"
).strip()
except Exception:
pass
yield (
history,
distortions_display,
reframe_display,
situations_display,
agent_obj,
notice,
agentic_ok,
)
def clear_input():
return ""
msg_input.submit(
respond_stream,
inputs=[
msg_input,
chatbot_ui,
model_state,
agent_state,
agentic_enabled_state,
admin_state,
],
outputs=[
chatbot_ui,
distortions_output,
reframe_output,
situations_output,
agent_state,
billing_notice,
agentic_enabled_state,
],
).then(fn=clear_input, outputs=[msg_input])
send_btn.click(
fn=respond_stream,
inputs=[
msg_input,
chatbot_ui,
model_state,
agent_state,
agentic_enabled_state,
admin_state,
],
outputs=[
chatbot_ui,
distortions_output,
reframe_output,
situations_output,
agent_state,
billing_notice,
agentic_enabled_state,
],
).then(clear_input, outputs=[msg_input])
def _clear_session_and_notice():
h, d, r, s = chatbot.clear_session()
return h, d, r, s, ""
clear_btn.click(
fn=_clear_session_and_notice,
outputs=[
chatbot_ui,
distortions_output,
reframe_output,
situations_output,
billing_notice,
],
)
# Learn Tab
with gr.Tab(t['learn']['title']):
create_learn_tab(t['learn'], COGNITIVE_DISTORTIONS)
def _owner_is(profile: gr.OAuthProfile | None, request: gr.Request | None = None) -> bool:
try:
# Prefer explicit OWNER_USER, fallback to the Space author
# (useful if OWNER_USER not set)
owner = (
os.getenv("OWNER_USER")
or os.getenv("SPACE_AUTHOR_NAME")
or ""
).strip().lower()
if not owner:
return False
# Try common OAuth profile fields
username = None
for key in ("preferred_username", "username", "login", "name", "sub", "id"):
try:
if hasattr(profile, key):
username = getattr(profile, key)
elif isinstance(profile, dict) and key in profile:
username = profile[key]
if username:
break
except Exception as e:
print(f"Error getting username from profile: {e}")
pass
# Fallback to request.username provided by Gradio when OAuth is enabled
if not username and request is not None:
try:
username = getattr(request, "username", None)
except Exception as e:
print(f"Error getting username from request: {e}")
username = None
if not username:
return False
return str(username).lower() == owner
except Exception as e:
print(f"Error checking owner: {e}")
return False
def _metrics_paths():
return (
os.getenv("APP_METRICS_PATH", "/tmp/app_metrics.json"),
os.getenv("AGENT_CALL_LOG_PATH", "/tmp/agent_calls.json"),
)
def _read_json(path: str) -> dict:
try:
with open(path, encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def _summarize_metrics_md() -> str:
mpath, _ = _metrics_paths()
data = _read_json(mpath)
if not data:
return "No metrics recorded yet."
# Summarize last 7 days
days = sorted(data.keys())[-7:]
total = blocked = errors = resp_chars = resp_count = 0
dist_counts: dict[str, int] = {}
for d in days:
day = data.get(d, {}) or {}
total += int(day.get("total_interactions", 0))
blocked += int(day.get("blocked_interactions", 0))
errors += int(day.get("agent_errors", 0))
resp_chars += int(day.get("response_chars_total", 0))
resp_count += int(day.get("response_count", 0))
dist = day.get("distortion_counts", {})
if isinstance(dist, dict):
for k, v in dist.items():
dist_counts[k] = int(dist_counts.get(k, 0)) + int(v)
avg_len = (resp_chars / resp_count) if resp_count else 0
top = sorted(dist_counts.items(), key=lambda x: x[1], reverse=True)[:5]
lines = [
"### Usage (last 7 days)",
f"- Total interactions: {total}",
f"- Blocked interactions: {blocked}",
f"- Agent errors: {errors}",
f"- Avg response length: {avg_len:.0f} chars",
"",
"### Top cognitive patterns",
]
if top:
for k, v in top:
lines.append(f"- {k}: {v}")
else:
lines.append("- None recorded")
return "\n".join(lines)
def _limit_info_md(settings: dict | None) -> str:
env_val = os.getenv("HF_AGENT_MAX_INTERACTIONS_PER_USER")
try:
env_limit = int(env_val) if env_val else 12
except Exception:
env_limit = 12
override = None
if isinstance(settings, dict):
override = settings.get("per_user_limit_override")
effective = (
int(override)
if isinstance(override, int | float) and int(override) > 0
else env_limit
)
return (
f"Per-user daily limit: {effective} (env: {env_limit}, override: "
f"{override if override else 'None'})"
)
def admin_set_limit(override_text: str, settings: dict | None):
# Only update runtime state; does not change env var
try:
if settings is None or not isinstance(settings, dict):
settings = {"per_user_limit_override": None}
override = None
if override_text and override_text.strip():
override = int(override_text.strip())
if override <= 0:
override = None
settings["per_user_limit_override"] = override
except Exception as e:
print(f"Error setting limit override: {e}")
settings = {"per_user_limit_override": None}
return settings, _limit_info_md(settings)
def admin_refresh():
return _summarize_metrics_md()
def _profile_username(profile: gr.OAuthProfile | None, request: gr.Request | None = None) -> str:
try:
for key in ("preferred_username", "username", "login", "name", "sub", "id"):
if hasattr(profile, key):
v = getattr(profile, key)
if v:
return str(v)
elif isinstance(profile, dict) and key in profile and profile[key]:
return str(profile[key])
except Exception as e:
print(f"Error getting username from profile: {e}")
pass
try:
if request is not None and getattr(request, "username", None):
return str(request.username)
except Exception as e:
print(f"Error getting username from request: {e}")
pass
return "unknown"
def identity_refresh(profile: gr.OAuthProfile | None, request: gr.Request | None = None):
viewer = _profile_username(profile, request)
visible = _owner_is(profile, request)
token_info = ""
try: # local import
info = _hf_whoami()
uname = info.get("name") or info.get("fullname") or "?"
ttype = info.get("type", "?")
orgs = ", ".join([o.get("name", "?") for o in info.get("orgs", [])])
token_info = f"Token user: `{uname}` (type: {ttype}); orgs: [{orgs}]"
except Exception as e:
print(f"Error getting token info whoami: {e}")
token_info = f"Token whoami failed: {e}"
return (
f"Logged in as (OAuth): `{viewer}`\n\n"
f"OWNER_USER: `{(os.getenv('OWNER_USER') or '').strip()}`\n"
f"SPACE_AUTHOR_NAME: `{(os.getenv('SPACE_AUTHOR_NAME') or '').strip()}`\n"
f"Owner match: {'yes' if visible else 'no'}\n\n"
f"{token_info}"
)
def storage_check():
try:
path = "/data"
exists = os.path.exists(path)
lines = [f"Path: `{path}` — {'present' if exists else 'absent'}"]
if exists:
total, used, free = shutil.disk_usage(path)
gb = 1024 ** 3
lines.append(
f"Disk: total {total/gb:.1f} GB, used {used/gb:.1f} GB, free {free/gb:.1f} GB"
)
try:
entries = sorted(os.listdir(path))[:20]
if entries:
lines.append("Entries: " + ", ".join(entries))
except Exception:
pass
hf_home = os.getenv("HF_HOME", "(not set)")
lines.append(f"HF_HOME: `{hf_home}`")
return "\n".join(lines)
except Exception as e:
return f"/data check failed: {e}"
# Wire admin interactions
model_dropdown.change(fn=lambda v: v, inputs=[model_dropdown], outputs=[model_state])
set_override_btn.click(
fn=admin_set_limit,
inputs=[override_tb, admin_state],
outputs=[admin_state, admin_limit_info],
)
refresh_btn.click(fn=admin_refresh, outputs=[admin_summary])
identity_btn.click(fn=identity_refresh, outputs=[owner_identity_md])
storage_btn.click(fn=storage_check, outputs=[storage_info_md])
# Gate admin panel visibility on load (OAuth)
try:
# Also populate identity + storage placeholders
def _load(profile: gr.OAuthProfile | None, request: gr.Request | None = None):
visible = _owner_is(profile, request)
ident = identity_refresh(profile, request) if visible else ""
return (
gr.update(visible=visible),
gr.update(visible=visible),
gr.update(visible=not visible),
_summarize_metrics_md() if visible else "",
_limit_info_md(admin_state.value if hasattr(admin_state, "value") else None)
if visible
else "",
ident,
"",
)
app.load(
_load,
outputs=[admin_accordion, chat_admin_panel, chat_locked_panel, admin_summary,
admin_limit_info, owner_identity_md, storage_info_md],
)
except Exception as e:
print(f"Error loading app: {e}")
# If OAuth not available, keep admin hidden
pass
# Enable queue for Spaces compatibility
return app.queue()
# Launch the app
if __name__ == "__main__":
app = create_app(language='en')
app.launch(share=False, show_error=True, show_api=False)