|
|
|
|
| """
|
| κΈ°λ³ΈμλλΉ ν¬λ‘€λ¬ - κ³ μ±λ₯ λΉλκΈ° λ²μ + νκΉ
νμ΄μ€ μλ μ
λ‘λ
|
| - κ·Έλ보λ 5 κΈ°λ° μ¬μ΄νΈ (basicincomeparty.kr)
|
| - td.td_subject / td.td_datetime(YY.MM.DD.) / div#bo_v_con ꡬ쑰
|
| """
|
|
|
| import os
|
| import json
|
| import re
|
| import asyncio
|
| from datetime import datetime, timedelta
|
| from typing import List, Dict, Optional
|
| import pandas as pd
|
| from tqdm.asyncio import tqdm as async_tqdm
|
| import aiohttp
|
| from bs4 import BeautifulSoup
|
| from dotenv import load_dotenv
|
| from huggingface_hub import HfApi, login
|
| from datasets import Dataset, load_dataset
|
|
|
| load_dotenv()
|
|
|
|
|
| class BasicIncomeAsyncCrawler:
|
| def __init__(self, config_path="crawler_config.json"):
|
| self.base_url = "https://basicincomeparty.kr"
|
| self.party_name = "κΈ°λ³ΈμλλΉ"
|
| self.config_path = config_path
|
| self.state_path = "crawler_state.json"
|
|
|
| self.load_config()
|
|
|
| self.hf_token = os.getenv("HF_TOKEN")
|
| self.hf_repo_id = os.getenv("HF_REPO_ID_BASIC_INCOME", "basic-income-press-releases")
|
|
|
| self.semaphore = asyncio.Semaphore(10)
|
|
|
| def load_config(self):
|
| default_config = {
|
| "boards": {
|
| "λ
Όν보λμλ£": "bikr/press"
|
| },
|
| "start_date": "2020-01-08",
|
| "max_pages": 10000,
|
| "concurrent_requests": 10,
|
| "request_delay": 0.3,
|
| "output_path": "./data"
|
| }
|
|
|
| if os.path.exists(self.config_path):
|
| with open(self.config_path, 'r', encoding='utf-8') as f:
|
| config = json.load(f)
|
| self.config = config.get('basic_income', default_config)
|
| else:
|
| self.config = default_config
|
|
|
| self.boards = self.config["boards"]
|
| self.start_date = self.config["start_date"]
|
| self.max_pages = self.config["max_pages"]
|
| self.output_path = self.config["output_path"]
|
|
|
| def load_state(self) -> Dict:
|
| if os.path.exists(self.state_path):
|
| with open(self.state_path, 'r', encoding='utf-8') as f:
|
| state = json.load(f)
|
| return state.get('basic_income', {})
|
| return {}
|
|
|
| def save_state(self, state: Dict):
|
| all_state = {}
|
| if os.path.exists(self.state_path):
|
| with open(self.state_path, 'r', encoding='utf-8') as f:
|
| all_state = json.load(f)
|
| all_state['basic_income'] = state
|
| with open(self.state_path, 'w', encoding='utf-8') as f:
|
| json.dump(all_state, f, ensure_ascii=False, indent=2)
|
|
|
| @staticmethod
|
| def parse_date(date_str: str) -> Optional[datetime]:
|
| """YY.MM.DD. λλ YYYY.MM.DD. λλ YYYY-MM-DD νμ±"""
|
| date_str = date_str.strip().rstrip('.')
|
| try:
|
| parts = date_str.split('.')
|
| if len(parts) >= 3:
|
| year = int(parts[0])
|
| year = 2000 + year if year < 100 else year
|
| return datetime(year, int(parts[1]), int(parts[2]))
|
| except:
|
| pass
|
| try:
|
| return datetime.strptime(date_str[:10], '%Y-%m-%d')
|
| except:
|
| return None
|
|
|
| @staticmethod
|
| def clean_text(text: str) -> str:
|
| text = text.replace('\xa0', '').replace('\u200b', '').replace('β', '')
|
| return text.strip()
|
|
|
| async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str,
|
| max_retries: int = 3) -> Optional[str]:
|
| async with self.semaphore:
|
| for attempt in range(max_retries):
|
| try:
|
| await asyncio.sleep(self.config.get("request_delay", 0.3))
|
| async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response:
|
| if response.status == 200:
|
| return await response.text()
|
| except Exception:
|
| if attempt < max_retries - 1:
|
| await asyncio.sleep(1)
|
| else:
|
| return None
|
| return None
|
|
|
| async def fetch_list_page(self, session: aiohttp.ClientSession,
|
| board_name: str, board_path: str, page_num: int,
|
| start_date: datetime, end_date: datetime) -> tuple:
|
| url = f"{self.base_url}/{board_path}?page={page_num}"
|
|
|
| html = await self.fetch_with_retry(session, url)
|
| if not html:
|
| return [], False
|
|
|
| soup = BeautifulSoup(html, 'html.parser')
|
| rows = soup.select('table tbody tr')
|
| if not rows:
|
| return [], True
|
|
|
| data = []
|
| stop_flag = False
|
|
|
| for row in rows:
|
| try:
|
|
|
| title_a = row.select_one('td.td_subject div.bo_tit a')
|
| if not title_a:
|
| continue
|
|
|
| title = title_a.get_text(strip=True)
|
| href = title_a.get('href', '')
|
|
|
| article_url = re.sub(r'\?.*$', '', href)
|
| if not article_url.startswith('http'):
|
| article_url = self.base_url + article_url
|
|
|
|
|
| date_td = row.select_one('td.td_datetime')
|
| if not date_td:
|
| continue
|
| date_str = date_td.get_text(strip=True)
|
|
|
|
|
| cate_a = row.select_one('td.td_num2 a.bo_cate_link')
|
| category = cate_a.get_text(strip=True) if cate_a else ""
|
|
|
| article_date = self.parse_date(date_str)
|
| if not article_date:
|
| continue
|
| if article_date < start_date:
|
| stop_flag = True
|
| break
|
| if article_date > end_date:
|
| continue
|
|
|
| data.append({
|
| 'board_name': board_name,
|
| 'title': title,
|
| 'category': category,
|
| 'date': article_date.strftime('%Y-%m-%d'),
|
| 'url': article_url
|
| })
|
| except:
|
| continue
|
|
|
| return data, stop_flag
|
|
|
| async def fetch_article_detail(self, session: aiohttp.ClientSession, url: str) -> Dict:
|
| html = await self.fetch_with_retry(session, url)
|
| if not html:
|
| return {'text': "λ³Έλ¬Έ μ‘°ν μ€ν¨", 'writer': ""}
|
|
|
| soup = BeautifulSoup(html, 'html.parser')
|
| text_parts = []
|
| writer = ""
|
|
|
|
|
| contents_div = soup.find('div', id='bo_v_con')
|
| if contents_div:
|
| for p in contents_div.find_all('p'):
|
| cleaned = self.clean_text(p.get_text(strip=True))
|
| if cleaned:
|
| text_parts.append(cleaned)
|
|
|
|
|
| info_div = soup.select_one('section#bo_v_info div.profile_info_ct')
|
| if info_div:
|
| writer_el = info_div.find('span', class_='sv_member')
|
| if writer_el:
|
| writer = writer_el.get_text(strip=True)
|
|
|
| return {'text': '\n'.join(text_parts), 'writer': writer}
|
|
|
| async def collect_board(self, board_name: str, board_path: str,
|
| start_date: str, end_date: str) -> List[Dict]:
|
| start_dt = datetime.strptime(start_date, '%Y-%m-%d')
|
| end_dt = datetime.strptime(end_date, '%Y-%m-%d')
|
|
|
| print(f"\nβΆ [{board_name}] λͺ©λ‘ μμ§ μμ...")
|
|
|
| headers = {
|
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
|
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
| 'Accept-Language': 'ko-KR,ko;q=0.9',
|
| }
|
|
|
| async with aiohttp.ClientSession(headers=headers) as session:
|
| all_items = []
|
| page_num = 1
|
| empty_pages = 0
|
| max_empty_pages = 3
|
|
|
| with async_tqdm(desc=f"[{board_name}] λͺ©λ‘", unit="νμ΄μ§") as pbar:
|
| while page_num <= self.max_pages:
|
| items, stop_flag = await self.fetch_list_page(
|
| session, board_name, board_path, page_num, start_dt, end_dt
|
| )
|
|
|
| if not items:
|
| empty_pages += 1
|
| if empty_pages >= max_empty_pages or stop_flag:
|
| break
|
| else:
|
| empty_pages = 0
|
| all_items.extend(items)
|
|
|
| pbar.update(1)
|
| pbar.set_postfix({"μμ§": len(all_items)})
|
|
|
| if stop_flag:
|
| break
|
|
|
| page_num += 1
|
|
|
| print(f" β {len(all_items)}κ° νλͺ© λ°κ²¬")
|
|
|
| if all_items:
|
| print(f" βΆ μμΈ νμ΄μ§ μμ§ μ€...")
|
| tasks = [self.fetch_article_detail(session, item['url']) for item in all_items]
|
|
|
| details = []
|
| for coro in async_tqdm(asyncio.as_completed(tasks),
|
| total=len(tasks),
|
| desc=f"[{board_name}] μμΈ"):
|
| detail = await coro
|
| details.append(detail)
|
|
|
| for item, detail in zip(all_items, details):
|
| item.update(detail)
|
|
|
| print(f"β [{board_name}] μλ£: {len(all_items)}κ°")
|
| return all_items
|
|
|
| async def collect_all(self, start_date: Optional[str] = None,
|
| end_date: Optional[str] = None) -> pd.DataFrame:
|
| if not end_date:
|
| end_date = datetime.now().strftime('%Y-%m-%d')
|
| if not start_date:
|
| start_date = self.start_date
|
|
|
| print(f"\n{'='*60}")
|
| print(f"κΈ°λ³ΈμλλΉ λ³΄λμλ£ μμ§ - λΉλκΈ° κ³ μ±λ₯ λ²μ ")
|
| print(f"κΈ°κ°: {start_date} ~ {end_date}")
|
| print(f"{'='*60}")
|
|
|
| tasks = [
|
| self.collect_board(board_name, board_path, start_date, end_date)
|
| for board_name, board_path in self.boards.items()
|
| ]
|
| results = await asyncio.gather(*tasks)
|
|
|
| all_data = []
|
| for items in results:
|
| all_data.extend(items)
|
|
|
| if not all_data:
|
| print("\nβ οΈ μμ§λ λ°μ΄ν° μμ")
|
| return pd.DataFrame()
|
|
|
| df = pd.DataFrame(all_data)
|
| df = df[['board_name', 'title', 'category', 'date', 'writer', 'text', 'url']]
|
| df = df[(df['title'] != "") & (df['text'] != "")]
|
| df['date'] = pd.to_datetime(df['date'], errors='coerce')
|
|
|
| print(f"\nβ μ΄ {len(df)}κ° μμ§ μλ£")
|
| return df
|
|
|
| def save_local(self, df: pd.DataFrame):
|
| os.makedirs(self.output_path, exist_ok=True)
|
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
| csv_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.csv")
|
| xlsx_path = os.path.join(self.output_path, f"{self.party_name}_{timestamp}.xlsx")
|
| df.to_csv(csv_path, index=False, encoding='utf-8-sig')
|
| df.to_excel(xlsx_path, index=False, engine='openpyxl')
|
| print(f"β CSV: {csv_path}")
|
| print(f"β Excel: {xlsx_path}")
|
|
|
| def upload_to_huggingface(self, df: pd.DataFrame):
|
| if not self.hf_token:
|
| print("\nβ οΈ HF_TOKENμ΄ μ€μ λμ§ μμμ΅λλ€.")
|
| return
|
|
|
| print(f"\nβΆ νκΉ
νμ΄μ€ μ
λ‘λ μ€... (repo: {self.hf_repo_id})")
|
| try:
|
| login(token=self.hf_token)
|
| new_dataset = Dataset.from_pandas(df)
|
| try:
|
| existing_dataset = load_dataset(self.hf_repo_id, split='train')
|
| existing_df = existing_dataset.to_pandas()
|
| combined_df = pd.concat([existing_df, df], ignore_index=True)
|
| combined_df = combined_df.drop_duplicates(subset=['url'], keep='last')
|
| combined_df = combined_df.sort_values('date', ascending=False).reset_index(drop=True)
|
| final_dataset = Dataset.from_pandas(combined_df)
|
| print(f" β λ³ν© ν: {len(final_dataset)}κ°")
|
| except:
|
| final_dataset = new_dataset
|
| print(f" βΉοΈ μ κ· λ°μ΄ν°μ
μμ±")
|
| final_dataset.push_to_hub(self.hf_repo_id, token=self.hf_token)
|
| print(f"β νκΉ
νμ΄μ€ μ
λ‘λ μλ£!")
|
| except Exception as e:
|
| print(f"β μ
λ‘λ μ€ν¨: {e}")
|
|
|
| async def run_incremental(self):
|
| state = self.load_state()
|
| last_date = state.get('last_crawl_date')
|
|
|
| if last_date:
|
| start_date = (datetime.strptime(last_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d')
|
| print(f"π
μ¦λΆ μ
λ°μ΄νΈ: {start_date} μ΄ν λ°μ΄ν°λ§ μμ§")
|
| else:
|
| start_date = self.start_date
|
| print(f"π
μ 체 μμ§: {start_date}λΆν°")
|
|
|
| end_date = datetime.now().strftime('%Y-%m-%d')
|
| df = await self.collect_all(start_date, end_date)
|
|
|
| if df.empty:
|
| print("β μλ‘μ΄ λ°μ΄ν° μμ")
|
| return
|
|
|
| self.save_local(df)
|
| self.upload_to_huggingface(df)
|
|
|
| state['last_crawl_date'] = end_date
|
| state['last_crawl_time'] = datetime.now().isoformat()
|
| state['last_count'] = len(df)
|
| self.save_state(state)
|
|
|
| print(f"\n{'='*60}\nβ μλ£!\n{'='*60}\n")
|
|
|
|
|
| async def main():
|
| crawler = BasicIncomeAsyncCrawler()
|
| await crawler.run_incremental()
|
|
|
|
|
| if __name__ == "__main__":
|
| asyncio.run(main())
|
|
|