KoreanPartyCommunication / rebuilding_crawler_async.py
hanjunlee's picture
Upload 23 files
3a36548 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
μ‘°κ΅­ν˜μ‹ λ‹Ή 크둀러 - κ³ μ„±λŠ₯ 비동기 버전 + ν—ˆκΉ…νŽ˜μ΄μŠ€ μžλ™ μ—…λ‘œλ“œ
- κΈ°μ‘΄ sync(requests) 방식을 async(aiohttp) 둜 μ „ν™˜
- 증뢄 μ—…λ°μ΄νŠΈ, ν—ˆκΉ…νŽ˜μ΄μŠ€ μžλ™ μ—…λ‘œλ“œ
"""
import os
import json
import re
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import pandas as pd
from tqdm.asyncio import tqdm as async_tqdm
import aiohttp
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from huggingface_hub import HfApi, login
from datasets import Dataset, load_dataset
load_dotenv()
class RebuildingAsyncCrawler:
def __init__(self, config_path="crawler_config.json"):
self.base_url = "https://rebuildingkoreaparty.kr"
self.party_name = "μ‘°κ΅­ν˜μ‹ λ‹Ή"
self.config_path = config_path
self.state_path = "crawler_state.json"
self.load_config()
self.hf_token = os.getenv("HF_TOKEN")
self.hf_repo_id = os.getenv("HF_REPO_ID_REBUILDING", "rebuilding-press-releases")
self.semaphore = asyncio.Semaphore(10)
def load_config(self):
default_config = {
"boards": {
"기자회견문": "news/press-conference",
"λ…Όν‰λΈŒλ¦¬ν•‘": "news/commentary-briefing",
"λ³΄λ„μžλ£Œ": "news/press-release"
},
"start_date": "2024-03-04",
"max_pages": 10000,
"concurrent_requests": 10,
"request_delay": 0.5,
"output_path": "./data"
}
if os.path.exists(self.config_path):
with open(self.config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
self.config = config.get('rebuilding', default_config)
else:
self.config = default_config
self.boards = self.config["boards"]
self.start_date = self.config["start_date"]
self.max_pages = self.config["max_pages"]
self.output_path = self.config["output_path"]
def load_state(self) -> Dict:
if os.path.exists(self.state_path):
with open(self.state_path, 'r', encoding='utf-8') as f:
state = json.load(f)
return state.get('rebuilding', {})
return {}
def save_state(self, state: Dict):
all_state = {}
if os.path.exists(self.state_path):
with open(self.state_path, 'r', encoding='utf-8') as f:
all_state = json.load(f)
all_state['rebuilding'] = state
with open(self.state_path, 'w', encoding='utf-8') as f:
json.dump(all_state, f, ensure_ascii=False, indent=2)
@staticmethod
def parse_date(date_str: str) -> Optional[datetime]:
try:
return datetime.strptime(date_str.strip(), '%Y-%m-%d')
except:
return None
@staticmethod
def clean_text(text: str) -> str:
text = text.replace('\xa0', '').replace('\u200b', '').replace('​', '')
return text.strip()
async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str,
max_retries: int = 3) -> Optional[str]:
async with self.semaphore:
for attempt in range(max_retries):
try:
await asyncio.sleep(self.config.get("request_delay", 0.5))
async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
return await response.text()
except Exception:
if attempt < max_retries - 1:
await asyncio.sleep(1)
else:
return None
return None
async def fetch_list_page(self, session: aiohttp.ClientSession,
board_name: str, board_path: str, page_num: int,
start_date: datetime, end_date: datetime) -> tuple:
if page_num == 1:
url = f"{self.base_url}/{board_path}"
else:
url = f"{self.base_url}/{board_path}?page={page_num}"
html = await self.fetch_with_retry(session, url)
if not html:
return [], False
soup = BeautifulSoup(html, 'html.parser')
# <a href="/news/{board_path}/..."> νŒ¨ν„΄μœΌλ‘œ κ²Œμ‹œκΈ€ 링크 탐색
article_links = soup.find_all('a', href=re.compile(f'^/news/{re.escape(board_path)}/'))
if not article_links:
return [], True
data = []
stop_flag = False
seen_urls = set()
for link in article_links:
try:
article_url = link.get('href', '')
if article_url.startswith('/'):
article_url = self.base_url + article_url
if article_url in seen_urls:
continue
seen_urls.add(article_url)
title = link.get_text(strip=True).replace('\n', ' ')
# 같은 <ul> μ•ˆμ—μ„œ λ‚ μ§œΒ·μΉ΄ν…Œκ³ λ¦¬ μΆ”μΆœ
parent = link.find_parent('ul')
if not parent:
parent_li = link.find_parent('li')
if parent_li:
parent = parent_li.find_parent('ul')
date_str = ""
category = ""
if parent:
date_li = parent.find('li', {'class': 'td date'})
if date_li:
date_str = date_li.get_text(strip=True)
cate_li = parent.find('li', {'class': 'td category'})
if cate_li:
category = cate_li.get_text(strip=True)
if not date_str:
continue
article_date = self.parse_date(date_str)
if not article_date:
continue
if article_date < start_date:
stop_flag = True
break
if article_date > end_date:
continue
data.append({
'board_name': board_name,
'category': category,
'title': title,
'date': date_str,
'url': article_url
})
except:
continue
return data, stop_flag
async def fetch_article_detail(self, session: aiohttp.ClientSession, url: str) -> Dict:
html = await self.fetch_with_retry(session, url)
if not html:
return {'text': "λ³Έλ¬Έ 쑰회 μ‹€νŒ¨", 'writer': ""}
soup = BeautifulSoup(html, 'html.parser')
text_parts = []
writer = ""
# λ³Έλ¬Έ: <div class="editor ck-content"> μ•ˆμ˜ <p> νƒœκ·Έ
contents_div = soup.find('div', {'class': 'editor ck-content'})
if contents_div:
paragraphs = contents_div.find_all('p')
for p in paragraphs:
cleaned = self.clean_text(p.get_text(strip=True))
if cleaned:
text_parts.append(cleaned)
# μž‘μ„±μž: 끝μͺ½ <p> μ—μ„œ λ‹Ήλͺ…/λŒ€λ³€μΈ 포함 ν…μŠ€νŠΈ
for p in reversed(paragraphs):
cleaned = self.clean_text(p.get_text(strip=True))
if 'μ‘°κ΅­ν˜μ‹ λ‹Ή' in cleaned or 'λŒ€λ³€μΈ' in cleaned or 'μœ„μ›νšŒ' in cleaned:
writer = cleaned
break
return {'text': '\n'.join(text_parts), 'writer': writer}
async def collect_board(self, board_name: str, board_path: str,
start_date: str, end_date: str) -> List[Dict]:
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
print(f"\nβ–Ά [{board_name}] λͺ©λ‘ μˆ˜μ§‘ μ‹œμž‘...")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'ko-KR,ko;q=0.9',
}
async with aiohttp.ClientSession(headers=headers) as session:
all_items = []
page_num = 1
empty_pages = 0
max_empty_pages = 3
with async_tqdm(desc=f"[{board_name}] λͺ©λ‘", unit="νŽ˜μ΄μ§€") as pbar:
while page_num <= self.max_pages:
items, stop_flag = await self.fetch_list_page(
session, board_name, board_path, page_num, start_dt, end_dt
)
if not items:
empty_pages += 1
if empty_pages >= max_empty_pages or stop_flag:
break
else:
empty_pages = 0
all_items.extend(items)
pbar.update(1)
pbar.set_postfix({"μˆ˜μ§‘": len(all_items)})
if stop_flag:
break
page_num += 1
print(f" βœ“ {len(all_items)}개 ν•­λͺ© 발견")
if all_items:
print(f" β–Ά 상세 νŽ˜μ΄μ§€ μˆ˜μ§‘ 쀑...")
tasks = [self.fetch_article_detail(session, item['url']) for item in all_items]
details = []
for coro in async_tqdm(asyncio.as_completed(tasks),
total=len(tasks),
desc=f"[{board_name}] 상세"):
detail = await coro
details.append(detail)
for item, detail in zip(all_items, details):
item.update(detail)
print(f"βœ“ [{board_name}] μ™„λ£Œ: {len(all_items)}개")
return all_items
async def collect_all(self, start_date: Optional[str] = None,
end_date: Optional[str] = None) -> pd.DataFrame:
if not end_date:
end_date = datetime.now().strftime('%Y-%m-%d')
if not start_date:
start_date = self.start_date
print(f"\n{'='*60}")
print(f"μ‘°κ΅­ν˜μ‹ λ‹Ή λ³΄λ„μžλ£Œ μˆ˜μ§‘ - 비동기 κ³ μ„±λŠ₯ 버전")
print(f"κΈ°κ°„: {start_date} ~ {end_date}")
print(f"{'='*60}")
tasks = [
self.collect_board(board_name, board_path, start_date, end_date)
for board_name, board_path in self.boards.items()
]
results = await asyncio.gather(*tasks)
all_data = []
for items in results:
all_data.extend(items)
if not all_data:
print("\n⚠️ μˆ˜μ§‘λœ 데이터 μ—†μŒ")
return pd.DataFrame()
df = pd.DataFrame(all_data)
df = df[['board_name', 'title', 'category', 'date', 'writer', 'text', 'url']]
df = df[(df['title'] != "") & (df['text'] != "")]
df['date'] = pd.to_datetime(df['date'], errors='coerce')
print(f"\nβœ“ 총 {len(df)}개 μˆ˜μ§‘ μ™„λ£Œ")
return df
def save_local(self, df: pd.DataFrame):
os.makedirs(self.output_path, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
csv_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.csv")
xlsx_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.xlsx")
df.to_csv(csv_path, index=False, encoding='utf-8-sig')
df.to_excel(xlsx_path, index=False, engine='openpyxl')
print(f"βœ“ CSV: {csv_path}")
print(f"βœ“ Excel: {xlsx_path}")
def upload_to_huggingface(self, df: pd.DataFrame):
if not self.hf_token:
print("\n⚠️ HF_TOKEN이 μ„€μ •λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€.")
return
print(f"\nβ–Ά ν—ˆκΉ…νŽ˜μ΄μŠ€ μ—…λ‘œλ“œ 쀑... (repo: {self.hf_repo_id})")
try:
login(token=self.hf_token)
new_dataset = Dataset.from_pandas(df)
try:
existing_dataset = load_dataset(self.hf_repo_id, split='train')
existing_df = existing_dataset.to_pandas()
combined_df = pd.concat([existing_df, df], ignore_index=True)
combined_df = combined_df.drop_duplicates(subset=['url'], keep='last')
combined_df = combined_df.sort_values('date', ascending=False).reset_index(drop=True)
final_dataset = Dataset.from_pandas(combined_df)
print(f" βœ“ 병합 ν›„: {len(final_dataset)}개")
except:
final_dataset = new_dataset
print(f" ℹ️ μ‹ κ·œ 데이터셋 생성")
final_dataset.push_to_hub(self.hf_repo_id, token=self.hf_token)
print(f"βœ“ ν—ˆκΉ…νŽ˜μ΄μŠ€ μ—…λ‘œλ“œ μ™„λ£Œ!")
except Exception as e:
print(f"βœ— μ—…λ‘œλ“œ μ‹€νŒ¨: {e}")
async def run_incremental(self):
state = self.load_state()
last_date = state.get('last_crawl_date')
if last_date:
start_date = (datetime.strptime(last_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d')
print(f"πŸ“… 증뢄 μ—…λ°μ΄νŠΈ: {start_date} 이후 λ°μ΄ν„°λ§Œ μˆ˜μ§‘")
else:
start_date = self.start_date
print(f"πŸ“… 전체 μˆ˜μ§‘: {start_date}λΆ€ν„°")
end_date = datetime.now().strftime('%Y-%m-%d')
df = await self.collect_all(start_date, end_date)
if df.empty:
print("βœ“ μƒˆλ‘œμš΄ 데이터 μ—†μŒ")
return
self.save_local(df)
self.upload_to_huggingface(df)
state['last_crawl_date'] = end_date
state['last_crawl_time'] = datetime.now().isoformat()
state['last_count'] = len(df)
self.save_state(state)
print(f"\n{'='*60}\nβœ“ μ™„λ£Œ!\n{'='*60}\n")
async def main():
crawler = RebuildingAsyncCrawler()
await crawler.run_incremental()
if __name__ == "__main__":
asyncio.run(main())