TeXray-backup / plagiarism_check.py
openfree's picture
Update plagiarism_check.py
4997198 verified
# ============================================
# plagiarism_check.py
# 표절 검사 전용 모듈
# Gemini Google Search + Brave + KCI·RISS·arXiv
# ============================================
import re, os, hashlib
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import gradio as gr
try:
import httpx
HAS_HTTPX = True
except ImportError:
HAS_HTTPX = False
try:
import chromadb
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
try:
import numpy as np
HAS_NUMPY = True
except ImportError:
HAS_NUMPY = False
try:
from google import genai
from google.genai import types as gtypes
HAS_GENAI = True
except ImportError:
HAS_GENAI = False
GEMINI_KEY = os.getenv("GEMINI_API_KEY", "")
BRAVE_KEY = os.getenv("BRAVE_API_KEY", "")
# ============================================
# 경량 텍스트 유틸 (app.py 의존성 제거용)
# ============================================
def _split_sentences(text):
"""문장 분리 (경량 regex 버전)"""
text = re.sub(r'\s+', ' ', text).strip()
sents = re.split(r'(?<=[.!?。])\s+', text)
return [s.strip() for s in sents if s.strip()]
def _split_words(text):
"""단어 분리"""
return [w for w in re.findall(r'[가-힣a-zA-Z0-9]+', text) if w]
def _http_get(url, headers=None, timeout=15):
"""HTTP GET"""
try:
if HAS_HTTPX:
r = httpx.get(url, headers=headers or {}, timeout=timeout, follow_redirects=True)
return r.text if r.status_code == 200 else None
else:
import urllib.request
req = urllib.request.Request(url, headers=headers or {})
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read().decode('utf-8', errors='replace')
except:
return None
# ============================================
# 웹 검색 함수
# ============================================
def brave_search(query, count=5):
"""Brave Search API"""
if not BRAVE_KEY:
return []
url = f"https://api.search.brave.com/res/v1/web/search?q={query}&count={count}"
try:
if HAS_HTTPX:
r = httpx.get(url, headers={"X-Subscription-Token": BRAVE_KEY, "Accept": "application/json"}, timeout=10)
if r.status_code == 200:
data = r.json()
results = []
for item in data.get("web", {}).get("results", []):
results.append({
"title": item.get("title", ""),
"url": item.get("url", ""),
"snippet": item.get("description", ""),
"source": "Brave",
})
return results
except:
pass
return []
def search_kci(query):
"""KCI 학술논문 검색"""
try:
url = f"https://open.kci.go.kr/po/openapi/openApiSearch.kci?apiCode=articleSearch&title={query}&displayCount=3"
resp = _http_get(url, timeout=8)
if resp:
results = []
for m in re.finditer(
r'<article-title><!\[CDATA\[(.+?)\]\]></article-title>.*?<url><!\[CDATA\[(.+?)\]\]></url>',
resp, re.S,
):
results.append({"title": m.group(1), "url": m.group(2), "snippet": "", "source": "KCI"})
return results[:3]
except:
pass
return []
def search_riss(query):
"""RISS 학술논문 검색"""
results = []
try:
url = (
f"http://www.riss.kr/search/Search.do?isDetailSearch=N&searchGubun=true"
f"&viewYn=OP&queryText=&strQuery={query}&iStartCount=0&iGroupView=5&icate=all"
)
resp = _http_get(url, timeout=8)
if resp:
for m in re.finditer(
r'class="title"[^>]*>.*?<a[^>]*href="([^"]+)"[^>]*>(.*?)</a>', resp, re.S
):
title = re.sub(r'<[^>]+>', '', m.group(2)).strip()
if title:
results.append({
"title": title,
"url": "https://www.riss.kr" + m.group(1),
"snippet": "",
"source": "RISS",
})
except:
pass
return results[:3]
def search_arxiv(query):
"""arXiv 논문 검색"""
results = []
try:
import urllib.parse
q = urllib.parse.quote(query)
url = f"https://export.arxiv.org/api/query?search_query=all:{q}&start=0&max_results=3&sortBy=relevance"
resp = _http_get(url, timeout=12)
if resp:
for m in re.finditer(
r'<entry>.*?<title>(.*?)</title>.*?<id>(.*?)</id>.*?<summary>(.*?)</summary>',
resp, re.S,
):
title = re.sub(r'\s+', ' ', m.group(1)).strip()
results.append({
"title": title,
"url": m.group(2).strip(),
"snippet": re.sub(r'\s+', ' ', m.group(3)).strip()[:150],
"source": "arXiv",
})
except:
pass
return results[:3]
def parallel_brave_search(queries, max_workers=10):
"""Brave Search 병렬 실행"""
all_results = {}
with ThreadPoolExecutor(max_workers=min(max_workers, 20)) as executor:
futures = {executor.submit(brave_search, q, 3): q for q in queries}
for future in as_completed(futures):
q = futures[future]
try:
all_results[q] = future.result()
except:
all_results[q] = []
return all_results
def duckduckgo_search(query, max_results=5):
"""DuckDuckGo HTML 스크래핑 — API 키 불필요 폴백"""
results = []
try:
import urllib.parse
q = urllib.parse.quote(query)
url = f"https://html.duckduckgo.com/html/?q={q}"
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
resp = _http_get(url, headers=headers, timeout=10)
if resp:
for m in re.finditer(
r'<a[^>]+class="result__a"[^>]+href="([^"]+)"[^>]*>(.*?)</a>'
r'.*?<a[^>]+class="result__snippet"[^>]*>(.*?)</a>',
resp, re.S,
):
href = m.group(1)
title = re.sub(r'<[^>]+>', '', m.group(2)).strip()
snippet = re.sub(r'<[^>]+>', '', m.group(3)).strip()
real_url = href
if 'uddg=' in href:
um = re.search(r'uddg=([^&]+)', href)
if um:
real_url = urllib.parse.unquote(um.group(1))
if title:
results.append({"title": title, "url": real_url, "snippet": snippet, "source": "Web"})
if len(results) >= max_results:
break
except:
pass
return results
def self_crawl_search(query, max_results=3):
"""DuckDuckGo 크롤링 + 학술 보충"""
all_results = []
all_results.extend(duckduckgo_search(query, max_results))
if '논문' not in query and 'paper' not in query.lower():
all_results.extend(duckduckgo_search(f"{query} 논문 학술", 2))
return all_results
# ============================================
# Gemini Google Search Grounding
# ============================================
def gemini_plagiarism_check(text_chunk):
"""Gemini + Google Search Grounding으로 표절 검사"""
if not HAS_GENAI or not GEMINI_KEY:
return None
try:
client = genai.Client(api_key=GEMINI_KEY)
tool = gtypes.Tool(googleSearch=gtypes.GoogleSearch())
prompt = f"""다음 텍스트가 인터넷에 존재하는지 Google Search로 확인하세요.
유사한 문장이 발견되면 출처 URL과 유사도(%)를 보고하세요.
마지막 줄에 "유사도: XX%" 형식으로 작성.
[텍스트]
{text_chunk[:1000]}"""
resp = client.models.generate_content(
model="gemini-flash-lite-latest",
contents=prompt,
config=gtypes.GenerateContentConfig(tools=[tool], temperature=0.1, max_output_tokens=600),
)
text_resp = resp.text if resp.text else ""
sources = []
if hasattr(resp, 'candidates') and resp.candidates:
gc = resp.candidates[0].grounding_metadata
if gc and hasattr(gc, 'grounding_chunks'):
for chunk in gc.grounding_chunks:
if hasattr(chunk, 'web') and chunk.web:
sources.append({
"title": chunk.web.title or "",
"url": chunk.web.uri or "",
"source": "Google",
})
pm = re.search(r'유사도[:\s]*(\d+)', text_resp)
pct = int(pm.group(1)) if pm else 0
return {"pct": pct, "response": text_resp, "sources": sources}
except Exception as e:
return {"pct": 0, "response": str(e)[:100], "sources": []}
# ============================================
# 유사도 분석 함수
# ============================================
def _ngram_set(text, n=3):
"""텍스트에서 n-gram 집합 생성"""
text = re.sub(r'\s+', ' ', text.lower().strip())
words = re.findall(r'[가-힣a-zA-Z0-9]+', text)
if len(words) < n:
return set(words)
return {' '.join(words[i:i + n]) for i in range(len(words) - n + 1)}
def _text_similarity(text_a, text_b, n=3):
"""두 텍스트 간 n-gram Jaccard 유사도 (0.0~1.0)"""
if not text_a or not text_b:
return 0.0
set_a = _ngram_set(text_a, n)
set_b = _ngram_set(text_b, n)
if not set_a or not set_b:
return 0.0
intersection = set_a & set_b
union = set_a | set_b
return len(intersection) / len(union) if union else 0.0
def _containment_similarity(original, source_text, n=3):
"""원문 n-gram이 출처에 얼마나 포함되는지 (Containment, 0.0~1.0)"""
if not original or not source_text:
return 0.0
set_orig = _ngram_set(original, n)
set_src = _ngram_set(source_text, n)
if not set_orig:
return 0.0
overlap = set_orig & set_src
return len(overlap) / len(set_orig)
def _fetch_page_text(url, timeout=8):
"""URL에서 본문 텍스트 추출 (경량 크롤링)"""
if not HAS_HTTPX:
return ""
try:
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
r = httpx.get(url, headers=headers, timeout=timeout, follow_redirects=True)
if r.status_code != 200:
return ""
html = r.text
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.S | re.I)
html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.S | re.I)
text = re.sub(r'<[^>]+>', ' ', html)
text = re.sub(r'\s+', ' ', text).strip()
return text[:5000]
except Exception:
return ""
def _verify_sources_parallel(original_text, sources, min_similarity=0.02, max_workers=8):
"""출처 URL을 크롤링하여 원문과 대조 검증 — 유사도 미달 출처 제거, 크롤링 본문도 반환"""
if not sources:
return [], 0.0, {}
verified = []
total_sim = 0.0
crawled_texts = {} # url → page_text (벡터DB 인덱싱용)
def _check_one(src):
url = src.get("url", "")
snippet = src.get("snippet", "")
page_text = ""
# snippet만으로 검증되는 경우에도 본문 크롤링 시도
page_text = _fetch_page_text(url, timeout=6)
if snippet and len(snippet) > 20:
sim = _text_similarity(original_text[:500], snippet, n=2)
if sim >= min_similarity:
return {**src, "similarity": round(sim * 100, 1), "method": "snippet"}, sim, page_text
if page_text and len(page_text) > 50:
jaccard = _text_similarity(original_text, page_text, n=3)
containment = _containment_similarity(original_text, page_text, n=3)
sim = max(jaccard, containment)
if sim >= min_similarity:
matched = _find_matching_sentences(original_text, page_text)
return {**src, "similarity": round(sim * 100, 1), "method": "crawl", "matched": matched}, sim, page_text
return None, 0.0, page_text
with ThreadPoolExecutor(max_workers=min(max_workers, len(sources))) as executor:
futures = {executor.submit(_check_one, src): src for src in sources[:20]}
for future in as_completed(futures, timeout=30):
try:
result, sim, page_text = future.result()
src_info = futures[future]
url = src_info.get("url", "")
# 크롤링 본문 저장 (검증 여부 무관하게)
if page_text and len(page_text) > 50 and url:
crawled_texts[url] = {
"text": page_text,
"title": src_info.get("title", ""),
"source": src_info.get("source", ""),
}
if result is not None:
verified.append(result)
total_sim += sim
except Exception:
pass
verified.sort(key=lambda x: x.get("similarity", 0), reverse=True)
avg_sim = total_sim / len(verified) if verified else 0.0
return verified, avg_sim, crawled_texts
def _find_matching_sentences(original, source_text, threshold=0.3):
"""원문에서 출처와 일치하는 문장 찾기"""
orig_sents = _split_sentences(original)
matched = []
for sent in orig_sents[:20]:
if len(sent) < 10:
continue
sim = _text_similarity(sent, source_text, n=2)
if sim >= threshold:
matched.append(sent[:60])
if len(matched) >= 3:
break
return matched
# ============================================
# 벡터 DB 기반 정밀 표절 검사
# ============================================
def _chunk_text(text, chunk_size=200, overlap=50):
"""텍스트를 청크로 분리 (문장 경계 유지)"""
sents = _split_sentences(text)
chunks = []
current = []
current_len = 0
for sent in sents:
current.append(sent)
current_len += len(sent)
if current_len >= chunk_size:
chunks.append(' '.join(current))
# overlap: 마지막 문장 유지
if overlap > 0 and len(current) > 1:
current = current[-1:]
current_len = len(current[0])
else:
current = []
current_len = 0
if current:
chunks.append(' '.join(current))
return chunks
class VectorPlagiarismDB:
"""크롤링한 출처를 벡터DB에 저장하고 문장별 유사도 검색"""
def __init__(self):
if not HAS_CHROMADB:
self.available = False
return
try:
self.client = chromadb.Client()
self.collection = self.client.get_or_create_collection(
name="plag_sources",
metadata={"hnsw:space": "cosine"},
)
self.available = True
self._doc_count = 0
except Exception as e:
print(f"ChromaDB 초기화 실패: {e}")
self.available = False
def index_source(self, url, title, text, source_type="Web"):
"""출처 텍스트를 청크로 분리하여 벡터DB에 저장"""
if not self.available or not text or len(text.strip()) < 30:
return 0
try:
chunks = _chunk_text(text, chunk_size=200, overlap=50)
if not chunks:
return 0
doc_hash = hashlib.md5(url.encode()).hexdigest()[:10]
ids = []
documents = []
metadatas = []
for i, chunk in enumerate(chunks[:50]): # 출처당 최대 50청크
chunk_id = f"{doc_hash}_{i}"
ids.append(chunk_id)
documents.append(chunk)
metadatas.append({
"url": url[:200],
"title": title[:100],
"source": source_type,
"chunk_idx": i,
})
self.collection.add(ids=ids, documents=documents, metadatas=metadatas)
self._doc_count += len(ids)
return len(ids)
except Exception as e:
print(f"벡터DB 인덱싱 오류: {e}")
return 0
def query_sentence(self, sentence, n_results=3):
"""단일 문장에 대해 가장 유사한 출처 청크 검색"""
if not self.available or self._doc_count == 0:
return []
try:
results = self.collection.query(
query_texts=[sentence],
n_results=min(n_results, self._doc_count),
)
matches = []
if results and results['distances'] and results['distances'][0]:
for j, dist in enumerate(results['distances'][0]):
# ChromaDB cosine distance: 0 = 동일, 2 = 정반대
# similarity = 1 - (distance / 2)
similarity = max(0, 1 - dist / 2)
if similarity < 0.3: # 30% 미만은 무시
continue
meta = results['metadatas'][0][j] if results['metadatas'] else {}
doc_text = results['documents'][0][j] if results['documents'] else ""
matches.append({
"similarity": round(similarity * 100, 1),
"url": meta.get("url", ""),
"title": meta.get("title", ""),
"source": meta.get("source", ""),
"matched_text": doc_text[:150],
})
return matches
except Exception as e:
print(f"벡터DB 쿼리 오류: {e}")
return []
def check_document(self, text, min_similarity=30):
"""전체 문서를 문장별로 검사 → 표절 문장 목록 반환"""
if not self.available or self._doc_count == 0:
return [], 0.0, {}
sents = _split_sentences(text)
flagged = [] # 표절 의심 문장
all_sims = [] # 전체 유사도
source_hits = {} # URL별 히트 횟수
for sent in sents:
if len(sent) < 15:
continue
matches = self.query_sentence(sent, n_results=3)
if not matches:
all_sims.append(0)
continue
best = matches[0]
all_sims.append(best["similarity"])
if best["similarity"] >= min_similarity:
flagged.append({
"sentence": sent[:80],
"similarity": best["similarity"],
"url": best["url"],
"title": best["title"],
"matched_text": best["matched_text"][:100],
})
url = best["url"]
source_hits[url] = source_hits.get(url, 0) + 1
# 표절율: 유사 문장 비율
total_checked = len(all_sims)
flagged_count = len(flagged)
plag_ratio = (flagged_count / total_checked * 100) if total_checked > 0 else 0
# 평균 유사도 (유사 문장만)
avg_sim = sum(s["similarity"] for s in flagged) / len(flagged) if flagged else 0
return flagged, plag_ratio, source_hits
def get_stats(self):
"""DB 상태"""
if not self.available:
return {"available": False, "chunks": 0}
return {"available": True, "chunks": self._doc_count}
# ============================================
# 핵심 구문 추출
# ============================================
def _extract_key_phrases(text, max_phrases=6):
"""텍스트에서 핵심 검색 구문 추출 (Brave/학술 검색용)"""
sents = _split_sentences(text)
phrases = []
ranked = sorted(sents, key=lambda s: len(s), reverse=True)
for s in ranked:
if 15 <= len(s) <= 80:
phrases.append(s)
elif len(s) > 80:
phrases.append(s[:80])
if len(phrases) >= max_phrases:
break
if len(phrases) < 2 and sents:
phrases.append(sents[0][:80])
return phrases
# ============================================
# 메인 표절 검사 함수
# ============================================
def run_plagiarism(text, progress=gr.Progress()):
"""표절 검사 — 3단계 파이프라인: ① Gemini Google Search ② Brave 웹검색 ③ 학술DB"""
if not text or len(text.strip()) < 50:
return "<div style='padding:20px;text-align:center;color:#888;'>⚠️ 최소 50자 이상</div>", ""
text = text.strip()
now = datetime.now().strftime("%Y-%m-%d %H:%M")
doc_id = hashlib.md5(text[:100].encode()).hexdigest()[:8].upper()
log_lines = []
# progress 헬퍼
def _prog(val, msg=""):
try:
progress(val, msg)
except:
pass
gemini_pct = 0
gemini_sources = []
gemini_response = ""
brave_sources = []
academic_sources = []
# ═══════════════════════════════════════
# PHASE 1: Gemini Google Search Grounding
# ═══════════════════════════════════════
if HAS_GENAI and GEMINI_KEY:
_prog(0.10, "① Gemini Google Search Grounding...")
try:
client = genai.Client(api_key=GEMINI_KEY)
prompt = f"""당신은 표절 검사 전문가입니다. 아래 텍스트가 인터넷에 이미 존재하는 내용인지 Google Search로 철저히 검색하세요.
[검사 대상 텍스트]
{text[:3000]}
[응답 형식]
1. 발견된 유사 콘텐츠를 각각 "제목 | URL | 유사도(높음/중간/낮음)" 형식으로 나열
2. 발견 못하면 "유사 콘텐츠 없음"
3. 마지막 줄에 반드시 "표절율: XX%" 형식으로 종합 판정"""
contents = [
gtypes.Content(
role="user",
parts=[gtypes.Part.from_text(text=prompt)],
)
]
tools = [gtypes.Tool(googleSearch=gtypes.GoogleSearch())]
generate_content_config = gtypes.GenerateContentConfig(
thinking_config=gtypes.ThinkingConfig(thinking_budget=0),
tools=tools,
temperature=0.1,
max_output_tokens=4000,
)
_prog(0.20, "① Google Search 실행 중...")
full_response = ""
for chunk in client.models.generate_content_stream(
model="gemini-flash-lite-latest",
contents=contents,
config=generate_content_config,
):
if chunk.text:
full_response += chunk.text
gemini_response = full_response
# 표절율 추출
pm = re.search(r'표절율[:\s]*(\d+)', full_response)
if pm:
gemini_pct = int(pm.group(1))
# Grounding Metadata에서 출처 추출 (비스트리밍 재호출)
_prog(0.28, "① 출처 메타데이터 추출...")
try:
resp_full = client.models.generate_content(
model="gemini-flash-lite-latest",
contents=prompt,
config=gtypes.GenerateContentConfig(
tools=[gtypes.Tool(googleSearch=gtypes.GoogleSearch())],
temperature=0.1,
max_output_tokens=2000,
),
)
if hasattr(resp_full, 'candidates') and resp_full.candidates:
cand = resp_full.candidates[0]
gm = getattr(cand, 'grounding_metadata', None)
if gm:
chunks = getattr(gm, 'grounding_chunks', None) or []
for gc in chunks:
web = getattr(gc, 'web', None)
if web:
title = getattr(web, 'title', '') or ''
uri = getattr(web, 'uri', '') or ''
if uri:
gemini_sources.append({
"title": title,
"url": uri,
"source": "Google",
"snippet": "",
})
supports = getattr(gm, 'grounding_supports', None) or []
for sup in supports:
seg = getattr(sup, 'segment', None)
snippet_text = getattr(seg, 'text', '') if seg else ''
idxs = getattr(sup, 'grounding_chunk_indices', []) or []
for idx in idxs:
if idx < len(gemini_sources) and snippet_text:
gemini_sources[idx]["snippet"] = snippet_text[:120]
except Exception as e2:
print(f"Gemini 메타데이터 추출 오류: {e2}")
# 텍스트 응답에서 추가 URL 추출
existing_urls = {s["url"] for s in gemini_sources}
for m in re.finditer(r'https?://[^\s\)\]\,\"\']{10,}', full_response):
url = m.group(0).rstrip('.')
if url not in existing_urls:
domain = url.split('/')[2] if len(url.split('/')) > 2 else url
gemini_sources.append({"title": domain, "url": url, "source": "Google", "snippet": ""})
existing_urls.add(url)
log_lines.append(f"[Gemini] 표절율={gemini_pct}%, 출처={len(gemini_sources)}건")
except Exception as e:
log_lines.append(f"[Gemini] 오류: {str(e)[:100]}")
print(f"Gemini 오류: {str(e)}")
else:
log_lines.append("[Gemini] API 키 없음 — 건너뜀")
# ═══════════════════════════════════════
# PHASE 2: Brave Search 병렬 웹 검색
# ═══════════════════════════════════════
_prog(0.40, "② Brave Search 웹 검색...")
key_phrases = _extract_key_phrases(text, max_phrases=6)
if BRAVE_KEY and key_phrases:
try:
brave_results = parallel_brave_search(key_phrases, max_workers=10)
seen_urls = {s["url"] for s in gemini_sources}
for query, results in brave_results.items():
for r in results:
url = r.get("url", "")
if url and url not in seen_urls:
brave_sources.append({
"title": r.get("title", "")[:80],
"url": url,
"source": "Brave",
"snippet": r.get("snippet", "")[:120],
})
seen_urls.add(url)
log_lines.append(f"[Brave] 쿼리={len(key_phrases)}개, 출처={len(brave_sources)}건")
except Exception as e:
log_lines.append(f"[Brave] 오류: {str(e)[:80]}")
elif not BRAVE_KEY:
try:
seen_urls = {s["url"] for s in gemini_sources}
for phrase in key_phrases[:3]:
for r in duckduckgo_search(phrase, max_results=3):
url = r.get("url", "")
if url and url not in seen_urls:
brave_sources.append({
"title": r.get("title", "")[:80],
"url": url,
"source": "Web",
"snippet": r.get("snippet", "")[:120],
})
seen_urls.add(url)
log_lines.append(f"[DuckDuckGo] 폴백, 출처={len(brave_sources)}건")
except Exception as e:
log_lines.append(f"[DuckDuckGo] 오류: {str(e)[:80]}")
# ═══════════════════════════════════════
# PHASE 3: 학술 DB 검색 (KCI · RISS · arXiv)
# ═══════════════════════════════════════
_prog(0.60, "③ 학술 DB 검색 (KCI·RISS·arXiv)...")
academic_query = text[:100].replace('\n', ' ')
has_korean = bool(re.search(r'[가-힣]', text))
try:
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {}
futures[executor.submit(search_arxiv, academic_query[:60])] = "arXiv"
if has_korean:
futures[executor.submit(search_kci, academic_query[:40])] = "KCI"
futures[executor.submit(search_riss, academic_query[:40])] = "RISS"
seen_urls = {s["url"] for s in gemini_sources + brave_sources}
for future in as_completed(futures, timeout=15):
src_name = futures[future]
try:
results = future.result()
for r in results:
url = r.get("url", "")
if url and url not in seen_urls:
academic_sources.append({
"title": r.get("title", "")[:80],
"url": url,
"source": src_name,
"snippet": r.get("snippet", "")[:120],
})
seen_urls.add(url)
except Exception:
pass
log_lines.append(f"[학술] KCI·RISS·arXiv 출처={len(academic_sources)}건")
except Exception as e:
log_lines.append(f"[학술] 오류: {str(e)[:80]}")
# ═══════════════════════════════════════
# PHASE 4: 출처 검증 (실제 유사도 측정)
# ═══════════════════════════════════════
_prog(0.75, "④ 출처 검증 (크롤링 대조)...")
raw_sources = gemini_sources + brave_sources + academic_sources
log_lines.append(f"[수집] 총 {len(raw_sources)}건 (검증 전)")
# 실제 크롤링하여 원문과 대조 → 유사도 미달 출처 제거
verified_sources, avg_similarity, crawled_texts = _verify_sources_parallel(
text, raw_sources, min_similarity=0.02, max_workers=8
)
# 검증 통과하지 못한 출처는 "미검증"으로 별도 보관
verified_urls = {s["url"] for s in verified_sources}
unverified_sources = [s for s in raw_sources if s["url"] not in verified_urls]
log_lines.append(f"[검증] 통과={len(verified_sources)}건, 미검증={len(unverified_sources)}건, 크롤링={len(crawled_texts)}건, 평균유사도={avg_similarity:.3f}")
# ═══════════════════════════════════════
# PHASE 5: 벡터DB 문장별 정밀 대조
# ═══════════════════════════════════════
vector_flagged = []
vector_plag_ratio = 0.0
vector_source_hits = {}
vdb = None
if HAS_CHROMADB and crawled_texts:
_prog(0.80, "⑤ 벡터DB 문장별 정밀 대조...")
try:
vdb = VectorPlagiarismDB()
if vdb.available:
# 크롤링된 모든 출처 본문을 벡터DB에 인덱싱
indexed_count = 0
for url, info in crawled_texts.items():
n = vdb.index_source(
url=url,
title=info.get("title", ""),
text=info["text"],
source_type=info.get("source", "Web"),
)
indexed_count += n
log_lines.append(f"[벡터DB] {len(crawled_texts)}개 출처 → {indexed_count}개 청크 인덱싱")
if indexed_count > 0:
# 입력 텍스트를 문장별로 벡터DB 검색
vector_flagged, vector_plag_ratio, vector_source_hits = vdb.check_document(
text, min_similarity=35
)
log_lines.append(
f"[벡터DB] 표절 문장={len(vector_flagged)}건, "
f"문장표절율={vector_plag_ratio:.1f}%, "
f"히트출처={len(vector_source_hits)}건"
)
except Exception as e:
log_lines.append(f"[벡터DB] 오류: {str(e)[:80]}")
elif not HAS_CHROMADB:
log_lines.append("[벡터DB] chromadb 미설치 — 건너뜀")
else:
log_lines.append("[벡터DB] 크롤링 데이터 없음 — 건너뜀")
# ═══════════════════════════════════════
# 종합 판정 (n-gram + 벡터DB 복합)
# ═══════════════════════════════════════
_prog(0.90, "보고서 생성...")
all_sources = verified_sources # 검증된 출처만 표시
# 표절율 산출: 다층 검증
# ① n-gram 기반 (기존)
max_sim = max((s.get("similarity", 0) for s in verified_sources), default=0)
count_factor = min(len(verified_sources) * 3, 30)
avg_factor = avg_similarity * 100
ngram_score = min(round(max_sim * 0.4 + avg_factor * 0.3 + count_factor * 0.3), 100)
# ② 벡터DB 기반 (문장별 매칭)
vector_score = round(vector_plag_ratio) if vector_flagged else 0
# 복합 점수: 벡터DB가 있으면 50:50, 없으면 n-gram만
if vector_flagged:
combined_score = round(ngram_score * 0.4 + vector_score * 0.6)
else:
combined_score = ngram_score
# Gemini 표절율은 참고용 (20% 가중, 검증 출처가 있을 때만)
if gemini_pct > 0 and len(verified_sources) > 0:
plag_pct = min(round(gemini_pct * 0.2 + combined_score * 0.8), 100)
elif len(verified_sources) > 0:
plag_pct = combined_score
else:
plag_pct = min(round(gemini_pct * 0.1), 20)
log_lines.append(
f"[판정] Gemini={gemini_pct}%, n-gram={ngram_score}%, "
f"벡터={vector_score}%, → 종합={plag_pct}%"
)
if plag_pct >= 50:
grade, gc = "🚨 표절 의심", "#FF4444"
elif plag_pct >= 30:
grade, gc = "⚠️ 주의 필요", "#FF8800"
elif plag_pct >= 15:
grade, gc = "📌 유사표현", "#DDAA00"
elif plag_pct >= 5:
grade, gc = "✓ 양호", "#4ECDC4"
else:
grade, gc = "✅ 우수", "#22AA44"
word_count = len(_split_words(text))
char_count = len(text)
# ═══════════════════════════════════════
# 출처 테이블 HTML 생성
# ═══════════════════════════════════════
def _source_badge(src):
colors = {
"Google": "#4285F4", "Brave": "#FB542B", "Web": "#888",
"KCI": "#2E7D32", "RISS": "#1565C0", "arXiv": "#B71C1C",
}
c = colors.get(src, "#666")
return f'<span style="display:inline-block;padding:2px 6px;border-radius:3px;background:{c};color:#fff;font-size:9px;font-weight:700;">{src}</span>'
src_rows = ""
for i, s in enumerate(all_sources[:30]):
title_display = s['title'][:55] if s['title'] else (
s['url'].split('/')[2] if len(s['url'].split('/')) > 2 else s['url'][:40]
)
sim_val = s.get("similarity", 0)
sim_color = "#FF4444" if sim_val >= 30 else "#FF8800" if sim_val >= 10 else "#DDAA00" if sim_val >= 3 else "#999"
sim_html = f'<span style="font-weight:700;color:{sim_color};">{sim_val:.1f}%</span>'
method = s.get("method", "")
method_tag = f' <span style="font-size:8px;color:#aaa;">({method})</span>' if method else ""
snippet_html = (
f'<div style="font-size:9px;color:#888;margin-top:2px;">{s["snippet"][:100]}</div>'
if s.get("snippet") else ""
)
matched = s.get("matched", [])
matched_html = ""
if matched:
matched_html = '<div style="font-size:9px;color:#C62828;margin-top:2px;">매칭: ' + " | ".join(matched[:2]) + '</div>'
src_rows += f"""<tr style="border-bottom:1px solid #E8E8E8;">
<td style="padding:8px;text-align:center;font-size:11px;color:#666;">{i + 1}</td>
<td style="padding:8px;">{_source_badge(s.get('source', ''))}</td>
<td style="padding:8px;text-align:center;">{sim_html}{method_tag}</td>
<td style="padding:8px;"><a href="{s['url']}" target="_blank" rel="noopener noreferrer" style="color:#2E86C1;text-decoration:none;font-weight:600;font-size:11px;">{title_display}</a>{snippet_html}{matched_html}</td>
<td style="padding:8px;font-size:9px;color:#999;word-break:break-all;max-width:180px;"><a href="{s['url']}" target="_blank" rel="noopener noreferrer" style="color:#999;text-decoration:none;">{s['url'][:60]}</a></td>
</tr>"""
if not src_rows:
src_rows = '<tr><td colspan="5" style="padding:20px;text-align:center;color:#999;">검증된 유사 출처 없음 — 표절 가능성 낮음</td></tr>'
# Gemini 분석 요약 (접기)
gemini_summary = ""
if gemini_response:
safe_resp = gemini_response.replace('<', '&lt;').replace('>', '&gt;').replace('\n', '<br>')
gemini_summary = f"""
<div style="padding:16px 24px;border-bottom:1px solid #E0E0E0;">
<details>
<summary style="cursor:pointer;font-size:13px;font-weight:700;color:#1A3C6E;">🤖 Gemini 분석 상세</summary>
<div style="margin-top:10px;padding:12px;background:#F8F9FA;border-radius:6px;font-size:11px;line-height:1.7;color:#333;max-height:300px;overflow-y:auto;">{safe_resp}</div>
</details>
</div>"""
# 벡터DB 문장별 매칭 결과
vector_section = ""
if vector_flagged:
vf_rows = ""
for k, vf in enumerate(vector_flagged[:15]):
sim_val = vf["similarity"]
sim_color = "#FF4444" if sim_val >= 70 else "#FF8800" if sim_val >= 50 else "#DDAA00"
sent_safe = vf["sentence"][:70].replace('<', '&lt;')
matched_safe = vf["matched_text"][:90].replace('<', '&lt;')
title_safe = vf["title"][:40].replace('<', '&lt;')
vf_rows += f"""<tr style="border-bottom:1px solid #F0F0F0;">
<td style="padding:6px;text-align:center;font-size:10px;color:#666;">{k+1}</td>
<td style="padding:6px;font-size:10px;color:#333;">{sent_safe}</td>
<td style="padding:6px;text-align:center;"><span style="font-weight:700;color:{sim_color};">{sim_val:.0f}%</span></td>
<td style="padding:6px;font-size:9px;color:#666;">{matched_safe}</td>
<td style="padding:6px;font-size:9px;"><a href="{vf['url']}" target="_blank" rel="noopener noreferrer" style="color:#2E86C1;text-decoration:none;">{title_safe}</a></td>
</tr>"""
vdb_stats = vdb.get_stats() if vdb else {"chunks": 0}
vector_section = f"""
<div style="padding:16px 24px;border-bottom:1px solid #E0E0E0;">
<details open>
<summary style="cursor:pointer;font-size:13px;font-weight:700;color:#1A3C6E;">
🧬 벡터DB 문장별 정밀 대조 — 표절 문장 {len(vector_flagged)}건 / 문장표절율 {vector_plag_ratio:.1f}%
<span style="font-size:10px;font-weight:400;color:#888;margin-left:8px;">(DB: {vdb_stats['chunks']}청크)</span>
</summary>
<table style="width:100%;border-collapse:collapse;font-size:11px;margin-top:10px;">
<thead>
<tr style="background:#E8EAF6;color:#333;">
<th style="padding:8px;width:30px;">#</th>
<th style="padding:8px;text-align:left;">입력 문장</th>
<th style="padding:8px;width:55px;">유사도</th>
<th style="padding:8px;text-align:left;">매칭 출처 내용</th>
<th style="padding:8px;width:120px;">출처</th>
</tr>
</thead>
<tbody>{vf_rows}</tbody>
</table>
</details>
</div>"""
HDR = '#3B7DD8'
html = f"""<div style="font-family:'Noto Sans KR',sans-serif;max-width:900px;margin:20px auto;background:#fff;border:1px solid #E0E0E0;border-radius:8px;box-shadow:0 2px 8px rgba(0,0,0,0.06);">
<div style="background:linear-gradient(135deg,{HDR},#4A8DE0);padding:24px;color:#fff;border-radius:8px 8px 0 0;">
<div style="display:flex;justify-content:space-between;align-items:center;">
<div>
<div style="font-size:24px;font-weight:900;">표절 검사 결과</div>
<div style="font-size:12px;opacity:0.9;margin-top:4px;">Gemini + Brave + KCI·RISS·arXiv + 벡터DB 정밀대조</div>
</div>
<div style="text-align:right;font-size:11px;opacity:0.9;">
<div>문서: {doc_id}</div>
<div>{now}</div>
</div>
</div>
</div>
<div style="padding:24px;background:#FAFBFE;border-bottom:1px solid #E0E0E0;">
<div style="display:grid;grid-template-columns:1fr 1fr 1fr 1fr;gap:12px;">
<div style="text-align:center;padding:16px;background:#fff;border-radius:6px;border:1px solid #E0E0E0;">
<div style="font-size:42px;font-weight:900;color:{gc};">{plag_pct}%</div>
<div style="font-size:11px;color:#666;margin-top:6px;">종합 표절율</div>
</div>
<div style="text-align:center;padding:16px;background:#fff;border-radius:6px;border:1px solid #E0E0E0;">
<div style="font-size:22px;font-weight:900;color:{gc};margin-top:6px;">{grade}</div>
<div style="font-size:11px;color:#666;margin-top:6px;">판정</div>
</div>
<div style="text-align:center;padding:16px;background:#fff;border-radius:6px;border:1px solid #E0E0E0;">
<div style="font-size:28px;font-weight:900;color:#555;">{len(all_sources)}</div>
<div style="font-size:11px;color:#666;margin-top:6px;">검증 출처</div>
</div>
<div style="text-align:center;padding:16px;background:#fff;border-radius:6px;border:1px solid #E0E0E0;">
<div style="font-size:14px;font-weight:700;color:#4285F4;margin-top:4px;">{len(raw_sources)} 수집</div>
<div style="font-size:14px;font-weight:700;color:#22AA44;">{len(verified_sources)} 검증</div>
<div style="font-size:14px;font-weight:700;color:#999;">{len(unverified_sources)} 제외</div>
<div style="font-size:9px;color:#666;margin-top:2px;">크롤링 대조</div>
</div>
</div>
</div>
<div style="padding:16px 24px;border-bottom:1px solid #E0E0E0;">
<div style="font-size:13px;font-weight:700;color:#1A3C6E;margin-bottom:8px;">📋 검사 정보</div>
<div style="display:flex;gap:24px;font-size:12px;">
<span><span style="color:#888;">글자수</span> <b>{char_count:,}자</b></span>
<span><span style="color:#888;">단어수</span> <b>{word_count:,}단어</b></span>
<span><span style="color:#888;">검색엔진</span> <b>Google + Brave + KCI·RISS·arXiv</b></span>
</div>
</div>{gemini_summary}{vector_section}
<div style="padding:24px;border-bottom:1px solid #E0E0E0;">
<div style="font-size:13px;font-weight:700;color:#1A3C6E;margin-bottom:12px;">🔍 검증된 출처 ({len(all_sources)}건)</div>
<table style="width:100%;border-collapse:collapse;font-size:11px;">
<thead>
<tr style="background:{HDR};color:white;">
<th style="padding:10px;text-align:center;width:35px;">#</th>
<th style="padding:10px;text-align:center;width:55px;">소스</th>
<th style="padding:10px;text-align:center;width:65px;">유사도</th>
<th style="padding:10px;text-align:left;">출처</th>
<th style="padding:10px;text-align:left;width:180px;">URL</th>
</tr>
</thead>
<tbody>{src_rows}</tbody>
</table>
</div>
</div>"""
_prog(0.95, "완료!")
log_text = "\n".join(log_lines)
return html, log_text