| import os |
| import gradio as gr |
| import requests |
| import inspect |
| import pandas as pd |
| import agents |
| from PIL import Image |
| from io import BytesIO |
| import whisper |
|
|
| |
| |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
| |
| |
|
|
| agent = None |
|
|
| def select_agent(provider_name:str, model_name: str): |
| """ |
| Selects the agent based on the provided name. |
| :param agent_name: Name of the agent to select. |
| :return: The selected agent instance. |
| """ |
| global agent |
| try: |
| agent = agents.get_agent(model_name=model_name, model_type=provider_name) |
| if agent is None: |
| print(f"Agent not found for provider: {provider_name} and model: {model_name}") |
| agent = BasicAgent() |
| except Exception as e: |
| print(f"Error selecting agent: {e}") |
| agent = BasicAgent() |
| |
| print(f"Agent selected: {agent.model}") |
| agent_info_text.value = get_agent_info() |
| return agent |
|
|
|
|
| def get_agent_info() -> str: |
| global agent |
| if (agent is None): |
| return "No agent selected." |
| try: |
| |
| agent_class_name = agent.__class__.__name__ |
| |
| model_name = agent.model |
| |
| docstring = inspect.getdoc(agent) |
| |
| info = f"Agent Class: {agent_class_name}\nModel Name: {model_name}\nDocstring: {docstring}" |
| return info |
| except Exception as e: |
| print(f"Error getting agent info: {e}") |
| return "Error getting agent info." |
|
|
|
|
| |
| |
| class BasicAgent: |
| def __init__(self): |
| print("BasicAgent initialized.") |
| def __call__(self, question: str) -> str: |
| print(f"Agent received question (first 50 chars): {question[:50]}...") |
| fixed_answer = "This is a default answer." |
| print(f"Agent returning fixed answer: {fixed_answer}") |
| return fixed_answer |
|
|
|
|
| def get_all_questions(): |
| """ |
| Fetches all available questions from the API. |
| """ |
| yield from run_test_on_questions(False, False) |
|
|
| def run_test_on_all_questions(): |
| """ |
| Runs tests on all available questions by forwarding yields from run_test_on_questions. |
| """ |
| yield from run_test_on_questions(False, True) |
|
|
| def run_test_on_random_question(): |
| """ |
| Runs a single test on a random available question by forwarding yields from run_test_on_questions. |
| """ |
| yield from run_test_on_questions(True, True) |
|
|
|
|
| def run_test_on_questions(use_random_question: bool, run_agent:bool): |
| """ |
| Fetches all questions, runs the BasicAgent on them, submits all answers, |
| and displays the results. |
| """ |
|
|
| global agent |
| api_url = DEFAULT_API_URL |
| questions_url = f"{api_url}/random-question" if use_random_question else f"{api_url}/questions" |
|
|
|
|
| |
| info = "# started request" |
| yield info, None |
| |
| print(f"Fetching questions from: {questions_url}") |
| try: |
| response = requests.get(questions_url, timeout=15) |
| response.raise_for_status() |
| questions_dataset_raw = response.json() |
| questions_dataset = [questions_dataset_raw] if use_random_question else questions_dataset_raw |
| yield info, None |
| if not questions_dataset: |
| print("Fetched questions list is empty.") |
| yield info +"\n\nFetched questions list is empty or invalid format.", None |
| return |
| print(f"Fetched {len(questions_dataset)} questions.") |
| except requests.exceptions.RequestException as e: |
| print(f"Error fetching questions: {e}") |
| yield f"Error fetching questions: {e}", None |
| return |
| except requests.exceptions.JSONDecodeError as e: |
| print(f"Error decoding JSON response from questions endpoint: {e}") |
| print(f"Response text: {response.text[:500]}") |
| yield f"Error decoding server response for questions: {e}", None |
| return |
| except Exception as e: |
| print(f"An unexpected error occurred fetching questions: {e}") |
| yield f"An unexpected error occurred fetching questions: {e}", None |
| return |
|
|
| |
| results_log = [] |
| answers_payload = [] |
| |
| for i, questions_data in enumerate(questions_dataset): |
|
|
| agent.memory.reset() |
| images = [] |
| task_id = questions_data.get("task_id") |
| question_text = questions_data.get("question") |
| file_name = questions_data.get("file_name") |
| if (file_name != "" and file_name is not None): |
| question_text = question_text + f"\n\nYou can download the correspondig file using the download tool with the task id: {task_id}." |
| fileData = requests.get(f"{DEFAULT_API_URL}/files/{task_id}") |
| |
| if fileData.headers['Content-Type'] in ['image/png', 'image/jpeg']: |
| image = Image.open(BytesIO(fileData.content)).convert("RGB") |
| images = [image] |
| if fileData.headers['Content-Type'] in ['audio/mpeg', 'audio/wav']: |
| |
| model = whisper.load_model("base") |
| |
| with open("temp_audio.mp3", "wb") as f: |
| f.write(fileData.content) |
|
|
| |
| audioContent = model.transcribe("temp_audio.mp3") |
| question_text = question_text + f"\n\nTranscription: {audioContent['text']}" |
| info += f"\n\nRunning agent on question {i+1}/{len(questions_dataset)}:\n - task_id: {task_id}\n - question: {question_text}" |
| yield info, None |
| if not task_id or question_text is None: |
| yield info+ f"\nError in question data: {questions_data}", None |
| return |
| try: |
| submitted_answer = agent.run(question_text, images=images) if run_agent else "-- no agent interaction --" |
| info += f"\n - got answer {submitted_answer}" |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer, "FileInfo": file_name}) |
| except Exception as e: |
| print(f"Error running agent on task {task_id}: {e}") |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}", "FileInfo": file_name}) |
|
|
| if not answers_payload: |
| print("Agent did not produce any answers.") |
| yield info + "\nAgent did not produce any answers.", pd.DataFrame(results_log) |
| return |
|
|
| |
| |
| try: |
| results_df = pd.DataFrame(results_log) |
| yield info + "\nGot an answer from agent", results_df |
| except Exception as e: |
| status_message = f"An unexpected error occurred during submission: {e}" |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| yield status_message, results_df |
| return |
|
|
|
|
|
|
| def run_and_submit_all( profile: gr.OAuthProfile | None): |
| """ |
| Fetches all questions, runs the BasicAgent on them, submits all answers, |
| and displays the results. |
| """ |
|
|
| return "We are not there yet", None |
| |
| space_id = os.getenv("SPACE_ID") |
|
|
| if profile: |
| username= f"{profile.username}" |
| print(f"User logged in: {username}") |
| else: |
| print("User not logged in.") |
| return "Please Login to Hugging Face with the button.", None |
|
|
| api_url = DEFAULT_API_URL |
| questions_url = f"{api_url}/questions" |
| submit_url = f"{api_url}/submit" |
|
|
| |
| try: |
| agent = BasicAgent() |
| except Exception as e: |
| print(f"Error instantiating agent: {e}") |
| return f"Error initializing agent: {e}", None |
| |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
| print(agent_code) |
|
|
| |
| print(f"Fetching questions from: {questions_url}") |
| try: |
| response = requests.get(questions_url, timeout=15) |
| response.raise_for_status() |
| questions_data = response.json() |
| if not questions_data: |
| print("Fetched questions list is empty.") |
| return "Fetched questions list is empty or invalid format.", None |
| print(f"Fetched {len(questions_data)} questions.") |
| except requests.exceptions.RequestException as e: |
| print(f"Error fetching questions: {e}") |
| return f"Error fetching questions: {e}", None |
| except requests.exceptions.JSONDecodeError as e: |
| print(f"Error decoding JSON response from questions endpoint: {e}") |
| print(f"Response text: {response.text[:500]}") |
| return f"Error decoding server response for questions: {e}", None |
| except Exception as e: |
| print(f"An unexpected error occurred fetching questions: {e}") |
| return f"An unexpected error occurred fetching questions: {e}", None |
|
|
| |
| results_log = [] |
| answers_payload = [] |
| print(f"Running agent on {len(questions_data)} questions...") |
| for item in questions_data: |
| task_id = item.get("task_id") |
| question_text = item.get("question") |
| if not task_id or question_text is None: |
| print(f"Skipping item with missing task_id or question: {item}") |
| continue |
| try: |
| submitted_answer = agent(question_text) |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
| except Exception as e: |
| print(f"Error running agent on task {task_id}: {e}") |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
| if not answers_payload: |
| print("Agent did not produce any answers to submit.") |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
| |
| submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
| status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
| print(status_update) |
|
|
| |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
| try: |
| response = requests.post(submit_url, json=submission_data, timeout=60) |
| response.raise_for_status() |
| result_data = response.json() |
| final_status = ( |
| f"Submission Successful!\n" |
| f"User: {result_data.get('username')}\n" |
| f"Overall Score: {result_data.get('score', 'N/A')}% " |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
| f"Message: {result_data.get('message', 'No message received.')}" |
| ) |
| print("Submission successful.") |
| results_df = pd.DataFrame(results_log) |
| return final_status, results_df |
| except requests.exceptions.HTTPError as e: |
| error_detail = f"Server responded with status {e.response.status_code}." |
| try: |
| error_json = e.response.json() |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
| except requests.exceptions.JSONDecodeError: |
| error_detail += f" Response: {e.response.text[:500]}" |
| status_message = f"Submission Failed: {error_detail}" |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
| except requests.exceptions.Timeout: |
| status_message = "Submission Failed: The request timed out." |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
| except requests.exceptions.RequestException as e: |
| status_message = f"Submission Failed: Network error - {e}" |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
| except Exception as e: |
| status_message = f"An unexpected error occurred during submission: {e}" |
| print(status_message) |
| results_df = pd.DataFrame(results_log) |
| return status_message, results_df |
|
|
|
|
| def fetch_ollama_models() -> list: |
| """ |
| Fetches available models from the Ollama server. |
| :return: List of available models. |
| """ |
| try: |
| response = requests.get("http://localhost:11434/api/tags") |
| response.raise_for_status() |
| data = response.json() |
| return [model["name"] for model in data["models"]] |
| except requests.exceptions.RequestException as e: |
| print(f"Error fetching Ollama models: {e}") |
| return ["None"] |
| def fetch_lmstudio_models() -> list: |
| """ |
| Fetches available models from the LM Studio server. |
| :return: List of available models. |
| """ |
| try: |
| response = requests.get("http://localhost:1234/v1/models") |
| response.raise_for_status() |
| data = response.json() |
| return [model["id"] for model in data["data"]] |
| except requests.exceptions.RequestException as e: |
| print(f"Error fetching LM Studio models: {e}") |
| return ["None"] |
|
|
|
|
| available_models = ["None"] |
|
|
| def update_available_models(provider:str): |
| """ |
| Fetches available models based on the selected provider. |
| :param provider: The selected provider name. |
| :return: Update object for the model dropdown. |
| """ |
| global available_models |
| print(f"Selected provider: {provider}") |
| |
| match provider: |
| case "hugging face": |
| available_models = ["None", "Qwen/Qwen2.5-Coder-32B-Instruct", "Qwen/Qwen2.5-Omni-7B"] |
| case "Ollama": |
| available_models = fetch_ollama_models() |
| case "LMStudio": |
| available_models = fetch_lmstudio_models() |
| case "Gemini": |
| available_models = ["None", "Gemini-2.0-flash-exp", "Gemini-2.0-flash-lite"] |
| case "Anthropic": |
| available_models = ["None", "claude-3"] |
| case "OpenAI": |
| available_models = ["None", "gpt-4o", "gpt-3.5-turbo"] |
| case "Basic Agent": |
| available_models = ["None"] |
| case _: |
| available_models = ["None"] |
| |
| print(f"Available models for {provider}: {available_models}") |
|
|
|
|
| return gr.Dropdown(choices=available_models) |
|
|
|
|
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# Basic Agent Evaluation Runner") |
|
|
| agent_info_text = gr.Text(label="Agent Name", value=get_agent_info(), interactive=False, visible=True) |
|
|
| gr.Markdown( |
| """ |
| **Instructions:** |
| |
| Select a provider and then model to generate the agent. |
| """ |
| ) |
|
|
| provider_select = gr.Dropdown( |
| label="Select Provider", |
| choices=["Basic Agent", "LMStudio", "Ollama", "hugging face", "Gemini", "Anthropic", "OpenAI"], |
| interactive=True, |
| visible=True, |
| multiselect=False) |
|
|
| model_select = gr.Dropdown( |
| label="Select Model", |
| choices=available_models, |
| interactive=True, |
| visible=True, |
| multiselect=False) |
|
|
| |
| provider_select.input(fn=update_available_models, inputs=provider_select, outputs=[model_select]) |
| |
| |
| model_select.change(fn=select_agent, inputs=[provider_select, model_select]) |
|
|
| |
| |
| gr.LoginButton() |
|
|
| run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
| run_test_button = gr.Button("Run Test on Random Question") |
|
|
| run_multiple_tests_button = gr.Button("Run tests on all questions") |
|
|
| run_get_questions_button = gr.Button("Get Questions") |
|
|
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
| |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
| run_test_button.click( |
| fn=run_test_on_random_question, |
| outputs=[status_output, results_table] |
| ) |
|
|
| run_multiple_tests_button.click( |
| fn=run_test_on_all_questions, |
| outputs=[status_output, results_table] |
| ) |
|
|
| run_button.click( |
| fn=run_and_submit_all, |
| outputs=[status_output, results_table] |
| ) |
|
|
| run_get_questions_button.click( |
| fn=get_all_questions, |
| outputs=[status_output, results_table] |
| ) |
|
|
| if __name__ == "__main__": |
| print("\n" + "-"*30 + " App Starting " + "-"*30) |
| |
| space_host_startup = os.getenv("SPACE_HOST") |
| space_id_startup = os.getenv("SPACE_ID") |
|
|
| if space_host_startup: |
| print(f"✅ SPACE_HOST found: {space_host_startup}") |
| print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
| else: |
| print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
|
|
| if space_id_startup: |
| print(f"✅ SPACE_ID found: {space_id_startup}") |
| print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
| print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
| else: |
| print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
|
|
| print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
| print("Launching Gradio Interface for Basic Agent Evaluation...") |
| demo.launch(debug=True, share=False) |