KoreanPartyCommunication / jinbo_crawler_async.py
hanjunlee's picture
Upload 23 files
3a36548 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
진보당 크둀러 - κ³ μ„±λŠ₯ 비동기 버전 + ν—ˆκΉ…νŽ˜μ΄μŠ€ μžλ™ μ—…λ‘œλ“œ
- jinboparty.com 자체 CMS μ‚¬μš©
- λ³΄λ„μžλ£Œ: μΉ΄λ“œν˜• λ ˆμ΄μ•„μ›ƒ (div.img_list_item)
- 논평/λͺ¨λ‘λ°œμ–Έ: ν…Œμ΄λΈ”ν˜• λ ˆμ΄μ•„μ›ƒ (div#moTable)
- js_board_view('ID') β†’ /pages/?p=...&b=...&bn=ID&m=read νŒ¨ν„΄
"""
import os
import json
import re
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import pandas as pd
from tqdm.asyncio import tqdm as async_tqdm
import aiohttp
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from huggingface_hub import HfApi, login
from datasets import Dataset, load_dataset
load_dotenv()
class JinboAsyncCrawler:
def __init__(self, config_path="crawler_config.json"):
self.base_url = "https://jinboparty.com"
self.party_name = "진보당"
self.config_path = config_path
self.state_path = "crawler_state.json"
self.load_config()
self.hf_token = os.getenv("HF_TOKEN")
self.hf_repo_id = os.getenv("HF_REPO_ID_JINBO", "jinbo-press-releases")
self.semaphore = asyncio.Semaphore(10)
def load_config(self):
# boards 값은 {"p": "...", "b": "..."} ν˜•νƒœμ˜ dict
default_config = {
"boards": {
"λ³΄λ„μžλ£Œ": {"p": "286", "b": "b_1_111"},
"논평": {"p": "15", "b": "b_1_2"},
"λͺ¨λ‘λ°œμ–Έ": {"p": "14", "b": "b_1_1"}
},
"start_date": "2017-10-14",
"max_pages": 10000,
"concurrent_requests": 10,
"request_delay": 0.3,
"output_path": "./data"
}
if os.path.exists(self.config_path):
with open(self.config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
self.config = config.get('jinbo', default_config)
else:
self.config = default_config
self.boards = self.config["boards"]
self.start_date = self.config["start_date"]
self.max_pages = self.config["max_pages"]
self.output_path = self.config["output_path"]
def load_state(self) -> Dict:
if os.path.exists(self.state_path):
with open(self.state_path, 'r', encoding='utf-8') as f:
state = json.load(f)
return state.get('jinbo', {})
return {}
def save_state(self, state: Dict):
all_state = {}
if os.path.exists(self.state_path):
with open(self.state_path, 'r', encoding='utf-8') as f:
all_state = json.load(f)
all_state['jinbo'] = state
with open(self.state_path, 'w', encoding='utf-8') as f:
json.dump(all_state, f, ensure_ascii=False, indent=2)
@staticmethod
def parse_date(date_str: str) -> Optional[datetime]:
"""YYYY.MM.DD λ˜λŠ” YYYY-MM-DD νŒŒμ‹±"""
date_str = date_str.strip()
for fmt in ('%Y.%m.%d', '%Y-%m-%d'):
try:
return datetime.strptime(date_str[:10], fmt)
except:
continue
return None
@staticmethod
def clean_text(text: str) -> str:
text = text.replace('\xa0', '').replace('\u200b', '').replace('​', '')
return text.strip()
@staticmethod
def extract_board_id(href: str) -> Optional[str]:
"""js_board_view('ID') μ—μ„œ ID μΆ”μΆœ"""
match = re.search(r"js_board_view\('(\d+)'\)", href)
return match.group(1) if match else None
async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str,
max_retries: int = 3) -> Optional[str]:
async with self.semaphore:
for attempt in range(max_retries):
try:
await asyncio.sleep(self.config.get("request_delay", 0.3))
async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
return await response.text()
except Exception:
if attempt < max_retries - 1:
await asyncio.sleep(1)
else:
return None
return None
async def fetch_list_page(self, session: aiohttp.ClientSession,
board_name: str, board_cfg: Dict, page_num: int,
start_date: datetime, end_date: datetime) -> tuple:
p = board_cfg['p']
b = board_cfg['b']
url = f"{self.base_url}/pages/index.php?nPage={page_num}&p={p}&b={b}"
html = await self.fetch_with_retry(session, url)
if not html:
return [], False
soup = BeautifulSoup(html, 'html.parser')
data = []
stop_flag = False
# ── μΉ΄λ“œν˜• λ ˆμ΄μ•„μ›ƒ (λ³΄λ„μžλ£Œ) ──────────────────────────────
card_items = soup.select('div.img_list_item')
if card_items:
for item in card_items:
try:
link = item.select_one('a[href]')
if not link:
continue
bn = self.extract_board_id(link.get('href', ''))
if not bn:
continue
title_el = item.select_one('h4._tit span')
title = title_el.get_text(strip=True) if title_el else ""
# λ‚ μ§œ: icon_cal λ‹€μŒ span
date_str = ""
for span in item.select('div.item_bottom span'):
text = span.get_text(strip=True)
if re.match(r'\d{4}\.\d{2}\.\d{2}', text):
date_str = text[:10]
break
if not date_str:
continue
article_date = self.parse_date(date_str)
if not article_date:
continue
if article_date < start_date:
stop_flag = True
break
if article_date > end_date:
continue
detail_url = f"{self.base_url}/pages/?p={p}&b={b}&bn={bn}&m=read"
data.append({
'board_name': board_name,
'title': title,
'category': board_name,
'date': article_date.strftime('%Y-%m-%d'),
'url': detail_url
})
except:
continue
return data, stop_flag
# ── ν…Œμ΄λΈ”ν˜• λ ˆμ΄μ•„μ›ƒ (논평·λͺ¨λ‘λ°œμ–Έ) ──────────────────────
table_items = soup.select('div#moTable li:not(.t_head)')
if table_items:
for item in table_items:
try:
link = item.select_one('div.tb_title_area a')
if not link:
continue
bn = self.extract_board_id(link.get('href', ''))
if not bn:
continue
title_el = item.select_one('p.title')
title = title_el.get_text(strip=True) if title_el else ""
# λ‚ μ§œ: div.col.wid_140 ("등둝일 YYYY.MM.DD")
date_div = item.select_one('div.col.wid_140')
date_str = ""
if date_div:
raw = re.sub(r'등둝일\s*', '', date_div.get_text(strip=True)).strip()
date_str = raw[:10]
if not date_str:
continue
article_date = self.parse_date(date_str)
if not article_date:
continue
if article_date < start_date:
stop_flag = True
break
if article_date > end_date:
continue
detail_url = f"{self.base_url}/pages/?p={p}&b={b}&bn={bn}&m=read"
data.append({
'board_name': board_name,
'title': title,
'category': board_name,
'date': article_date.strftime('%Y-%m-%d'),
'url': detail_url
})
except:
continue
return data, stop_flag
# λ‘˜ λ‹€ μ—†μœΌλ©΄ 빈 νŽ˜μ΄μ§€
return [], True
async def fetch_article_detail(self, session: aiohttp.ClientSession, url: str) -> Dict:
html = await self.fetch_with_retry(session, url)
if not html:
return {'text': "λ³Έλ¬Έ 쑰회 μ‹€νŒ¨", 'writer': ""}
soup = BeautifulSoup(html, 'html.parser')
text_parts = []
writer = ""
# λ³Έλ¬Έ: div.content_box (class="td wid_full content_box")
contents_div = soup.select_one('div.content_box')
if contents_div:
for p in contents_div.find_all('p'):
cleaned = self.clean_text(p.get_text(strip=True))
if cleaned:
text_parts.append(cleaned)
# μž‘μ„±μž: ul.info_list li 쀑 "μž‘μ„±μž" ν•­λͺ©
for li in soup.select('ul.info_list li'):
b_tag = li.find('b')
if b_tag and 'μž‘μ„±μž' in b_tag.get_text():
writer = li.get_text(strip=True).replace(b_tag.get_text(strip=True), '').strip()
break
return {'text': '\n'.join(text_parts), 'writer': writer}
async def collect_board(self, board_name: str, board_cfg: Dict,
start_date: str, end_date: str) -> List[Dict]:
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
print(f"\nβ–Ά [{board_name}] λͺ©λ‘ μˆ˜μ§‘ μ‹œμž‘...")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'ko-KR,ko;q=0.9',
}
async with aiohttp.ClientSession(headers=headers) as session:
all_items = []
page_num = 1
empty_pages = 0
max_empty_pages = 3
with async_tqdm(desc=f"[{board_name}] λͺ©λ‘", unit="νŽ˜μ΄μ§€") as pbar:
while page_num <= self.max_pages:
items, stop_flag = await self.fetch_list_page(
session, board_name, board_cfg, page_num, start_dt, end_dt
)
if not items:
empty_pages += 1
if empty_pages >= max_empty_pages or stop_flag:
break
else:
empty_pages = 0
all_items.extend(items)
pbar.update(1)
pbar.set_postfix({"μˆ˜μ§‘": len(all_items)})
if stop_flag:
break
page_num += 1
print(f" βœ“ {len(all_items)}개 ν•­λͺ© 발견")
if all_items:
print(f" β–Ά 상세 νŽ˜μ΄μ§€ μˆ˜μ§‘ 쀑...")
tasks = [self.fetch_article_detail(session, item['url']) for item in all_items]
details = []
for coro in async_tqdm(asyncio.as_completed(tasks),
total=len(tasks),
desc=f"[{board_name}] 상세"):
detail = await coro
details.append(detail)
for item, detail in zip(all_items, details):
item.update(detail)
print(f"βœ“ [{board_name}] μ™„λ£Œ: {len(all_items)}개")
return all_items
async def collect_all(self, start_date: Optional[str] = None,
end_date: Optional[str] = None) -> pd.DataFrame:
if not end_date:
end_date = datetime.now().strftime('%Y-%m-%d')
if not start_date:
start_date = self.start_date
print(f"\n{'='*60}")
print(f"진보당 λ³΄λ„μžλ£Œ μˆ˜μ§‘ - 비동기 κ³ μ„±λŠ₯ 버전")
print(f"κΈ°κ°„: {start_date} ~ {end_date}")
print(f"{'='*60}")
tasks = [
self.collect_board(board_name, board_cfg, start_date, end_date)
for board_name, board_cfg in self.boards.items()
]
results = await asyncio.gather(*tasks)
all_data = []
for items in results:
all_data.extend(items)
if not all_data:
print("\n⚠️ μˆ˜μ§‘λœ 데이터 μ—†μŒ")
return pd.DataFrame()
df = pd.DataFrame(all_data)
df = df[['board_name', 'title', 'category', 'date', 'writer', 'text', 'url']]
df = df[(df['title'] != "") & (df['text'] != "")]
df['date'] = pd.to_datetime(df['date'], errors='coerce')
print(f"\nβœ“ 총 {len(df)}개 μˆ˜μ§‘ μ™„λ£Œ")
return df
def save_local(self, df: pd.DataFrame):
os.makedirs(self.output_path, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
csv_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.csv")
xlsx_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.xlsx")
df.to_csv(csv_path, index=False, encoding='utf-8-sig')
df.to_excel(xlsx_path, index=False, engine='openpyxl')
print(f"βœ“ CSV: {csv_path}")
print(f"βœ“ Excel: {xlsx_path}")
def upload_to_huggingface(self, df: pd.DataFrame):
if not self.hf_token:
print("\n⚠️ HF_TOKEN이 μ„€μ •λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€.")
return
print(f"\nβ–Ά ν—ˆκΉ…νŽ˜μ΄μŠ€ μ—…λ‘œλ“œ 쀑... (repo: {self.hf_repo_id})")
try:
login(token=self.hf_token)
new_dataset = Dataset.from_pandas(df)
try:
existing_dataset = load_dataset(self.hf_repo_id, split='train')
existing_df = existing_dataset.to_pandas()
combined_df = pd.concat([existing_df, df], ignore_index=True)
combined_df = combined_df.drop_duplicates(subset=['url'], keep='last')
combined_df = combined_df.sort_values('date', ascending=False).reset_index(drop=True)
final_dataset = Dataset.from_pandas(combined_df)
print(f" βœ“ 병합 ν›„: {len(final_dataset)}개")
except:
final_dataset = new_dataset
print(f" ℹ️ μ‹ κ·œ 데이터셋 생성")
final_dataset.push_to_hub(self.hf_repo_id, token=self.hf_token)
print(f"βœ“ ν—ˆκΉ…νŽ˜μ΄μŠ€ μ—…λ‘œλ“œ μ™„λ£Œ!")
except Exception as e:
print(f"βœ— μ—…λ‘œλ“œ μ‹€νŒ¨: {e}")
async def run_incremental(self):
state = self.load_state()
last_date = state.get('last_crawl_date')
if last_date:
start_date = (datetime.strptime(last_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d')
print(f"πŸ“… 증뢄 μ—…λ°μ΄νŠΈ: {start_date} 이후 λ°μ΄ν„°λ§Œ μˆ˜μ§‘")
else:
start_date = self.start_date
print(f"πŸ“… 전체 μˆ˜μ§‘: {start_date}λΆ€ν„°")
end_date = datetime.now().strftime('%Y-%m-%d')
df = await self.collect_all(start_date, end_date)
if df.empty:
print("βœ“ μƒˆλ‘œμš΄ 데이터 μ—†μŒ")
return
self.save_local(df)
self.upload_to_huggingface(df)
state['last_crawl_date'] = end_date
state['last_crawl_time'] = datetime.now().isoformat()
state['last_count'] = len(df)
self.save_state(state)
print(f"\n{'='*60}\nβœ“ μ™„λ£Œ!\n{'='*60}\n")
async def main():
crawler = JinboAsyncCrawler()
await crawler.run_incremental()
if __name__ == "__main__":
asyncio.run(main())