leaderboard / backend /submission_handler.py
Basma Boussaha
add a new status submitted
e357bf2
import json
import logging
from datetime import datetime, timedelta
from typing import Tuple, Optional, Any, Dict, List
import requests
import pandas as pd
from huggingface_hub import ModelCard, HfApi
from transformers import AutoConfig, AutoTokenizer
# Import local modules
from backend.config import API, REQUESTS_REPO_ID, hf_api_token, SLACK_WEBHOOK_URL
from backend.data_loader import load_requests
from backend.helpers import unify_precision, get_model_size, parse_datetime
# Configure logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
MODEL_TYPE_NORMALIZATION = {
# "pt": "pre-trained",
"base": "base",
# "pre-trained": "pre-trained",
# "fine-tuned": "finetuned",
# "finetuned": "finetuned",
"instruct": "instruct",
# "chat": "finetuned",
}
class SlackNotifier:
"""
Handles all Slack notifications for the Arabic leaderboard system.
"""
def __init__(self, webhook_url: str):
"""
Initialize with Slack webhook URL.
Args:
webhook_url: Slack incoming webhook URL
"""
self.webhook_url = webhook_url
def _send_message(self, blocks: List[Dict], text: str = "") -> bool:
"""
Send a message to Slack using Block Kit.
Args:
blocks: List of Slack block elements
text: Fallback plain text
Returns:
True if successful, False otherwise
"""
try:
payload = {
"blocks": blocks,
"text": text # Fallback for notifications
}
response = requests.post(
self.webhook_url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=10
)
if response.status_code != 200:
logger.error(f"Slack API error: {response.status_code} - {response.text}")
return False
return True
except Exception as e:
logger.error(f"Failed to send Slack message: {e}")
return False
def notify_new_submission(self, submission_data: Dict) -> bool:
"""
Notify when a new model is submitted for evaluation.
Args:
submission_data: Dictionary containing submission details
"""
model_name = submission_data.get("model", "Unknown")
org = model_name.split("/")[0] if "/" in model_name else "Unknown"
# precision = submission_data.get("precision", "UNK")
# weight_type = submission_data.get("weight_type", "Unknown")
params = submission_data.get("params", "Unknown")
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "🆕 New Model Submission",
"emoji": True
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*Model:*\n{model_name}"
},
{
"type": "mrkdwn",
"text": f"*Organization:*\n{org}"
},
# {
# "type": "mrkdwn",
# "text": f"*Precision:*\n{precision}"
# },
# {
# "type": "mrkdwn",
# "text": f"*Weight Type:*\n{weight_type}"
# },
{
"type": "mrkdwn",
"text": f"*Parameters:*\n{params}"
},
{
"type": "mrkdwn",
"text": f"*Status:*\n⏳ PENDING"
}
]
},
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": f"Submitted at: {submission_data.get('submitted_time', 'Unknown')}"
}
]
}
]
return self._send_message(
blocks=blocks,
text=f"New submission: {model_name}"
)
def notify_evaluation_failed(self, model_name: str, error_message: str,
submission_data: Optional[Dict] = None) -> bool:
"""
Notify when model evaluation fails.
Args:
model_name: Name of the model
error_message: Description of the failure
submission_data: Optional submission details
"""
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "❌ Evaluation Failed",
"emoji": True
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Model:* {model_name}\n*Error:* {error_message}"
}
}
]
# if submission_data:
# blocks.append({
# "type": "section",
# "fields": [
# {
# "type": "mrkdwn",
# "text": f"*Precision:*\n{submission_data.get('precision', 'UNK')}"
# },
# {
# "type": "mrkdwn",
# "text": f"*Revision:*\n{submission_data.get('revision', 'main')}"
# }
# ]
# })
blocks.append({
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": f"Failed at: {datetime.utcnow().isoformat()}Z"
}
]
})
return self._send_message(
blocks=blocks,
text=f"Evaluation failed: {model_name}"
)
def notify_evaluation_success(self, model_name: str, results: Dict) -> bool:
"""
Notify when model evaluation succeeds and is added to leaderboard.
Args:
model_name: Name of the model
results: Dictionary containing evaluation results/metrics
"""
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "✅ Evaluation Completed",
"emoji": True
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Model:* {model_name}\n*Status:* Successfully added to leaderboard! 🎉"
}
}
]
# Add metrics if available
if results:
metric_fields = []
for key, value in results.items():
if isinstance(value, (int, float)):
metric_fields.append({
"type": "mrkdwn",
"text": f"*{key}:*\n{value:.4f}" if isinstance(value, float) else f"*{key}:*\n{value}"
})
if metric_fields:
blocks.append({
"type": "section",
"fields": metric_fields[:10] # Limit to 10 fields
})
blocks.append({
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": f"Completed at: {datetime.utcnow().isoformat()}Z"
}
]
})
return self._send_message(
blocks=blocks,
text=f"Evaluation success: {model_name}"
)
def notify_top5_update(self, top5_models: List[Dict], changed: bool = True) -> bool:
"""
Notify about new top 5 models with LinkedIn post suggestion.
Args:
top5_models: List of top 5 model dictionaries with scores
changed: Whether the top 5 has changed
"""
if not changed:
return True # Don't send if nothing changed
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "🏆 Top 5 Leaderboard Update!",
"emoji": True
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*The Top 5 Arabic LLMs have been updated!*"
}
}
]
# Add top 5 list
leaderboard_text = ""
for idx, model in enumerate(top5_models[:5], 1):
model_name = model.get("model", "Unknown")
score = model.get("average_score", model.get("score", 0))
medal = ["🥇", "🥈", "🥉", "4️⃣", "5️⃣"][idx - 1]
leaderboard_text += f"{medal} *{model_name}* - Score: {score:.2f}\n"
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": leaderboard_text
}
})
# Generate LinkedIn post
linkedin_post = self._generate_linkedin_post(top5_models[:5])
blocks.extend([
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*📱 Suggested LinkedIn Post:*"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"```{linkedin_post}```"
}
},
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": "Copy the post above and share on LinkedIn!"
}
]
}
])
return self._send_message(
blocks=blocks,
text="Top 5 leaderboard updated!"
)
def _generate_linkedin_post(self, top5_models: List[Dict]) -> str:
"""
Generate a LinkedIn post text for the top 5 models.
Args:
top5_models: List of top 5 model dictionaries
Returns:
Formatted LinkedIn post text
"""
post = "🚀 Arabic LLM Leaderboard Update!\n\n"
post += "We're excited to share the latest rankings for Arabic Language Models:\n\n"
for idx, model in enumerate(top5_models, 1):
model_name = model.get("model", "Unknown")
score = model.get("average_score", model.get("score", 0))
medal = ["🥇", "🥈", "🥉", "4️⃣", "5️⃣"][idx - 1]
post += f"{medal} {model_name} - {score:.2f}\n"
post += "\n"
post += "These models are pushing the boundaries of Arabic NLP! "
post += "Check out our full leaderboard to explore more models and benchmarks.\n\n"
post += "#ArabicNLP #LLM #AI #MachineLearning #ArabicAI #OpenSource #HuggingFace"
return post
# Integration helper functions
def integrate_with_submission(original_submit_func):
"""
Decorator to integrate Slack notifications with the submit_model function.
Usage:
@integrate_with_submission
def submit_model(...):
# original implementation
"""
def wrapper(*args, **kwargs):
result = original_submit_func(*args, **kwargs)
# If submission was successful, send notification
if result.startswith("**Success**"):
try:
from backend.config import SLACK_WEBHOOK_URL
notifier = SlackNotifier(SLACK_WEBHOOK_URL)
# Extract submission data from arguments
submission_data = {
"model": args[0] if len(args) > 0 else kwargs.get("model_name"),
# "base_model": args[1] if len(args) > 1 else kwargs.get("base_model"),
"revision": "main", # args[2] if len(args) > 2 else kwargs.get("revision"),
# "precision": args[3] if len(args) > 3 else kwargs.get("precision"),
# "weight_type": args[4] if len(args) > 4 else kwargs.get("weight_type"),
"submitted_time": datetime.utcnow().isoformat() + "Z",
"slack_thread_ts": null
}
notifier.notify_new_submission(submission_data)
except Exception as e:
logger.error(f"Failed to send Slack notification: {e}")
return result
return wrapper
def already_in_queue(df: pd.DataFrame, model_name: str) -> bool:
"""
Check if (model, revision, precision) is already in the provided dataframe.
"""
if df.empty:
return False
# Create a boolean mask for matching rows
mask = (
(df["model"] == model_name)
)
return not df[mask].empty
def check_model_card(repo_id: str) -> Tuple[bool, str]:
"""
Validate that the model card exists, has a license, and is of sufficient length.
"""
try:
card = ModelCard.load(repo_id)
except Exception:
return False, "No model card found. Please add a README.md describing your model and license."
# Check for license metadata
has_license = card.data.license is not None or (
"license_name" in card.data and "license_link" in card.data
)
if not has_license:
return False, "No license metadata found in the model card."
# Check content length
if len(card.text) < 200:
return False, "Model card is too short (<200 chars). Please add more details."
return True, ""
def is_model_on_hub(
model_name: str,
revision: Optional[str],
token: Optional[str] = None,
trust_remote_code: bool = False,
test_tokenizer: bool = True
) -> Tuple[bool, str, Any]:
"""
Verifies if the model and tokenizer can be loaded from the Hub.
Returns: (success, error_message, config_object)
"""
# 1. Check Configuration
try:
config = AutoConfig.from_pretrained(
model_name,
revision=revision,
trust_remote_code=trust_remote_code,
token=token
)
except ValueError:
return False, "requires `trust_remote_code=True`. Not automatically allowed.", None
except Exception as e:
return False, f"not loadable from hub: {str(e)}", None
# 2. Check Tokenizer (optional but recommended)
if test_tokenizer:
try:
_ = AutoTokenizer.from_pretrained(
model_name,
revision=revision,
trust_remote_code=trust_remote_code,
token=token
)
except Exception as e:
return False, f"tokenizer not loadable: {str(e)}", None
return True, "", config
def check_org_threshold(org_name: str) -> Tuple[bool, str]:
"""
Enforce rate limit: Each org can only submit 5 models in the last 7 days.
"""
df_all = load_requests("") # Load all requests
if df_all.empty:
return True, ""
# Extract organization name safely
df_all["org_name"] = df_all["model"].apply(lambda m: m.split("/")[0] if "/" in m else m)
# Filter for specific org
df_org = df_all[df_all["org_name"] == org_name].copy()
if df_org.empty:
return True, ""
# Parse dates and clean data
df_org["datetime"] = df_org["submitted_time"].apply(parse_datetime)
df_org = df_org.dropna(subset=["datetime"])
# Calculate threshold
now = datetime.utcnow()
week_ago = now - timedelta(days=7)
df_recent = df_org[df_org["datetime"] >= week_ago]
if len(df_recent) >= 5:
# Calculate when the next slot opens
earliest_submission = df_recent.sort_values(by="datetime").iloc[0]["datetime"]
next_slot = earliest_submission + timedelta(days=7)
msg_next = next_slot.isoformat(timespec="seconds") + "Z"
return (
False,
f"Your org '{org_name}' has reached the 5-submissions-per-week limit. You can submit again after {msg_next}."
)
return True, ""
@integrate_with_submission
def submit_model(
model_name: str,
# base_model: Optional[str] = None,
revision: Optional[str] = "main",
# precision: str = "",
# weight_type: str = "",
model_type: str = "",
) -> str:
"""
Main controller: Validation -> Info Extraction -> Submission Upload.
Returns a markdown formatted string message for the UI.
"""
# --- 1. Input Sanitization ---
model_name = model_name.strip()
# if base_model:
# base_model = base_model.strip()
revision = revision.strip() or "main"
# precision = precision.strip()
model_type = MODEL_TYPE_NORMALIZATION.get(model_type.strip().lower(), model_type.strip())
if not model_name:
return "**Error**: Model name cannot be empty (use 'org/model')."
try:
org, repo_id = model_name.split("/")
except ValueError:
return "**Error**: Please specify model as 'org/model'."
# --- 2. validation Pipeline ---
# A. Check Model Card
card_ok, card_msg = check_model_card(model_name)
if not card_ok:
return f"**Error**: {card_msg}"
# B. Check Hub Existence (Base vs Target)
# if weight_type.lower() in ["adapter", "delta"]:
# if not base_model:
# return "**Error**: For adapter/delta, you must provide a valid `base_model`."
# ok_base, base_err, _ = is_model_on_hub(
# base_model, revision, hf_api_token, trust_remote_code=True
# )
# if not ok_base:
# return f"**Error**: Base model '{base_model}' {base_err}"
# else:
ok_model, model_err, _ = is_model_on_hub(
model_name, revision, hf_api_token, trust_remote_code=True
)
if not ok_model:
return f"**Error**: Model '{model_name}' {model_err}"
# C. Fetch Model Info (Likes, License, Private Status)
try:
info = API.model_info(model_name, revision=revision, token=hf_api_token)
except Exception as e:
return f"**Error**: Could not fetch model info. {str(e)}"
model_license = info.card_data.license
model_likes = info.likes or 0
model_private = bool(getattr(info, "private", False))
# D. Check Queue Duplication
if already_in_queue(load_requests("finished"), model_name):
return f"**Warning**: '{model_name}') has already been evaluated."
if already_in_queue(load_requests("pending"), model_name):
return f"**Warning**: '{model_name}') is already in PENDING."
# E. Check Rate Limit
under_threshold, limit_msg = check_org_threshold(org)
if not under_threshold:
return f"**Error**: {limit_msg}"
# --- 3. Submission Construction ---
# precision_final = unify_precision(precision)
# if precision_final == "Missing":
# precision_final = "UNK"
model_params = get_model_size(model_info=info)
current_time = datetime.utcnow().isoformat() + "Z"
submission_data = {
"model": model_name,
# "base_model": base_model,
"revision": revision,
# "precision": precision_final,
# "weight_type": weight_type,
"status": "SUBMITTED",
"submitted_time": current_time,
"model_type": model_type,
"likes": model_likes,
"params": model_params,
"license": model_license,
"private": model_private,
"job_id": None,
"job_start_time": None,
}
# Define path in the requests dataset
file_path = f"{org}/{repo_id}_eval_request.json"
# --- 4. Upload to Hub ---
try:
API.upload_file(
path_or_fileobj=json.dumps(submission_data, indent=2).encode("utf-8"),
path_in_repo=file_path,
repo_id=REQUESTS_REPO_ID,
repo_type="dataset",
token=hf_api_token,
commit_message=f"Add {model_name} to eval queue"
)
except Exception as e:
logger.error(f"Submission upload failed: {e}")
return f"**Error**: Could not upload to '{REQUESTS_REPO_ID}': {str(e)}"
if SLACK_WEBHOOK_URL:
notifier = SlackNotifier(SLACK_WEBHOOK_URL)
notifier.notify_new_submission(submission_data)
return f"**Success**: Model '{model_name}' submitted for evaluation!"