| from langchain_core.output_parsers import PydanticOutputParser |
| from langchain_core.prompts import ChatPromptTemplate |
| from langchain_openai import ChatOpenAI |
| from pydantic import BaseModel, Field |
| import pandas as pd |
|
|
|
|
| class AnswerEvaluation(BaseModel): |
| is_valid: bool = Field( |
| description="Является ли ответ валидным и корректным относительно вопроса и оригинального текста" |
| ) |
| relevance_score: float = Field( |
| description="Оценка релевантности ответа вопросу от 0.0 до 1.0", |
| ge=0.0, |
| le=1.0 |
| ) |
| completeness_score: float = Field( |
| description="Оценка полноты ответа от 0.0 до 1.0 (насколько ответ покрывает всю необходимую информацию)", |
| ge=0.0, |
| le=1.0 |
| ) |
| factual_accuracy_score: float = Field( |
| description="Оценка фактической точности ответа от 0.0 до 1.0 (соответствие фактам из оригинального текста)", |
| ge=0.0, |
| le=1.0 |
| ) |
|
|
|
|
| class QuestionBatchIterator: |
| def __init__(self, questions, batch_size): |
| self.questions = questions |
| self.batch_size = batch_size |
| self.current_idx = 0 |
| |
| def __iter__(self): |
| return self |
| |
| def __next__(self): |
| if self.current_idx >= len(self.questions): |
| raise StopIteration |
| |
| batch = self.questions[self.current_idx:self.current_idx + self.batch_size] |
| self.current_idx += self.batch_size |
| return batch |
| |
| def __len__(self): |
| return (len(self.questions) + self.batch_size - 1) // self.batch_size |
| |
| def reset(self): |
| self.current_idx = 0 |
|
|
|
|
| class QAEvaluator: |
| def __init__( |
| self, |
| df, |
| text_column="original_text", |
| model="qwen/qwen3-next-80b-a3b-instruct", |
| temperature=0.0, |
| api_key=None, |
| api_base="https://api.proxyapi.ru/openrouter/v1" |
| ): |
| self.df = df.copy() |
| self.original_text_column = text_column |
| self.api_key = api_key |
| |
| self.llm = ChatOpenAI( |
| model=model, |
| temperature=temperature, |
| openai_api_key=self.api_key, |
| openai_api_base=api_base, |
| ) |
| |
| self._setup_evaluation_agent() |
| self._current_question_column = None |
| self._questions_data = None |
| |
| def _setup_evaluation_agent(self): |
| self.parser = PydanticOutputParser(pydantic_object=AnswerEvaluation) |
| |
| self.prompt = ChatPromptTemplate.from_messages([ |
| ("system", """Ты - эксперт по оценке качества ответов на вопросы по новостным текстам. |
| |
| Твоя задача - оценить, насколько ответ корректен и полон относительно заданного вопроса и оригинального текста. |
| |
| ## Критерии оценки: |
| |
| ### is_valid (валидность): |
| - True: ответ корректно отвечает на вопрос и соответствует фактам из текста |
| - False: ответ неверный, не по теме, или содержит фактические ошибки |
| |
| ### relevance_score (релевантность, 0.0-1.0): |
| - 1.0: ответ полностью по теме вопроса |
| - 0.5: ответ частично по теме |
| - 0.0: ответ не имеет отношения к вопросу |
| |
| ### completeness_score (полнота, 0.0-1.0): |
| - 1.0: ответ содержит всю необходимую информацию |
| - 0.5: ответ содержит часть информации |
| - 0.0: ответ пустой или не содержит нужной информации |
| |
| ### factual_accuracy_score (фактическая точность, 0.0-1.0): |
| - 1.0: все факты в ответе соответствуют оригинальному тексту |
| - 0.5: есть небольшие неточности |
| - 0.0: факты в ответе противоречат оригинальному тексту |
| |
| {format_instructions}"""), |
| ("human", """Оцени следующий ответ: |
| |
| ## Оригинальный текст поста: |
| {original_text} |
| |
| ## Вопрос: |
| {question} |
| |
| ## Ответ для оценки: |
| {answer} |
| |
| Проанализируй и выдай оценку.""") |
| ]) |
| |
| self.evaluation_chain = self.prompt | self.llm | self.parser |
| |
| def get_questions(self, question_column, batch_size=10): |
| if question_column not in self.df.columns: |
| raise ValueError(f"Колонка '{question_column}' не найдена в DataFrame. " |
| f"Доступные колонки: {list(self.df.columns)}") |
| |
| self._current_question_column = question_column |
| |
| self._questions_data = [] |
| for idx, row in self.df.iterrows(): |
| self._questions_data.append({ |
| "index": idx, |
| "question": row[question_column], |
| "original_text": row[self.original_text_column] |
| }) |
| |
| questions = [item["question"] for item in self._questions_data] |
| |
| return QuestionBatchIterator(questions, batch_size) |
| |
| def evaluate_answers(self, answers, show_progress=True): |
| if self._questions_data is None: |
| raise ValueError("Сначала вызовите get_questions() для получения вопросов") |
| |
| if len(answers) != len(self._questions_data): |
| raise ValueError( |
| f"Количество ответов ({len(answers)}) не совпадает с количеством " |
| f"вопросов ({len(self._questions_data)})" |
| ) |
| |
| total_questions = len(answers) |
| valid_answers = 0 |
| invalid_answers = 0 |
| detailed_results = [] |
| |
| relevance_scores = [] |
| completeness_scores = [] |
| factual_accuracy_scores = [] |
| |
| if show_progress: |
| from tqdm import tqdm |
| iterator = tqdm( |
| zip(self._questions_data, answers), |
| total=len(answers), |
| desc="Оценка ответов" |
| ) |
| else: |
| iterator = zip(self._questions_data, answers) |
| |
| for qa_data, answer in iterator: |
| try: |
| evaluation = self._evaluate_single_answer( |
| original_text=qa_data["original_text"], |
| question=qa_data["question"], |
| answer=answer |
| ) |
| |
| if evaluation.is_valid: |
| valid_answers += 1 |
| else: |
| invalid_answers += 1 |
| |
| relevance_scores.append(evaluation.relevance_score) |
| completeness_scores.append(evaluation.completeness_score) |
| factual_accuracy_scores.append(evaluation.factual_accuracy_score) |
| |
| detailed_results.append({ |
| "index": qa_data["index"], |
| "question": qa_data["question"], |
| "answer": answer, |
| "is_valid": evaluation.is_valid, |
| "relevance_score": evaluation.relevance_score, |
| "completeness_score": evaluation.completeness_score, |
| "factual_accuracy_score": evaluation.factual_accuracy_score, |
| }) |
| |
| except Exception as e: |
| print(f"Ошибка при оценке ответа: {e}") |
| invalid_answers += 1 |
| relevance_scores.append(0.0) |
| completeness_scores.append(0.0) |
| factual_accuracy_scores.append(0.0) |
| |
| detailed_results.append({ |
| "index": qa_data["index"], |
| "question": qa_data["question"], |
| "answer": answer, |
| "is_valid": False, |
| "relevance_score": 0.0, |
| "completeness_score": 0.0, |
| "factual_accuracy_score": 0.0 |
| }) |
| |
| avg_relevance = sum(relevance_scores) / len(relevance_scores) if relevance_scores else 0.0 |
| avg_completeness = sum(completeness_scores) / len(completeness_scores) if completeness_scores else 0.0 |
| avg_factual_accuracy = sum(factual_accuracy_scores) / len(factual_accuracy_scores) if factual_accuracy_scores else 0.0 |
| |
| accuracy = valid_answers / total_questions if total_questions > 0 else 0.0 |
| combined_score = (avg_relevance + avg_completeness + avg_factual_accuracy) / 3 |
| |
| return { |
| "total_questions": total_questions, |
| "valid_answers": valid_answers, |
| "invalid_answers": invalid_answers, |
| "accuracy": accuracy, |
| "avg_relevance": avg_relevance, |
| "avg_completeness": avg_completeness, |
| "avg_factual_accuracy": avg_factual_accuracy, |
| "combined_score": combined_score, |
| "detailed_results": detailed_results, |
| } |
| |
| def _evaluate_single_answer(self, original_text, question, answer): |
| if answer is None or (isinstance(answer, str) and answer.strip() == ""): |
| return AnswerEvaluation( |
| is_valid=False, |
| relevance_score=0.0, |
| completeness_score=0.0, |
| factual_accuracy_score=0.0, |
| ) |
| |
| result = self.evaluation_chain.invoke({ |
| "original_text": original_text, |
| "question": question, |
| "answer": answer, |
| "format_instructions": self.parser.get_format_instructions() |
| }) |
| |
| return result |
| |
| def get_detailed_results_df(self, metrics): |
| return pd.DataFrame(metrics["detailed_results"]) |
|
|