|
|
|
|
| """
|
| μ‘°κ΅νμ λΉ ν¬λ‘€λ¬ - κ³ μ±λ₯ λΉλκΈ° λ²μ + νκΉ
νμ΄μ€ μλ μ
λ‘λ
|
| - κΈ°μ‘΄ sync(requests) λ°©μμ async(aiohttp) λ‘ μ ν
|
| - μ¦λΆ μ
λ°μ΄νΈ, νκΉ
νμ΄μ€ μλ μ
λ‘λ
|
| """
|
|
|
| import os
|
| import json
|
| import re
|
| import asyncio
|
| from datetime import datetime, timedelta
|
| from typing import List, Dict, Optional
|
| import pandas as pd
|
| from tqdm.asyncio import tqdm as async_tqdm
|
| import aiohttp
|
| from bs4 import BeautifulSoup
|
| from dotenv import load_dotenv
|
| from huggingface_hub import HfApi, login
|
| from datasets import Dataset, load_dataset
|
|
|
| load_dotenv()
|
|
|
|
|
| class RebuildingAsyncCrawler:
|
| def __init__(self, config_path="crawler_config.json"):
|
| self.base_url = "https://rebuildingkoreaparty.kr"
|
| self.party_name = "μ‘°κ΅νμ λΉ"
|
| self.config_path = config_path
|
| self.state_path = "crawler_state.json"
|
|
|
| self.load_config()
|
|
|
| self.hf_token = os.getenv("HF_TOKEN")
|
| self.hf_repo_id = os.getenv("HF_REPO_ID_REBUILDING", "rebuilding-press-releases")
|
|
|
| self.semaphore = asyncio.Semaphore(10)
|
|
|
| def load_config(self):
|
| default_config = {
|
| "boards": {
|
| "κΈ°μν견문": "news/press-conference",
|
| "λ
ΌνλΈλ¦¬ν": "news/commentary-briefing",
|
| "보λμλ£": "news/press-release"
|
| },
|
| "start_date": "2024-03-04",
|
| "max_pages": 10000,
|
| "concurrent_requests": 10,
|
| "request_delay": 0.5,
|
| "output_path": "./data"
|
| }
|
|
|
| if os.path.exists(self.config_path):
|
| with open(self.config_path, 'r', encoding='utf-8') as f:
|
| config = json.load(f)
|
| self.config = config.get('rebuilding', default_config)
|
| else:
|
| self.config = default_config
|
|
|
| self.boards = self.config["boards"]
|
| self.start_date = self.config["start_date"]
|
| self.max_pages = self.config["max_pages"]
|
| self.output_path = self.config["output_path"]
|
|
|
| def load_state(self) -> Dict:
|
| if os.path.exists(self.state_path):
|
| with open(self.state_path, 'r', encoding='utf-8') as f:
|
| state = json.load(f)
|
| return state.get('rebuilding', {})
|
| return {}
|
|
|
| def save_state(self, state: Dict):
|
| all_state = {}
|
| if os.path.exists(self.state_path):
|
| with open(self.state_path, 'r', encoding='utf-8') as f:
|
| all_state = json.load(f)
|
| all_state['rebuilding'] = state
|
| with open(self.state_path, 'w', encoding='utf-8') as f:
|
| json.dump(all_state, f, ensure_ascii=False, indent=2)
|
|
|
| @staticmethod
|
| def parse_date(date_str: str) -> Optional[datetime]:
|
| try:
|
| return datetime.strptime(date_str.strip(), '%Y-%m-%d')
|
| except:
|
| return None
|
|
|
| @staticmethod
|
| def clean_text(text: str) -> str:
|
| text = text.replace('\xa0', '').replace('\u200b', '').replace('β', '')
|
| return text.strip()
|
|
|
| async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str,
|
| max_retries: int = 3) -> Optional[str]:
|
| async with self.semaphore:
|
| for attempt in range(max_retries):
|
| try:
|
| await asyncio.sleep(self.config.get("request_delay", 0.5))
|
| async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response:
|
| if response.status == 200:
|
| return await response.text()
|
| except Exception:
|
| if attempt < max_retries - 1:
|
| await asyncio.sleep(1)
|
| else:
|
| return None
|
| return None
|
|
|
| async def fetch_list_page(self, session: aiohttp.ClientSession,
|
| board_name: str, board_path: str, page_num: int,
|
| start_date: datetime, end_date: datetime) -> tuple:
|
| if page_num == 1:
|
| url = f"{self.base_url}/{board_path}"
|
| else:
|
| url = f"{self.base_url}/{board_path}?page={page_num}"
|
|
|
| html = await self.fetch_with_retry(session, url)
|
| if not html:
|
| return [], False
|
|
|
| soup = BeautifulSoup(html, 'html.parser')
|
|
|
|
|
| article_links = soup.find_all('a', href=re.compile(f'^/news/{re.escape(board_path)}/'))
|
| if not article_links:
|
| return [], True
|
|
|
| data = []
|
| stop_flag = False
|
| seen_urls = set()
|
|
|
| for link in article_links:
|
| try:
|
| article_url = link.get('href', '')
|
| if article_url.startswith('/'):
|
| article_url = self.base_url + article_url
|
| if article_url in seen_urls:
|
| continue
|
| seen_urls.add(article_url)
|
|
|
| title = link.get_text(strip=True).replace('\n', ' ')
|
|
|
|
|
| parent = link.find_parent('ul')
|
| if not parent:
|
| parent_li = link.find_parent('li')
|
| if parent_li:
|
| parent = parent_li.find_parent('ul')
|
|
|
| date_str = ""
|
| category = ""
|
| if parent:
|
| date_li = parent.find('li', {'class': 'td date'})
|
| if date_li:
|
| date_str = date_li.get_text(strip=True)
|
| cate_li = parent.find('li', {'class': 'td category'})
|
| if cate_li:
|
| category = cate_li.get_text(strip=True)
|
|
|
| if not date_str:
|
| continue
|
|
|
| article_date = self.parse_date(date_str)
|
| if not article_date:
|
| continue
|
| if article_date < start_date:
|
| stop_flag = True
|
| break
|
| if article_date > end_date:
|
| continue
|
|
|
| data.append({
|
| 'board_name': board_name,
|
| 'category': category,
|
| 'title': title,
|
| 'date': date_str,
|
| 'url': article_url
|
| })
|
| except:
|
| continue
|
|
|
| return data, stop_flag
|
|
|
| async def fetch_article_detail(self, session: aiohttp.ClientSession, url: str) -> Dict:
|
| html = await self.fetch_with_retry(session, url)
|
| if not html:
|
| return {'text': "λ³Έλ¬Έ μ‘°ν μ€ν¨", 'writer': ""}
|
|
|
| soup = BeautifulSoup(html, 'html.parser')
|
| text_parts = []
|
| writer = ""
|
|
|
|
|
| contents_div = soup.find('div', {'class': 'editor ck-content'})
|
| if contents_div:
|
| paragraphs = contents_div.find_all('p')
|
| for p in paragraphs:
|
| cleaned = self.clean_text(p.get_text(strip=True))
|
| if cleaned:
|
| text_parts.append(cleaned)
|
|
|
|
|
| for p in reversed(paragraphs):
|
| cleaned = self.clean_text(p.get_text(strip=True))
|
| if 'μ‘°κ΅νμ λΉ' in cleaned or 'λλ³μΈ' in cleaned or 'μμν' in cleaned:
|
| writer = cleaned
|
| break
|
|
|
| return {'text': '\n'.join(text_parts), 'writer': writer}
|
|
|
| async def collect_board(self, board_name: str, board_path: str,
|
| start_date: str, end_date: str) -> List[Dict]:
|
| start_dt = datetime.strptime(start_date, '%Y-%m-%d')
|
| end_dt = datetime.strptime(end_date, '%Y-%m-%d')
|
|
|
| print(f"\nβΆ [{board_name}] λͺ©λ‘ μμ§ μμ...")
|
|
|
| headers = {
|
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
|
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
| 'Accept-Language': 'ko-KR,ko;q=0.9',
|
| }
|
|
|
| async with aiohttp.ClientSession(headers=headers) as session:
|
| all_items = []
|
| page_num = 1
|
| empty_pages = 0
|
| max_empty_pages = 3
|
|
|
| with async_tqdm(desc=f"[{board_name}] λͺ©λ‘", unit="νμ΄μ§") as pbar:
|
| while page_num <= self.max_pages:
|
| items, stop_flag = await self.fetch_list_page(
|
| session, board_name, board_path, page_num, start_dt, end_dt
|
| )
|
|
|
| if not items:
|
| empty_pages += 1
|
| if empty_pages >= max_empty_pages or stop_flag:
|
| break
|
| else:
|
| empty_pages = 0
|
| all_items.extend(items)
|
|
|
| pbar.update(1)
|
| pbar.set_postfix({"μμ§": len(all_items)})
|
|
|
| if stop_flag:
|
| break
|
|
|
| page_num += 1
|
|
|
| print(f" β {len(all_items)}κ° νλͺ© λ°κ²¬")
|
|
|
| if all_items:
|
| print(f" βΆ μμΈ νμ΄μ§ μμ§ μ€...")
|
| tasks = [self.fetch_article_detail(session, item['url']) for item in all_items]
|
|
|
| details = []
|
| for coro in async_tqdm(asyncio.as_completed(tasks),
|
| total=len(tasks),
|
| desc=f"[{board_name}] μμΈ"):
|
| detail = await coro
|
| details.append(detail)
|
|
|
| for item, detail in zip(all_items, details):
|
| item.update(detail)
|
|
|
| print(f"β [{board_name}] μλ£: {len(all_items)}κ°")
|
| return all_items
|
|
|
| async def collect_all(self, start_date: Optional[str] = None,
|
| end_date: Optional[str] = None) -> pd.DataFrame:
|
| if not end_date:
|
| end_date = datetime.now().strftime('%Y-%m-%d')
|
| if not start_date:
|
| start_date = self.start_date
|
|
|
| print(f"\n{'='*60}")
|
| print(f"μ‘°κ΅νμ λΉ λ³΄λμλ£ μμ§ - λΉλκΈ° κ³ μ±λ₯ λ²μ ")
|
| print(f"κΈ°κ°: {start_date} ~ {end_date}")
|
| print(f"{'='*60}")
|
|
|
| tasks = [
|
| self.collect_board(board_name, board_path, start_date, end_date)
|
| for board_name, board_path in self.boards.items()
|
| ]
|
| results = await asyncio.gather(*tasks)
|
|
|
| all_data = []
|
| for items in results:
|
| all_data.extend(items)
|
|
|
| if not all_data:
|
| print("\nβ οΈ μμ§λ λ°μ΄ν° μμ")
|
| return pd.DataFrame()
|
|
|
| df = pd.DataFrame(all_data)
|
| df = df[['board_name', 'title', 'category', 'date', 'writer', 'text', 'url']]
|
| df = df[(df['title'] != "") & (df['text'] != "")]
|
| df['date'] = pd.to_datetime(df['date'], errors='coerce')
|
|
|
| print(f"\nβ μ΄ {len(df)}κ° μμ§ μλ£")
|
| return df
|
|
|
| def save_local(self, df: pd.DataFrame):
|
| os.makedirs(self.output_path, exist_ok=True)
|
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
| csv_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.csv")
|
| xlsx_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.xlsx")
|
| df.to_csv(csv_path, index=False, encoding='utf-8-sig')
|
| df.to_excel(xlsx_path, index=False, engine='openpyxl')
|
| print(f"β CSV: {csv_path}")
|
| print(f"β Excel: {xlsx_path}")
|
|
|
| def upload_to_huggingface(self, df: pd.DataFrame):
|
| if not self.hf_token:
|
| print("\nβ οΈ HF_TOKENμ΄ μ€μ λμ§ μμμ΅λλ€.")
|
| return
|
|
|
| print(f"\nβΆ νκΉ
νμ΄μ€ μ
λ‘λ μ€... (repo: {self.hf_repo_id})")
|
| try:
|
| login(token=self.hf_token)
|
| new_dataset = Dataset.from_pandas(df)
|
| try:
|
| existing_dataset = load_dataset(self.hf_repo_id, split='train')
|
| existing_df = existing_dataset.to_pandas()
|
| combined_df = pd.concat([existing_df, df], ignore_index=True)
|
| combined_df = combined_df.drop_duplicates(subset=['url'], keep='last')
|
| combined_df = combined_df.sort_values('date', ascending=False).reset_index(drop=True)
|
| final_dataset = Dataset.from_pandas(combined_df)
|
| print(f" β λ³ν© ν: {len(final_dataset)}κ°")
|
| except:
|
| final_dataset = new_dataset
|
| print(f" βΉοΈ μ κ· λ°μ΄ν°μ
μμ±")
|
| final_dataset.push_to_hub(self.hf_repo_id, token=self.hf_token)
|
| print(f"β νκΉ
νμ΄μ€ μ
λ‘λ μλ£!")
|
| except Exception as e:
|
| print(f"β μ
λ‘λ μ€ν¨: {e}")
|
|
|
| async def run_incremental(self):
|
| state = self.load_state()
|
| last_date = state.get('last_crawl_date')
|
|
|
| if last_date:
|
| start_date = (datetime.strptime(last_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d')
|
| print(f"π
μ¦λΆ μ
λ°μ΄νΈ: {start_date} μ΄ν λ°μ΄ν°λ§ μμ§")
|
| else:
|
| start_date = self.start_date
|
| print(f"π
μ 체 μμ§: {start_date}λΆν°")
|
|
|
| end_date = datetime.now().strftime('%Y-%m-%d')
|
| df = await self.collect_all(start_date, end_date)
|
|
|
| if df.empty:
|
| print("β μλ‘μ΄ λ°μ΄ν° μμ")
|
| return
|
|
|
| self.save_local(df)
|
| self.upload_to_huggingface(df)
|
|
|
| state['last_crawl_date'] = end_date
|
| state['last_crawl_time'] = datetime.now().isoformat()
|
| state['last_count'] = len(df)
|
| self.save_state(state)
|
|
|
| print(f"\n{'='*60}\nβ μλ£!\n{'='*60}\n")
|
|
|
|
|
| async def main():
|
| crawler = RebuildingAsyncCrawler()
|
| await crawler.run_incremental()
|
|
|
|
|
| if __name__ == "__main__":
|
| asyncio.run(main())
|
|
|