#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 더불어민주당 크롤러 - 고성능 비동기 버전 + 허깅페이스 자동 업로드 - asyncio + aiohttp (10-20배 빠른 속도) - 동시 요청 수 제어 (서버 부담 최소화) - 증분 업데이트 (마지막 날짜 이후만 크롤링) - 허깅페이스 자동 업로드 - 일 단위 스케줄링 """ import os import json import time import re import asyncio from datetime import datetime, timedelta from typing import List, Dict, Optional import pandas as pd from tqdm.asyncio import tqdm as async_tqdm import aiohttp from bs4 import BeautifulSoup from dotenv import load_dotenv from huggingface_hub import HfApi, login from datasets import Dataset, load_dataset, concatenate_datasets # .env 파일 로드 load_dotenv() class MinjooAsyncCrawler: def __init__(self, config_path="crawler_config.json"): self.base_url = "https://theminjoo.kr/main/sub" self.party_name = "더불어민주당" self.config_path = config_path self.state_path = "crawler_state.json" # 설정 로드 self.load_config() # 허깅페이스 설정 self.hf_token = os.getenv("HF_TOKEN") self.hf_repo_id = os.getenv("HF_REPO_ID", "minjoo-press-releases") # 동시 요청 수 제한 (서버 부담 방지) self.semaphore = asyncio.Semaphore(20) def load_config(self): """설정 파일 로드""" default_config = { "boards": { "보도자료": "188", "논평_브리핑": "11", "모두발언": "230" }, "start_date": "2003-11-11", "max_pages": 10000, "concurrent_requests": 20, "request_delay": 0.1, "output_path": "./data" } if os.path.exists(self.config_path): with open(self.config_path, 'r', encoding='utf-8') as f: config = json.load(f) # 민주당 설정만 추출 if 'minjoo' in config: self.config = config['minjoo'] else: self.config = default_config else: self.config = default_config self.boards = self.config["boards"] self.start_date = self.config["start_date"] self.max_pages = self.config["max_pages"] self.output_path = self.config["output_path"] def load_state(self) -> Dict: """크롤러 상태 로드 (마지막 크롤링 날짜)""" if os.path.exists(self.state_path): with open(self.state_path, 'r', encoding='utf-8') as f: state = json.load(f) return state.get('minjoo', {}) return {} def save_state(self, state: Dict): """크롤러 상태 저장""" all_state = {} if os.path.exists(self.state_path): with open(self.state_path, 'r', encoding='utf-8') as f: all_state = json.load(f) all_state['minjoo'] = state with open(self.state_path, 'w', encoding='utf-8') as f: json.dump(all_state, f, ensure_ascii=False, indent=2) @staticmethod def parse_date(date_str: str) -> Optional[datetime]: """날짜 파싱""" try: return datetime.strptime(date_str.strip().split()[0], '%Y-%m-%d') except: return None @staticmethod def clean_text(text: str) -> str: """텍스트 정리""" text = text.replace('\xa0', '').replace('\u200b', '').replace('​', '') return text.strip() async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str, max_retries: int = 3) -> Optional[str]: """재시도 로직이 있는 비동기 요청""" async with self.semaphore: for attempt in range(max_retries): try: await asyncio.sleep(self.config.get("request_delay", 0.1)) async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response: if response.status == 200: return await response.text() except Exception as e: if attempt < max_retries - 1: await asyncio.sleep(1) else: return None return None async def fetch_list_page(self, session: aiohttp.ClientSession, board_id: str, page_num: int, start_date: datetime, end_date: datetime) -> tuple: """목록 페이지 하나 가져오기""" if page_num == 0: url = f"{self.base_url}/news/list.php?brd={board_id}" else: url = f"{self.base_url}/news/list.php?sno={page_num}&par=&&brd={board_id}" html = await self.fetch_with_retry(session, url) if not html: return [], False soup = BeautifulSoup(html, 'html.parser') board_items = soup.find_all('div', {'class': 'board-item'}) if not board_items: return [], True # 빈 페이지 data = [] stop_flag = False for item in board_items: try: link_tag = item.find('a') if not link_tag: continue title_span = link_tag.find('span') if not title_span: continue title = title_span.get_text(strip=True).replace('\n', ' ') # URL 처리 article_url = link_tag.get('href', '') if article_url.startswith('./'): article_url = self.base_url + '/news/' + article_url[2:] elif not article_url.startswith('http'): article_url = self.base_url + article_url # 카테고리 category_tag = item.find('p', {'class': 'category'}) category = "" if category_tag: category_span = category_tag.find('span') if category_span: category = category_span.get_text(strip=True) # 날짜 time_tag = item.find('time') if not time_tag: continue date_str = time_tag.get('datetime', '') or time_tag.get_text(strip=True) article_date = self.parse_date(date_str) if not article_date: continue if article_date < start_date: stop_flag = True break if article_date > end_date: continue data.append({ 'category': category, 'title': title, 'date': date_str.split()[0] if ' ' in date_str else date_str, 'url': article_url }) except: continue return data, stop_flag async def fetch_article_detail(self, session: aiohttp.ClientSession, url: str) -> Dict: """상세 페이지 가져오기""" html = await self.fetch_with_retry(session, url) if not html: return {'text': "본문 조회 실패", 'writer': "", 'published_date': ""} soup = BeautifulSoup(html, 'html.parser') text_parts = [] writer = "" published_date = "" # 게시일 date_li = soup.find('li', {'class': 'date'}) if date_li: date_text = date_li.get_text(strip=True) match = re.search(r'(\d{4}-\d{2}-\d{2})', date_text) if match: published_date = match.group(1) # 본문 contents_div = soup.find('div', {'class': 'board-view__contents'}) if contents_div: for element in contents_div.descendants: if element.name == 'p': text = element.get_text(strip=True) cleaned = self.clean_text(text) if cleaned: text_parts.append(cleaned) elif element.name == 'b': text = element.get_text(strip=True) cleaned = self.clean_text(text) if cleaned and not writer: if '민주당' in cleaned or '공보국' in cleaned or '대변인' in cleaned: writer = cleaned return { 'text': '\n'.join(text_parts), 'writer': writer, 'published_date': published_date } async def collect_board(self, board_name: str, board_id: str, start_date: str, end_date: str) -> List[Dict]: """한 게시판 전체 수집 (비동기)""" start_dt = datetime.strptime(start_date, '%Y-%m-%d') end_dt = datetime.strptime(end_date, '%Y-%m-%d') print(f"\n▶ [{board_name}] 목록 수집 시작...") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'ko-KR,ko;q=0.9', } async with aiohttp.ClientSession(headers=headers) as session: # 1단계: 목록 페이지 수집 all_items = [] page_num = 0 empty_pages = 0 max_empty_pages = 3 with async_tqdm(desc=f"[{board_name}] 목록", unit="페이지") as pbar: while page_num <= self.max_pages * 20: items, stop_flag = await self.fetch_list_page( session, board_id, page_num, start_dt, end_dt ) if not items: empty_pages += 1 if empty_pages >= max_empty_pages or stop_flag: break else: empty_pages = 0 all_items.extend(items) pbar.update(1) pbar.set_postfix({"수집": len(all_items)}) if stop_flag: break page_num += 20 print(f" ✓ {len(all_items)}개 항목 발견") # 2단계: 상세 페이지 수집 (병렬 처리) if all_items: print(f" ▶ 상세 페이지 수집 중...") tasks = [self.fetch_article_detail(session, item['url']) for item in all_items] # 진행률 표시와 함께 병렬 실행 details = [] for coro in async_tqdm(asyncio.as_completed(tasks), total=len(tasks), desc=f"[{board_name}] 상세"): detail = await coro details.append(detail) # 상세 정보 병합 for item, detail in zip(all_items, details): item.update(detail) item['board_name'] = board_name print(f"✓ [{board_name}] 완료: {len(all_items)}개") return all_items async def collect_all(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame: """모든 게시판 수집""" if not end_date: end_date = datetime.now().strftime('%Y-%m-%d') if not start_date: start_date = self.start_date print(f"\n{'='*60}") print(f"더불어민주당 보도자료 수집 - 비동기 고성능 버전") print(f"기간: {start_date} ~ {end_date}") print(f"{'='*60}") # 모든 게시판 병렬 수집 tasks = [ self.collect_board(board_name, board_id, start_date, end_date) for board_name, board_id in self.boards.items() ] results = await asyncio.gather(*tasks) # 데이터 결합 all_data = [] for items in results: all_data.extend(items) if not all_data: print("\n⚠️ 수집된 데이터 없음") return pd.DataFrame() df = pd.DataFrame(all_data) df = df[['board_name', 'title', 'category', 'published_date', 'writer', 'text', 'url']] df = df[(df['title'] != "") & (df['text'] != "")] df['published_date'] = pd.to_datetime(df['published_date'], errors='coerce') df = df.rename(columns={'published_date': 'date'}) print(f"\n✓ 총 {len(df)}개 수집 완료") return df def save_local(self, df: pd.DataFrame): """로컬에 저장""" os.makedirs(self.output_path, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # CSV csv_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.csv") df.to_csv(csv_path, index=False, encoding='utf-8-sig') # Excel xlsx_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.xlsx") df.to_excel(xlsx_path, index=False, engine='openpyxl') print(f"✓ CSV: {csv_path}") print(f"✓ Excel: {xlsx_path}") def upload_to_huggingface(self, df: pd.DataFrame): """허깅페이스에 업로드""" if not self.hf_token: print("\n⚠️ HF_TOKEN이 설정되지 않았습니다. .env 파일을 확인하세요.") return print(f"\n▶ 허깅페이스 업로드 중... (repo: {self.hf_repo_id})") try: # 로그인 login(token=self.hf_token) api = HfApi() # 새 데이터셋 생성 new_dataset = Dataset.from_pandas(df) # 기존 데이터셋 확인 및 병합 try: existing_dataset = load_dataset(self.hf_repo_id, split='train') print(f" ℹ️ 기존 데이터: {len(existing_dataset)}개") # 중복 제거를 위해 URL 기준으로 병합 existing_df = existing_dataset.to_pandas() combined_df = pd.concat([existing_df, df], ignore_index=True) combined_df = combined_df.drop_duplicates(subset=['url'], keep='last') combined_df = combined_df.sort_values('date', ascending=False).reset_index(drop=True) final_dataset = Dataset.from_pandas(combined_df) print(f" ✓ 병합 후: {len(final_dataset)}개 (중복 제거됨)") except: print(f" ℹ️ 신규 데이터셋 생성") final_dataset = new_dataset # 업로드 final_dataset.push_to_hub(self.hf_repo_id, token=self.hf_token) print(f"✓ 허깅페이스 업로드 완료!") print(f" 🔗 https://huggingface.co/datasets/{self.hf_repo_id}") except Exception as e: print(f"✗ 업로드 실패: {e}") async def run_incremental(self): """증분 업데이트 실행 (마지막 날짜 이후만)""" state = self.load_state() last_date = state.get('last_crawl_date') if last_date: # 마지막 크롤링 날짜 다음날부터 start_date = (datetime.strptime(last_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d') print(f"📅 증분 업데이트: {start_date} 이후 데이터만 수집") else: start_date = self.start_date print(f"📅 전체 수집: {start_date}부터") end_date = datetime.now().strftime('%Y-%m-%d') # 크롤링 df = await self.collect_all(start_date, end_date) if df.empty: print("✓ 새로운 데이터 없음") return # 로컬 저장 self.save_local(df) # 허깅페이스 업로드 self.upload_to_huggingface(df) # 상태 저장 state['last_crawl_date'] = end_date state['last_crawl_time'] = datetime.now().isoformat() state['last_count'] = len(df) self.save_state(state) print(f"\n{'='*60}") print(f"✓ 완료! 다음 실행: 내일") print(f"{'='*60}\n") async def main(): """메인 함수""" crawler = MinjooAsyncCrawler() await crawler.run_incremental() if __name__ == "__main__": asyncio.run(main())