| import json |
| from pathlib import Path |
| from typing import List |
| from functools import partial |
| from .base_agent import BaseAgent |
| from prompt.template import METHOD_CRITIQUE_PROMPT |
| from utils.convert_format import markdown_to_json_method |
| from utils.utils import parse_llm_output_to_json |
| from utils.embedding import EmbeddingScorer |
|
|
| import json |
|
|
|
|
|
|
| class MethodScorer: |
|
|
| def __init__(self, score_func, parent_weight=0.5, child_weight=0.5): |
| self.parent_weight = parent_weight |
| self.child_weight = child_weight |
| self.score_func = score_func |
| self.leaves = [] |
|
|
| def process(self, data): |
| self.leaves = [] |
| for root_node in data: |
| self._process_node(root_node, parent_scores=[]) |
| for root_node in data: |
| self._collect_leaves(root_node) |
| return self.leaves |
|
|
| def _process_node(self, node, parent_scores): |
| if 'children' in node: |
| children = node.get('children', []) |
| if children: |
| first_child = children[0] |
| if 'method_class' in first_child: |
| input_for_llm = [{"method": child["method_class"], "description": child.get("description", "")} for child in children] |
| llm_result = self.score_func(input_for_llm) |
| for idx, child in enumerate(children): |
| if idx < len(llm_result): |
| child['score'] = llm_result[idx]['score'] |
| else: |
| child['score'] = 0 |
| current_score = node.get('score') |
| new_parent = parent_scores.copy() |
| if current_score is not None: |
| new_parent.append(current_score) |
| for child in children: |
| self._process_node(child, new_parent) |
| else: |
| input_for_llm = [{"method": child["method"], "description": child.get("description", "")} for child in children] |
| llm_result = self.score_func(input_for_llm) |
| for idx, child in enumerate(children): |
| if idx < len(llm_result): |
| child_score = llm_result[idx]['score'] |
| else: |
| child_score = 0 |
| child['score'] = child_score |
| parent_avg = sum(parent_scores) / len(parent_scores) if parent_scores else 0 |
| final_score = parent_avg * self.parent_weight + child_score * self.child_weight |
| child['final_score'] = final_score |
|
|
| def _collect_leaves(self, node): |
| if 'children' in node: |
| for child in node['children']: |
| self._collect_leaves(child) |
| else: |
| if 'final_score' in node: |
| self.leaves.append({ |
| "method": node["method"], |
| "description": node.get("description", ""), |
| "score": node['final_score'] |
| }) |
|
|
|
|
|
|
| class MethodRanking(BaseAgent): |
| def __init__(self, llm, rag=True): |
| super().__init__(llm) |
| self.rag = rag |
| self.embedding_scorer = EmbeddingScorer() |
| current_file = Path(__file__).resolve() |
| json_path = current_file.parent.parent.parent / 'data/actor_data/docs/method_en_v1.json' |
| md_path = current_file.parent.parent.parent / 'data/actor_data/docs/method_en_v1.md' |
|
|
| with open(str(md_path), "r", encoding="utf-8") as f: |
| self.markdown_text = f.read() |
| self.method_tree = markdown_to_json_method(self.markdown_text) |
| with open(json_path, "w+", encoding="utf-8") as f: |
| json.dump(self.method_tree, f, ensure_ascii=False, indent=4) |
| |
| def llm_score_method(self, problem_description: str, methods: List[dict]): |
| methods_str = '\n'.join([f"{i+1}. {method['method']} {method.get('description', '')}" for i, method in enumerate(methods)]) |
| prompt = METHOD_CRITIQUE_PROMPT.format(problem_description=problem_description, methods=methods_str) |
| answer = self.llm.generate(prompt) |
| method_scores = parse_llm_output_to_json(answer).get('methods', []) |
| method_scores = sorted(method_scores, key=lambda x: x['method_index']) |
| for method in method_scores: |
| method['score'] = sum(method['scores'].values()) / len(method['scores']) |
| |
| return method_scores |
|
|
| def format_methods(self, methods: List[str]): |
| return '\n'.join([f"**{method['method']}:** {method['description']}" for method in methods]) |
|
|
| def top_methods(self, problem_description: str, top_k: int=6, method: str='embedding'): |
| if self.rag: |
| if method == 'embedding': |
| score_func = partial(self.embedding_scorer.score_method, problem_description) |
| else: |
| score_func = partial(self.llm_score_method, problem_description) |
| method_scores = MethodScorer(score_func).process(self.method_tree) |
| method_scores.sort(key=lambda x: x['score'], reverse=True) |
| return self.format_methods(method_scores[:top_k]) |
| else: |
| return self.markdown_text |
| |
|
|
| if __name__ == "__main__": |
| from input.test_middle_result import problem_str |
| from llm.llm import LLM |
| llm = LLM('deepseek-chat') |
| |
| mr = MethodRanking(llm) |
| |
|
|