JNU-TSB / event_extractor.py
HONGRIZON's picture
Upload 18 files
cf02581 verified
from __future__ import annotations
import json
import re
from collections import Counter
from dataclasses import dataclass
from typing import Any, Callable, Dict, Iterable, List, Optional
import pandas as pd
DEFAULT_CATEGORIES = [
"earnings",
"product",
"macro",
"regulation",
"supply_chain",
"competition",
"other",
]
COVARIATE_COLUMNS = [
"cov_earnings_count",
"cov_product_count",
"cov_macro_count",
"cov_regulation_count",
"cov_supply_chain_count",
"cov_competition_count",
"cov_other_count",
"cov_sentiment_pos_count",
"cov_sentiment_neg_count",
"cov_sentiment_neu_count",
"cov_news_count",
"cov_sentiment_mean",
"cov_confidence_mean",
"cov_event_score",
]
CATEGORY_KEYWORDS = {
"earnings": ["์‹ค์ ", "์˜์—…์ด์ต", "๋งค์ถœ", "์ˆœ์ด์ต", "๊ฐ€์ด๋˜์Šค", "์–ด๋‹", "๋ถ„๊ธฐ", "ํ‘์ž", "์ ์ž"],
"product": ["์‹ ์ œํ’ˆ", "์ถœ์‹œ", "HBM", "AI์นฉ", "๋ฐ˜๋„์ฒด", "์Šค๋งˆํŠธํฐ", "์ œํ’ˆ", "๊ฐœ๋ฐœ", "์–‘์‚ฐ"],
"macro": ["๊ธˆ๋ฆฌ", "ํ™˜์œจ", "๋ฌผ๊ฐ€", "๊ฒฝ๊ธฐ", "์ฝ”์Šคํ”ผ", "๋‚˜์Šค๋‹ฅ", "์—ฐ์ค€", "๋ฏธ๊ตญ", "์ค‘๊ตญ", "์ˆ˜์ถœ"],
"regulation": ["๊ทœ์ œ", "์ •๋ถ€", "๊ณต์ •์œ„", "์กฐ์‚ฌ", "์ œ์žฌ", "๋ฒ•์•ˆ", "ํ—ˆ๊ฐ€", "์†Œ์†ก", "๋ฒŒ๊ธˆ"],
"supply_chain": ["๊ณต๊ธ‰", "์ˆ˜์ฃผ", "๊ณ„์•ฝ", "๊ณต์žฅ", "์ƒ์‚ฐ", "๋ฌผ๋ฅ˜", "๊ณต๊ธ‰๋ง", "์›์žฌ๋ฃŒ", "๋‚ฉํ’ˆ"],
"competition": ["๊ฒฝ์Ÿ", "์ ์œ ์œจ", "๊ฐ€๊ฒฉ์ธํ•˜", "๊ฒฝ์Ÿ์‚ฌ", "SKํ•˜์ด๋‹‰์Šค", "์—”๋น„๋””์•„", "TSMC"],
}
POSITIVE_KEYWORDS = [
"์ƒ์Šน", "ํ˜ธ์žฌ", "๊ฐœ์„ ", "์ฆ๊ฐ€", "์ˆ˜์ฃผ", "๊ณ„์•ฝ", "์ถœ์‹œ", "์„ฑ์žฅ", "์ตœ๋Œ€", "๋ŒํŒŒ",
"ํ‘์ž", "๊ฐ•์„ธ", "ํˆฌ์ž", "ํ™•๋Œ€", "ํšŒ๋ณต", "์Šน์ธ", "๊ฐœ๋ฐœ", "์–‘์‚ฐ",
]
NEGATIVE_KEYWORDS = [
"ํ•˜๋ฝ", "์•…์žฌ", "๋‘”ํ™”", "๊ฐ์†Œ", "์šฐ๋ ค", "์ ์ž", "๋ถ€์ง„", "๊ทœ์ œ", "์ œ์žฌ", "์†Œ์†ก",
"์ค‘๋‹จ", "๊ฐ์‚ฐ", "์•ฝ์„ธ", "๋ฆฌ์ฝœ", "์†์‹ค", "์ทจ์†Œ", "์นจ์ฒด",
]
@dataclass
class EventResult:
category: str
sentiment: int
confidence: float
source: str
raw_text: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
"category": self.category,
"sentiment": int(self.sentiment),
"confidence": float(self.confidence),
"source": self.source,
"raw_text": self.raw_text,
}
class EventExtractor:
"""Korean financial news -> event/sentiment -> daily covariates.
The LLM path asks Polyglot-Ko to emit JSON. Since Polyglot-Ko-1.3B is a base
LM rather than an instruction-tuned JSON extractor, deterministic keyword
fallback is always available.
"""
def __init__(
self,
generate_fn: Optional[Callable[[str], str]] = None,
categories: Optional[List[str]] = None,
use_llm: bool = True,
) -> None:
self.generate_fn = generate_fn
self.categories = categories or list(DEFAULT_CATEGORIES)
self.use_llm = bool(use_llm)
def build_prompt(self, title: str) -> str:
cats = ", ".join(self.categories)
return (
"๋‹ค์Œ ํ•œ๊ตญ์–ด ๊ธˆ์œต๋‰ด์Šค ์ œ๋ชฉ์„ ์ฃผ๊ฐ€ ์˜ˆ์ธก์šฉ ๊ณต๋ณ€๋Ÿ‰์œผ๋กœ ๋ถ„์„ํ•˜์„ธ์š”.\n"
f"๊ฐ€๋Šฅํ•œ category: {cats}\n"
"sentiment๋Š” ์ฃผ๊ฐ€ ๊ด€์ ์—์„œ -1, 0, 1 ์ค‘ ํ•˜๋‚˜์ž…๋‹ˆ๋‹ค.\n"
"confidence๋Š” 0๊ณผ 1 ์‚ฌ์ด ์ˆซ์ž์ž…๋‹ˆ๋‹ค.\n"
"๋ฐ˜๋“œ์‹œ JSON๋งŒ ์ถœ๋ ฅํ•˜์„ธ์š”.\n"
f"๋‰ด์Šค: {title}\n"
"JSON:"
)
def extract(self, title: str) -> Dict[str, Any]:
title = str(title or "").strip()
if self.use_llm and self.generate_fn is not None and title:
try:
raw = self.generate_fn(self.build_prompt(title))
parsed = self._parse_json(raw)
if parsed is not None:
return parsed.to_dict()
except Exception:
pass
return self._keyword_fallback(title).to_dict()
def aggregate_to_daily(self, news: Iterable[Dict[str, Any]]) -> pd.DataFrame:
rows: List[Dict[str, Any]] = []
for item in news or []:
date_value = item.get("date") or item.get("timestamp") or item.get("datetime")
title = item.get("title") or item.get("headline") or item.get("text") or item.get("content") or ""
if date_value is None:
continue
day = pd.to_datetime(date_value).floor("D")
event = self.extract(str(title))
event["timestamp"] = day
rows.append(event)
if not rows:
return pd.DataFrame(columns=["timestamp", *COVARIATE_COLUMNS])
df = pd.DataFrame(rows)
daily_rows: List[Dict[str, Any]] = []
for day, group in df.groupby("timestamp"):
counter = Counter(group["category"].tolist())
sentiments = group["sentiment"].astype(float)
confidences = group["confidence"].astype(float).clip(0, 1)
out: Dict[str, Any] = {"timestamp": pd.to_datetime(day)}
for cat in DEFAULT_CATEGORIES:
out[f"cov_{cat}_count"] = float(counter.get(cat, 0))
out["cov_sentiment_pos_count"] = float((sentiments > 0).sum())
out["cov_sentiment_neg_count"] = float((sentiments < 0).sum())
out["cov_sentiment_neu_count"] = float((sentiments == 0).sum())
out["cov_news_count"] = float(len(group))
out["cov_sentiment_mean"] = float(sentiments.mean()) if len(group) else 0.0
out["cov_confidence_mean"] = float(confidences.mean()) if len(group) else 0.0
out["cov_event_score"] = float((sentiments * confidences).sum()) if len(group) else 0.0
daily_rows.append(out)
result = pd.DataFrame(daily_rows).sort_values("timestamp").reset_index(drop=True)
for col in COVARIATE_COLUMNS:
if col not in result.columns:
result[col] = 0.0
return result[["timestamp", *COVARIATE_COLUMNS]]
def _parse_json(self, raw: str) -> Optional[EventResult]:
if not raw:
return None
# Extract the first {...} block.
match = re.search(r"\{.*?\}", str(raw), flags=re.DOTALL)
if not match:
return None
payload = json.loads(match.group(0))
category = str(payload.get("category", "other")).strip()
if category not in self.categories:
category = "other"
sentiment = int(payload.get("sentiment", 0))
sentiment = -1 if sentiment < 0 else (1 if sentiment > 0 else 0)
confidence = float(payload.get("confidence", 0.5))
confidence = max(0.0, min(1.0, confidence))
return EventResult(category=category, sentiment=sentiment, confidence=confidence, source="llm", raw_text=str(raw))
def _keyword_fallback(self, title: str) -> EventResult:
text = title.lower()
scores: Dict[str, int] = {}
for category, keywords in CATEGORY_KEYWORDS.items():
scores[category] = sum(1 for kw in keywords if kw.lower() in text)
category = max(scores, key=scores.get) if scores else "other"
if scores.get(category, 0) == 0:
category = "other"
pos = sum(1 for kw in POSITIVE_KEYWORDS if kw.lower() in text)
neg = sum(1 for kw in NEGATIVE_KEYWORDS if kw.lower() in text)
sentiment = 1 if pos > neg else (-1 if neg > pos else 0)
confidence = 0.55 + 0.1 * min(3, abs(pos - neg) + scores.get(category, 0))
confidence = max(0.1, min(0.95, confidence))
return EventResult(category=category, sentiment=sentiment, confidence=confidence, source="keyword", raw_text=title)