#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 국민의힘 크롤러 - 고성능 비동기 버전 + 허깅페이스 자동 업로드 - asyncio + aiohttp (10-20배 빠른 속도) - 동시 요청 수 제어 (서버 부담 최소화) - 증분 업데이트 (마지막 날짜 이후만 크롤링) - 허깅페이스 자동 업로드 - 일 단위 스케줄링 """ import os import json import re import asyncio from datetime import datetime, timedelta from typing import List, Dict, Optional import pandas as pd from tqdm.asyncio import tqdm as async_tqdm import aiohttp from bs4 import BeautifulSoup from dotenv import load_dotenv from huggingface_hub import HfApi, login from datasets import Dataset, load_dataset # .env 파일 로드 load_dotenv() class PPPAsyncCrawler: def __init__(self, config_path="crawler_config.json"): self.base_url = "https://www.peoplepowerparty.kr" self.party_name = "국민의힘" self.config_path = config_path self.state_path = "crawler_state.json" # 설정 로드 self.load_config() # 허깅페이스 설정 self.hf_token = os.getenv("HF_TOKEN") self.hf_repo_id = os.getenv("HF_REPO_ID_PPP", "ppp-press-releases") # 동시 요청 수 제한 self.semaphore = asyncio.Semaphore(20) def load_config(self): """설정 파일 로드""" default_config = { "boards": { "대변인_논평보도자료": "BBSDD0001", "원내_보도자료": "BBSDD0002", "미디어특위_보도자료": "BBSDD0042" }, "start_date": "2000-03-10", "max_pages": 10000, "concurrent_requests": 20, "request_delay": 0.1, "output_path": "./data" } if os.path.exists(self.config_path): with open(self.config_path, 'r', encoding='utf-8') as f: config = json.load(f) # 국민의힘 설정만 추출 if 'ppp' in config: self.config = config['ppp'] else: self.config = default_config else: self.config = default_config self.boards = self.config["boards"] self.start_date = self.config["start_date"] self.max_pages = self.config["max_pages"] self.output_path = self.config["output_path"] def load_state(self) -> Dict: """크롤러 상태 로드""" if os.path.exists(self.state_path): with open(self.state_path, 'r', encoding='utf-8') as f: state = json.load(f) return state.get('ppp', {}) return {} def save_state(self, state: Dict): """크롤러 상태 저장""" all_state = {} if os.path.exists(self.state_path): with open(self.state_path, 'r', encoding='utf-8') as f: all_state = json.load(f) all_state['ppp'] = state with open(self.state_path, 'w', encoding='utf-8') as f: json.dump(all_state, f, ensure_ascii=False, indent=2) @staticmethod def parse_date(date_str: str) -> Optional[datetime]: """날짜 파싱""" try: return datetime.strptime(date_str.strip(), '%Y-%m-%d') except: return None @staticmethod def clean_text(text: str) -> str: """텍스트 정리""" text = text.replace('\xa0', '').replace('\u200b', '').replace('​', '') return text.strip() async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str, max_retries: int = 3) -> Optional[str]: """재시도 로직이 있는 비동기 요청""" async with self.semaphore: for attempt in range(max_retries): try: await asyncio.sleep(self.config.get("request_delay", 0.1)) async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response: if response.status == 200: return await response.text() except Exception as e: if attempt < max_retries - 1: await asyncio.sleep(1) else: return None return None async def fetch_list_page(self, session: aiohttp.ClientSession, board_id: str, page_num: int, start_date: datetime, end_date: datetime) -> tuple: """목록 페이지 하나 가져오기""" url = f"{self.base_url}/news/comment/{board_id}?page={page_num}" html = await self.fetch_with_retry(session, url) if not html: return [], False soup = BeautifulSoup(html, 'html.parser') table_div = soup.find('div', {'class': 'board-tbl'}) if not table_div: return [], True tbody = table_div.find('tbody') if not tbody: return [], True rows = tbody.find_all('tr') if not rows: return [], True data = [] stop_flag = False for row in rows: cols = row.find_all('td') if len(cols) < 3: continue try: no_td = row.find('td', {'class': 'no'}) class_td = row.find('td', {'class': 'class'}) no = no_td.get_text(strip=True) if no_td else cols[0].get_text(strip=True) section = class_td.get_text(strip=True) if class_td else cols[1].get_text(strip=True) link_tag = row.find('a') if not link_tag: continue title = link_tag.get_text(strip=True).replace('\n', ' ') article_url = self.base_url + link_tag.get('href', '') # 날짜 추출 date_str = "" if len(cols) >= 4: date_str = cols[3].get_text(strip=True) if not date_str or not re.match(r'\d{4}-\d{2}-\d{2}', date_str): dd_date = row.find('dd', {'class': 'date'}) if dd_date: span = dd_date.find('span') if span: span.decompose() date_str = dd_date.get_text(strip=True) article_date = self.parse_date(date_str) if not article_date: continue if article_date < start_date: stop_flag = True break if article_date > end_date: continue data.append({ 'no': no, 'section': section, 'title': title, 'date': date_str, 'url': article_url }) except: continue return data, stop_flag async def fetch_article_detail(self, session: aiohttp.ClientSession, url: str) -> Dict: """상세 페이지 가져오기""" html = await self.fetch_with_retry(session, url) if not html: return {'text': "본문 조회 실패", 'writer': ""} soup = BeautifulSoup(html, 'html.parser') text_parts = [] writer = "" conts_tag = soup.select_one('dd.conts') if conts_tag: hwp_div = conts_tag.find('div', {'id': 'hwpEditorBoardContent'}) if hwp_div: hwp_div.decompose() p_tags = conts_tag.find_all('p') for p in p_tags: style = p.get('style', '') is_center = 'text-align:center' in style.replace(' ', '').lower() raw_text = p.get_text(strip=True) cleaned_text = self.clean_text(raw_text) if not cleaned_text: continue if is_center: if not re.match(r'\d{4}\.\s*\d{1,2}\.\s*\d{1,2}', cleaned_text): writer = cleaned_text else: text_parts.append(cleaned_text) return {'text': '\n'.join(text_parts), 'writer': writer} async def collect_board(self, board_name: str, board_id: str, start_date: str, end_date: str) -> List[Dict]: """한 게시판 전체 수집 (비동기)""" start_dt = datetime.strptime(start_date, '%Y-%m-%d') end_dt = datetime.strptime(end_date, '%Y-%m-%d') print(f"\n▶ [{board_name}] 목록 수집 시작...") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'ko-KR,ko;q=0.9', } async with aiohttp.ClientSession(headers=headers) as session: # 1단계: 목록 페이지 수집 all_items = [] page_num = 1 empty_pages = 0 max_empty_pages = 3 with async_tqdm(desc=f"[{board_name}] 목록", unit="페이지") as pbar: while page_num <= self.max_pages: items, stop_flag = await self.fetch_list_page( session, board_id, page_num, start_dt, end_dt ) if not items: empty_pages += 1 if empty_pages >= max_empty_pages or stop_flag: break else: empty_pages = 0 all_items.extend(items) pbar.update(1) pbar.set_postfix({"수집": len(all_items)}) if stop_flag: break page_num += 1 print(f" ✓ {len(all_items)}개 항목 발견") # 2단계: 상세 페이지 수집 (병렬 처리) if all_items: print(f" ▶ 상세 페이지 수집 중...") tasks = [self.fetch_article_detail(session, item['url']) for item in all_items] details = [] for coro in async_tqdm(asyncio.as_completed(tasks), total=len(tasks), desc=f"[{board_name}] 상세"): detail = await coro details.append(detail) # 상세 정보 병합 for item, detail in zip(all_items, details): item.update(detail) item['board_name'] = board_name print(f"✓ [{board_name}] 완료: {len(all_items)}개") return all_items async def collect_all(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame: """모든 게시판 수집""" if not end_date: end_date = datetime.now().strftime('%Y-%m-%d') if not start_date: start_date = self.start_date print(f"\n{'='*60}") print(f"국민의힘 보도자료 수집 - 비동기 고성능 버전") print(f"기간: {start_date} ~ {end_date}") print(f"{'='*60}") # 모든 게시판 병렬 수집 tasks = [ self.collect_board(board_name, board_id, start_date, end_date) for board_name, board_id in self.boards.items() ] results = await asyncio.gather(*tasks) # 데이터 결합 all_data = [] for items in results: all_data.extend(items) if not all_data: print("\n⚠️ 수집된 데이터 없음") return pd.DataFrame() df = pd.DataFrame(all_data) df = df[['board_name', 'no', 'title', 'section', 'date', 'writer', 'text', 'url']] df = df[(df['title'] != "") & (df['text'] != "")] df['date'] = pd.to_datetime(df['date'], errors='coerce') print(f"\n✓ 총 {len(df)}개 수집 완료") return df def save_local(self, df: pd.DataFrame): """로컬에 저장""" os.makedirs(self.output_path, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # CSV csv_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.csv") df.to_csv(csv_path, index=False, encoding='utf-8-sig') # Excel xlsx_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.xlsx") df.to_excel(xlsx_path, index=False, engine='openpyxl') print(f"✓ CSV: {csv_path}") print(f"✓ Excel: {xlsx_path}") def upload_to_huggingface(self, df: pd.DataFrame): """허깅페이스에 업로드""" if not self.hf_token: print("\n⚠️ HF_TOKEN이 설정되지 않았습니다. .env 파일을 확인하세요.") return print(f"\n▶ 허깅페이스 업로드 중... (repo: {self.hf_repo_id})") try: login(token=self.hf_token) api = HfApi() new_dataset = Dataset.from_pandas(df) # 기존 데이터셋 확인 및 병합 try: existing_dataset = load_dataset(self.hf_repo_id, split='train') print(f" ℹ️ 기존 데이터: {len(existing_dataset)}개") existing_df = existing_dataset.to_pandas() combined_df = pd.concat([existing_df, df], ignore_index=True) combined_df = combined_df.drop_duplicates(subset=['url'], keep='last') combined_df = combined_df.sort_values('date', ascending=False).reset_index(drop=True) final_dataset = Dataset.from_pandas(combined_df) print(f" ✓ 병합 후: {len(final_dataset)}개 (중복 제거됨)") except: print(f" ℹ️ 신규 데이터셋 생성") final_dataset = new_dataset final_dataset.push_to_hub(self.hf_repo_id, token=self.hf_token) print(f"✓ 허깅페이스 업로드 완료!") print(f" 🔗 https://huggingface.co/datasets/{self.hf_repo_id}") except Exception as e: print(f"✗ 업로드 실패: {e}") async def run_incremental(self): """증분 업데이트 실행""" state = self.load_state() last_date = state.get('last_crawl_date') if last_date: start_date = (datetime.strptime(last_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d') print(f"📅 증분 업데이트: {start_date} 이후 데이터만 수집") else: start_date = self.start_date print(f"📅 전체 수집: {start_date}부터") end_date = datetime.now().strftime('%Y-%m-%d') # 크롤링 df = await self.collect_all(start_date, end_date) if df.empty: print("✓ 새로운 데이터 없음") return # 로컬 저장 self.save_local(df) # 허깅페이스 업로드 self.upload_to_huggingface(df) # 상태 저장 state['last_crawl_date'] = end_date state['last_crawl_time'] = datetime.now().isoformat() state['last_count'] = len(df) self.save_state(state) print(f"\n{'='*60}") print(f"✓ 완료!") print(f"{'='*60}\n") async def main(): """메인 함수""" crawler = PPPAsyncCrawler() await crawler.run_incremental() if __name__ == "__main__": asyncio.run(main())