""" Task 3: Identify the function that violates a specified property. """ import json from typing import Any, Dict, Tuple from data.data_loader import ( get_function_by_name, list_function_names, list_state_variable_names, get_state_variable_by_name ) from env.schemas import Reward, ActionType def list_functions(ctx: Any, qkey: str, params: Dict) -> Tuple[str, Reward]: """Handle LIST_FUNCTIONS action.""" if ctx._is_repeated(qkey): return "Repeated query.", Reward(value=ActionType.REPEATED.cost, reason="Repeated query") names = list_function_names(ctx._contract) return ( f"Functions in {ctx._contract['contract_name']}: {', '.join(names)}", Reward(value=ActionType.LIST_FUNCTIONS.cost, reason="list_functions cost"), ) def get_function_metadata(ctx: Any, qkey: str, params: Dict) -> Tuple[str, Reward]: """Handle GET_FUNCTION_METADATA action.""" fn_name = params.get("function_name", "") if ctx._is_repeated(qkey): return "Repeated query.", Reward(value=ActionType.REPEATED.cost, reason="Repeated query") fn = get_function_by_name(ctx._contract, fn_name) if fn is None: return ( f"Function '{fn_name}' not found. " f"Available: {list_function_names(ctx._contract)}", Reward(value=ActionType.GET_FUNCTION_METADATA.cost, reason="Unknown function"), ) params_list = fn.get("parameters", []) modifiers = fn.get("modifiers", []) lines = [ f"Function : {fn.get('signature', fn_name)}", f"Visibility : {fn.get('visibility', 'unknown')}", f"Modifiers : {', '.join(modifiers) if modifiers else 'none'}", ] if params_list: lines.append("Parameters :") for p in params_list: lines.append(f" {p['type']} {p['name']} — {p.get('description','')}") else: lines.append("Parameters : none") lines.append(f"Returns : {fn.get('returns','') or 'void'}") lines.append(f"Summary : {fn.get('comment','')}") return "\n".join(lines), Reward(value=ActionType.GET_FUNCTION_METADATA.cost, reason="get_function_metadata cost") def get_function_code(ctx: Any, qkey: str, params: Dict) -> Tuple[str, Reward]: """Handle GET_FUNCTION_CODE action.""" fn_name = params.get("function_name", "") if ctx._is_repeated(qkey): return "Repeated query.", Reward(value=ActionType.REPEATED.cost, reason="Repeated query") fn = get_function_by_name(ctx._contract, fn_name) if fn is None: return ( f"Function '{fn_name}' not found. " f"Available: {list_function_names(ctx._contract)}", Reward(value=ActionType.GET_FUNCTION_CODE.cost, reason="Unknown function — extra penalty"), ) code = fn.get("code", "// no code available") return ( f"// {fn_name}\n{code}", Reward(value=ActionType.GET_FUNCTION_CODE.cost, reason="get_function_code cost"), ) def get_state_variable(ctx: Any, qkey: str, params: Dict) -> Tuple[str, Reward]: """Handle GET_STATE_VARIABLE action.""" var_name = params.get("variable_name", "") if ctx._is_repeated(qkey): return "Repeated query.", Reward(value=ActionType.REPEATED.cost, reason="Repeated query") if not var_name: names = list_state_variable_names(ctx._contract) return ( f"State variables: {', '.join(names)}", Reward(value=ActionType.GET_STATE_VARIABLE.cost, reason="Listed state variables"), ) sv = get_state_variable_by_name(ctx._contract, var_name) if sv is None: return ( f"Variable '{var_name}' not found.", Reward(value=ActionType.GET_STATE_VARIABLE.cost, reason="Unknown state variable"), ) return ( f"{sv['type']} {sv['visibility']} {sv['name']}: {sv.get('description','')}", Reward(value=ActionType.GET_STATE_VARIABLE.cost, reason="get_state_variable cost"), ) def get_call_graph(ctx: Any, qkey: str, params: Dict) -> Tuple[str, Reward]: """Handle GET_CALL_GRAPH action.""" if ctx._is_repeated(qkey): return "Repeated query.", Reward(value=ActionType.REPEATED.cost, reason="Repeated query") cg = ctx._contract.get("call_graph", {}) cg_str = "; ".join( f"{fn} → [{', '.join(callees)}]" for fn, callees in cg.items() ) return ( f"Call graph: {cg_str}", Reward(value=ActionType.GET_CALL_GRAPH.cost, reason="get_call_graph cost"), ) def get_property_specification(ctx: Any, qkey: str, params: Dict) -> Tuple[str, Reward]: """Handle GET_PROPERTY_SPECIFICATION action.""" if ctx._is_repeated(qkey): return "Repeated query.", Reward(value=ActionType.REPEATED.cost, reason="Repeated query") rule = ctx._target_fn.get("property_specification", {}) if not rule: rule = "No rule specification available for this property." rule_parsed = json.dumps(rule) if isinstance(rule, dict) else rule return ( f"Formal property:\n{rule_parsed}", Reward(value=ActionType.GET_PROPERTY_SPECIFICATION.cost, reason="get_property_specification cost"), ) def submit_function(ctx: Any, qkey: str, params: Dict) -> Tuple[str, Reward]: """Handle SUBMIT_FUNCTION action for Task 3. Expected params --------------- function_name : str – name of the function that violates the given property """ if ctx._done: return ( "Only ONE submission is allowed. Reset environment to start again.", Reward(value=ActionType.SUBMIT.cost, reason="Second submit_function attempt"), ) fn_name = params.get("function_name", "").strip() if not fn_name: return ( "submit_function requires 'function_name' in params.", Reward(value=ActionType.SUBMIT.cost, reason="Malformed submission"), ) ctx._done = True score = ctx._grader.grade(fn_name, ctx._step_count, ctx._cum_reward) return (f"Correct Answer: {ctx._grader.get_canonical_answer}"), Reward( value=score, reason=f"submit_function score={score:.1f}" ) def unknown_action(ctx: Any, qkey: str, params: Dict, action_type: str) -> Tuple[str, Reward]: """Fallback for unknown actions.""" return ( f"Unknown action '{action_type}'. Valid: {[a.value for a in ActionType]}", Reward(value=-0.10, reason="Unknown action"), )