| import json |
| import logging |
| import os |
| import subprocess |
| import threading |
| import time |
| from pathlib import Path |
|
|
| import pipe |
| from app_env import ( |
| HF_GSK_HUB_HF_TOKEN, |
| HF_GSK_HUB_KEY, |
| HF_GSK_HUB_PROJECT_KEY, |
| HF_GSK_HUB_UNLOCK_TOKEN, |
| HF_GSK_HUB_URL, |
| HF_REPO_ID, |
| HF_SPACE_ID, |
| HF_WRITE_TOKEN, |
| ) |
| from io_utils import LOG_FILE, get_yaml_path, write_log_to_user_file |
| from isolated_env import prepare_venv |
| from leaderboard import LEADERBOARD |
|
|
| is_running = False |
|
|
| logger = logging.getLogger(__file__) |
|
|
|
|
| def start_process_run_job(): |
| try: |
| logging.debug("Running jobs in thread") |
| global thread, is_running |
| thread = threading.Thread(target=run_job) |
| thread.daemon = True |
| is_running = True |
| thread.start() |
|
|
| except Exception as e: |
| print("Failed to start thread: ", e) |
|
|
|
|
| def stop_thread(): |
| logging.debug("Stop thread") |
| global is_running |
| is_running = False |
|
|
|
|
| def prepare_env_and_get_command( |
| m_id, |
| d_id, |
| config, |
| split, |
| inference, |
| inference_token, |
| uid, |
| label_mapping, |
| feature_mapping, |
| ): |
| leaderboard_dataset = None |
| if os.environ.get("SPACE_ID") == "giskardai/giskard-evaluator": |
| leaderboard_dataset = LEADERBOARD |
|
|
| inference_type = "hf_pipeline" |
| if inference and inference_token: |
| inference_type = "hf_inference_api" |
|
|
| executable = "giskard_scanner" |
| try: |
| |
| with open("requirements.txt", "r") as f: |
| executable = prepare_venv( |
| uid, |
| "\n".join(f.readlines()), |
| ) |
| logger.info(f"Using {executable} as executable") |
| except Exception as e: |
| logger.warn(f"Create env failed due to {e}, using the current env as fallback.") |
| executable = "giskard_scanner" |
|
|
| command = [ |
| executable, |
| "--loader", |
| "huggingface", |
| "--model", |
| m_id, |
| "--dataset", |
| d_id, |
| "--dataset_config", |
| config, |
| "--dataset_split", |
| split, |
| "--output_format", |
| "markdown", |
| "--output_portal", |
| "huggingface", |
| "--feature_mapping", |
| json.dumps(feature_mapping), |
| "--label_mapping", |
| json.dumps(label_mapping), |
| "--scan_config", |
| get_yaml_path(uid), |
| "--inference_type", |
| inference_type, |
| "--inference_api_token", |
| inference_token, |
| ] |
| |
| if os.environ.get(HF_WRITE_TOKEN): |
| command.append("--hf_token") |
| command.append(os.environ.get(HF_WRITE_TOKEN)) |
|
|
| |
| if os.environ.get(HF_REPO_ID) or os.environ.get(HF_SPACE_ID): |
| command.append("--discussion_repo") |
| |
| command.append(os.environ.get(HF_REPO_ID) or os.environ.get(HF_SPACE_ID)) |
|
|
| |
| if leaderboard_dataset: |
| command.append("--leaderboard_dataset") |
| command.append(leaderboard_dataset) |
|
|
| |
| if os.environ.get(HF_GSK_HUB_KEY): |
| command.append("--giskard_hub_api_key") |
| command.append(os.environ.get(HF_GSK_HUB_KEY)) |
| if os.environ.get(HF_GSK_HUB_URL): |
| command.append("--giskard_hub_url") |
| command.append(os.environ.get(HF_GSK_HUB_URL)) |
| if os.environ.get(HF_GSK_HUB_PROJECT_KEY): |
| command.append("--giskard_hub_project_key") |
| command.append(os.environ.get(HF_GSK_HUB_PROJECT_KEY)) |
| if os.environ.get(HF_GSK_HUB_HF_TOKEN): |
| command.append("--giskard_hub_hf_token") |
| command.append(os.environ.get(HF_GSK_HUB_HF_TOKEN)) |
| if os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN): |
| command.append("--giskard_hub_unlock_token") |
| command.append(os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN)) |
|
|
| eval_str = f"[{m_id}]<{d_id}({config}, {split} set)>" |
|
|
| write_log_to_user_file( |
| uid, |
| f"Start local evaluation on {eval_str}. Please wait for your job to start...\n", |
| ) |
|
|
| return command |
|
|
|
|
| def save_job_to_pipe(task_id, job, description, lock): |
| with lock: |
| pipe.jobs.append((task_id, job, description)) |
|
|
|
|
| def pop_job_from_pipe(): |
| if len(pipe.jobs) == 0: |
| return |
| job_info = pipe.jobs.pop() |
| pipe.current = job_info[2] |
| task_id = job_info[0] |
|
|
| |
| log_file_path = Path(LOG_FILE) |
| if log_file_path.exists(): |
| log_file_path.unlink() |
| os.symlink(f"./tmp/{task_id}.log", LOG_FILE) |
|
|
| write_log_to_user_file(task_id, f"Running job id {task_id}\n") |
| command = prepare_env_and_get_command(*job_info[1]) |
|
|
| with open(f"./tmp/{task_id}.log", "a") as log_file: |
| p = subprocess.Popen(command, stdout=log_file, stderr=subprocess.STDOUT) |
| p.wait() |
| pipe.current = None |
|
|
|
|
| def run_job(): |
| global is_running |
| while is_running: |
| try: |
| pop_job_from_pipe() |
| time.sleep(10) |
| except KeyboardInterrupt: |
| logging.debug("KeyboardInterrupt stop background thread") |
| is_running = False |
| break |
|
|