import json import logging from datetime import datetime, timedelta from typing import Tuple, Optional, Any, Dict, List import requests import pandas as pd from huggingface_hub import ModelCard, HfApi from transformers import AutoConfig, AutoTokenizer # Import local modules from backend.config import API, REQUESTS_REPO_ID, hf_api_token, SLACK_WEBHOOK_URL from backend.data_loader import load_requests from backend.helpers import unify_precision, get_model_size, parse_datetime # Configure logger logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) MODEL_TYPE_NORMALIZATION = { # "pt": "pre-trained", "base": "base", # "pre-trained": "pre-trained", # "fine-tuned": "finetuned", # "finetuned": "finetuned", "instruct": "instruct", # "chat": "finetuned", } class SlackNotifier: """ Handles all Slack notifications for the Arabic leaderboard system. """ def __init__(self, webhook_url: str): """ Initialize with Slack webhook URL. Args: webhook_url: Slack incoming webhook URL """ self.webhook_url = webhook_url def _send_message(self, blocks: List[Dict], text: str = "") -> bool: """ Send a message to Slack using Block Kit. Args: blocks: List of Slack block elements text: Fallback plain text Returns: True if successful, False otherwise """ try: payload = { "blocks": blocks, "text": text # Fallback for notifications } response = requests.post( self.webhook_url, json=payload, headers={"Content-Type": "application/json"}, timeout=10 ) if response.status_code != 200: logger.error(f"Slack API error: {response.status_code} - {response.text}") return False return True except Exception as e: logger.error(f"Failed to send Slack message: {e}") return False def notify_new_submission(self, submission_data: Dict) -> bool: """ Notify when a new model is submitted for evaluation. Args: submission_data: Dictionary containing submission details """ model_name = submission_data.get("model", "Unknown") org = model_name.split("/")[0] if "/" in model_name else "Unknown" # precision = submission_data.get("precision", "UNK") # weight_type = submission_data.get("weight_type", "Unknown") params = submission_data.get("params", "Unknown") blocks = [ { "type": "header", "text": { "type": "plain_text", "text": "šŸ†• New Model Submission", "emoji": True } }, { "type": "section", "fields": [ { "type": "mrkdwn", "text": f"*Model:*\n{model_name}" }, { "type": "mrkdwn", "text": f"*Organization:*\n{org}" }, # { # "type": "mrkdwn", # "text": f"*Precision:*\n{precision}" # }, # { # "type": "mrkdwn", # "text": f"*Weight Type:*\n{weight_type}" # }, { "type": "mrkdwn", "text": f"*Parameters:*\n{params}" }, { "type": "mrkdwn", "text": f"*Status:*\nā³ PENDING" } ] }, { "type": "context", "elements": [ { "type": "mrkdwn", "text": f"Submitted at: {submission_data.get('submitted_time', 'Unknown')}" } ] } ] return self._send_message( blocks=blocks, text=f"New submission: {model_name}" ) def notify_evaluation_failed(self, model_name: str, error_message: str, submission_data: Optional[Dict] = None) -> bool: """ Notify when model evaluation fails. Args: model_name: Name of the model error_message: Description of the failure submission_data: Optional submission details """ blocks = [ { "type": "header", "text": { "type": "plain_text", "text": "āŒ Evaluation Failed", "emoji": True } }, { "type": "section", "text": { "type": "mrkdwn", "text": f"*Model:* {model_name}\n*Error:* {error_message}" } } ] # if submission_data: # blocks.append({ # "type": "section", # "fields": [ # { # "type": "mrkdwn", # "text": f"*Precision:*\n{submission_data.get('precision', 'UNK')}" # }, # { # "type": "mrkdwn", # "text": f"*Revision:*\n{submission_data.get('revision', 'main')}" # } # ] # }) blocks.append({ "type": "context", "elements": [ { "type": "mrkdwn", "text": f"Failed at: {datetime.utcnow().isoformat()}Z" } ] }) return self._send_message( blocks=blocks, text=f"Evaluation failed: {model_name}" ) def notify_evaluation_success(self, model_name: str, results: Dict) -> bool: """ Notify when model evaluation succeeds and is added to leaderboard. Args: model_name: Name of the model results: Dictionary containing evaluation results/metrics """ blocks = [ { "type": "header", "text": { "type": "plain_text", "text": "āœ… Evaluation Completed", "emoji": True } }, { "type": "section", "text": { "type": "mrkdwn", "text": f"*Model:* {model_name}\n*Status:* Successfully added to leaderboard! šŸŽ‰" } } ] # Add metrics if available if results: metric_fields = [] for key, value in results.items(): if isinstance(value, (int, float)): metric_fields.append({ "type": "mrkdwn", "text": f"*{key}:*\n{value:.4f}" if isinstance(value, float) else f"*{key}:*\n{value}" }) if metric_fields: blocks.append({ "type": "section", "fields": metric_fields[:10] # Limit to 10 fields }) blocks.append({ "type": "context", "elements": [ { "type": "mrkdwn", "text": f"Completed at: {datetime.utcnow().isoformat()}Z" } ] }) return self._send_message( blocks=blocks, text=f"Evaluation success: {model_name}" ) def notify_top5_update(self, top5_models: List[Dict], changed: bool = True) -> bool: """ Notify about new top 5 models with LinkedIn post suggestion. Args: top5_models: List of top 5 model dictionaries with scores changed: Whether the top 5 has changed """ if not changed: return True # Don't send if nothing changed blocks = [ { "type": "header", "text": { "type": "plain_text", "text": "šŸ† Top 5 Leaderboard Update!", "emoji": True } }, { "type": "section", "text": { "type": "mrkdwn", "text": "*The Top 5 Arabic LLMs have been updated!*" } } ] # Add top 5 list leaderboard_text = "" for idx, model in enumerate(top5_models[:5], 1): model_name = model.get("model", "Unknown") score = model.get("average_score", model.get("score", 0)) medal = ["šŸ„‡", "🄈", "šŸ„‰", "4ļøāƒ£", "5ļøāƒ£"][idx - 1] leaderboard_text += f"{medal} *{model_name}* - Score: {score:.2f}\n" blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": leaderboard_text } }) # Generate LinkedIn post linkedin_post = self._generate_linkedin_post(top5_models[:5]) blocks.extend([ { "type": "divider" }, { "type": "section", "text": { "type": "mrkdwn", "text": "*šŸ“± Suggested LinkedIn Post:*" } }, { "type": "section", "text": { "type": "mrkdwn", "text": f"```{linkedin_post}```" } }, { "type": "context", "elements": [ { "type": "mrkdwn", "text": "Copy the post above and share on LinkedIn!" } ] } ]) return self._send_message( blocks=blocks, text="Top 5 leaderboard updated!" ) def _generate_linkedin_post(self, top5_models: List[Dict]) -> str: """ Generate a LinkedIn post text for the top 5 models. Args: top5_models: List of top 5 model dictionaries Returns: Formatted LinkedIn post text """ post = "šŸš€ Arabic LLM Leaderboard Update!\n\n" post += "We're excited to share the latest rankings for Arabic Language Models:\n\n" for idx, model in enumerate(top5_models, 1): model_name = model.get("model", "Unknown") score = model.get("average_score", model.get("score", 0)) medal = ["šŸ„‡", "🄈", "šŸ„‰", "4ļøāƒ£", "5ļøāƒ£"][idx - 1] post += f"{medal} {model_name} - {score:.2f}\n" post += "\n" post += "These models are pushing the boundaries of Arabic NLP! " post += "Check out our full leaderboard to explore more models and benchmarks.\n\n" post += "#ArabicNLP #LLM #AI #MachineLearning #ArabicAI #OpenSource #HuggingFace" return post # Integration helper functions def integrate_with_submission(original_submit_func): """ Decorator to integrate Slack notifications with the submit_model function. Usage: @integrate_with_submission def submit_model(...): # original implementation """ def wrapper(*args, **kwargs): result = original_submit_func(*args, **kwargs) # If submission was successful, send notification if result.startswith("**Success**"): try: from backend.config import SLACK_WEBHOOK_URL notifier = SlackNotifier(SLACK_WEBHOOK_URL) # Extract submission data from arguments submission_data = { "model": args[0] if len(args) > 0 else kwargs.get("model_name"), # "base_model": args[1] if len(args) > 1 else kwargs.get("base_model"), "revision": "main", # args[2] if len(args) > 2 else kwargs.get("revision"), # "precision": args[3] if len(args) > 3 else kwargs.get("precision"), # "weight_type": args[4] if len(args) > 4 else kwargs.get("weight_type"), "submitted_time": datetime.utcnow().isoformat() + "Z", "slack_thread_ts": null } notifier.notify_new_submission(submission_data) except Exception as e: logger.error(f"Failed to send Slack notification: {e}") return result return wrapper def already_in_queue(df: pd.DataFrame, model_name: str) -> bool: """ Check if (model, revision, precision) is already in the provided dataframe. """ if df.empty: return False # Create a boolean mask for matching rows mask = ( (df["model"] == model_name) ) return not df[mask].empty def check_model_card(repo_id: str) -> Tuple[bool, str]: """ Validate that the model card exists, has a license, and is of sufficient length. """ try: card = ModelCard.load(repo_id) except Exception: return False, "No model card found. Please add a README.md describing your model and license." # Check for license metadata has_license = card.data.license is not None or ( "license_name" in card.data and "license_link" in card.data ) if not has_license: return False, "No license metadata found in the model card." # Check content length if len(card.text) < 200: return False, "Model card is too short (<200 chars). Please add more details." return True, "" def is_model_on_hub( model_name: str, revision: Optional[str], token: Optional[str] = None, trust_remote_code: bool = False, test_tokenizer: bool = True ) -> Tuple[bool, str, Any]: """ Verifies if the model and tokenizer can be loaded from the Hub. Returns: (success, error_message, config_object) """ # 1. Check Configuration try: config = AutoConfig.from_pretrained( model_name, revision=revision, trust_remote_code=trust_remote_code, token=token ) except ValueError: return False, "requires `trust_remote_code=True`. Not automatically allowed.", None except Exception as e: return False, f"not loadable from hub: {str(e)}", None # 2. Check Tokenizer (optional but recommended) if test_tokenizer: try: _ = AutoTokenizer.from_pretrained( model_name, revision=revision, trust_remote_code=trust_remote_code, token=token ) except Exception as e: return False, f"tokenizer not loadable: {str(e)}", None return True, "", config def check_org_threshold(org_name: str) -> Tuple[bool, str]: """ Enforce rate limit: Each org can only submit 5 models in the last 7 days. """ df_all = load_requests("") # Load all requests if df_all.empty: return True, "" # Extract organization name safely df_all["org_name"] = df_all["model"].apply(lambda m: m.split("/")[0] if "/" in m else m) # Filter for specific org df_org = df_all[df_all["org_name"] == org_name].copy() if df_org.empty: return True, "" # Parse dates and clean data df_org["datetime"] = df_org["submitted_time"].apply(parse_datetime) df_org = df_org.dropna(subset=["datetime"]) # Calculate threshold now = datetime.utcnow() week_ago = now - timedelta(days=7) df_recent = df_org[df_org["datetime"] >= week_ago] if len(df_recent) >= 5: # Calculate when the next slot opens earliest_submission = df_recent.sort_values(by="datetime").iloc[0]["datetime"] next_slot = earliest_submission + timedelta(days=7) msg_next = next_slot.isoformat(timespec="seconds") + "Z" return ( False, f"Your org '{org_name}' has reached the 5-submissions-per-week limit. You can submit again after {msg_next}." ) return True, "" @integrate_with_submission def submit_model( model_name: str, # base_model: Optional[str] = None, revision: Optional[str] = "main", # precision: str = "", # weight_type: str = "", model_type: str = "", ) -> str: """ Main controller: Validation -> Info Extraction -> Submission Upload. Returns a markdown formatted string message for the UI. """ # --- 1. Input Sanitization --- model_name = model_name.strip() # if base_model: # base_model = base_model.strip() revision = revision.strip() or "main" # precision = precision.strip() model_type = MODEL_TYPE_NORMALIZATION.get(model_type.strip().lower(), model_type.strip()) if not model_name: return "**Error**: Model name cannot be empty (use 'org/model')." try: org, repo_id = model_name.split("/") except ValueError: return "**Error**: Please specify model as 'org/model'." # --- 2. validation Pipeline --- # A. Check Model Card card_ok, card_msg = check_model_card(model_name) if not card_ok: return f"**Error**: {card_msg}" # B. Check Hub Existence (Base vs Target) # if weight_type.lower() in ["adapter", "delta"]: # if not base_model: # return "**Error**: For adapter/delta, you must provide a valid `base_model`." # ok_base, base_err, _ = is_model_on_hub( # base_model, revision, hf_api_token, trust_remote_code=True # ) # if not ok_base: # return f"**Error**: Base model '{base_model}' {base_err}" # else: ok_model, model_err, _ = is_model_on_hub( model_name, revision, hf_api_token, trust_remote_code=True ) if not ok_model: return f"**Error**: Model '{model_name}' {model_err}" # C. Fetch Model Info (Likes, License, Private Status) try: info = API.model_info(model_name, revision=revision, token=hf_api_token) except Exception as e: return f"**Error**: Could not fetch model info. {str(e)}" model_license = info.card_data.license model_likes = info.likes or 0 model_private = bool(getattr(info, "private", False)) # D. Check Queue Duplication if already_in_queue(load_requests("finished"), model_name): return f"**Warning**: '{model_name}') has already been evaluated." if already_in_queue(load_requests("pending"), model_name): return f"**Warning**: '{model_name}') is already in PENDING." # E. Check Rate Limit under_threshold, limit_msg = check_org_threshold(org) if not under_threshold: return f"**Error**: {limit_msg}" # --- 3. Submission Construction --- # precision_final = unify_precision(precision) # if precision_final == "Missing": # precision_final = "UNK" model_params = get_model_size(model_info=info) current_time = datetime.utcnow().isoformat() + "Z" submission_data = { "model": model_name, # "base_model": base_model, "revision": revision, # "precision": precision_final, # "weight_type": weight_type, "status": "SUBMITTED", "submitted_time": current_time, "model_type": model_type, "likes": model_likes, "params": model_params, "license": model_license, "private": model_private, "job_id": None, "job_start_time": None, } # Define path in the requests dataset file_path = f"{org}/{repo_id}_eval_request.json" # --- 4. Upload to Hub --- try: API.upload_file( path_or_fileobj=json.dumps(submission_data, indent=2).encode("utf-8"), path_in_repo=file_path, repo_id=REQUESTS_REPO_ID, repo_type="dataset", token=hf_api_token, commit_message=f"Add {model_name} to eval queue" ) except Exception as e: logger.error(f"Submission upload failed: {e}") return f"**Error**: Could not upload to '{REQUESTS_REPO_ID}': {str(e)}" if SLACK_WEBHOOK_URL: notifier = SlackNotifier(SLACK_WEBHOOK_URL) notifier.notify_new_submission(submission_data) return f"**Success**: Model '{model_name}' submitted for evaluation!"