Spaces:
Sleeping
Sleeping
| """ | |
| Inference Script Example | |
| =================================== | |
| MANDATORY | |
| - Before submitting, ensure the following variables are defined in your environment configuration: | |
| API_BASE_URL The API endpoint for the LLM. | |
| MODEL_NAME The model identifier to use for inference. | |
| HF_TOKEN Your Hugging Face / API key. | |
| LOCAL_IMAGE_NAME The name of the local image to use for the environment if you are using from_docker_image() | |
| method | |
| """ | |
| import asyncio | |
| import os | |
| import textwrap | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| from typing import List, Optional | |
| from openai import OpenAI | |
| try: | |
| from my_env_v4 import MyEnvV4Action, MyEnvV4Env | |
| except ImportError: | |
| # Minimal mock or fallback if not installed natively | |
| class MyEnvV4Action: | |
| def __init__(self, message: str): | |
| self.message = message | |
| class MyEnvV4Env: | |
| async def from_docker_image(cls, image_name): | |
| import asyncio | |
| # Give Uvicorn a moment to bind | |
| await asyncio.sleep(2) | |
| return cls() | |
| def __init__(self): | |
| self.base_url = "http://127.0.0.1:7860" | |
| async def reset(self): | |
| import httpx | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| resp = await client.post(f"{self.base_url}/reset", json={}, timeout=5.0) | |
| data = resp.json() | |
| except Exception: | |
| data = {"observation": {"echoed_message": "fallback data"}, "done": False} | |
| class Obj: pass | |
| class Obs: pass | |
| res = Obj() | |
| res.observation = Obs() | |
| res.observation.echoed_message = data.get("observation", {}).get("echoed_message", "") | |
| res.done = data.get("done", False) | |
| return res | |
| async def step(self, action): | |
| import httpx | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| resp = await client.post(f"{self.base_url}/step", json={"action": {"message": action.message}}, timeout=5.0) | |
| data = resp.json() | |
| except Exception: | |
| data = {"observation": {"echoed_message": ""}, "reward": 0.0, "done": True} | |
| class Obj: pass | |
| class Obs: pass | |
| res = Obj() | |
| res.observation = Obs() | |
| res.observation.echoed_message = data.get("observation", {}).get("echoed_message", "") | |
| res.reward = data.get("reward", 0.0) | |
| res.done = data.get("done", True) | |
| return res | |
| async def close(self): | |
| pass | |
| IMAGE_NAME = os.getenv("IMAGE_NAME") # If you are using docker image | |
| API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") | |
| API_BASE_URL = os.getenv("API_BASE_URL") | |
| MODEL_NAME = os.getenv("MODEL_NAME") or "gpt-4o-mini" | |
| TASK_NAME = os.getenv("MY_ENV_V4_TASK", "moderate") | |
| BENCHMARK = os.getenv("MY_ENV_V4_BENCHMARK", "my_env_v4") | |
| MAX_STEPS = 8 | |
| TEMPERATURE = 0.1 | |
| MAX_TOKENS = 10 | |
| SUCCESS_SCORE_THRESHOLD = 0.1 # normalized score in [0, 1] | |
| _MAX_REWARD_PER_STEP = 1.0 # 1.0 for perfect match | |
| MAX_TOTAL_REWARD = MAX_STEPS * _MAX_REWARD_PER_STEP | |
| SYSTEM_PROMPT = textwrap.dedent( | |
| """ | |
| You are a content moderation AI. | |
| The user will provide a piece of text to moderate. | |
| You must output exactly one of these actions: "allow" or "flag" or "remove". | |
| Do not add any reasoning, quotes, or additional text. | |
| "allow": mostly safe content. | |
| "flag": somewhat toxic, sarcasm, or slightly risky content. | |
| "remove": hate speech, threats, go die/disappear, or extremely toxic. | |
| """ | |
| ).strip() | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| error_val = error if error else "null" | |
| done_val = str(done).lower() | |
| print( | |
| f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) | |
| def build_user_prompt(step: int, last_echoed: str, last_reward: float, history: List[str]) -> str: | |
| return textwrap.dedent( | |
| f""" | |
| Please moderate the following text: | |
| {last_echoed!r} | |
| Output exactly "allow", "flag", or "remove". | |
| """ | |
| ).strip() | |
| def get_model_message(client: OpenAI, step: int, last_echoed: str, last_reward: float, history: List[str]) -> str: | |
| user_prompt = build_user_prompt(step, last_echoed, last_reward, history) | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| stream=False, | |
| ) | |
| text = (completion.choices[0].message.content or "").strip() | |
| return text if text else "allow" | |
| except Exception as exc: | |
| print(f"[DEBUG] Model request failed: {exc}", flush=True) | |
| return "allow" | |
| async def main() -> None: | |
| if API_BASE_URL: | |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) | |
| else: | |
| client = OpenAI(api_key=API_KEY) | |
| env = await MyEnvV4Env.from_docker_image(IMAGE_NAME) | |
| history: List[str] = [] | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| log_start(task=TASK_NAME, env=BENCHMARK, model=MODEL_NAME) | |
| try: | |
| result = await env.reset() # OpenENV.reset() | |
| last_echoed = result.observation.echoed_message | |
| last_reward = 0.0 | |
| for step in range(1, MAX_STEPS + 1): | |
| if result.done: | |
| break | |
| message = get_model_message(client, step, last_echoed, last_reward, history) | |
| result = await env.step(MyEnvV4Action(message=message)) | |
| obs = result.observation | |
| reward = result.reward or 0.0 | |
| done = result.done | |
| error = None | |
| rewards.append(reward) | |
| steps_taken = step | |
| last_echoed = obs.echoed_message | |
| last_reward = reward | |
| log_step(step=step, action=message, reward=reward, done=done, error=error) | |
| history.append(f"Step {step}: {message!r} -> reward {reward:+.2f}") | |
| if done: | |
| break | |
| score = sum(rewards) / MAX_TOTAL_REWARD if MAX_TOTAL_REWARD > 0 else 0.0 | |
| score = min(max(score, 0.0), 1.0) # clamp to [0, 1] | |
| success = score >= SUCCESS_SCORE_THRESHOLD | |
| finally: | |
| try: | |
| await env.close() | |
| except Exception as e: | |
| print(f"[DEBUG] env.close() error (container cleanup): {e}", flush=True) | |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |