from __future__ import annotations import json import re from collections import Counter from dataclasses import dataclass from typing import Any, Callable, Dict, Iterable, List, Optional import pandas as pd DEFAULT_CATEGORIES = [ "earnings", "product", "macro", "regulation", "supply_chain", "competition", "other", ] COVARIATE_COLUMNS = [ "cov_earnings_count", "cov_product_count", "cov_macro_count", "cov_regulation_count", "cov_supply_chain_count", "cov_competition_count", "cov_other_count", "cov_sentiment_pos_count", "cov_sentiment_neg_count", "cov_sentiment_neu_count", "cov_news_count", "cov_sentiment_mean", "cov_confidence_mean", "cov_event_score", ] CATEGORY_KEYWORDS = { "earnings": ["실적", "영업이익", "매출", "순이익", "가이던스", "어닝", "분기", "흑자", "적자"], "product": ["신제품", "출시", "HBM", "AI칩", "반도체", "스마트폰", "제품", "개발", "양산"], "macro": ["금리", "환율", "물가", "경기", "코스피", "나스닥", "연준", "미국", "중국", "수출"], "regulation": ["규제", "정부", "공정위", "조사", "제재", "법안", "허가", "소송", "벌금"], "supply_chain": ["공급", "수주", "계약", "공장", "생산", "물류", "공급망", "원재료", "납품"], "competition": ["경쟁", "점유율", "가격인하", "경쟁사", "SK하이닉스", "엔비디아", "TSMC"], } POSITIVE_KEYWORDS = [ "상승", "호재", "개선", "증가", "수주", "계약", "출시", "성장", "최대", "돌파", "흑자", "강세", "투자", "확대", "회복", "승인", "개발", "양산", ] NEGATIVE_KEYWORDS = [ "하락", "악재", "둔화", "감소", "우려", "적자", "부진", "규제", "제재", "소송", "중단", "감산", "약세", "리콜", "손실", "취소", "침체", ] @dataclass class EventResult: category: str sentiment: int confidence: float source: str raw_text: str = "" def to_dict(self) -> Dict[str, Any]: return { "category": self.category, "sentiment": int(self.sentiment), "confidence": float(self.confidence), "source": self.source, "raw_text": self.raw_text, } class EventExtractor: """Korean financial news -> event/sentiment -> daily covariates. The LLM path asks Polyglot-Ko to emit JSON. Since Polyglot-Ko-1.3B is a base LM rather than an instruction-tuned JSON extractor, deterministic keyword fallback is always available. """ def __init__( self, generate_fn: Optional[Callable[[str], str]] = None, categories: Optional[List[str]] = None, use_llm: bool = True, ) -> None: self.generate_fn = generate_fn self.categories = categories or list(DEFAULT_CATEGORIES) self.use_llm = bool(use_llm) def build_prompt(self, title: str) -> str: cats = ", ".join(self.categories) return ( "다음 한국어 금융뉴스 제목을 주가 예측용 공변량으로 분석하세요.\n" f"가능한 category: {cats}\n" "sentiment는 주가 관점에서 -1, 0, 1 중 하나입니다.\n" "confidence는 0과 1 사이 숫자입니다.\n" "반드시 JSON만 출력하세요.\n" f"뉴스: {title}\n" "JSON:" ) def extract(self, title: str) -> Dict[str, Any]: title = str(title or "").strip() if self.use_llm and self.generate_fn is not None and title: try: raw = self.generate_fn(self.build_prompt(title)) parsed = self._parse_json(raw) if parsed is not None: return parsed.to_dict() except Exception: pass return self._keyword_fallback(title).to_dict() def aggregate_to_daily(self, news: Iterable[Dict[str, Any]]) -> pd.DataFrame: rows: List[Dict[str, Any]] = [] for item in news or []: date_value = item.get("date") or item.get("timestamp") or item.get("datetime") title = item.get("title") or item.get("headline") or item.get("text") or item.get("content") or "" if date_value is None: continue day = pd.to_datetime(date_value).floor("D") event = self.extract(str(title)) event["timestamp"] = day rows.append(event) if not rows: return pd.DataFrame(columns=["timestamp", *COVARIATE_COLUMNS]) df = pd.DataFrame(rows) daily_rows: List[Dict[str, Any]] = [] for day, group in df.groupby("timestamp"): counter = Counter(group["category"].tolist()) sentiments = group["sentiment"].astype(float) confidences = group["confidence"].astype(float).clip(0, 1) out: Dict[str, Any] = {"timestamp": pd.to_datetime(day)} for cat in DEFAULT_CATEGORIES: out[f"cov_{cat}_count"] = float(counter.get(cat, 0)) out["cov_sentiment_pos_count"] = float((sentiments > 0).sum()) out["cov_sentiment_neg_count"] = float((sentiments < 0).sum()) out["cov_sentiment_neu_count"] = float((sentiments == 0).sum()) out["cov_news_count"] = float(len(group)) out["cov_sentiment_mean"] = float(sentiments.mean()) if len(group) else 0.0 out["cov_confidence_mean"] = float(confidences.mean()) if len(group) else 0.0 out["cov_event_score"] = float((sentiments * confidences).sum()) if len(group) else 0.0 daily_rows.append(out) result = pd.DataFrame(daily_rows).sort_values("timestamp").reset_index(drop=True) for col in COVARIATE_COLUMNS: if col not in result.columns: result[col] = 0.0 return result[["timestamp", *COVARIATE_COLUMNS]] def _parse_json(self, raw: str) -> Optional[EventResult]: if not raw: return None # Extract the first {...} block. match = re.search(r"\{.*?\}", str(raw), flags=re.DOTALL) if not match: return None payload = json.loads(match.group(0)) category = str(payload.get("category", "other")).strip() if category not in self.categories: category = "other" sentiment = int(payload.get("sentiment", 0)) sentiment = -1 if sentiment < 0 else (1 if sentiment > 0 else 0) confidence = float(payload.get("confidence", 0.5)) confidence = max(0.0, min(1.0, confidence)) return EventResult(category=category, sentiment=sentiment, confidence=confidence, source="llm", raw_text=str(raw)) def _keyword_fallback(self, title: str) -> EventResult: text = title.lower() scores: Dict[str, int] = {} for category, keywords in CATEGORY_KEYWORDS.items(): scores[category] = sum(1 for kw in keywords if kw.lower() in text) category = max(scores, key=scores.get) if scores else "other" if scores.get(category, 0) == 0: category = "other" pos = sum(1 for kw in POSITIVE_KEYWORDS if kw.lower() in text) neg = sum(1 for kw in NEGATIVE_KEYWORDS if kw.lower() in text) sentiment = 1 if pos > neg else (-1 if neg > pos else 0) confidence = 0.55 + 0.1 * min(3, abs(pos - neg) + scores.get(category, 0)) confidence = max(0.1, min(0.95, confidence)) return EventResult(category=category, sentiment=sentiment, confidence=confidence, source="keyword", raw_text=title)