KoreanPartyCommunication / minjoo_crawler_async.py
hanjunlee's picture
Upload 23 files
3a36548 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
λ”λΆˆμ–΄λ―Όμ£Όλ‹Ή 크둀러 - κ³ μ„±λŠ₯ 비동기 버전 + ν—ˆκΉ…νŽ˜μ΄μŠ€ μžλ™ μ—…λ‘œλ“œ
- asyncio + aiohttp (10-20λ°° λΉ λ₯Έ 속도)
- λ™μ‹œ μš”μ²­ 수 μ œμ–΄ (μ„œλ²„ λΆ€λ‹΄ μ΅œμ†Œν™”)
- 증뢄 μ—…λ°μ΄νŠΈ (λ§ˆμ§€λ§‰ λ‚ μ§œ μ΄ν›„λ§Œ 크둀링)
- ν—ˆκΉ…νŽ˜μ΄μŠ€ μžλ™ μ—…λ‘œλ“œ
- 일 λ‹¨μœ„ μŠ€μΌ€μ€„λ§
"""
import os
import json
import time
import re
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import pandas as pd
from tqdm.asyncio import tqdm as async_tqdm
import aiohttp
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from huggingface_hub import HfApi, login
from datasets import Dataset, load_dataset, concatenate_datasets
# .env 파일 λ‘œλ“œ
load_dotenv()
class MinjooAsyncCrawler:
def __init__(self, config_path="crawler_config.json"):
self.base_url = "https://theminjoo.kr/main/sub"
self.party_name = "λ”λΆˆμ–΄λ―Όμ£Όλ‹Ή"
self.config_path = config_path
self.state_path = "crawler_state.json"
# μ„€μ • λ‘œλ“œ
self.load_config()
# ν—ˆκΉ…νŽ˜μ΄μŠ€ μ„€μ •
self.hf_token = os.getenv("HF_TOKEN")
self.hf_repo_id = os.getenv("HF_REPO_ID", "minjoo-press-releases")
# λ™μ‹œ μš”μ²­ 수 μ œν•œ (μ„œλ²„ λΆ€λ‹΄ λ°©μ§€)
self.semaphore = asyncio.Semaphore(20)
def load_config(self):
"""μ„€μ • 파일 λ‘œλ“œ"""
default_config = {
"boards": {
"λ³΄λ„μžλ£Œ": "188",
"논평_λΈŒλ¦¬ν•‘": "11",
"λͺ¨λ‘λ°œμ–Έ": "230"
},
"start_date": "2003-11-11",
"max_pages": 10000,
"concurrent_requests": 20,
"request_delay": 0.1,
"output_path": "./data"
}
if os.path.exists(self.config_path):
with open(self.config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# λ―Όμ£Όλ‹Ή μ„€μ •λ§Œ μΆ”μΆœ
if 'minjoo' in config:
self.config = config['minjoo']
else:
self.config = default_config
else:
self.config = default_config
self.boards = self.config["boards"]
self.start_date = self.config["start_date"]
self.max_pages = self.config["max_pages"]
self.output_path = self.config["output_path"]
def load_state(self) -> Dict:
"""크둀러 μƒνƒœ λ‘œλ“œ (λ§ˆμ§€λ§‰ 크둀링 λ‚ μ§œ)"""
if os.path.exists(self.state_path):
with open(self.state_path, 'r', encoding='utf-8') as f:
state = json.load(f)
return state.get('minjoo', {})
return {}
def save_state(self, state: Dict):
"""크둀러 μƒνƒœ μ €μž₯"""
all_state = {}
if os.path.exists(self.state_path):
with open(self.state_path, 'r', encoding='utf-8') as f:
all_state = json.load(f)
all_state['minjoo'] = state
with open(self.state_path, 'w', encoding='utf-8') as f:
json.dump(all_state, f, ensure_ascii=False, indent=2)
@staticmethod
def parse_date(date_str: str) -> Optional[datetime]:
"""λ‚ μ§œ νŒŒμ‹±"""
try:
return datetime.strptime(date_str.strip().split()[0], '%Y-%m-%d')
except:
return None
@staticmethod
def clean_text(text: str) -> str:
"""ν…μŠ€νŠΈ 정리"""
text = text.replace('\xa0', '').replace('\u200b', '').replace('​', '')
return text.strip()
async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str,
max_retries: int = 3) -> Optional[str]:
"""μž¬μ‹œλ„ 둜직이 μžˆλŠ” 비동기 μš”μ²­"""
async with self.semaphore:
for attempt in range(max_retries):
try:
await asyncio.sleep(self.config.get("request_delay", 0.1))
async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
return await response.text()
except Exception as e:
if attempt < max_retries - 1:
await asyncio.sleep(1)
else:
return None
return None
async def fetch_list_page(self, session: aiohttp.ClientSession,
board_id: str, page_num: int,
start_date: datetime, end_date: datetime) -> tuple:
"""λͺ©λ‘ νŽ˜μ΄μ§€ ν•˜λ‚˜ κ°€μ Έμ˜€κΈ°"""
if page_num == 0:
url = f"{self.base_url}/news/list.php?brd={board_id}"
else:
url = f"{self.base_url}/news/list.php?sno={page_num}&par=&&brd={board_id}"
html = await self.fetch_with_retry(session, url)
if not html:
return [], False
soup = BeautifulSoup(html, 'html.parser')
board_items = soup.find_all('div', {'class': 'board-item'})
if not board_items:
return [], True # 빈 νŽ˜μ΄μ§€
data = []
stop_flag = False
for item in board_items:
try:
link_tag = item.find('a')
if not link_tag:
continue
title_span = link_tag.find('span')
if not title_span:
continue
title = title_span.get_text(strip=True).replace('\n', ' ')
# URL 처리
article_url = link_tag.get('href', '')
if article_url.startswith('./'):
article_url = self.base_url + '/news/' + article_url[2:]
elif not article_url.startswith('http'):
article_url = self.base_url + article_url
# μΉ΄ν…Œκ³ λ¦¬
category_tag = item.find('p', {'class': 'category'})
category = ""
if category_tag:
category_span = category_tag.find('span')
if category_span:
category = category_span.get_text(strip=True)
# λ‚ μ§œ
time_tag = item.find('time')
if not time_tag:
continue
date_str = time_tag.get('datetime', '') or time_tag.get_text(strip=True)
article_date = self.parse_date(date_str)
if not article_date:
continue
if article_date < start_date:
stop_flag = True
break
if article_date > end_date:
continue
data.append({
'category': category,
'title': title,
'date': date_str.split()[0] if ' ' in date_str else date_str,
'url': article_url
})
except:
continue
return data, stop_flag
async def fetch_article_detail(self, session: aiohttp.ClientSession,
url: str) -> Dict:
"""상세 νŽ˜μ΄μ§€ κ°€μ Έμ˜€κΈ°"""
html = await self.fetch_with_retry(session, url)
if not html:
return {'text': "λ³Έλ¬Έ 쑰회 μ‹€νŒ¨", 'writer': "", 'published_date': ""}
soup = BeautifulSoup(html, 'html.parser')
text_parts = []
writer = ""
published_date = ""
# κ²Œμ‹œμΌ
date_li = soup.find('li', {'class': 'date'})
if date_li:
date_text = date_li.get_text(strip=True)
match = re.search(r'(\d{4}-\d{2}-\d{2})', date_text)
if match:
published_date = match.group(1)
# λ³Έλ¬Έ
contents_div = soup.find('div', {'class': 'board-view__contents'})
if contents_div:
for element in contents_div.descendants:
if element.name == 'p':
text = element.get_text(strip=True)
cleaned = self.clean_text(text)
if cleaned:
text_parts.append(cleaned)
elif element.name == 'b':
text = element.get_text(strip=True)
cleaned = self.clean_text(text)
if cleaned and not writer:
if 'λ―Όμ£Όλ‹Ή' in cleaned or '곡보ꡭ' in cleaned or 'λŒ€λ³€μΈ' in cleaned:
writer = cleaned
return {
'text': '\n'.join(text_parts),
'writer': writer,
'published_date': published_date
}
async def collect_board(self, board_name: str, board_id: str,
start_date: str, end_date: str) -> List[Dict]:
"""ν•œ κ²Œμ‹œνŒ 전체 μˆ˜μ§‘ (비동기)"""
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
print(f"\nβ–Ά [{board_name}] λͺ©λ‘ μˆ˜μ§‘ μ‹œμž‘...")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'ko-KR,ko;q=0.9',
}
async with aiohttp.ClientSession(headers=headers) as session:
# 1단계: λͺ©λ‘ νŽ˜μ΄μ§€ μˆ˜μ§‘
all_items = []
page_num = 0
empty_pages = 0
max_empty_pages = 3
with async_tqdm(desc=f"[{board_name}] λͺ©λ‘", unit="νŽ˜μ΄μ§€") as pbar:
while page_num <= self.max_pages * 20:
items, stop_flag = await self.fetch_list_page(
session, board_id, page_num, start_dt, end_dt
)
if not items:
empty_pages += 1
if empty_pages >= max_empty_pages or stop_flag:
break
else:
empty_pages = 0
all_items.extend(items)
pbar.update(1)
pbar.set_postfix({"μˆ˜μ§‘": len(all_items)})
if stop_flag:
break
page_num += 20
print(f" βœ“ {len(all_items)}개 ν•­λͺ© 발견")
# 2단계: 상세 νŽ˜μ΄μ§€ μˆ˜μ§‘ (병렬 처리)
if all_items:
print(f" β–Ά 상세 νŽ˜μ΄μ§€ μˆ˜μ§‘ 쀑...")
tasks = [self.fetch_article_detail(session, item['url']) for item in all_items]
# μ§„ν–‰λ₯  ν‘œμ‹œμ™€ ν•¨κ»˜ 병렬 μ‹€ν–‰
details = []
for coro in async_tqdm(asyncio.as_completed(tasks),
total=len(tasks),
desc=f"[{board_name}] 상세"):
detail = await coro
details.append(detail)
# 상세 정보 병합
for item, detail in zip(all_items, details):
item.update(detail)
item['board_name'] = board_name
print(f"βœ“ [{board_name}] μ™„λ£Œ: {len(all_items)}개")
return all_items
async def collect_all(self, start_date: Optional[str] = None,
end_date: Optional[str] = None) -> pd.DataFrame:
"""λͺ¨λ“  κ²Œμ‹œνŒ μˆ˜μ§‘"""
if not end_date:
end_date = datetime.now().strftime('%Y-%m-%d')
if not start_date:
start_date = self.start_date
print(f"\n{'='*60}")
print(f"λ”λΆˆμ–΄λ―Όμ£Όλ‹Ή λ³΄λ„μžλ£Œ μˆ˜μ§‘ - 비동기 κ³ μ„±λŠ₯ 버전")
print(f"κΈ°κ°„: {start_date} ~ {end_date}")
print(f"{'='*60}")
# λͺ¨λ“  κ²Œμ‹œνŒ 병렬 μˆ˜μ§‘
tasks = [
self.collect_board(board_name, board_id, start_date, end_date)
for board_name, board_id in self.boards.items()
]
results = await asyncio.gather(*tasks)
# 데이터 κ²°ν•©
all_data = []
for items in results:
all_data.extend(items)
if not all_data:
print("\n⚠️ μˆ˜μ§‘λœ 데이터 μ—†μŒ")
return pd.DataFrame()
df = pd.DataFrame(all_data)
df = df[['board_name', 'title', 'category', 'published_date', 'writer', 'text', 'url']]
df = df[(df['title'] != "") & (df['text'] != "")]
df['published_date'] = pd.to_datetime(df['published_date'], errors='coerce')
df = df.rename(columns={'published_date': 'date'})
print(f"\nβœ“ 총 {len(df)}개 μˆ˜μ§‘ μ™„λ£Œ")
return df
def save_local(self, df: pd.DataFrame):
"""λ‘œμ»¬μ— μ €μž₯"""
os.makedirs(self.output_path, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# CSV
csv_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.csv")
df.to_csv(csv_path, index=False, encoding='utf-8-sig')
# Excel
xlsx_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.xlsx")
df.to_excel(xlsx_path, index=False, engine='openpyxl')
print(f"βœ“ CSV: {csv_path}")
print(f"βœ“ Excel: {xlsx_path}")
def upload_to_huggingface(self, df: pd.DataFrame):
"""ν—ˆκΉ…νŽ˜μ΄μŠ€μ— μ—…λ‘œλ“œ"""
if not self.hf_token:
print("\n⚠️ HF_TOKEN이 μ„€μ •λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€. .env νŒŒμΌμ„ ν™•μΈν•˜μ„Έμš”.")
return
print(f"\nβ–Ά ν—ˆκΉ…νŽ˜μ΄μŠ€ μ—…λ‘œλ“œ 쀑... (repo: {self.hf_repo_id})")
try:
# 둜그인
login(token=self.hf_token)
api = HfApi()
# μƒˆ 데이터셋 생성
new_dataset = Dataset.from_pandas(df)
# κΈ°μ‘΄ 데이터셋 확인 및 병합
try:
existing_dataset = load_dataset(self.hf_repo_id, split='train')
print(f" ℹ️ κΈ°μ‘΄ 데이터: {len(existing_dataset)}개")
# 쀑볡 제거λ₯Ό μœ„ν•΄ URL κΈ°μ€€μœΌλ‘œ 병합
existing_df = existing_dataset.to_pandas()
combined_df = pd.concat([existing_df, df], ignore_index=True)
combined_df = combined_df.drop_duplicates(subset=['url'], keep='last')
combined_df = combined_df.sort_values('date', ascending=False).reset_index(drop=True)
final_dataset = Dataset.from_pandas(combined_df)
print(f" βœ“ 병합 ν›„: {len(final_dataset)}개 (쀑볡 제거됨)")
except:
print(f" ℹ️ μ‹ κ·œ 데이터셋 생성")
final_dataset = new_dataset
# μ—…λ‘œλ“œ
final_dataset.push_to_hub(self.hf_repo_id, token=self.hf_token)
print(f"βœ“ ν—ˆκΉ…νŽ˜μ΄μŠ€ μ—…λ‘œλ“œ μ™„λ£Œ!")
print(f" πŸ”— https://huggingface.co/datasets/{self.hf_repo_id}")
except Exception as e:
print(f"βœ— μ—…λ‘œλ“œ μ‹€νŒ¨: {e}")
async def run_incremental(self):
"""증뢄 μ—…λ°μ΄νŠΈ μ‹€ν–‰ (λ§ˆμ§€λ§‰ λ‚ μ§œ μ΄ν›„λ§Œ)"""
state = self.load_state()
last_date = state.get('last_crawl_date')
if last_date:
# λ§ˆμ§€λ§‰ 크둀링 λ‚ μ§œ λ‹€μŒλ‚ λΆ€ν„°
start_date = (datetime.strptime(last_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d')
print(f"πŸ“… 증뢄 μ—…λ°μ΄νŠΈ: {start_date} 이후 λ°μ΄ν„°λ§Œ μˆ˜μ§‘")
else:
start_date = self.start_date
print(f"πŸ“… 전체 μˆ˜μ§‘: {start_date}λΆ€ν„°")
end_date = datetime.now().strftime('%Y-%m-%d')
# 크둀링
df = await self.collect_all(start_date, end_date)
if df.empty:
print("βœ“ μƒˆλ‘œμš΄ 데이터 μ—†μŒ")
return
# 둜컬 μ €μž₯
self.save_local(df)
# ν—ˆκΉ…νŽ˜μ΄μŠ€ μ—…λ‘œλ“œ
self.upload_to_huggingface(df)
# μƒνƒœ μ €μž₯
state['last_crawl_date'] = end_date
state['last_crawl_time'] = datetime.now().isoformat()
state['last_count'] = len(df)
self.save_state(state)
print(f"\n{'='*60}")
print(f"βœ“ μ™„λ£Œ! λ‹€μŒ μ‹€ν–‰: 내일")
print(f"{'='*60}\n")
async def main():
"""메인 ν•¨μˆ˜"""
crawler = MinjooAsyncCrawler()
await crawler.run_incremental()
if __name__ == "__main__":
asyncio.run(main())