| import os |
| import json |
| import pandas as pd |
| import streamlit as st |
| from collections import defaultdict |
|
|
| def clean_git_patch(git_patch): |
| if 'diff' in git_patch: |
| git_patch = git_patch[git_patch.index('diff'):] |
| return git_patch |
|
|
|
|
| def _load_report_legacy(instance_id_to_status, report): |
| |
| for status, instance_ids in report.items(): |
| for instance_id in instance_ids: |
| if status == 'resolved': |
| instance_id_to_status[instance_id]['resolved'] = True |
| elif status == 'applied': |
| instance_id_to_status[instance_id]['applied'] = True |
| elif status == 'test_timeout': |
| instance_id_to_status[instance_id]['test_timeout'] = True |
| elif status == 'test_errored': |
| instance_id_to_status[instance_id]['test_errored'] = True |
| elif status == 'no_generation': |
| instance_id_to_status[instance_id]['empty_generation'] = True |
|
|
| def _load_report_new(instance_id_to_status, report): |
| |
| |
| for instance_id in report['resolved_ids']: |
| instance_id_to_status[instance_id]['resolved'] = True |
| for instance_id in report['error_ids']: |
| instance_id_to_status[instance_id]['error_eval'] = True |
|
|
| def load_df_from_selected_filepaths(select_filepaths): |
| data = [] |
| if isinstance(select_filepaths, str): |
| select_filepaths = [select_filepaths] |
| for filepath in select_filepaths: |
| |
| dirname = os.path.dirname(filepath) |
| |
| report_json = os.path.join(dirname, 'report.json') |
|
|
| instance_id_to_status = defaultdict(lambda: {'resolved': False}) |
| if os.path.exists(report_json): |
| with open(report_json, 'r') as f: |
| report = json.load(f) |
| if "resolved_ids" in report: |
| _load_report_new(instance_id_to_status, report) |
| else: |
| _load_report_legacy(instance_id_to_status, report) |
| else: |
| pass |
|
|
| with open(filepath, 'r') as f: |
| for line in f.readlines(): |
| d = json.loads(line) |
| |
| if 'git_patch' in d: |
| d['git_patch'] = clean_git_patch(d['git_patch']) |
| if d['instance_id'] in instance_id_to_status: |
| d['fine_grained_report'] = dict(instance_id_to_status[d['instance_id']]) |
| data.append(d) |
| df = pd.DataFrame(data) |
| return df |
|
|
|
|
| def agg_stats(df): |
| stats = [] |
| for idx, entry in df.iterrows(): |
| history = entry['history'] |
| test_result = entry['test_result']['result'] if 'result' in entry['test_result'] else entry['test_result'] |
| error = entry.get('error', None) |
| if error is not None and isinstance(error, str): |
| agent_stuck_in_loop = "Agent got stuck in a loop" in error |
| contains_error = bool(error) and not agent_stuck_in_loop |
| else: |
| agent_stuck_in_loop = False |
| contains_error = False |
|
|
| |
| if 'fine_grained_report' in entry: |
| |
| if not isinstance(entry['fine_grained_report'], dict): |
| entry['fine_grained_report'] = {} |
| test_result['resolved'] = entry['fine_grained_report'].get('resolved', False) |
| test_result['test_timeout'] = entry['fine_grained_report'].get('test_timeout', False) |
| test_result['test_errored'] = entry['fine_grained_report'].get('test_errored', False) |
| test_result['patch_applied'] = entry['fine_grained_report'].get('applied', False) |
| elif 'report' in entry: |
| test_result['resolved'] = bool(entry['report'].get('resolved', False)) |
| test_result['test_timeout'] = bool(entry['report'].get('test_timeout', False)) |
| test_result['test_errored'] = bool(entry['report'].get('test_errored', False)) |
| test_result['patch_applied'] = bool(entry['report'].get('apply_test_patch_success', False)) |
|
|
| metrics = entry.get('metrics', {}) |
| cost = metrics.get('accumulated_cost', None) |
|
|
| d = { |
| 'idx': idx, |
| 'instance_id': entry['instance_id'], |
| 'agent_class': entry['metadata']['agent_class'], |
| 'model_name': entry['metadata']['llm_config']['model'] if 'llm_config' in entry['metadata'] else entry['metadata']['model_name'], |
| **test_result, |
| 'agent_stuck_in_loop': agent_stuck_in_loop, |
| 'contains_error': contains_error, |
| 'cost': cost, |
| } |
| if 'swe_instance' in entry: |
| d.update( |
| { |
| 'repo': entry['swe_instance']['repo'], |
| } |
| ) |
| stats.append(d) |
| return pd.DataFrame(stats) |
|
|
| @st.cache_data |
| def get_resolved_stats_from_filepath(filepath): |
| df = load_df_from_selected_filepaths(filepath) |
| stats = agg_stats(df) |
| del df |
| if not len(stats): |
| return { |
| 'success_rate': None, |
| 'n_solved': None, |
| 'n_error': None, |
| 'total': None, |
| 'total_cost': None, |
| } |
| tot_cost = stats['cost'].sum() |
| resolved = stats['resolved'].sum() / len(stats) |
| num_contains_error = stats['contains_error'].sum() |
| num_agent_stuck_in_loop = stats['agent_stuck_in_loop'].sum() |
| tot_instances = len(stats) |
| return { |
| 'success_rate': resolved, |
| 'n_solved': stats['resolved'].sum(), |
| 'n_error': num_contains_error, |
| 'n_stuck_in_loop': num_agent_stuck_in_loop, |
| 'total': tot_instances, |
| 'total_cost': tot_cost, |
| } |
|
|