#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 개혁신당 크롤러 - 고성능 비동기 버전 + 허깅페이스 자동 업로드 - 그누보드 5 기반 사이트 (reformparty.kr) - td.td_subject / td.td_datetime / div#bo_v_con 구조 """ import os import json import re import asyncio from datetime import datetime, timedelta from typing import List, Dict, Optional import pandas as pd from tqdm.asyncio import tqdm as async_tqdm import aiohttp from bs4 import BeautifulSoup from dotenv import load_dotenv from huggingface_hub import HfApi, login from datasets import Dataset, load_dataset load_dotenv() class ReformAsyncCrawler: def __init__(self, config_path="crawler_config.json"): self.base_url = "https://www.reformparty.kr" self.party_name = "개혁신당" self.config_path = config_path self.state_path = "crawler_state.json" self.load_config() self.hf_token = os.getenv("HF_TOKEN") self.hf_repo_id = os.getenv("HF_REPO_ID_REFORM", "reform-press-releases") self.semaphore = asyncio.Semaphore(10) def load_config(self): default_config = { "boards": { "보도자료": "press", "논평브리핑": "briefing" }, "start_date": "2024-02-13", "max_pages": 10000, "concurrent_requests": 10, "request_delay": 0.3, "output_path": "./data" } if os.path.exists(self.config_path): with open(self.config_path, 'r', encoding='utf-8') as f: config = json.load(f) self.config = config.get('reform', default_config) else: self.config = default_config self.boards = self.config["boards"] self.start_date = self.config["start_date"] self.max_pages = self.config["max_pages"] self.output_path = self.config["output_path"] def load_state(self) -> Dict: if os.path.exists(self.state_path): with open(self.state_path, 'r', encoding='utf-8') as f: state = json.load(f) return state.get('reform', {}) return {} def save_state(self, state: Dict): all_state = {} if os.path.exists(self.state_path): with open(self.state_path, 'r', encoding='utf-8') as f: all_state = json.load(f) all_state['reform'] = state with open(self.state_path, 'w', encoding='utf-8') as f: json.dump(all_state, f, ensure_ascii=False, indent=2) @staticmethod def parse_date(date_str: str) -> Optional[datetime]: """YYYY-MM-DD HH:MM:SS 또는 YYYY-MM-DD 파싱""" try: return datetime.strptime(date_str.strip()[:10], '%Y-%m-%d') except: return None @staticmethod def clean_text(text: str) -> str: text = text.replace('\xa0', '').replace('\u200b', '').replace('​', '') return text.strip() async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str, max_retries: int = 3) -> Optional[str]: async with self.semaphore: for attempt in range(max_retries): try: await asyncio.sleep(self.config.get("request_delay", 0.3)) async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response: if response.status == 200: return await response.text() except Exception: if attempt < max_retries - 1: await asyncio.sleep(1) else: return None return None async def fetch_list_page(self, session: aiohttp.ClientSession, board_name: str, board_slug: str, page_num: int, start_date: datetime, end_date: datetime) -> tuple: url = f"{self.base_url}/{board_slug}?page={page_num}" html = await self.fetch_with_retry(session, url) if not html: return [], False soup = BeautifulSoup(html, 'html.parser') rows = soup.select('table tbody tr') if not rows: return [], True data = [] stop_flag = False for row in rows: try: # 제목·URL: td.td_subject div.bo_tit a title_a = row.select_one('td.td_subject div.bo_tit a') if not title_a: continue title = title_a.get_text(strip=True) href = title_a.get('href', '') # page 파라미터 제거 후 절대 URL article_url = self.base_url + re.sub(r'\?.*$', '', href) # 날짜: td.td_datetime (YYYY-MM-DD HH:MM:SS) date_td = row.select_one('td.td_datetime') if not date_td: continue date_str = date_td.get_text(strip=True)[:10] # 카테고리: td.td_cate a.bo_cate cate_a = row.select_one('td.td_cate a.bo_cate') category = cate_a.get_text(strip=True) if cate_a else "" article_date = self.parse_date(date_str) if not article_date: continue if article_date < start_date: stop_flag = True break if article_date > end_date: continue data.append({ 'board_name': board_name, 'title': title, 'category': category, 'date': date_str, 'url': article_url }) except: continue return data, stop_flag async def fetch_article_detail(self, session: aiohttp.ClientSession, url: str) -> Dict: html = await self.fetch_with_retry(session, url) if not html: return {'text': "본문 조회 실패", 'writer': ""} soup = BeautifulSoup(html, 'html.parser') text_parts = [] writer = "" # 본문: div#bo_v_con contents_div = soup.find('div', id='bo_v_con') if contents_div: for p in contents_div.find_all('p'): cleaned = self.clean_text(p.get_text(strip=True)) if cleaned: text_parts.append(cleaned) # 작성자: p.name span.content span.sv_member writer_el = soup.select_one('p.name span.content span.sv_member') if writer_el: writer = writer_el.get_text(strip=True) return {'text': '\n'.join(text_parts), 'writer': writer} async def collect_board(self, board_name: str, board_slug: str, start_date: str, end_date: str) -> List[Dict]: start_dt = datetime.strptime(start_date, '%Y-%m-%d') end_dt = datetime.strptime(end_date, '%Y-%m-%d') print(f"\n▶ [{board_name}] 목록 수집 시작...") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'ko-KR,ko;q=0.9', } async with aiohttp.ClientSession(headers=headers) as session: all_items = [] page_num = 1 empty_pages = 0 max_empty_pages = 3 with async_tqdm(desc=f"[{board_name}] 목록", unit="페이지") as pbar: while page_num <= self.max_pages: items, stop_flag = await self.fetch_list_page( session, board_name, board_slug, page_num, start_dt, end_dt ) if not items: empty_pages += 1 if empty_pages >= max_empty_pages or stop_flag: break else: empty_pages = 0 all_items.extend(items) pbar.update(1) pbar.set_postfix({"수집": len(all_items)}) if stop_flag: break page_num += 1 print(f" ✓ {len(all_items)}개 항목 발견") if all_items: print(f" ▶ 상세 페이지 수집 중...") tasks = [self.fetch_article_detail(session, item['url']) for item in all_items] details = [] for coro in async_tqdm(asyncio.as_completed(tasks), total=len(tasks), desc=f"[{board_name}] 상세"): detail = await coro details.append(detail) for item, detail in zip(all_items, details): item.update(detail) print(f"✓ [{board_name}] 완료: {len(all_items)}개") return all_items async def collect_all(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame: if not end_date: end_date = datetime.now().strftime('%Y-%m-%d') if not start_date: start_date = self.start_date print(f"\n{'='*60}") print(f"개혁신당 보도자료 수집 - 비동기 고성능 버전") print(f"기간: {start_date} ~ {end_date}") print(f"{'='*60}") tasks = [ self.collect_board(board_name, board_slug, start_date, end_date) for board_name, board_slug in self.boards.items() ] results = await asyncio.gather(*tasks) all_data = [] for items in results: all_data.extend(items) if not all_data: print("\n⚠️ 수집된 데이터 없음") return pd.DataFrame() df = pd.DataFrame(all_data) df = df[['board_name', 'title', 'category', 'date', 'writer', 'text', 'url']] df = df[(df['title'] != "") & (df['text'] != "")] df['date'] = pd.to_datetime(df['date'], errors='coerce') print(f"\n✓ 총 {len(df)}개 수집 완료") return df def save_local(self, df: pd.DataFrame): os.makedirs(self.output_path, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') csv_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.csv") xlsx_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.xlsx") df.to_csv(csv_path, index=False, encoding='utf-8-sig') df.to_excel(xlsx_path, index=False, engine='openpyxl') print(f"✓ CSV: {csv_path}") print(f"✓ Excel: {xlsx_path}") def upload_to_huggingface(self, df: pd.DataFrame): if not self.hf_token: print("\n⚠️ HF_TOKEN이 설정되지 않았습니다.") return print(f"\n▶ 허깅페이스 업로드 중... (repo: {self.hf_repo_id})") try: login(token=self.hf_token) new_dataset = Dataset.from_pandas(df) try: existing_dataset = load_dataset(self.hf_repo_id, split='train') existing_df = existing_dataset.to_pandas() combined_df = pd.concat([existing_df, df], ignore_index=True) combined_df = combined_df.drop_duplicates(subset=['url'], keep='last') combined_df = combined_df.sort_values('date', ascending=False).reset_index(drop=True) final_dataset = Dataset.from_pandas(combined_df) print(f" ✓ 병합 후: {len(final_dataset)}개") except: final_dataset = new_dataset print(f" ℹ️ 신규 데이터셋 생성") final_dataset.push_to_hub(self.hf_repo_id, token=self.hf_token) print(f"✓ 허깅페이스 업로드 완료!") except Exception as e: print(f"✗ 업로드 실패: {e}") async def run_incremental(self): state = self.load_state() last_date = state.get('last_crawl_date') if last_date: start_date = (datetime.strptime(last_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d') print(f"📅 증분 업데이트: {start_date} 이후 데이터만 수집") else: start_date = self.start_date print(f"📅 전체 수집: {start_date}부터") end_date = datetime.now().strftime('%Y-%m-%d') df = await self.collect_all(start_date, end_date) if df.empty: print("✓ 새로운 데이터 없음") return self.save_local(df) self.upload_to_huggingface(df) state['last_crawl_date'] = end_date state['last_crawl_time'] = datetime.now().isoformat() state['last_count'] = len(df) self.save_state(state) print(f"\n{'='*60}\n✓ 완료!\n{'='*60}\n") async def main(): crawler = ReformAsyncCrawler() await crawler.run_incremental() if __name__ == "__main__": asyncio.run(main())