KoreanPartyCommunication / ppp_crawler_async.py
hanjunlee's picture
Upload 23 files
3a36548 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ꡭ민의힘 크둀러 - κ³ μ„±λŠ₯ 비동기 버전 + ν—ˆκΉ…νŽ˜μ΄μŠ€ μžλ™ μ—…λ‘œλ“œ
- asyncio + aiohttp (10-20λ°° λΉ λ₯Έ 속도)
- λ™μ‹œ μš”μ²­ 수 μ œμ–΄ (μ„œλ²„ λΆ€λ‹΄ μ΅œμ†Œν™”)
- 증뢄 μ—…λ°μ΄νŠΈ (λ§ˆμ§€λ§‰ λ‚ μ§œ μ΄ν›„λ§Œ 크둀링)
- ν—ˆκΉ…νŽ˜μ΄μŠ€ μžλ™ μ—…λ‘œλ“œ
- 일 λ‹¨μœ„ μŠ€μΌ€μ€„λ§
"""
import os
import json
import re
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import pandas as pd
from tqdm.asyncio import tqdm as async_tqdm
import aiohttp
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from huggingface_hub import HfApi, login
from datasets import Dataset, load_dataset
# .env 파일 λ‘œλ“œ
load_dotenv()
class PPPAsyncCrawler:
def __init__(self, config_path="crawler_config.json"):
self.base_url = "https://www.peoplepowerparty.kr"
self.party_name = "ꡭ민의힘"
self.config_path = config_path
self.state_path = "crawler_state.json"
# μ„€μ • λ‘œλ“œ
self.load_config()
# ν—ˆκΉ…νŽ˜μ΄μŠ€ μ„€μ •
self.hf_token = os.getenv("HF_TOKEN")
self.hf_repo_id = os.getenv("HF_REPO_ID_PPP", "ppp-press-releases")
# λ™μ‹œ μš”μ²­ 수 μ œν•œ
self.semaphore = asyncio.Semaphore(20)
def load_config(self):
"""μ„€μ • 파일 λ‘œλ“œ"""
default_config = {
"boards": {
"λŒ€λ³€μΈ_λ…Όν‰λ³΄λ„μžλ£Œ": "BBSDD0001",
"원내_λ³΄λ„μžλ£Œ": "BBSDD0002",
"λ―Έλ””μ–΄νŠΉμœ„_λ³΄λ„μžλ£Œ": "BBSDD0042"
},
"start_date": "2000-03-10",
"max_pages": 10000,
"concurrent_requests": 20,
"request_delay": 0.1,
"output_path": "./data"
}
if os.path.exists(self.config_path):
with open(self.config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# ꡭ민의힘 μ„€μ •λ§Œ μΆ”μΆœ
if 'ppp' in config:
self.config = config['ppp']
else:
self.config = default_config
else:
self.config = default_config
self.boards = self.config["boards"]
self.start_date = self.config["start_date"]
self.max_pages = self.config["max_pages"]
self.output_path = self.config["output_path"]
def load_state(self) -> Dict:
"""크둀러 μƒνƒœ λ‘œλ“œ"""
if os.path.exists(self.state_path):
with open(self.state_path, 'r', encoding='utf-8') as f:
state = json.load(f)
return state.get('ppp', {})
return {}
def save_state(self, state: Dict):
"""크둀러 μƒνƒœ μ €μž₯"""
all_state = {}
if os.path.exists(self.state_path):
with open(self.state_path, 'r', encoding='utf-8') as f:
all_state = json.load(f)
all_state['ppp'] = state
with open(self.state_path, 'w', encoding='utf-8') as f:
json.dump(all_state, f, ensure_ascii=False, indent=2)
@staticmethod
def parse_date(date_str: str) -> Optional[datetime]:
"""λ‚ μ§œ νŒŒμ‹±"""
try:
return datetime.strptime(date_str.strip(), '%Y-%m-%d')
except:
return None
@staticmethod
def clean_text(text: str) -> str:
"""ν…μŠ€νŠΈ 정리"""
text = text.replace('\xa0', '').replace('\u200b', '').replace('​', '')
return text.strip()
async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str,
max_retries: int = 3) -> Optional[str]:
"""μž¬μ‹œλ„ 둜직이 μžˆλŠ” 비동기 μš”μ²­"""
async with self.semaphore:
for attempt in range(max_retries):
try:
await asyncio.sleep(self.config.get("request_delay", 0.1))
async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
return await response.text()
except Exception as e:
if attempt < max_retries - 1:
await asyncio.sleep(1)
else:
return None
return None
async def fetch_list_page(self, session: aiohttp.ClientSession,
board_id: str, page_num: int,
start_date: datetime, end_date: datetime) -> tuple:
"""λͺ©λ‘ νŽ˜μ΄μ§€ ν•˜λ‚˜ κ°€μ Έμ˜€κΈ°"""
url = f"{self.base_url}/news/comment/{board_id}?page={page_num}"
html = await self.fetch_with_retry(session, url)
if not html:
return [], False
soup = BeautifulSoup(html, 'html.parser')
table_div = soup.find('div', {'class': 'board-tbl'})
if not table_div:
return [], True
tbody = table_div.find('tbody')
if not tbody:
return [], True
rows = tbody.find_all('tr')
if not rows:
return [], True
data = []
stop_flag = False
for row in rows:
cols = row.find_all('td')
if len(cols) < 3:
continue
try:
no_td = row.find('td', {'class': 'no'})
class_td = row.find('td', {'class': 'class'})
no = no_td.get_text(strip=True) if no_td else cols[0].get_text(strip=True)
section = class_td.get_text(strip=True) if class_td else cols[1].get_text(strip=True)
link_tag = row.find('a')
if not link_tag:
continue
title = link_tag.get_text(strip=True).replace('\n', ' ')
article_url = self.base_url + link_tag.get('href', '')
# λ‚ μ§œ μΆ”μΆœ
date_str = ""
if len(cols) >= 4:
date_str = cols[3].get_text(strip=True)
if not date_str or not re.match(r'\d{4}-\d{2}-\d{2}', date_str):
dd_date = row.find('dd', {'class': 'date'})
if dd_date:
span = dd_date.find('span')
if span:
span.decompose()
date_str = dd_date.get_text(strip=True)
article_date = self.parse_date(date_str)
if not article_date:
continue
if article_date < start_date:
stop_flag = True
break
if article_date > end_date:
continue
data.append({
'no': no,
'section': section,
'title': title,
'date': date_str,
'url': article_url
})
except:
continue
return data, stop_flag
async def fetch_article_detail(self, session: aiohttp.ClientSession,
url: str) -> Dict:
"""상세 νŽ˜μ΄μ§€ κ°€μ Έμ˜€κΈ°"""
html = await self.fetch_with_retry(session, url)
if not html:
return {'text': "λ³Έλ¬Έ 쑰회 μ‹€νŒ¨", 'writer': ""}
soup = BeautifulSoup(html, 'html.parser')
text_parts = []
writer = ""
conts_tag = soup.select_one('dd.conts')
if conts_tag:
hwp_div = conts_tag.find('div', {'id': 'hwpEditorBoardContent'})
if hwp_div:
hwp_div.decompose()
p_tags = conts_tag.find_all('p')
for p in p_tags:
style = p.get('style', '')
is_center = 'text-align:center' in style.replace(' ', '').lower()
raw_text = p.get_text(strip=True)
cleaned_text = self.clean_text(raw_text)
if not cleaned_text:
continue
if is_center:
if not re.match(r'\d{4}\.\s*\d{1,2}\.\s*\d{1,2}', cleaned_text):
writer = cleaned_text
else:
text_parts.append(cleaned_text)
return {'text': '\n'.join(text_parts), 'writer': writer}
async def collect_board(self, board_name: str, board_id: str,
start_date: str, end_date: str) -> List[Dict]:
"""ν•œ κ²Œμ‹œνŒ 전체 μˆ˜μ§‘ (비동기)"""
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
print(f"\nβ–Ά [{board_name}] λͺ©λ‘ μˆ˜μ§‘ μ‹œμž‘...")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'ko-KR,ko;q=0.9',
}
async with aiohttp.ClientSession(headers=headers) as session:
# 1단계: λͺ©λ‘ νŽ˜μ΄μ§€ μˆ˜μ§‘
all_items = []
page_num = 1
empty_pages = 0
max_empty_pages = 3
with async_tqdm(desc=f"[{board_name}] λͺ©λ‘", unit="νŽ˜μ΄μ§€") as pbar:
while page_num <= self.max_pages:
items, stop_flag = await self.fetch_list_page(
session, board_id, page_num, start_dt, end_dt
)
if not items:
empty_pages += 1
if empty_pages >= max_empty_pages or stop_flag:
break
else:
empty_pages = 0
all_items.extend(items)
pbar.update(1)
pbar.set_postfix({"μˆ˜μ§‘": len(all_items)})
if stop_flag:
break
page_num += 1
print(f" βœ“ {len(all_items)}개 ν•­λͺ© 발견")
# 2단계: 상세 νŽ˜μ΄μ§€ μˆ˜μ§‘ (병렬 처리)
if all_items:
print(f" β–Ά 상세 νŽ˜μ΄μ§€ μˆ˜μ§‘ 쀑...")
tasks = [self.fetch_article_detail(session, item['url']) for item in all_items]
details = []
for coro in async_tqdm(asyncio.as_completed(tasks),
total=len(tasks),
desc=f"[{board_name}] 상세"):
detail = await coro
details.append(detail)
# 상세 정보 병합
for item, detail in zip(all_items, details):
item.update(detail)
item['board_name'] = board_name
print(f"βœ“ [{board_name}] μ™„λ£Œ: {len(all_items)}개")
return all_items
async def collect_all(self, start_date: Optional[str] = None,
end_date: Optional[str] = None) -> pd.DataFrame:
"""λͺ¨λ“  κ²Œμ‹œνŒ μˆ˜μ§‘"""
if not end_date:
end_date = datetime.now().strftime('%Y-%m-%d')
if not start_date:
start_date = self.start_date
print(f"\n{'='*60}")
print(f"ꡭ민의힘 λ³΄λ„μžλ£Œ μˆ˜μ§‘ - 비동기 κ³ μ„±λŠ₯ 버전")
print(f"κΈ°κ°„: {start_date} ~ {end_date}")
print(f"{'='*60}")
# λͺ¨λ“  κ²Œμ‹œνŒ 병렬 μˆ˜μ§‘
tasks = [
self.collect_board(board_name, board_id, start_date, end_date)
for board_name, board_id in self.boards.items()
]
results = await asyncio.gather(*tasks)
# 데이터 κ²°ν•©
all_data = []
for items in results:
all_data.extend(items)
if not all_data:
print("\n⚠️ μˆ˜μ§‘λœ 데이터 μ—†μŒ")
return pd.DataFrame()
df = pd.DataFrame(all_data)
df = df[['board_name', 'no', 'title', 'section', 'date', 'writer', 'text', 'url']]
df = df[(df['title'] != "") & (df['text'] != "")]
df['date'] = pd.to_datetime(df['date'], errors='coerce')
print(f"\nβœ“ 총 {len(df)}개 μˆ˜μ§‘ μ™„λ£Œ")
return df
def save_local(self, df: pd.DataFrame):
"""λ‘œμ»¬μ— μ €μž₯"""
os.makedirs(self.output_path, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# CSV
csv_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.csv")
df.to_csv(csv_path, index=False, encoding='utf-8-sig')
# Excel
xlsx_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.xlsx")
df.to_excel(xlsx_path, index=False, engine='openpyxl')
print(f"βœ“ CSV: {csv_path}")
print(f"βœ“ Excel: {xlsx_path}")
def upload_to_huggingface(self, df: pd.DataFrame):
"""ν—ˆκΉ…νŽ˜μ΄μŠ€μ— μ—…λ‘œλ“œ"""
if not self.hf_token:
print("\n⚠️ HF_TOKEN이 μ„€μ •λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€. .env νŒŒμΌμ„ ν™•μΈν•˜μ„Έμš”.")
return
print(f"\nβ–Ά ν—ˆκΉ…νŽ˜μ΄μŠ€ μ—…λ‘œλ“œ 쀑... (repo: {self.hf_repo_id})")
try:
login(token=self.hf_token)
api = HfApi()
new_dataset = Dataset.from_pandas(df)
# κΈ°μ‘΄ 데이터셋 확인 및 병합
try:
existing_dataset = load_dataset(self.hf_repo_id, split='train')
print(f" ℹ️ κΈ°μ‘΄ 데이터: {len(existing_dataset)}개")
existing_df = existing_dataset.to_pandas()
combined_df = pd.concat([existing_df, df], ignore_index=True)
combined_df = combined_df.drop_duplicates(subset=['url'], keep='last')
combined_df = combined_df.sort_values('date', ascending=False).reset_index(drop=True)
final_dataset = Dataset.from_pandas(combined_df)
print(f" βœ“ 병합 ν›„: {len(final_dataset)}개 (쀑볡 제거됨)")
except:
print(f" ℹ️ μ‹ κ·œ 데이터셋 생성")
final_dataset = new_dataset
final_dataset.push_to_hub(self.hf_repo_id, token=self.hf_token)
print(f"βœ“ ν—ˆκΉ…νŽ˜μ΄μŠ€ μ—…λ‘œλ“œ μ™„λ£Œ!")
print(f" πŸ”— https://huggingface.co/datasets/{self.hf_repo_id}")
except Exception as e:
print(f"βœ— μ—…λ‘œλ“œ μ‹€νŒ¨: {e}")
async def run_incremental(self):
"""증뢄 μ—…λ°μ΄νŠΈ μ‹€ν–‰"""
state = self.load_state()
last_date = state.get('last_crawl_date')
if last_date:
start_date = (datetime.strptime(last_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d')
print(f"πŸ“… 증뢄 μ—…λ°μ΄νŠΈ: {start_date} 이후 λ°μ΄ν„°λ§Œ μˆ˜μ§‘")
else:
start_date = self.start_date
print(f"πŸ“… 전체 μˆ˜μ§‘: {start_date}λΆ€ν„°")
end_date = datetime.now().strftime('%Y-%m-%d')
# 크둀링
df = await self.collect_all(start_date, end_date)
if df.empty:
print("βœ“ μƒˆλ‘œμš΄ 데이터 μ—†μŒ")
return
# 둜컬 μ €μž₯
self.save_local(df)
# ν—ˆκΉ…νŽ˜μ΄μŠ€ μ—…λ‘œλ“œ
self.upload_to_huggingface(df)
# μƒνƒœ μ €μž₯
state['last_crawl_date'] = end_date
state['last_crawl_time'] = datetime.now().isoformat()
state['last_count'] = len(df)
self.save_state(state)
print(f"\n{'='*60}")
print(f"βœ“ μ™„λ£Œ!")
print(f"{'='*60}\n")
async def main():
"""메인 ν•¨μˆ˜"""
crawler = PPPAsyncCrawler()
await crawler.run_incremental()
if __name__ == "__main__":
asyncio.run(main())