| |
|
|
| import time |
| import streamlit as st |
| from typing import Dict, List, Optional, Tuple |
| from datetime import datetime |
|
|
| try: |
| from openai import OpenAI |
| OPENAI_AVAILABLE = True |
| except ImportError: |
| OPENAI_AVAILABLE = False |
| st.error("❌ OpenAI library nie jest dostępna") |
|
|
| from config import REPORT_PROMPTS, MODEL_SETTINGS, INTERVIEW_TYPES |
|
|
| class ReportGenerator: |
| """Inteligentny generator długich raportów badawczych z self-prompting""" |
| |
| def __init__(self, api_key: str): |
| if not OPENAI_AVAILABLE: |
| raise Exception("OpenAI library nie jest dostępna") |
| |
| self.client = OpenAI(api_key=api_key) |
| self.api_key = api_key |
| self.generation_stats = { |
| 'sections_generated': 0, |
| 'sections_expanded': 0, |
| 'total_tokens_used': 0, |
| 'total_cost_estimate': 0, |
| 'generation_time': 0 |
| } |
| |
| def generate_comprehensive_report(self, transcriptions: Dict[str, str], brief: str = "") -> str: |
| """ |
| Główna funkcja generowania kompletnego raportu |
| Używa strategii wieloetapowej z self-prompting |
| """ |
| start_time = time.time() |
| |
| try: |
| st.info("📋 Rozpoczynam generowanie raportu...") |
| |
| |
| combined_transcriptions = self._combine_transcriptions(transcriptions) |
| interview_type = self._detect_interview_type(combined_transcriptions) |
| |
| st.info(f"🔍 Wykryto typ: {INTERVIEW_TYPES.get(interview_type, 'nieznany')}") |
| |
| |
| st.info("📝 Etap 1/4: Tworzenie struktury raportu...") |
| outline = self._generate_outline(combined_transcriptions, brief, interview_type) |
| |
| if not outline: |
| raise Exception("Nie udało się wygenerować struktury raportu") |
| |
| |
| st.info("✍️ Etap 2/4: Generowanie treści sekcji...") |
| sections = self._generate_sections_iteratively( |
| outline, combined_transcriptions, brief, interview_type |
| ) |
| |
| |
| st.info("🔍 Etap 3/4: Pogłębianie analizy...") |
| expanded_sections = self._expand_short_sections( |
| sections, combined_transcriptions, brief |
| ) |
| |
| |
| st.info("📄 Etap 4/4: Finalne scalenie...") |
| final_report = self._assemble_final_report( |
| expanded_sections, brief, interview_type, len(transcriptions) |
| ) |
| |
| |
| self.generation_stats['generation_time'] = time.time() - start_time |
| |
| st.success(f"🎉 Raport wygenerowany! ({self.generation_stats['generation_time']:.1f}s)") |
| self._log_generation_stats() |
| |
| return final_report |
| |
| except Exception as e: |
| st.error(f"❌ Błąd generowania raportu: {str(e)}") |
| raise e |
| |
| def _combine_transcriptions(self, transcriptions: Dict[str, str]) -> str: |
| """Połącz wszystkie transkrypcje w jeden tekst z oznaczeniami""" |
| combined = [] |
| |
| for i, (filename, transcription) in enumerate(transcriptions.items(), 1): |
| header = f"\n\n=== WYWIAD {i}: {filename} ===\n\n" |
| combined.append(header + transcription) |
| |
| return "\n".join(combined) |
| |
| def _detect_interview_type(self, transcriptions: str) -> str: |
| """Automatyczne rozpoznanie typu wywiadu""" |
| text_lower = transcriptions.lower() |
| |
| |
| fgi_indicators = [ |
| 'moderator', 'grupa', 'wszyscy', 'uczestnicy', 'dyskusja', |
| 'czy zgadzacie się', 'co myślicie', 'focus group' |
| ] |
| |
| |
| idi_indicators = [ |
| 'wywiad indywidualny', 'jeden na jeden', 'interviewer', |
| 'opowiedz mi', 'jak się czujesz', 'twoje doświadczenie' |
| ] |
| |
| fgi_score = sum(1 for indicator in fgi_indicators if indicator in text_lower) |
| idi_score = sum(1 for indicator in idi_indicators if indicator in text_lower) |
| |
| if fgi_score > idi_score: |
| return 'fgi' |
| elif idi_score > fgi_score: |
| return 'idi' |
| else: |
| return 'auto' |
| |
| def _generate_outline(self, transcriptions: str, brief: str, interview_type: str) -> Dict: |
| """Generuj strukturę raportu""" |
| try: |
| prompt = REPORT_PROMPTS['outline_generator'].format( |
| transcriptions=transcriptions[:8000], |
| brief=brief or "Brak szczegółowego briefu", |
| interview_type=INTERVIEW_TYPES.get(interview_type, 'wywiad') |
| ) |
| |
| response = self._call_gpt(prompt) |
| outline = self._parse_outline(response) |
| |
| st.success(f"✅ Outline: {len(outline)} sekcji zaplanowanych") |
| return outline |
| |
| except Exception as e: |
| st.error(f"❌ Błąd generowania outline: {e}") |
| return {} |
| |
| def _generate_sections_iteratively(self, outline: Dict, transcriptions: str, brief: str, interview_type: str) -> Dict: |
| """Generuj sekcje raportu jedna po drugiej""" |
| sections = {} |
| |
| for section_title, section_points in outline.items(): |
| if not section_title or section_title.startswith('#'): |
| continue |
| |
| st.info(f"📝 Generuję: {section_title}") |
| |
| try: |
| prompt = REPORT_PROMPTS['section_generator'].format( |
| transcriptions=transcriptions, |
| brief=brief or "Brak szczegółowego briefu", |
| interview_type=INTERVIEW_TYPES.get(interview_type, 'wywiad'), |
| outline=str(outline), |
| section_title=section_title, |
| section_points=section_points |
| ) |
| |
| section_content = self._call_gpt(prompt) |
| sections[section_title] = section_content |
| |
| self.generation_stats['sections_generated'] += 1 |
| st.success(f"✅ {section_title} ({len(section_content.split())} słów)") |
| |
| |
| time.sleep(2) |
| |
| except Exception as e: |
| st.warning(f"⚠️ Błąd sekcji '{section_title}': {e}") |
| sections[section_title] = f"[BŁĄD GENEROWANIA SEKCJI: {e}]" |
| |
| return sections |
| |
| def _expand_short_sections(self, sections: Dict, transcriptions: str, brief: str) -> Dict: |
| """Self-prompting: rozszerz zbyt krótkie sekcje""" |
| expanded_sections = {} |
| |
| for section_title, section_content in sections.items(): |
| word_count = len(section_content.split()) |
| |
| |
| if word_count < 500: |
| st.info(f"🔍 Rozszerzam: {section_title} ({word_count} słów)") |
| |
| try: |
| prompt = REPORT_PROMPTS['section_expander'].format( |
| current_section=section_content, |
| transcriptions=transcriptions, |
| brief=brief or "Brak szczegółowego briefu" |
| ) |
| |
| expanded_content = self._call_gpt(prompt) |
| expanded_sections[section_title] = expanded_content |
| |
| new_word_count = len(expanded_content.split()) |
| self.generation_stats['sections_expanded'] += 1 |
| |
| st.success(f"✅ Rozszerzone: {section_title} ({word_count} → {new_word_count} słów)") |
| |
| time.sleep(2) |
| |
| except Exception as e: |
| st.warning(f"⚠️ Nie udało się rozszerzyć '{section_title}': {e}") |
| expanded_sections[section_title] = section_content |
| else: |
| |
| expanded_sections[section_title] = section_content |
| st.success(f"✅ {section_title} OK ({word_count} słów)") |
| |
| return expanded_sections |
| |
| def _assemble_final_report(self, sections: Dict, brief: str, interview_type: str, interviews_count: int) -> str: |
| """Scal wszystko w finalny raport""" |
| try: |
| sections_text = "\n\n".join([ |
| f"## {title}\n\n{content}" |
| for title, content in sections.items() |
| ]) |
| |
| prompt = REPORT_PROMPTS['final_assembly'].format( |
| sections=sections_text, |
| brief=brief or "Brak szczegółowego briefu", |
| interview_type=INTERVIEW_TYPES.get(interview_type, 'wywiad'), |
| interviews_count=interviews_count, |
| date=datetime.now().strftime("%Y-%m-%d") |
| ) |
| |
| final_report = self._call_gpt(prompt, max_tokens=4000) |
| |
| |
| metadata = f""" |
| |
| --- |
| |
| ## METADATA RAPORTU |
| - **Wygenerowano**: {datetime.now().strftime("%Y-%m-%d %H:%M")} |
| - **Typ badania**: {INTERVIEW_TYPES.get(interview_type, 'nieznany')} |
| - **Liczba wywiadów**: {interviews_count} |
| - **Sekcji wygenerowanych**: {self.generation_stats['sections_generated']} |
| - **Sekcji rozszerzonych**: {self.generation_stats['sections_expanded']} |
| - **Czas generowania**: {self.generation_stats['generation_time']:.1f}s |
| - **Generator**: FGI/IDI Research Analyzer v1.0 |
| """ |
| |
| return final_report + metadata |
| |
| except Exception as e: |
| st.error(f"❌ Błąd finalnego scalenia: {e}") |
| |
| return self._create_fallback_report(sections, brief, interview_type) |
| |
| def _call_gpt(self, prompt: str, max_tokens: int = 3000) -> str: |
| """Wywołanie GPT API z error handling""" |
| try: |
| response = self.client.chat.completions.create( |
| model=MODEL_SETTINGS['gpt']['model'], |
| messages=[ |
| {"role": "system", "content": "Jesteś ekspertem analizy badań jakościowych. Tworzysz profesjonalne, szczegółowe raporty badawcze."}, |
| {"role": "user", "content": prompt} |
| ], |
| temperature=MODEL_SETTINGS['gpt']['temperature'], |
| max_tokens=max_tokens |
| ) |
| |
| |
| if hasattr(response, 'usage'): |
| self.generation_stats['total_tokens_used'] += response.usage.total_tokens |
| |
| self.generation_stats['total_cost_estimate'] += (response.usage.total_tokens / 1000) * 0.00015 |
| |
| return response.choices[0].message.content |
| |
| except Exception as e: |
| if "rate limit" in str(e).lower(): |
| st.warning("⏳ Rate limit - czekam 60s...") |
| time.sleep(60) |
| return self._call_gpt(prompt, max_tokens) |
| else: |
| raise e |
| |
| def _parse_outline(self, outline_text: str) -> Dict: |
| """Parsuj outline z odpowiedzi GPT""" |
| outline = {} |
| current_section = None |
| |
| for line in outline_text.split('\n'): |
| line = line.strip() |
| |
| if line.startswith('## '): |
| |
| current_section = line[3:].strip() |
| outline[current_section] = [] |
| elif line.startswith('- ') and current_section: |
| |
| outline[current_section].append(line[2:].strip()) |
| |
| return outline |
| |
| def _create_fallback_report(self, sections: Dict, brief: str, interview_type: str) -> str: |
| """Fallback raport jeśli final assembly nie zadziała""" |
| report_parts = [ |
| f"# RAPORT Z BADANIA {INTERVIEW_TYPES.get(interview_type, 'INTERVIEW').upper()}", |
| f"\n**Data**: {datetime.now().strftime('%Y-%m-%d')}", |
| f"**Brief**: {brief or 'Brak szczegółowego briefu'}", |
| "\n---\n" |
| ] |
| |
| for title, content in sections.items(): |
| report_parts.append(f"## {title}\n\n{content}\n\n") |
| |
| return "\n".join(report_parts) |
| |
| def _log_generation_stats(self): |
| """Wyświetl statystyki generowania""" |
| stats = self.generation_stats |
| |
| st.info(f""" |
| 📊 **Statystyki generowania:** |
| - Sekcji: {stats['sections_generated']} wygenerowanych, {stats['sections_expanded']} rozszerzonych |
| - Tokeny: ~{stats['total_tokens_used']:,} |
| - Koszt: ~${stats['total_cost_estimate']:.4f} |
| - Czas: {stats['generation_time']:.1f}s |
| """) |
| |
| def evaluate_section_quality(self, section_content: str) -> Dict: |
| """Oceń jakość sekcji (dla debugowania)""" |
| try: |
| prompt = REPORT_PROMPTS['quality_checker'].format(section=section_content) |
| evaluation = self._call_gpt(prompt, max_tokens=500) |
| |
| |
| lines = evaluation.split('\n') |
| scores = {} |
| |
| for line in lines: |
| if ':' in line and '/10' in line: |
| criterion = line.split(':')[0].strip() |
| score = line.split(':')[1].strip().split('/')[0] |
| try: |
| scores[criterion] = int(score) |
| except: |
| pass |
| |
| needs_improvement = 'TAK' in evaluation.upper() |
| |
| return { |
| 'scores': scores, |
| 'needs_improvement': needs_improvement, |
| 'evaluation_text': evaluation |
| } |
| |
| except Exception as e: |
| return {'error': str(e)} |
| |
| def get_generation_stats(self) -> Dict: |
| """Zwróć statystyki generowania""" |
| return self.generation_stats.copy() |
|
|
| |
| def estimate_report_length(transcriptions: Dict[str, str]) -> Dict: |
| """Estymuj długość finalnego raportu""" |
| total_words = sum(len(text.split()) for text in transcriptions.values()) |
| |
| |
| estimated_report_words = int(total_words * 0.2) |
| estimated_pages = estimated_report_words / 250 |
| |
| return { |
| 'transcription_words': total_words, |
| 'estimated_report_words': estimated_report_words, |
| 'estimated_pages': estimated_pages, |
| 'estimated_generation_time': len(transcriptions) * 120 |
| } |
|
|
| |
| if __name__ == "__main__": |
| print("🧪 Test ReportGenerator") |
| |
| |
| try: |
| generator = ReportGenerator("test-key") |
| print("✅ ReportGenerator zainicjalizowany") |
| |
| |
| test_transcriptions = { |
| "test1.mp3": "To jest przykładowa transkrypcja wywiadu. " * 100, |
| "test2.mp3": "To jest druga transkrypcja z badania. " * 150 |
| } |
| |
| estimates = estimate_report_length(test_transcriptions) |
| print(f"📊 Estymacja: {estimates['estimated_report_words']} słów, {estimates['estimated_pages']:.1f} stron") |
| |
| except Exception as e: |
| print(f"❌ Błąd testu: {e}") |
| |
| print("✅ Test zakończony") |