| from datetime import datetime, timedelta |
| from typing import List |
| from dataclasses import dataclass, astuple |
|
|
| from tabulate import tabulate |
| from lib.log_parser import LogTag, LogItem, WebItem |
| from lib.utils import * |
|
|
| class LogReport: |
| """用于处理 log文件""" |
| def __init__(self): |
| self.items:List[LogItem] = [] |
|
|
| def from_logfile(self, log_file, start_line=0): |
| """将log文件中有效的行转换成 LogItem,返回 LogItem的列表 |
| 和当前文件的行数(用于下一个 case读取 log文件的起始行数)""" |
| print(f"generate LogReport from logfile: {log_file}") |
| with open(log_file, "r") as f: |
| lines = f.readlines() |
| print(f"read log file lines {start_line}:{len(lines)}") |
| for l in lines[start_line:]: |
| for item in LogTag: |
| if item.value in l: |
| log_item = LogItem.from_log(item, l) |
| self.items.append(log_item) |
| return self.items, len(lines) |
|
|
| def item_to_rows(self): |
| """将 LogItem 列表转换成 csv行的格式,每行以 audio开始""" |
| rows = [] |
| current_line = [] |
| for index, item in enumerate(self.items): |
| if item.tag in [LogTag.load_start, LogTag.load_end]: |
| continue |
| |
| if item.tag == LogTag.audio_end: |
| rows.append(current_line) |
| current_line = [] |
| current_line += [item.tag.name, item.timestamp, item.content] |
| return rows |
|
|
| def to_csv(self, csv_path=None): |
| header_mapping = { |
| |
| "audio_end_tag": 0, |
| "audio_end_tsp": 1, |
| "audio_length": 2, |
| "transcribe_cost_tag": 3, |
| "transcribe_cost_tsp": 4, |
| "transcribe_cost": 5, |
| "transcribe_end_tag": 6, |
| "transcribe_end_tsp": 7, |
| "transcribe_output": 8, |
| "translate_start_tag": 9, |
| "translate_start_tsp": 10, |
| "translate_input": 11, |
| "translate_cost_tag": 12, |
| "translate_cost_tsp": 13, |
| "translate_cost": 14, |
| "translate_end_tag": 15, |
| "translate_end_tsp": 16, |
| "translate_output": 17, |
| } |
| rows = self.item_to_rows() |
| header = list(header_mapping.keys()) |
| rows = [[row[i] for i in header_mapping.values() if i < len(row)] for row in rows] |
| save_csv(csv_path, header, rows) |
|
|
| @dataclass |
| class DelaySummary: |
| audio_name:str = "" |
| trans_type: str = "" |
| audio_length:str = "" |
| load_start: datetime =None |
| load_end: datetime=None |
| load: float = 0 |
| avg_audio_len: float = 0 |
| total_tsb: float = 0 |
| avg_tsb_per_second: float = 0 |
| total_tsl: float = 0 |
| avg_tsl_per_second: float = 0 |
| total_web: float = 0 |
| avg_web_per_second: float = 0 |
| avg_web_freq: float = 0 |
|
|
| @dataclass |
| class DelayDetailRow: |
| audio_end_tsp:datetime = "" |
| audio_length:float =0 |
| tsb_end_tsp:datetime ="" |
| tsb_opt:str ="" |
| tsb_cost:float = 0 |
| tsb_cost_per_second: float = 0 |
| tsl_ipt:str ="" |
| tsl_end_tsp:datetime ="" |
| tsl_opt:str ="" |
| tsl_cost:float =0 |
| tsl_cost_per_second: float = 0 |
| web_tsp:datetime ="" |
| web_src:str ="" |
| web_dst:str ="" |
| web_delay: float = 0 |
| web_delay_per_second: float = 0 |
| web_freq: float = 0 |
| def __repr__(self): |
| return f"Row(audio_length={self.audio_length}, tsb_opt={self.tsb_opt})" |
|
|
| @dataclass |
| class DelayItem: |
| """存储delay 报告中每一个 case的结果""" |
| translation_type: str = '' |
| audio: str = "" |
| audio_length: str = "" |
| web_items: List[WebItem] = None |
| log_items: List[LogItem] = None |
|
|
| def to_rows(self): |
| """将 log和 web的结果合并, 返回 DelaySummary和 DelayDetail的列表 |
| 返回 row_0包含音频信息和 load 时间 |
| rows 是每次推理的详细信息""" |
| print(f"length of log_items: {len(self.log_items)}") |
| web_items_dict = {i.src_text + i.dst_text: i for i in self.web_items} |
|
|
| summary = DelaySummary(audio_name=self.audio,trans_type=self.translation_type, |
| audio_length=self.audio_length) |
| detail_rows = [] |
| current_row = DelayDetailRow() |
| for i in self.log_items: |
| if i.tag == LogTag.load_start: |
| summary.load_start = i.timestamp |
| elif i.tag == LogTag.load_end: |
| summary.load_end = i.timestamp |
| summary.load = (summary.load_end-summary.load_start).total_seconds() |
| elif i.tag == LogTag.audio_end: |
| if current_row.audio_length > 0: |
| detail_rows.append(current_row) |
| |
| current_row = DelayDetailRow() |
| current_row.audio_end_tsp = i.timestamp |
| current_row.audio_length = time_to_float(i.content) |
| elif i.tag == LogTag.transcribe_end: |
| current_row.tsb_end_tsp = i.timestamp |
| current_row.tsb_opt = i.content |
| elif i.tag == LogTag.transcribe_cost: |
| current_row.tsb_cost = time_to_float(i.content) |
| current_row.tsb_cost_per_second = current_row.tsb_cost/current_row.audio_length if current_row.audio_length else 0 |
| elif i.tag == LogTag.translate_start: |
| current_row.tsl_ipt = i.content |
| elif i.tag in [LogTag.translate_end, LogTag.translate_large_end]: |
| current_row.tsl_end_tsp = i.timestamp |
| current_row.tsl_opt = i.content |
| |
| if web_item:=web_items_dict.get(current_row.tsb_opt+current_row.tsl_opt): |
| current_row.web_tsp = web_item.timestamp |
| current_row.web_src = web_item.src_text |
| current_row.web_dst = web_item.dst_text |
| current_row.web_delay = (current_row.web_tsp - current_row.audio_end_tsp).total_seconds() |
| current_row.web_delay_per_second = current_row.web_delay / current_row.audio_length if current_row.audio_length else 0 |
| |
| web_items_dict.pop(current_row.tsb_opt+current_row.tsl_opt) |
| if len(detail_rows)>=1 and detail_rows[-1].web_tsp: |
| current_row.web_freq = (current_row.web_tsp - detail_rows[-1].web_tsp).total_seconds() |
|
|
| elif i.tag in [LogTag.translate_cost, LogTag.translate_large_cost]: |
| current_row.tsl_cost = time_to_float(i.content) |
| current_row.tsl_cost_per_second = current_row.tsl_cost/current_row.audio_length if current_row.audio_length else 0 |
| summary = self.get_summary(summary, detail_rows) |
| return summary, detail_rows |
| def get_summary(self,summary: DelaySummary, detail_rows): |
| audio_len = [] |
| total_tsb = [] |
| avg_tsb_per_second = [] |
| total_tsl = [] |
| avg_tsl_per_second = [] |
| total_web = [] |
| avg_web_per_second = [] |
| web_freq = [] |
| for row in detail_rows: |
| if row.audio_length: |
| audio_len.append(row.audio_length) |
| if row.tsb_cost: |
| total_tsb.append(row.tsb_cost) |
| if row.tsb_cost_per_second: |
| avg_tsb_per_second.append(row.tsb_cost_per_second) |
| if row.tsl_cost: |
| total_tsl.append(row.tsl_cost) |
| if row.tsl_cost_per_second: |
| avg_tsl_per_second.append(row.tsl_cost_per_second) |
| if row.web_delay: |
| total_web.append(row.web_delay) |
| if row.web_delay_per_second: |
| avg_web_per_second.append(row.web_delay_per_second) |
| if row.web_freq: |
| web_freq.append(row.web_freq) |
|
|
| summary.avg_audio_len = sum(audio_len) / len(audio_len) if len(audio_len)>0 else 0 |
| summary.total_tsb = sum(total_tsb) |
| summary.avg_tsb_per_second = sum(avg_tsb_per_second) / len(avg_tsb_per_second) if len(avg_tsb_per_second)>0 else 0 |
| summary.total_tsl = sum(total_tsl) |
| summary.avg_tsl_per_second = sum(avg_tsl_per_second) / len(avg_tsl_per_second) if len(avg_tsl_per_second)>0 else 0 |
| summary.total_web = sum(total_web) |
| summary.avg_web_per_second = sum(avg_web_per_second) / len(avg_web_per_second) if len(avg_web_per_second)>0 else 0 |
| summary.avg_web_freq = sum(web_freq) /len(web_freq) if len(web_freq)>0 else 0 |
| return summary |
|
|
|
|
| class DelayReport: |
| """存储delay 报告中所有 case的结果""" |
| start_line = 0 |
| items: List[DelayItem] = [] |
| def print_summary(self, data): |
| print(tabulate(data)) |
|
|
| def to_csv(self, csv_path): |
| summaries = [["audio_name", "translation", "audio_length", |
| "load_start", "load_end", "load", "avg_audio_len", |
| "total_tsb", "avg_tsb_per_sec", "total_tsl", "avg_tsl_per_sec", |
| "total_web", "avg_web_per_sec", "avg_web_freq"]] |
| details = [["audio_end_tsp", "audio_length", |
| "tsb_end_tsp", "tsp_opt", "tsb_cost", "tsb_cost_per_sec", |
| "tsl_ipt", "tsl_end_tsp", "tsl_opt", "tsl_cost", "tsl_cost_per_sec", |
| "web_tsp", "web_src", "web_dst", "web_delay", "web_delay_per_sec", "web_freq"]] |
| for i in self.items: |
| summary, detail_rows = i.to_rows() |
| summaries.append(astuple(summary)) |
| details += [astuple(i) for i in detail_rows] |
| details.append([]) |
| self.print_summary(summaries) |
| save_csv(csv_path, [], summaries+[[]]+details) |
|
|
| @dataclass |
| class AccuracyItem: |
| """存储accuracy 报告中每一个 case的结果""" |
| translation_type: str = '' |
| audio: str = "" |
| audio_length: str = "" |
| audio_text: str = "" |
| src_text: str = "" |
| dst_text: str = "" |
| asr_accuracy: tuple= (0,1) |
| text_compare: str = "" |
| def __post_init__(self): |
|
|
| if self.translation_type == "en2zh": |
| text1 = clean_text_for_comparison_en(self.audio_text) |
| text2 = clean_text_for_comparison_en(self.src_text) |
| spliter = " " |
| else: |
| text1 = clean_text_for_comparison_zh(self.audio_text) |
| text2 = clean_text_for_comparison_zh(self.src_text) |
| spliter = "" |
| self.asr_accuracy = run_textdistance(text1, text2) |
| self.text_compare = highlight_diff(text1, text2, spliter) |
|
|
| def to_list(self): |
| return [self.audio, self.translation_type, self.audio_length, |
| self.asr_accuracy[0], self.asr_accuracy[1], |
| self.src_text, self.audio_text, self.text_compare] |
|
|
| class AccuracyReport: |
| items:List[AccuracyItem] = [] |
|
|
| def print_summary(self): |
| header = ["audio", "distance", "normalized distance"] |
| rows = [[i.audio, i.asr_accuracy[0], i.asr_accuracy[1]] for i in self.items] |
| print(tabulate(rows, header)) |
|
|
| def to_csv(self, csv_path): |
| print("accuracy item length: ", len(self.items)) |
| self.print_summary() |
| header = ["audio_name", "translation", "audio_length", |
| "distance", "normalized distance", |
| "src text", "audio text", "text compare"] |
| save_csv(csv_path, header, [i.to_list() for i in self.items]) |
|
|
|
|
|
|
|
|