#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 기본소득당 크롤러 - 고성능 비동기 버전 + 허깅페이스 자동 업로드 - 그누보드 5 기반 사이트 (basicincomeparty.kr) - td.td_subject / td.td_datetime(YY.MM.DD.) / div#bo_v_con 구조 """ import os import json import re import asyncio from datetime import datetime, timedelta from typing import List, Dict, Optional import pandas as pd from tqdm.asyncio import tqdm as async_tqdm import aiohttp from bs4 import BeautifulSoup from dotenv import load_dotenv from huggingface_hub import HfApi, login from datasets import Dataset, load_dataset load_dotenv() class BasicIncomeAsyncCrawler: def __init__(self, config_path="crawler_config.json"): self.base_url = "https://basicincomeparty.kr" self.party_name = "기본소득당" self.config_path = config_path self.state_path = "crawler_state.json" self.load_config() self.hf_token = os.getenv("HF_TOKEN") self.hf_repo_id = os.getenv("HF_REPO_ID_BASIC_INCOME", "basic-income-press-releases") self.semaphore = asyncio.Semaphore(10) def load_config(self): default_config = { "boards": { "논평보도자료": "bikr/press" }, "start_date": "2020-01-08", "max_pages": 10000, "concurrent_requests": 10, "request_delay": 0.3, "output_path": "./data" } if os.path.exists(self.config_path): with open(self.config_path, 'r', encoding='utf-8') as f: config = json.load(f) self.config = config.get('basic_income', default_config) else: self.config = default_config self.boards = self.config["boards"] self.start_date = self.config["start_date"] self.max_pages = self.config["max_pages"] self.output_path = self.config["output_path"] def load_state(self) -> Dict: if os.path.exists(self.state_path): with open(self.state_path, 'r', encoding='utf-8') as f: state = json.load(f) return state.get('basic_income', {}) return {} def save_state(self, state: Dict): all_state = {} if os.path.exists(self.state_path): with open(self.state_path, 'r', encoding='utf-8') as f: all_state = json.load(f) all_state['basic_income'] = state with open(self.state_path, 'w', encoding='utf-8') as f: json.dump(all_state, f, ensure_ascii=False, indent=2) @staticmethod def parse_date(date_str: str) -> Optional[datetime]: """YY.MM.DD. 또는 YYYY.MM.DD. 또는 YYYY-MM-DD 파싱""" date_str = date_str.strip().rstrip('.') try: parts = date_str.split('.') if len(parts) >= 3: year = int(parts[0]) year = 2000 + year if year < 100 else year return datetime(year, int(parts[1]), int(parts[2])) except: pass try: return datetime.strptime(date_str[:10], '%Y-%m-%d') except: return None @staticmethod def clean_text(text: str) -> str: text = text.replace('\xa0', '').replace('\u200b', '').replace('​', '') return text.strip() async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str, max_retries: int = 3) -> Optional[str]: async with self.semaphore: for attempt in range(max_retries): try: await asyncio.sleep(self.config.get("request_delay", 0.3)) async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response: if response.status == 200: return await response.text() except Exception: if attempt < max_retries - 1: await asyncio.sleep(1) else: return None return None async def fetch_list_page(self, session: aiohttp.ClientSession, board_name: str, board_path: str, page_num: int, start_date: datetime, end_date: datetime) -> tuple: url = f"{self.base_url}/{board_path}?page={page_num}" html = await self.fetch_with_retry(session, url) if not html: return [], False soup = BeautifulSoup(html, 'html.parser') rows = soup.select('table tbody tr') if not rows: return [], True data = [] stop_flag = False for row in rows: try: # 제목·URL: td.td_subject div.bo_tit a title_a = row.select_one('td.td_subject div.bo_tit a') if not title_a: continue title = title_a.get_text(strip=True) href = title_a.get('href', '') # page 파라미터 제거 후 절대 URL article_url = re.sub(r'\?.*$', '', href) if not article_url.startswith('http'): article_url = self.base_url + article_url # 날짜: td.td_datetime (YY.MM.DD. 형식) date_td = row.select_one('td.td_datetime') if not date_td: continue date_str = date_td.get_text(strip=True) # 카테고리: td.td_num2 a.bo_cate_link cate_a = row.select_one('td.td_num2 a.bo_cate_link') category = cate_a.get_text(strip=True) if cate_a else "" article_date = self.parse_date(date_str) if not article_date: continue if article_date < start_date: stop_flag = True break if article_date > end_date: continue data.append({ 'board_name': board_name, 'title': title, 'category': category, 'date': article_date.strftime('%Y-%m-%d'), # YYYY-MM-DD 정규화 'url': article_url }) except: continue return data, stop_flag async def fetch_article_detail(self, session: aiohttp.ClientSession, url: str) -> Dict: html = await self.fetch_with_retry(session, url) if not html: return {'text': "본문 조회 실패", 'writer': ""} soup = BeautifulSoup(html, 'html.parser') text_parts = [] writer = "" # 본문: div#bo_v_con contents_div = soup.find('div', id='bo_v_con') if contents_div: for p in contents_div.find_all('p'): cleaned = self.clean_text(p.get_text(strip=True)) if cleaned: text_parts.append(cleaned) # 작성자: section#bo_v_info div.profile_info_ct 안의 span.sv_member info_div = soup.select_one('section#bo_v_info div.profile_info_ct') if info_div: writer_el = info_div.find('span', class_='sv_member') if writer_el: writer = writer_el.get_text(strip=True) return {'text': '\n'.join(text_parts), 'writer': writer} async def collect_board(self, board_name: str, board_path: str, start_date: str, end_date: str) -> List[Dict]: start_dt = datetime.strptime(start_date, '%Y-%m-%d') end_dt = datetime.strptime(end_date, '%Y-%m-%d') print(f"\n▶ [{board_name}] 목록 수집 시작...") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'ko-KR,ko;q=0.9', } async with aiohttp.ClientSession(headers=headers) as session: all_items = [] page_num = 1 empty_pages = 0 max_empty_pages = 3 with async_tqdm(desc=f"[{board_name}] 목록", unit="페이지") as pbar: while page_num <= self.max_pages: items, stop_flag = await self.fetch_list_page( session, board_name, board_path, page_num, start_dt, end_dt ) if not items: empty_pages += 1 if empty_pages >= max_empty_pages or stop_flag: break else: empty_pages = 0 all_items.extend(items) pbar.update(1) pbar.set_postfix({"수집": len(all_items)}) if stop_flag: break page_num += 1 print(f" ✓ {len(all_items)}개 항목 발견") if all_items: print(f" ▶ 상세 페이지 수집 중...") tasks = [self.fetch_article_detail(session, item['url']) for item in all_items] details = [] for coro in async_tqdm(asyncio.as_completed(tasks), total=len(tasks), desc=f"[{board_name}] 상세"): detail = await coro details.append(detail) for item, detail in zip(all_items, details): item.update(detail) print(f"✓ [{board_name}] 완료: {len(all_items)}개") return all_items async def collect_all(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame: if not end_date: end_date = datetime.now().strftime('%Y-%m-%d') if not start_date: start_date = self.start_date print(f"\n{'='*60}") print(f"기본소득당 보도자료 수집 - 비동기 고성능 버전") print(f"기간: {start_date} ~ {end_date}") print(f"{'='*60}") tasks = [ self.collect_board(board_name, board_path, start_date, end_date) for board_name, board_path in self.boards.items() ] results = await asyncio.gather(*tasks) all_data = [] for items in results: all_data.extend(items) if not all_data: print("\n⚠️ 수집된 데이터 없음") return pd.DataFrame() df = pd.DataFrame(all_data) df = df[['board_name', 'title', 'category', 'date', 'writer', 'text', 'url']] df = df[(df['title'] != "") & (df['text'] != "")] df['date'] = pd.to_datetime(df['date'], errors='coerce') print(f"\n✓ 총 {len(df)}개 수집 완료") return df def save_local(self, df: pd.DataFrame): os.makedirs(self.output_path, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') csv_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.csv") xlsx_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.xlsx") df.to_csv(csv_path, index=False, encoding='utf-8-sig') df.to_excel(xlsx_path, index=False, engine='openpyxl') print(f"✓ CSV: {csv_path}") print(f"✓ Excel: {xlsx_path}") def upload_to_huggingface(self, df: pd.DataFrame): if not self.hf_token: print("\n⚠️ HF_TOKEN이 설정되지 않았습니다.") return print(f"\n▶ 허깅페이스 업로드 중... (repo: {self.hf_repo_id})") try: login(token=self.hf_token) new_dataset = Dataset.from_pandas(df) try: existing_dataset = load_dataset(self.hf_repo_id, split='train') existing_df = existing_dataset.to_pandas() combined_df = pd.concat([existing_df, df], ignore_index=True) combined_df = combined_df.drop_duplicates(subset=['url'], keep='last') combined_df = combined_df.sort_values('date', ascending=False).reset_index(drop=True) final_dataset = Dataset.from_pandas(combined_df) print(f" ✓ 병합 후: {len(final_dataset)}개") except: final_dataset = new_dataset print(f" ℹ️ 신규 데이터셋 생성") final_dataset.push_to_hub(self.hf_repo_id, token=self.hf_token) print(f"✓ 허깅페이스 업로드 완료!") except Exception as e: print(f"✗ 업로드 실패: {e}") async def run_incremental(self): state = self.load_state() last_date = state.get('last_crawl_date') if last_date: start_date = (datetime.strptime(last_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d') print(f"📅 증분 업데이트: {start_date} 이후 데이터만 수집") else: start_date = self.start_date print(f"📅 전체 수집: {start_date}부터") end_date = datetime.now().strftime('%Y-%m-%d') df = await self.collect_all(start_date, end_date) if df.empty: print("✓ 새로운 데이터 없음") return self.save_local(df) self.upload_to_huggingface(df) state['last_crawl_date'] = end_date state['last_crawl_time'] = datetime.now().isoformat() state['last_count'] = len(df) self.save_state(state) print(f"\n{'='*60}\n✓ 완료!\n{'='*60}\n") async def main(): crawler = BasicIncomeAsyncCrawler() await crawler.run_incremental() if __name__ == "__main__": asyncio.run(main())