| """ |
| 파일 처리 및 API 관련 함수 |
| - 캐시 시스템 통합 버전 |
| """ |
| import os |
| import re |
| import zlib |
| import zipfile |
| import tempfile |
| import requests |
| from pathlib import Path |
| from typing import Optional, Tuple, List, Dict, Generator |
| from xml.etree import ElementTree as ET |
| from datetime import datetime |
|
|
| from utils import ( |
| API_URL, API_KEY, GROQ_API_KEY, CATEGORY_CODES, |
| OLEFILE_AVAILABLE, PYPDF2_AVAILABLE, PDFPLUMBER_AVAILABLE, GROQ_AVAILABLE, |
| extract_region_from_text, extract_region_from_hashtags, classify_org_type, |
| parse_deadline, is_ongoing |
| ) |
|
|
| if OLEFILE_AVAILABLE: |
| import olefile |
| if PYPDF2_AVAILABLE: |
| import PyPDF2 |
| if PDFPLUMBER_AVAILABLE: |
| import pdfplumber |
| if GROQ_AVAILABLE: |
| from groq import Groq |
|
|
| import pandas as pd |
| from bs4 import BeautifulSoup |
|
|
|
|
| |
| |
| |
| try: |
| from cache_db import ( |
| get_cache, get_cached_announcements, sync_from_api, |
| manual_sync, get_sync_status, initialize_cache_system |
| ) |
| CACHE_AVAILABLE = True |
| except ImportError: |
| CACHE_AVAILABLE = False |
| print("Warning: cache_db module not available") |
|
|
|
|
| |
| |
| |
| def extract_text_from_hwpx(file_path: str) -> Tuple[Optional[str], Optional[str]]: |
| """HWPX 파일에서 텍스트 추출""" |
| try: |
| text_parts = [] |
| with zipfile.ZipFile(file_path, 'r') as zf: |
| file_list = zf.namelist() |
| section_files = sorted([f for f in file_list if f.startswith('Contents/section') and f.endswith('.xml')]) |
| if not section_files: |
| section_files = sorted([f for f in file_list if 'section' in f.lower() and f.endswith('.xml')]) |
| for section_file in section_files: |
| try: |
| with zf.open(section_file) as sf: |
| content = sf.read() |
| content_str = content.decode('utf-8') |
| content_str = re.sub(r'\sxmlns[^"]*"[^"]*"', '', content_str) |
| content_str = re.sub(r'<[a-zA-Z]+:', '<', content_str) |
| content_str = re.sub(r'</[a-zA-Z]+:', '</', content_str) |
| try: |
| root = ET.fromstring(content_str) |
| texts = [] |
| for elem in root.iter(): |
| if elem.tag.endswith('t') or elem.tag == 't': |
| if elem.text: |
| texts.append(elem.text) |
| elif elem.text and elem.text.strip(): |
| if any(x in elem.tag.lower() for x in ['text', 'run', 'para', 'char']): |
| texts.append(elem.text.strip()) |
| if texts: |
| text_parts.append(' '.join(texts)) |
| except ET.ParseError: |
| text_matches = re.findall(r'>([^<]+)<', content.decode('utf-8', errors='ignore')) |
| clean_texts = [t.strip() for t in text_matches if t.strip() and len(t.strip()) > 1] |
| if clean_texts: |
| text_parts.append(' '.join(clean_texts)) |
| except: |
| continue |
| if text_parts: |
| result = '\n\n'.join(text_parts) |
| result = re.sub(r'\s+', ' ', result) |
| result = re.sub(r'\n{3,}', '\n\n', result) |
| return result.strip(), None |
| return None, "HWPX에서 텍스트를 찾을 수 없습니다" |
| except zipfile.BadZipFile: |
| return None, "유효하지 않은 HWPX 파일" |
| except Exception as e: |
| return None, f"HWPX 처리 오류: {str(e)}" |
|
|
|
|
| def extract_hwp_section_text(data: bytes) -> Optional[str]: |
| """HWP 섹션 데이터에서 텍스트 추출""" |
| texts = [] |
| pos = 0 |
| while pos < len(data) - 4: |
| try: |
| header = int.from_bytes(data[pos:pos+4], 'little') |
| tag_id = header & 0x3FF |
| size = (header >> 20) & 0xFFF |
| pos += 4 |
| if size == 0xFFF: |
| if pos + 4 > len(data): |
| break |
| size = int.from_bytes(data[pos:pos+4], 'little') |
| pos += 4 |
| if pos + size > len(data): |
| break |
| record_data = data[pos:pos+size] |
| pos += size |
| if tag_id == 67 and size > 0: |
| text = decode_para_text(record_data) |
| if text: |
| texts.append(text) |
| except: |
| pos += 1 |
| continue |
| return '\n'.join(texts) if texts else None |
|
|
|
|
| def decode_para_text(data: bytes) -> Optional[str]: |
| """HWP 문단 텍스트 디코딩""" |
| result = [] |
| i = 0 |
| while i < len(data) - 1: |
| code = int.from_bytes(data[i:i+2], 'little') |
| if code == 0: |
| pass |
| elif code in [1, 2, 3]: |
| i += 14 |
| elif code == 9: |
| result.append('\t') |
| elif code in [10, 13]: |
| result.append('\n') |
| elif code == 24: |
| result.append('-') |
| elif code in [30, 31]: |
| result.append(' ') |
| elif code >= 32: |
| try: |
| char = chr(code) |
| if char.isprintable() or char in '\n\t ': |
| result.append(char) |
| except: |
| pass |
| i += 2 |
| text = ''.join(result).strip() |
| text = re.sub(r'[ \t]+', ' ', text) |
| text = re.sub(r'\n{3,}', '\n\n', text) |
| return text if len(text) > 2 else None |
|
|
|
|
| def extract_text_from_hwp(file_path: str) -> Tuple[Optional[str], Optional[str]]: |
| """HWP 파일에서 텍스트 추출""" |
| if not OLEFILE_AVAILABLE: |
| return None, "olefile 모듈 없음" |
| try: |
| ole = olefile.OleFileIO(file_path) |
| if not ole.exists('FileHeader'): |
| ole.close() |
| return None, "HWP 파일 헤더 없음" |
| header_data = ole.openstream('FileHeader').read() |
| is_compressed = (header_data[36] & 1) == 1 if len(header_data) > 36 else True |
| all_texts = [] |
| for entry in ole.listdir(): |
| entry_path = '/'.join(entry) |
| if entry_path.startswith('BodyText/Section'): |
| try: |
| stream_data = ole.openstream(entry).read() |
| if is_compressed: |
| try: |
| stream_data = zlib.decompress(stream_data, -15) |
| except: |
| try: |
| stream_data = zlib.decompress(stream_data) |
| except: |
| pass |
| section_text = extract_hwp_section_text(stream_data) |
| if section_text: |
| all_texts.append(section_text) |
| except: |
| continue |
| ole.close() |
| if all_texts: |
| return '\n\n'.join(all_texts).strip(), None |
| return None, "텍스트를 찾을 수 없습니다" |
| except Exception as e: |
| return None, f"olefile 오류: {str(e)}" |
|
|
|
|
| def extract_text_from_pdf(file_path: str) -> Optional[str]: |
| """PDF 파일에서 텍스트 추출""" |
| text_parts = [] |
| if PDFPLUMBER_AVAILABLE: |
| try: |
| with pdfplumber.open(file_path) as pdf: |
| for page in pdf.pages: |
| text = page.extract_text() |
| if text: |
| text_parts.append(text) |
| if text_parts: |
| return "\n\n".join(text_parts) |
| except Exception as e: |
| print(f"pdfplumber error: {e}") |
| if PYPDF2_AVAILABLE: |
| try: |
| with open(file_path, 'rb') as f: |
| reader = PyPDF2.PdfReader(f) |
| for page in reader.pages: |
| text = page.extract_text() |
| if text: |
| text_parts.append(text) |
| if text_parts: |
| return "\n\n".join(text_parts) |
| except Exception as e: |
| print(f"PyPDF2 error: {e}") |
| return None |
|
|
|
|
| def extract_text_from_file(file_path: str) -> Tuple[Optional[str], Optional[str]]: |
| """파일에서 텍스트 추출 (확장자 기반 자동 선택)""" |
| if not os.path.exists(file_path): |
| return None, "파일을 찾을 수 없습니다" |
| ext = Path(file_path).suffix.lower() |
| if ext == '.hwpx': |
| return extract_text_from_hwpx(file_path) |
| elif ext == '.hwp': |
| return extract_text_from_hwp(file_path) |
| elif ext == '.pdf': |
| text = extract_text_from_pdf(file_path) |
| if text: |
| return text, None |
| return None, "PDF에서 텍스트 추출 실패" |
| elif ext in ['.txt', '.md', '.csv']: |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| return f.read(), None |
| except: |
| try: |
| with open(file_path, 'r', encoding='cp949') as f: |
| return f.read(), None |
| except Exception as e: |
| return None, f"텍스트 파일 읽기 오류: {str(e)}" |
| else: |
| return None, f"지원하지 않는 파일 형식: {ext}" |
|
|
|
|
| def extract_zip_files(zip_path: str, extract_dir: str) -> List[str]: |
| """ZIP 파일 압축 해제""" |
| extracted_files = [] |
| try: |
| with zipfile.ZipFile(zip_path, 'r') as zf: |
| for name in zf.namelist(): |
| if name.endswith('/'): |
| continue |
| ext = Path(name).suffix.lower() |
| if ext in ['.hwp', '.hwpx', '.pdf', '.txt', '.doc', '.docx']: |
| try: |
| zf.extract(name, extract_dir) |
| extracted_files.append(os.path.join(extract_dir, name)) |
| except: |
| continue |
| except: |
| pass |
| return extracted_files |
|
|
|
|
| |
| |
| |
| def fetch_all_from_api(category: str = "전체", region: str = "전체(지역)", keyword: str = "") -> Tuple[List[Dict], str]: |
| """API에서 전체 데이터를 페이지네이션으로 수집""" |
| if not API_KEY: |
| return [], "❌ API 키가 설정되지 않았습니다. (BIZ_API 환경변수)" |
| all_items = [] |
| page_size = 100 |
| max_pages = 10 |
| headers = {"User-Agent": "Mozilla/5.0", "Accept": "application/json"} |
| hashtags = [] |
| if category and category != "전체": |
| hashtags.append(category) |
| if region and region != "전체(지역)": |
| hashtags.append(region) |
| if keyword and keyword.strip(): |
| hashtags.append(keyword.strip()) |
| |
| for page_idx in range(1, max_pages + 1): |
| try: |
| params = {"crtfcKey": API_KEY, "dataType": "json", "pageUnit": page_size, "pageIndex": page_idx} |
| if category and category != "전체" and category in CATEGORY_CODES: |
| if CATEGORY_CODES[category]: |
| params["searchLclasId"] = CATEGORY_CODES[category] |
| if hashtags: |
| params["hashtags"] = ",".join(hashtags) |
| response = requests.get(API_URL, params=params, headers=headers, timeout=(15, 60), verify=True) |
| response.raise_for_status() |
| result = response.json() |
| items = [] |
| json_array = result.get("jsonArray", result) |
| if isinstance(json_array, dict): |
| items = json_array.get("item", []) |
| if isinstance(items, dict): |
| items = [items] |
| elif isinstance(json_array, list): |
| items = json_array |
| if not items: |
| break |
| all_items.extend(items) |
| if page_idx == 1 and items: |
| total_cnt = items[0].get("totCnt", 0) if isinstance(items[0], dict) else 0 |
| try: |
| total_cnt = int(total_cnt) |
| except: |
| total_cnt = len(items) |
| needed_pages = (total_cnt + page_size - 1) // page_size |
| max_pages = min(max_pages, needed_pages) |
| if len(items) < page_size: |
| break |
| except requests.exceptions.Timeout: |
| return all_items, f"⏱️ 페이지 {page_idx} 요청 시간 초과" |
| except requests.exceptions.RequestException as e: |
| if all_items: |
| break |
| return [], f"❌ API 요청 오류: {str(e)[:50]}" |
| except Exception as e: |
| if all_items: |
| break |
| return [], f"❌ 오류: {str(e)[:50]}" |
| return all_items, "" |
|
|
|
|
| def fetch_with_cache(category: str = "전체", region: str = "전체(지역)", keyword: str = "") -> Tuple[List[Dict], str]: |
| """ |
| 캐시를 활용한 공고 조회 (개선됨) |
| - 가능하면 캐시에서 필터링 (빠름!) |
| - 키워드가 있으면 캐시에서 벡터 검색 |
| """ |
| if not CACHE_AVAILABLE: |
| return fetch_all_from_api(category, region, keyword) |
| |
| try: |
| |
| items, status = get_cached_announcements() |
| |
| if not items: |
| |
| return fetch_all_from_api(category, region, keyword) |
| |
| |
| if keyword and keyword.strip(): |
| kw = keyword.strip().lower() |
| kw_terms = [t.strip() for t in kw.replace(",", " ").split() if t.strip()] |
| matched = [] |
| for item in items: |
| |
| search_text = " ".join([ |
| str(item.get("title", "") or item.get("pblancNm", "") or ""), |
| str(item.get("description", "") or item.get("bsnsSumryCn", "") or ""), |
| str(item.get("author", "") or item.get("jrsdInsttNm", "") or ""), |
| str(item.get("lcategory", "") or item.get("pldirSportRealmLclasCodeNm", "") or item.get("category", "") or ""), |
| str(item.get("hashTags", "") or ""), |
| str(item.get("trgetNm", "") or item.get("target", "") or ""), |
| str(item.get("excInsttNm", "") or ""), |
| ]).lower() |
| |
| if all(t in search_text for t in kw_terms): |
| matched.append(item) |
| items = matched |
| status = f"🔍 '{keyword}' 검색 ({len(items)}건 매칭)" |
| |
| |
| filtered = items |
| filter_info = [] |
| |
| |
| if category and category != "전체": |
| filtered = [ |
| i for i in filtered |
| if category.lower() in (i.get("lcategory", "") or i.get("pldirSportRealmLclasCodeNm", "") or i.get("category", "") or "").lower() |
| ] |
| filter_info.append(f"분야:{category}") |
| |
| |
| if region and region != "전체(지역)": |
| region_filtered = [] |
| for i in filtered: |
| hash_tags = i.get("hashTags", "") or "" |
| author = i.get("author", "") or i.get("jrsdInsttNm", "") or "" |
| title = i.get("title", "") or i.get("pblancNm", "") or "" |
| |
| if region in hash_tags or region in author or region in title: |
| region_filtered.append(i) |
| filtered = region_filtered |
| filter_info.append(f"지역:{region}") |
| |
| filter_str = ", ".join(filter_info) if filter_info else "전체" |
| result_status = f"⚡ 캐시에서 {len(filtered)}건 필터링 ({filter_str})" |
| |
| return filtered, result_status |
| |
| except Exception as e: |
| print(f"Cache filter error: {e}") |
| |
| return fetch_all_from_api(category, region, keyword) |
|
|
|
|
| def download_file(url: str, save_dir: str, hint_filename: str = None) -> Tuple[Optional[str], Optional[str]]: |
| """파일 다운로드""" |
| try: |
| headers = {"User-Agent": "Mozilla/5.0", "Accept": "*/*", "Referer": "https://www.bizinfo.go.kr/"} |
| response = requests.get(url, headers=headers, timeout=60, stream=True, verify=False, allow_redirects=True) |
| response.raise_for_status() |
| cd = response.headers.get('Content-Disposition', '') |
| filename = None |
| if cd: |
| match = re.search(r"filename\*=(?:UTF-8''|utf-8'')(.+)", cd, re.IGNORECASE) |
| if match: |
| from urllib.parse import unquote |
| filename = unquote(match.group(1)) |
| else: |
| match = re.search(r'filename=(["\']?)(.+?)\1(?:;|$)', cd) |
| if match: |
| filename = match.group(2) |
| if not filename and hint_filename: |
| filename = hint_filename |
| if not filename: |
| from urllib.parse import urlparse |
| parsed = urlparse(url) |
| filename = parsed.path.split('/')[-1] |
| if not filename or '.' not in filename: |
| content_type = response.headers.get('Content-Type', '').lower() |
| if 'pdf' in content_type: |
| filename = f"document_{hash(url) % 10000}.pdf" |
| elif 'hwp' in content_type: |
| filename = f"document_{hash(url) % 10000}.hwp" |
| else: |
| filename = f"file_{hash(url) % 10000}.bin" |
| |
| |
| |
| filename = re.sub(r'[<>:"/\\|?*]', '_', filename) |
| |
| |
| name_part, ext = os.path.splitext(filename) |
| if not ext: |
| |
| content_type = response.headers.get('Content-Type', '').lower() |
| if 'pdf' in content_type: |
| ext = '.pdf' |
| elif 'hwp' in content_type: |
| ext = '.hwp' |
| else: |
| ext = '.bin' |
| |
| |
| max_name_len = 100 - len(ext) |
| if len(name_part) > max_name_len: |
| |
| name_hash = f"_{hash(name_part) % 10000:04d}_" |
| name_part = name_part[:50] + name_hash + name_part[-30:] |
| |
| filename = name_part + ext |
| |
| |
| try: |
| filename.encode('utf-8') |
| except UnicodeEncodeError: |
| |
| filename = f"document_{hash(url) % 100000}{ext}" |
| |
| file_path = os.path.join(save_dir, filename) |
| |
| |
| if len(file_path) > 250: |
| filename = f"doc_{hash(url) % 100000}{ext}" |
| file_path = os.path.join(save_dir, filename) |
| |
| with open(file_path, 'wb') as f: |
| for chunk in response.iter_content(chunk_size=8192): |
| if chunk: |
| f.write(chunk) |
| if os.path.getsize(file_path) == 0: |
| os.remove(file_path) |
| return None, "빈 파일이 다운로드됨" |
| return file_path, None |
| except Exception as e: |
| return None, f"다운로드 실패: {str(e)}" |
|
|
|
|
| def call_groq_api_stream(messages: List[Dict]) -> Generator[str, None, None]: |
| """Groq API 스트리밍 호출""" |
| if not GROQ_AVAILABLE: |
| yield "❌ Groq 라이브러리가 설치되지 않았습니다." |
| return |
| if not GROQ_API_KEY: |
| yield "❌ GROQ_API_KEY 환경변수가 설정되지 않았습니다." |
| return |
| try: |
| client = Groq(api_key=GROQ_API_KEY) |
| completion = client.chat.completions.create( |
| model="llama-3.3-70b-versatile", |
| messages=messages, |
| temperature=0.3, |
| max_tokens=4096, |
| stream=True, |
| ) |
| for chunk in completion: |
| if chunk.choices[0].delta.content: |
| yield chunk.choices[0].delta.content |
| except Exception as e: |
| yield f"❌ API 오류: {str(e)}" |
|
|
|
|
| def fetch_announcement_detail(url: str) -> Tuple[str, List[Dict], Optional[Dict]]: |
| """공고 상세 페이지에서 본문, 첨부파일, 본문출력파일 정보 추출 |
| |
| Returns: |
| (content_text, attachments, print_file) |
| - content_text: 공고 본문 텍스트 |
| - attachments: 일반 첨부파일 리스트 (서식, 양식 등) |
| - print_file: 본문출력파일 (공고문 PDF/HWP) - AI 분석용 |
| """ |
| try: |
| |
| if url.startswith('/'): |
| url = f"https://www.bizinfo.go.kr{url}" |
| |
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
| "Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7", |
| } |
| response = requests.get(url, headers=headers, timeout=30, verify=False) |
| response.raise_for_status() |
| html_text = response.text |
| soup = BeautifulSoup(html_text, 'html.parser') |
| |
| |
| content_text = "" |
| tables = soup.find_all('table') |
| for table in tables: |
| text = table.get_text(separator='\n', strip=True) |
| if '사업개요' in text or '지원대상' in text or '신청기간' in text: |
| content_text += text + "\n\n" |
| main_content = soup.find('div', {'id': 'container'}) or soup.find('main') or soup.find('article') |
| if main_content and not content_text: |
| content_text = main_content.get_text(separator='\n', strip=True) |
| |
| attachments = [] |
| print_file = None |
| |
| |
| for a_tag in soup.find_all('a', href=True): |
| href = a_tag.get('href', '') |
| href_clean = re.sub(r';jsessionid=[^?]*', '', href) |
| |
| if 'getImageFile.do' in href_clean or 'fileDown' in href_clean or 'atchFileId' in href_clean: |
| filename = a_tag.get_text(strip=True) |
| |
| |
| if filename in ['다운로드', '바로보기', '내려받기', '']: |
| parent = a_tag.parent |
| if parent: |
| parent_text = parent.get_text(separator='|', strip=True) |
| parts = [p.strip() for p in parent_text.split('|') if p.strip()] |
| for part in parts: |
| if part not in ['다운로드', '바로보기', '내려받기'] and ('.' in part): |
| filename = part |
| break |
| title = a_tag.get('title', '') |
| if title and '첨부파일' in title: |
| match = re.search(r'첨부파일\s+(.+?)\s+다운로드', title) |
| if match: |
| filename = match.group(1) |
| |
| if not filename or filename in ['다운로드', '바로보기', '내려받기']: |
| filename = f"첨부파일_{len(attachments)+1}" |
| |
| |
| if href_clean.startswith('/'): |
| full_url = f"https://www.bizinfo.go.kr{href_clean}" |
| elif href_clean.startswith('http'): |
| full_url = href_clean |
| else: |
| continue |
| |
| ext = Path(filename).suffix.lower() |
| if not ext: |
| ext = '.unknown' |
| |
| file_info = { |
| "filename": filename, |
| "url": full_url, |
| "type": ext[1:] if ext.startswith('.') else ext |
| } |
| |
| |
| |
| is_print_file = False |
| filename_lower = filename.lower() |
| |
| |
| if re.search(r'\(제\d+[-_]?\d*호\)', filename): |
| is_print_file = True |
| |
| elif any(kw in filename for kw in ['공고문', '모집공고', '공고(안)', '공고 안', '_공고_', '_공고.']): |
| is_print_file = True |
| |
| elif '본문출력' in filename or '본문 출력' in filename: |
| is_print_file = True |
| |
| parent = a_tag.parent |
| grandparent = parent.parent if parent else None |
| for ancestor in [parent, grandparent]: |
| if ancestor: |
| ancestor_text = ancestor.get_text(strip=True) |
| if '본문출력파일' in ancestor_text or '본문출력 파일' in ancestor_text: |
| is_print_file = True |
| break |
| |
| if is_print_file and not print_file: |
| print_file = file_info |
| elif not any(att['url'] == full_url for att in attachments): |
| |
| attachments.append(file_info) |
| |
| return content_text, attachments, print_file |
| |
| except Exception as e: |
| import traceback |
| return f"상세 정보 조회 실패: {str(e)}\n{traceback.format_exc()}", [], None |