KoreanPartyCommunication / basic_income_crawler_async.py
hanjunlee's picture
Upload 23 files
3a36548 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
κΈ°λ³Έμ†Œλ“λ‹Ή 크둀러 - κ³ μ„±λŠ₯ 비동기 버전 + ν—ˆκΉ…νŽ˜μ΄μŠ€ μžλ™ μ—…λ‘œλ“œ
- κ·Έλˆ„λ³΄λ“œ 5 기반 μ‚¬μ΄νŠΈ (basicincomeparty.kr)
- td.td_subject / td.td_datetime(YY.MM.DD.) / div#bo_v_con ꡬ쑰
"""
import os
import json
import re
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import pandas as pd
from tqdm.asyncio import tqdm as async_tqdm
import aiohttp
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from huggingface_hub import HfApi, login
from datasets import Dataset, load_dataset
load_dotenv()
class BasicIncomeAsyncCrawler:
def __init__(self, config_path="crawler_config.json"):
self.base_url = "https://basicincomeparty.kr"
self.party_name = "κΈ°λ³Έμ†Œλ“λ‹Ή"
self.config_path = config_path
self.state_path = "crawler_state.json"
self.load_config()
self.hf_token = os.getenv("HF_TOKEN")
self.hf_repo_id = os.getenv("HF_REPO_ID_BASIC_INCOME", "basic-income-press-releases")
self.semaphore = asyncio.Semaphore(10)
def load_config(self):
default_config = {
"boards": {
"λ…Όν‰λ³΄λ„μžλ£Œ": "bikr/press"
},
"start_date": "2020-01-08",
"max_pages": 10000,
"concurrent_requests": 10,
"request_delay": 0.3,
"output_path": "./data"
}
if os.path.exists(self.config_path):
with open(self.config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
self.config = config.get('basic_income', default_config)
else:
self.config = default_config
self.boards = self.config["boards"]
self.start_date = self.config["start_date"]
self.max_pages = self.config["max_pages"]
self.output_path = self.config["output_path"]
def load_state(self) -> Dict:
if os.path.exists(self.state_path):
with open(self.state_path, 'r', encoding='utf-8') as f:
state = json.load(f)
return state.get('basic_income', {})
return {}
def save_state(self, state: Dict):
all_state = {}
if os.path.exists(self.state_path):
with open(self.state_path, 'r', encoding='utf-8') as f:
all_state = json.load(f)
all_state['basic_income'] = state
with open(self.state_path, 'w', encoding='utf-8') as f:
json.dump(all_state, f, ensure_ascii=False, indent=2)
@staticmethod
def parse_date(date_str: str) -> Optional[datetime]:
"""YY.MM.DD. λ˜λŠ” YYYY.MM.DD. λ˜λŠ” YYYY-MM-DD νŒŒμ‹±"""
date_str = date_str.strip().rstrip('.')
try:
parts = date_str.split('.')
if len(parts) >= 3:
year = int(parts[0])
year = 2000 + year if year < 100 else year
return datetime(year, int(parts[1]), int(parts[2]))
except:
pass
try:
return datetime.strptime(date_str[:10], '%Y-%m-%d')
except:
return None
@staticmethod
def clean_text(text: str) -> str:
text = text.replace('\xa0', '').replace('\u200b', '').replace('​', '')
return text.strip()
async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str,
max_retries: int = 3) -> Optional[str]:
async with self.semaphore:
for attempt in range(max_retries):
try:
await asyncio.sleep(self.config.get("request_delay", 0.3))
async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response:
if response.status == 200:
return await response.text()
except Exception:
if attempt < max_retries - 1:
await asyncio.sleep(1)
else:
return None
return None
async def fetch_list_page(self, session: aiohttp.ClientSession,
board_name: str, board_path: str, page_num: int,
start_date: datetime, end_date: datetime) -> tuple:
url = f"{self.base_url}/{board_path}?page={page_num}"
html = await self.fetch_with_retry(session, url)
if not html:
return [], False
soup = BeautifulSoup(html, 'html.parser')
rows = soup.select('table tbody tr')
if not rows:
return [], True
data = []
stop_flag = False
for row in rows:
try:
# 제λͺ©Β·URL: td.td_subject div.bo_tit a
title_a = row.select_one('td.td_subject div.bo_tit a')
if not title_a:
continue
title = title_a.get_text(strip=True)
href = title_a.get('href', '')
# page νŒŒλΌλ―Έν„° 제거 ν›„ μ ˆλŒ€ URL
article_url = re.sub(r'\?.*$', '', href)
if not article_url.startswith('http'):
article_url = self.base_url + article_url
# λ‚ μ§œ: td.td_datetime (YY.MM.DD. ν˜•μ‹)
date_td = row.select_one('td.td_datetime')
if not date_td:
continue
date_str = date_td.get_text(strip=True)
# μΉ΄ν…Œκ³ λ¦¬: td.td_num2 a.bo_cate_link
cate_a = row.select_one('td.td_num2 a.bo_cate_link')
category = cate_a.get_text(strip=True) if cate_a else ""
article_date = self.parse_date(date_str)
if not article_date:
continue
if article_date < start_date:
stop_flag = True
break
if article_date > end_date:
continue
data.append({
'board_name': board_name,
'title': title,
'category': category,
'date': article_date.strftime('%Y-%m-%d'), # YYYY-MM-DD μ •κ·œν™”
'url': article_url
})
except:
continue
return data, stop_flag
async def fetch_article_detail(self, session: aiohttp.ClientSession, url: str) -> Dict:
html = await self.fetch_with_retry(session, url)
if not html:
return {'text': "λ³Έλ¬Έ 쑰회 μ‹€νŒ¨", 'writer': ""}
soup = BeautifulSoup(html, 'html.parser')
text_parts = []
writer = ""
# λ³Έλ¬Έ: div#bo_v_con
contents_div = soup.find('div', id='bo_v_con')
if contents_div:
for p in contents_div.find_all('p'):
cleaned = self.clean_text(p.get_text(strip=True))
if cleaned:
text_parts.append(cleaned)
# μž‘μ„±μž: section#bo_v_info div.profile_info_ct μ•ˆμ˜ span.sv_member
info_div = soup.select_one('section#bo_v_info div.profile_info_ct')
if info_div:
writer_el = info_div.find('span', class_='sv_member')
if writer_el:
writer = writer_el.get_text(strip=True)
return {'text': '\n'.join(text_parts), 'writer': writer}
async def collect_board(self, board_name: str, board_path: str,
start_date: str, end_date: str) -> List[Dict]:
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
print(f"\nβ–Ά [{board_name}] λͺ©λ‘ μˆ˜μ§‘ μ‹œμž‘...")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'ko-KR,ko;q=0.9',
}
async with aiohttp.ClientSession(headers=headers) as session:
all_items = []
page_num = 1
empty_pages = 0
max_empty_pages = 3
with async_tqdm(desc=f"[{board_name}] λͺ©λ‘", unit="νŽ˜μ΄μ§€") as pbar:
while page_num <= self.max_pages:
items, stop_flag = await self.fetch_list_page(
session, board_name, board_path, page_num, start_dt, end_dt
)
if not items:
empty_pages += 1
if empty_pages >= max_empty_pages or stop_flag:
break
else:
empty_pages = 0
all_items.extend(items)
pbar.update(1)
pbar.set_postfix({"μˆ˜μ§‘": len(all_items)})
if stop_flag:
break
page_num += 1
print(f" βœ“ {len(all_items)}개 ν•­λͺ© 발견")
if all_items:
print(f" β–Ά 상세 νŽ˜μ΄μ§€ μˆ˜μ§‘ 쀑...")
tasks = [self.fetch_article_detail(session, item['url']) for item in all_items]
details = []
for coro in async_tqdm(asyncio.as_completed(tasks),
total=len(tasks),
desc=f"[{board_name}] 상세"):
detail = await coro
details.append(detail)
for item, detail in zip(all_items, details):
item.update(detail)
print(f"βœ“ [{board_name}] μ™„λ£Œ: {len(all_items)}개")
return all_items
async def collect_all(self, start_date: Optional[str] = None,
end_date: Optional[str] = None) -> pd.DataFrame:
if not end_date:
end_date = datetime.now().strftime('%Y-%m-%d')
if not start_date:
start_date = self.start_date
print(f"\n{'='*60}")
print(f"κΈ°λ³Έμ†Œλ“λ‹Ή λ³΄λ„μžλ£Œ μˆ˜μ§‘ - 비동기 κ³ μ„±λŠ₯ 버전")
print(f"κΈ°κ°„: {start_date} ~ {end_date}")
print(f"{'='*60}")
tasks = [
self.collect_board(board_name, board_path, start_date, end_date)
for board_name, board_path in self.boards.items()
]
results = await asyncio.gather(*tasks)
all_data = []
for items in results:
all_data.extend(items)
if not all_data:
print("\n⚠️ μˆ˜μ§‘λœ 데이터 μ—†μŒ")
return pd.DataFrame()
df = pd.DataFrame(all_data)
df = df[['board_name', 'title', 'category', 'date', 'writer', 'text', 'url']]
df = df[(df['title'] != "") & (df['text'] != "")]
df['date'] = pd.to_datetime(df['date'], errors='coerce')
print(f"\nβœ“ 총 {len(df)}개 μˆ˜μ§‘ μ™„λ£Œ")
return df
def save_local(self, df: pd.DataFrame):
os.makedirs(self.output_path, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
csv_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.csv")
xlsx_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.xlsx")
df.to_csv(csv_path, index=False, encoding='utf-8-sig')
df.to_excel(xlsx_path, index=False, engine='openpyxl')
print(f"βœ“ CSV: {csv_path}")
print(f"βœ“ Excel: {xlsx_path}")
def upload_to_huggingface(self, df: pd.DataFrame):
if not self.hf_token:
print("\n⚠️ HF_TOKEN이 μ„€μ •λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€.")
return
print(f"\nβ–Ά ν—ˆκΉ…νŽ˜μ΄μŠ€ μ—…λ‘œλ“œ 쀑... (repo: {self.hf_repo_id})")
try:
login(token=self.hf_token)
new_dataset = Dataset.from_pandas(df)
try:
existing_dataset = load_dataset(self.hf_repo_id, split='train')
existing_df = existing_dataset.to_pandas()
combined_df = pd.concat([existing_df, df], ignore_index=True)
combined_df = combined_df.drop_duplicates(subset=['url'], keep='last')
combined_df = combined_df.sort_values('date', ascending=False).reset_index(drop=True)
final_dataset = Dataset.from_pandas(combined_df)
print(f" βœ“ 병합 ν›„: {len(final_dataset)}개")
except:
final_dataset = new_dataset
print(f" ℹ️ μ‹ κ·œ 데이터셋 생성")
final_dataset.push_to_hub(self.hf_repo_id, token=self.hf_token)
print(f"βœ“ ν—ˆκΉ…νŽ˜μ΄μŠ€ μ—…λ‘œλ“œ μ™„λ£Œ!")
except Exception as e:
print(f"βœ— μ—…λ‘œλ“œ μ‹€νŒ¨: {e}")
async def run_incremental(self):
state = self.load_state()
last_date = state.get('last_crawl_date')
if last_date:
start_date = (datetime.strptime(last_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d')
print(f"πŸ“… 증뢄 μ—…λ°μ΄νŠΈ: {start_date} 이후 λ°μ΄ν„°λ§Œ μˆ˜μ§‘")
else:
start_date = self.start_date
print(f"πŸ“… 전체 μˆ˜μ§‘: {start_date}λΆ€ν„°")
end_date = datetime.now().strftime('%Y-%m-%d')
df = await self.collect_all(start_date, end_date)
if df.empty:
print("βœ“ μƒˆλ‘œμš΄ 데이터 μ—†μŒ")
return
self.save_local(df)
self.upload_to_huggingface(df)
state['last_crawl_date'] = end_date
state['last_crawl_time'] = datetime.now().isoformat()
state['last_count'] = len(df)
self.save_state(state)
print(f"\n{'='*60}\nβœ“ μ™„λ£Œ!\n{'='*60}\n")
async def main():
crawler = BasicIncomeAsyncCrawler()
await crawler.run_incremental()
if __name__ == "__main__":
asyncio.run(main())