Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| import json | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Tuple, Optional, Any, Dict, List | |
| import requests | |
| import pandas as pd | |
| from huggingface_hub import ModelCard, HfApi | |
| from transformers import AutoConfig, AutoTokenizer | |
| # Import local modules | |
| from backend.config import API, REQUESTS_REPO_ID, hf_api_token, SLACK_WEBHOOK_URL | |
| from backend.data_loader import load_requests | |
| from backend.helpers import unify_precision, get_model_size, parse_datetime | |
| # Configure logger | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| MODEL_TYPE_NORMALIZATION = { | |
| # "pt": "pre-trained", | |
| "base": "base", | |
| # "pre-trained": "pre-trained", | |
| # "fine-tuned": "finetuned", | |
| # "finetuned": "finetuned", | |
| "instruct": "instruct", | |
| # "chat": "finetuned", | |
| } | |
| class SlackNotifier: | |
| """ | |
| Handles all Slack notifications for the Arabic leaderboard system. | |
| """ | |
| def __init__(self, webhook_url: str): | |
| """ | |
| Initialize with Slack webhook URL. | |
| Args: | |
| webhook_url: Slack incoming webhook URL | |
| """ | |
| self.webhook_url = webhook_url | |
| def _send_message(self, blocks: List[Dict], text: str = "") -> bool: | |
| """ | |
| Send a message to Slack using Block Kit. | |
| Args: | |
| blocks: List of Slack block elements | |
| text: Fallback plain text | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| payload = { | |
| "blocks": blocks, | |
| "text": text # Fallback for notifications | |
| } | |
| response = requests.post( | |
| self.webhook_url, | |
| json=payload, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=10 | |
| ) | |
| if response.status_code != 200: | |
| logger.error(f"Slack API error: {response.status_code} - {response.text}") | |
| return False | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to send Slack message: {e}") | |
| return False | |
| def notify_new_submission(self, submission_data: Dict) -> bool: | |
| """ | |
| Notify when a new model is submitted for evaluation. | |
| Args: | |
| submission_data: Dictionary containing submission details | |
| """ | |
| model_name = submission_data.get("model", "Unknown") | |
| org = model_name.split("/")[0] if "/" in model_name else "Unknown" | |
| # precision = submission_data.get("precision", "UNK") | |
| # weight_type = submission_data.get("weight_type", "Unknown") | |
| params = submission_data.get("params", "Unknown") | |
| blocks = [ | |
| { | |
| "type": "header", | |
| "text": { | |
| "type": "plain_text", | |
| "text": "🆕 New Model Submission", | |
| "emoji": True | |
| } | |
| }, | |
| { | |
| "type": "section", | |
| "fields": [ | |
| { | |
| "type": "mrkdwn", | |
| "text": f"*Model:*\n{model_name}" | |
| }, | |
| { | |
| "type": "mrkdwn", | |
| "text": f"*Organization:*\n{org}" | |
| }, | |
| # { | |
| # "type": "mrkdwn", | |
| # "text": f"*Precision:*\n{precision}" | |
| # }, | |
| # { | |
| # "type": "mrkdwn", | |
| # "text": f"*Weight Type:*\n{weight_type}" | |
| # }, | |
| { | |
| "type": "mrkdwn", | |
| "text": f"*Parameters:*\n{params}" | |
| }, | |
| { | |
| "type": "mrkdwn", | |
| "text": f"*Status:*\n⏳ PENDING" | |
| } | |
| ] | |
| }, | |
| { | |
| "type": "context", | |
| "elements": [ | |
| { | |
| "type": "mrkdwn", | |
| "text": f"Submitted at: {submission_data.get('submitted_time', 'Unknown')}" | |
| } | |
| ] | |
| } | |
| ] | |
| return self._send_message( | |
| blocks=blocks, | |
| text=f"New submission: {model_name}" | |
| ) | |
| def notify_evaluation_failed(self, model_name: str, error_message: str, | |
| submission_data: Optional[Dict] = None) -> bool: | |
| """ | |
| Notify when model evaluation fails. | |
| Args: | |
| model_name: Name of the model | |
| error_message: Description of the failure | |
| submission_data: Optional submission details | |
| """ | |
| blocks = [ | |
| { | |
| "type": "header", | |
| "text": { | |
| "type": "plain_text", | |
| "text": "❌ Evaluation Failed", | |
| "emoji": True | |
| } | |
| }, | |
| { | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": f"*Model:* {model_name}\n*Error:* {error_message}" | |
| } | |
| } | |
| ] | |
| # if submission_data: | |
| # blocks.append({ | |
| # "type": "section", | |
| # "fields": [ | |
| # { | |
| # "type": "mrkdwn", | |
| # "text": f"*Precision:*\n{submission_data.get('precision', 'UNK')}" | |
| # }, | |
| # { | |
| # "type": "mrkdwn", | |
| # "text": f"*Revision:*\n{submission_data.get('revision', 'main')}" | |
| # } | |
| # ] | |
| # }) | |
| blocks.append({ | |
| "type": "context", | |
| "elements": [ | |
| { | |
| "type": "mrkdwn", | |
| "text": f"Failed at: {datetime.utcnow().isoformat()}Z" | |
| } | |
| ] | |
| }) | |
| return self._send_message( | |
| blocks=blocks, | |
| text=f"Evaluation failed: {model_name}" | |
| ) | |
| def notify_evaluation_success(self, model_name: str, results: Dict) -> bool: | |
| """ | |
| Notify when model evaluation succeeds and is added to leaderboard. | |
| Args: | |
| model_name: Name of the model | |
| results: Dictionary containing evaluation results/metrics | |
| """ | |
| blocks = [ | |
| { | |
| "type": "header", | |
| "text": { | |
| "type": "plain_text", | |
| "text": "✅ Evaluation Completed", | |
| "emoji": True | |
| } | |
| }, | |
| { | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": f"*Model:* {model_name}\n*Status:* Successfully added to leaderboard! 🎉" | |
| } | |
| } | |
| ] | |
| # Add metrics if available | |
| if results: | |
| metric_fields = [] | |
| for key, value in results.items(): | |
| if isinstance(value, (int, float)): | |
| metric_fields.append({ | |
| "type": "mrkdwn", | |
| "text": f"*{key}:*\n{value:.4f}" if isinstance(value, float) else f"*{key}:*\n{value}" | |
| }) | |
| if metric_fields: | |
| blocks.append({ | |
| "type": "section", | |
| "fields": metric_fields[:10] # Limit to 10 fields | |
| }) | |
| blocks.append({ | |
| "type": "context", | |
| "elements": [ | |
| { | |
| "type": "mrkdwn", | |
| "text": f"Completed at: {datetime.utcnow().isoformat()}Z" | |
| } | |
| ] | |
| }) | |
| return self._send_message( | |
| blocks=blocks, | |
| text=f"Evaluation success: {model_name}" | |
| ) | |
| def notify_top5_update(self, top5_models: List[Dict], changed: bool = True) -> bool: | |
| """ | |
| Notify about new top 5 models with LinkedIn post suggestion. | |
| Args: | |
| top5_models: List of top 5 model dictionaries with scores | |
| changed: Whether the top 5 has changed | |
| """ | |
| if not changed: | |
| return True # Don't send if nothing changed | |
| blocks = [ | |
| { | |
| "type": "header", | |
| "text": { | |
| "type": "plain_text", | |
| "text": "🏆 Top 5 Leaderboard Update!", | |
| "emoji": True | |
| } | |
| }, | |
| { | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": "*The Top 5 Arabic LLMs have been updated!*" | |
| } | |
| } | |
| ] | |
| # Add top 5 list | |
| leaderboard_text = "" | |
| for idx, model in enumerate(top5_models[:5], 1): | |
| model_name = model.get("model", "Unknown") | |
| score = model.get("average_score", model.get("score", 0)) | |
| medal = ["🥇", "🥈", "🥉", "4️⃣", "5️⃣"][idx - 1] | |
| leaderboard_text += f"{medal} *{model_name}* - Score: {score:.2f}\n" | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": leaderboard_text | |
| } | |
| }) | |
| # Generate LinkedIn post | |
| linkedin_post = self._generate_linkedin_post(top5_models[:5]) | |
| blocks.extend([ | |
| { | |
| "type": "divider" | |
| }, | |
| { | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": "*📱 Suggested LinkedIn Post:*" | |
| } | |
| }, | |
| { | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": f"```{linkedin_post}```" | |
| } | |
| }, | |
| { | |
| "type": "context", | |
| "elements": [ | |
| { | |
| "type": "mrkdwn", | |
| "text": "Copy the post above and share on LinkedIn!" | |
| } | |
| ] | |
| } | |
| ]) | |
| return self._send_message( | |
| blocks=blocks, | |
| text="Top 5 leaderboard updated!" | |
| ) | |
| def _generate_linkedin_post(self, top5_models: List[Dict]) -> str: | |
| """ | |
| Generate a LinkedIn post text for the top 5 models. | |
| Args: | |
| top5_models: List of top 5 model dictionaries | |
| Returns: | |
| Formatted LinkedIn post text | |
| """ | |
| post = "🚀 Arabic LLM Leaderboard Update!\n\n" | |
| post += "We're excited to share the latest rankings for Arabic Language Models:\n\n" | |
| for idx, model in enumerate(top5_models, 1): | |
| model_name = model.get("model", "Unknown") | |
| score = model.get("average_score", model.get("score", 0)) | |
| medal = ["🥇", "🥈", "🥉", "4️⃣", "5️⃣"][idx - 1] | |
| post += f"{medal} {model_name} - {score:.2f}\n" | |
| post += "\n" | |
| post += "These models are pushing the boundaries of Arabic NLP! " | |
| post += "Check out our full leaderboard to explore more models and benchmarks.\n\n" | |
| post += "#ArabicNLP #LLM #AI #MachineLearning #ArabicAI #OpenSource #HuggingFace" | |
| return post | |
| # Integration helper functions | |
| def integrate_with_submission(original_submit_func): | |
| """ | |
| Decorator to integrate Slack notifications with the submit_model function. | |
| Usage: | |
| @integrate_with_submission | |
| def submit_model(...): | |
| # original implementation | |
| """ | |
| def wrapper(*args, **kwargs): | |
| result = original_submit_func(*args, **kwargs) | |
| # If submission was successful, send notification | |
| if result.startswith("**Success**"): | |
| try: | |
| from backend.config import SLACK_WEBHOOK_URL | |
| notifier = SlackNotifier(SLACK_WEBHOOK_URL) | |
| # Extract submission data from arguments | |
| submission_data = { | |
| "model": args[0] if len(args) > 0 else kwargs.get("model_name"), | |
| # "base_model": args[1] if len(args) > 1 else kwargs.get("base_model"), | |
| "revision": "main", # args[2] if len(args) > 2 else kwargs.get("revision"), | |
| # "precision": args[3] if len(args) > 3 else kwargs.get("precision"), | |
| # "weight_type": args[4] if len(args) > 4 else kwargs.get("weight_type"), | |
| "submitted_time": datetime.utcnow().isoformat() + "Z", | |
| "slack_thread_ts": null | |
| } | |
| notifier.notify_new_submission(submission_data) | |
| except Exception as e: | |
| logger.error(f"Failed to send Slack notification: {e}") | |
| return result | |
| return wrapper | |
| def already_in_queue(df: pd.DataFrame, model_name: str) -> bool: | |
| """ | |
| Check if (model, revision, precision) is already in the provided dataframe. | |
| """ | |
| if df.empty: | |
| return False | |
| # Create a boolean mask for matching rows | |
| mask = ( | |
| (df["model"] == model_name) | |
| ) | |
| return not df[mask].empty | |
| def check_model_card(repo_id: str) -> Tuple[bool, str]: | |
| """ | |
| Validate that the model card exists, has a license, and is of sufficient length. | |
| """ | |
| try: | |
| card = ModelCard.load(repo_id) | |
| except Exception: | |
| return False, "No model card found. Please add a README.md describing your model and license." | |
| # Check for license metadata | |
| has_license = card.data.license is not None or ( | |
| "license_name" in card.data and "license_link" in card.data | |
| ) | |
| if not has_license: | |
| return False, "No license metadata found in the model card." | |
| # Check content length | |
| if len(card.text) < 200: | |
| return False, "Model card is too short (<200 chars). Please add more details." | |
| return True, "" | |
| def is_model_on_hub( | |
| model_name: str, | |
| revision: Optional[str], | |
| token: Optional[str] = None, | |
| trust_remote_code: bool = False, | |
| test_tokenizer: bool = True | |
| ) -> Tuple[bool, str, Any]: | |
| """ | |
| Verifies if the model and tokenizer can be loaded from the Hub. | |
| Returns: (success, error_message, config_object) | |
| """ | |
| # 1. Check Configuration | |
| try: | |
| config = AutoConfig.from_pretrained( | |
| model_name, | |
| revision=revision, | |
| trust_remote_code=trust_remote_code, | |
| token=token | |
| ) | |
| except ValueError: | |
| return False, "requires `trust_remote_code=True`. Not automatically allowed.", None | |
| except Exception as e: | |
| return False, f"not loadable from hub: {str(e)}", None | |
| # 2. Check Tokenizer (optional but recommended) | |
| if test_tokenizer: | |
| try: | |
| _ = AutoTokenizer.from_pretrained( | |
| model_name, | |
| revision=revision, | |
| trust_remote_code=trust_remote_code, | |
| token=token | |
| ) | |
| except Exception as e: | |
| return False, f"tokenizer not loadable: {str(e)}", None | |
| return True, "", config | |
| def check_org_threshold(org_name: str) -> Tuple[bool, str]: | |
| """ | |
| Enforce rate limit: Each org can only submit 5 models in the last 7 days. | |
| """ | |
| df_all = load_requests("") # Load all requests | |
| if df_all.empty: | |
| return True, "" | |
| # Extract organization name safely | |
| df_all["org_name"] = df_all["model"].apply(lambda m: m.split("/")[0] if "/" in m else m) | |
| # Filter for specific org | |
| df_org = df_all[df_all["org_name"] == org_name].copy() | |
| if df_org.empty: | |
| return True, "" | |
| # Parse dates and clean data | |
| df_org["datetime"] = df_org["submitted_time"].apply(parse_datetime) | |
| df_org = df_org.dropna(subset=["datetime"]) | |
| # Calculate threshold | |
| now = datetime.utcnow() | |
| week_ago = now - timedelta(days=7) | |
| df_recent = df_org[df_org["datetime"] >= week_ago] | |
| if len(df_recent) >= 5: | |
| # Calculate when the next slot opens | |
| earliest_submission = df_recent.sort_values(by="datetime").iloc[0]["datetime"] | |
| next_slot = earliest_submission + timedelta(days=7) | |
| msg_next = next_slot.isoformat(timespec="seconds") + "Z" | |
| return ( | |
| False, | |
| f"Your org '{org_name}' has reached the 5-submissions-per-week limit. You can submit again after {msg_next}." | |
| ) | |
| return True, "" | |
| def submit_model( | |
| model_name: str, | |
| # base_model: Optional[str] = None, | |
| revision: Optional[str] = "main", | |
| # precision: str = "", | |
| # weight_type: str = "", | |
| model_type: str = "", | |
| ) -> str: | |
| """ | |
| Main controller: Validation -> Info Extraction -> Submission Upload. | |
| Returns a markdown formatted string message for the UI. | |
| """ | |
| # --- 1. Input Sanitization --- | |
| model_name = model_name.strip() | |
| # if base_model: | |
| # base_model = base_model.strip() | |
| revision = revision.strip() or "main" | |
| # precision = precision.strip() | |
| model_type = MODEL_TYPE_NORMALIZATION.get(model_type.strip().lower(), model_type.strip()) | |
| if not model_name: | |
| return "**Error**: Model name cannot be empty (use 'org/model')." | |
| try: | |
| org, repo_id = model_name.split("/") | |
| except ValueError: | |
| return "**Error**: Please specify model as 'org/model'." | |
| # --- 2. validation Pipeline --- | |
| # A. Check Model Card | |
| card_ok, card_msg = check_model_card(model_name) | |
| if not card_ok: | |
| return f"**Error**: {card_msg}" | |
| # B. Check Hub Existence (Base vs Target) | |
| # if weight_type.lower() in ["adapter", "delta"]: | |
| # if not base_model: | |
| # return "**Error**: For adapter/delta, you must provide a valid `base_model`." | |
| # ok_base, base_err, _ = is_model_on_hub( | |
| # base_model, revision, hf_api_token, trust_remote_code=True | |
| # ) | |
| # if not ok_base: | |
| # return f"**Error**: Base model '{base_model}' {base_err}" | |
| # else: | |
| ok_model, model_err, _ = is_model_on_hub( | |
| model_name, revision, hf_api_token, trust_remote_code=True | |
| ) | |
| if not ok_model: | |
| return f"**Error**: Model '{model_name}' {model_err}" | |
| # C. Fetch Model Info (Likes, License, Private Status) | |
| try: | |
| info = API.model_info(model_name, revision=revision, token=hf_api_token) | |
| except Exception as e: | |
| return f"**Error**: Could not fetch model info. {str(e)}" | |
| model_license = info.card_data.license | |
| model_likes = info.likes or 0 | |
| model_private = bool(getattr(info, "private", False)) | |
| # D. Check Queue Duplication | |
| if already_in_queue(load_requests("finished"), model_name): | |
| return f"**Warning**: '{model_name}') has already been evaluated." | |
| if already_in_queue(load_requests("pending"), model_name): | |
| return f"**Warning**: '{model_name}') is already in PENDING." | |
| # E. Check Rate Limit | |
| under_threshold, limit_msg = check_org_threshold(org) | |
| if not under_threshold: | |
| return f"**Error**: {limit_msg}" | |
| # --- 3. Submission Construction --- | |
| # precision_final = unify_precision(precision) | |
| # if precision_final == "Missing": | |
| # precision_final = "UNK" | |
| model_params = get_model_size(model_info=info) | |
| current_time = datetime.utcnow().isoformat() + "Z" | |
| submission_data = { | |
| "model": model_name, | |
| # "base_model": base_model, | |
| "revision": revision, | |
| # "precision": precision_final, | |
| # "weight_type": weight_type, | |
| "status": "SUBMITTED", | |
| "submitted_time": current_time, | |
| "model_type": model_type, | |
| "likes": model_likes, | |
| "params": model_params, | |
| "license": model_license, | |
| "private": model_private, | |
| "job_id": None, | |
| "job_start_time": None, | |
| } | |
| # Define path in the requests dataset | |
| file_path = f"{org}/{repo_id}_eval_request.json" | |
| # --- 4. Upload to Hub --- | |
| try: | |
| API.upload_file( | |
| path_or_fileobj=json.dumps(submission_data, indent=2).encode("utf-8"), | |
| path_in_repo=file_path, | |
| repo_id=REQUESTS_REPO_ID, | |
| repo_type="dataset", | |
| token=hf_api_token, | |
| commit_message=f"Add {model_name} to eval queue" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Submission upload failed: {e}") | |
| return f"**Error**: Could not upload to '{REQUESTS_REPO_ID}': {str(e)}" | |
| if SLACK_WEBHOOK_URL: | |
| notifier = SlackNotifier(SLACK_WEBHOOK_URL) | |
| notifier.notify_new_submission(submission_data) | |
| return f"**Success**: Model '{model_name}' submitted for evaluation!" | |