# ============================================ # plagiarism_check.py # 표절 검사 전용 모듈 # Gemini Google Search + Brave + KCI·RISS·arXiv # ============================================ import re, os, hashlib from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed import gradio as gr try: import httpx HAS_HTTPX = True except ImportError: HAS_HTTPX = False try: import chromadb HAS_CHROMADB = True except ImportError: HAS_CHROMADB = False try: import numpy as np HAS_NUMPY = True except ImportError: HAS_NUMPY = False try: from google import genai from google.genai import types as gtypes HAS_GENAI = True except ImportError: HAS_GENAI = False GEMINI_KEY = os.getenv("GEMINI_API_KEY", "") BRAVE_KEY = os.getenv("BRAVE_API_KEY", "") # ============================================ # 경량 텍스트 유틸 (app.py 의존성 제거용) # ============================================ def _split_sentences(text): """문장 분리 (경량 regex 버전)""" text = re.sub(r'\s+', ' ', text).strip() sents = re.split(r'(?<=[.!?。])\s+', text) return [s.strip() for s in sents if s.strip()] def _split_words(text): """단어 분리""" return [w for w in re.findall(r'[가-힣a-zA-Z0-9]+', text) if w] def _http_get(url, headers=None, timeout=15): """HTTP GET""" try: if HAS_HTTPX: r = httpx.get(url, headers=headers or {}, timeout=timeout, follow_redirects=True) return r.text if r.status_code == 200 else None else: import urllib.request req = urllib.request.Request(url, headers=headers or {}) with urllib.request.urlopen(req, timeout=timeout) as resp: return resp.read().decode('utf-8', errors='replace') except: return None # ============================================ # 웹 검색 함수 # ============================================ def brave_search(query, count=5): """Brave Search API""" if not BRAVE_KEY: return [] url = f"https://api.search.brave.com/res/v1/web/search?q={query}&count={count}" try: if HAS_HTTPX: r = httpx.get(url, headers={"X-Subscription-Token": BRAVE_KEY, "Accept": "application/json"}, timeout=10) if r.status_code == 200: data = r.json() results = [] for item in data.get("web", {}).get("results", []): results.append({ "title": item.get("title", ""), "url": item.get("url", ""), "snippet": item.get("description", ""), "source": "Brave", }) return results except: pass return [] def search_kci(query): """KCI 학술논문 검색""" try: url = f"https://open.kci.go.kr/po/openapi/openApiSearch.kci?apiCode=articleSearch&title={query}&displayCount=3" resp = _http_get(url, timeout=8) if resp: results = [] for m in re.finditer( r'.*?', resp, re.S, ): results.append({"title": m.group(1), "url": m.group(2), "snippet": "", "source": "KCI"}) return results[:3] except: pass return [] def search_riss(query): """RISS 학술논문 검색""" results = [] try: url = ( f"http://www.riss.kr/search/Search.do?isDetailSearch=N&searchGubun=true" f"&viewYn=OP&queryText=&strQuery={query}&iStartCount=0&iGroupView=5&icate=all" ) resp = _http_get(url, timeout=8) if resp: for m in re.finditer( r'class="title"[^>]*>.*?]*href="([^"]+)"[^>]*>(.*?)', resp, re.S ): title = re.sub(r'<[^>]+>', '', m.group(2)).strip() if title: results.append({ "title": title, "url": "https://www.riss.kr" + m.group(1), "snippet": "", "source": "RISS", }) except: pass return results[:3] def search_arxiv(query): """arXiv 논문 검색""" results = [] try: import urllib.parse q = urllib.parse.quote(query) url = f"https://export.arxiv.org/api/query?search_query=all:{q}&start=0&max_results=3&sortBy=relevance" resp = _http_get(url, timeout=12) if resp: for m in re.finditer( r'.*?(.*?).*?(.*?).*?(.*?)', resp, re.S, ): title = re.sub(r'\s+', ' ', m.group(1)).strip() results.append({ "title": title, "url": m.group(2).strip(), "snippet": re.sub(r'\s+', ' ', m.group(3)).strip()[:150], "source": "arXiv", }) except: pass return results[:3] def parallel_brave_search(queries, max_workers=10): """Brave Search 병렬 실행""" all_results = {} with ThreadPoolExecutor(max_workers=min(max_workers, 20)) as executor: futures = {executor.submit(brave_search, q, 3): q for q in queries} for future in as_completed(futures): q = futures[future] try: all_results[q] = future.result() except: all_results[q] = [] return all_results def duckduckgo_search(query, max_results=5): """DuckDuckGo HTML 스크래핑 — API 키 불필요 폴백""" results = [] try: import urllib.parse q = urllib.parse.quote(query) url = f"https://html.duckduckgo.com/html/?q={q}" headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} resp = _http_get(url, headers=headers, timeout=10) if resp: for m in re.finditer( r']+class="result__a"[^>]+href="([^"]+)"[^>]*>(.*?)' r'.*?]+class="result__snippet"[^>]*>(.*?)', resp, re.S, ): href = m.group(1) title = re.sub(r'<[^>]+>', '', m.group(2)).strip() snippet = re.sub(r'<[^>]+>', '', m.group(3)).strip() real_url = href if 'uddg=' in href: um = re.search(r'uddg=([^&]+)', href) if um: real_url = urllib.parse.unquote(um.group(1)) if title: results.append({"title": title, "url": real_url, "snippet": snippet, "source": "Web"}) if len(results) >= max_results: break except: pass return results def self_crawl_search(query, max_results=3): """DuckDuckGo 크롤링 + 학술 보충""" all_results = [] all_results.extend(duckduckgo_search(query, max_results)) if '논문' not in query and 'paper' not in query.lower(): all_results.extend(duckduckgo_search(f"{query} 논문 학술", 2)) return all_results # ============================================ # Gemini Google Search Grounding # ============================================ def gemini_plagiarism_check(text_chunk): """Gemini + Google Search Grounding으로 표절 검사""" if not HAS_GENAI or not GEMINI_KEY: return None try: client = genai.Client(api_key=GEMINI_KEY) tool = gtypes.Tool(googleSearch=gtypes.GoogleSearch()) prompt = f"""다음 텍스트가 인터넷에 존재하는지 Google Search로 확인하세요. 유사한 문장이 발견되면 출처 URL과 유사도(%)를 보고하세요. 마지막 줄에 "유사도: XX%" 형식으로 작성. [텍스트] {text_chunk[:1000]}""" resp = client.models.generate_content( model="gemini-flash-lite-latest", contents=prompt, config=gtypes.GenerateContentConfig(tools=[tool], temperature=0.1, max_output_tokens=600), ) text_resp = resp.text if resp.text else "" sources = [] if hasattr(resp, 'candidates') and resp.candidates: gc = resp.candidates[0].grounding_metadata if gc and hasattr(gc, 'grounding_chunks'): for chunk in gc.grounding_chunks: if hasattr(chunk, 'web') and chunk.web: sources.append({ "title": chunk.web.title or "", "url": chunk.web.uri or "", "source": "Google", }) pm = re.search(r'유사도[:\s]*(\d+)', text_resp) pct = int(pm.group(1)) if pm else 0 return {"pct": pct, "response": text_resp, "sources": sources} except Exception as e: return {"pct": 0, "response": str(e)[:100], "sources": []} # ============================================ # 유사도 분석 함수 # ============================================ def _ngram_set(text, n=3): """텍스트에서 n-gram 집합 생성""" text = re.sub(r'\s+', ' ', text.lower().strip()) words = re.findall(r'[가-힣a-zA-Z0-9]+', text) if len(words) < n: return set(words) return {' '.join(words[i:i + n]) for i in range(len(words) - n + 1)} def _text_similarity(text_a, text_b, n=3): """두 텍스트 간 n-gram Jaccard 유사도 (0.0~1.0)""" if not text_a or not text_b: return 0.0 set_a = _ngram_set(text_a, n) set_b = _ngram_set(text_b, n) if not set_a or not set_b: return 0.0 intersection = set_a & set_b union = set_a | set_b return len(intersection) / len(union) if union else 0.0 def _containment_similarity(original, source_text, n=3): """원문 n-gram이 출처에 얼마나 포함되는지 (Containment, 0.0~1.0)""" if not original or not source_text: return 0.0 set_orig = _ngram_set(original, n) set_src = _ngram_set(source_text, n) if not set_orig: return 0.0 overlap = set_orig & set_src return len(overlap) / len(set_orig) def _fetch_page_text(url, timeout=8): """URL에서 본문 텍스트 추출 (경량 크롤링)""" if not HAS_HTTPX: return "" try: headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} r = httpx.get(url, headers=headers, timeout=timeout, follow_redirects=True) if r.status_code != 200: return "" html = r.text html = re.sub(r']*>.*?', '', html, flags=re.S | re.I) html = re.sub(r']*>.*?', '', html, flags=re.S | re.I) text = re.sub(r'<[^>]+>', ' ', html) text = re.sub(r'\s+', ' ', text).strip() return text[:5000] except Exception: return "" def _verify_sources_parallel(original_text, sources, min_similarity=0.02, max_workers=8): """출처 URL을 크롤링하여 원문과 대조 검증 — 유사도 미달 출처 제거, 크롤링 본문도 반환""" if not sources: return [], 0.0, {} verified = [] total_sim = 0.0 crawled_texts = {} # url → page_text (벡터DB 인덱싱용) def _check_one(src): url = src.get("url", "") snippet = src.get("snippet", "") page_text = "" # snippet만으로 검증되는 경우에도 본문 크롤링 시도 page_text = _fetch_page_text(url, timeout=6) if snippet and len(snippet) > 20: sim = _text_similarity(original_text[:500], snippet, n=2) if sim >= min_similarity: return {**src, "similarity": round(sim * 100, 1), "method": "snippet"}, sim, page_text if page_text and len(page_text) > 50: jaccard = _text_similarity(original_text, page_text, n=3) containment = _containment_similarity(original_text, page_text, n=3) sim = max(jaccard, containment) if sim >= min_similarity: matched = _find_matching_sentences(original_text, page_text) return {**src, "similarity": round(sim * 100, 1), "method": "crawl", "matched": matched}, sim, page_text return None, 0.0, page_text with ThreadPoolExecutor(max_workers=min(max_workers, len(sources))) as executor: futures = {executor.submit(_check_one, src): src for src in sources[:20]} for future in as_completed(futures, timeout=30): try: result, sim, page_text = future.result() src_info = futures[future] url = src_info.get("url", "") # 크롤링 본문 저장 (검증 여부 무관하게) if page_text and len(page_text) > 50 and url: crawled_texts[url] = { "text": page_text, "title": src_info.get("title", ""), "source": src_info.get("source", ""), } if result is not None: verified.append(result) total_sim += sim except Exception: pass verified.sort(key=lambda x: x.get("similarity", 0), reverse=True) avg_sim = total_sim / len(verified) if verified else 0.0 return verified, avg_sim, crawled_texts def _find_matching_sentences(original, source_text, threshold=0.3): """원문에서 출처와 일치하는 문장 찾기""" orig_sents = _split_sentences(original) matched = [] for sent in orig_sents[:20]: if len(sent) < 10: continue sim = _text_similarity(sent, source_text, n=2) if sim >= threshold: matched.append(sent[:60]) if len(matched) >= 3: break return matched # ============================================ # 벡터 DB 기반 정밀 표절 검사 # ============================================ def _chunk_text(text, chunk_size=200, overlap=50): """텍스트를 청크로 분리 (문장 경계 유지)""" sents = _split_sentences(text) chunks = [] current = [] current_len = 0 for sent in sents: current.append(sent) current_len += len(sent) if current_len >= chunk_size: chunks.append(' '.join(current)) # overlap: 마지막 문장 유지 if overlap > 0 and len(current) > 1: current = current[-1:] current_len = len(current[0]) else: current = [] current_len = 0 if current: chunks.append(' '.join(current)) return chunks class VectorPlagiarismDB: """크롤링한 출처를 벡터DB에 저장하고 문장별 유사도 검색""" def __init__(self): if not HAS_CHROMADB: self.available = False return try: self.client = chromadb.Client() self.collection = self.client.get_or_create_collection( name="plag_sources", metadata={"hnsw:space": "cosine"}, ) self.available = True self._doc_count = 0 except Exception as e: print(f"ChromaDB 초기화 실패: {e}") self.available = False def index_source(self, url, title, text, source_type="Web"): """출처 텍스트를 청크로 분리하여 벡터DB에 저장""" if not self.available or not text or len(text.strip()) < 30: return 0 try: chunks = _chunk_text(text, chunk_size=200, overlap=50) if not chunks: return 0 doc_hash = hashlib.md5(url.encode()).hexdigest()[:10] ids = [] documents = [] metadatas = [] for i, chunk in enumerate(chunks[:50]): # 출처당 최대 50청크 chunk_id = f"{doc_hash}_{i}" ids.append(chunk_id) documents.append(chunk) metadatas.append({ "url": url[:200], "title": title[:100], "source": source_type, "chunk_idx": i, }) self.collection.add(ids=ids, documents=documents, metadatas=metadatas) self._doc_count += len(ids) return len(ids) except Exception as e: print(f"벡터DB 인덱싱 오류: {e}") return 0 def query_sentence(self, sentence, n_results=3): """단일 문장에 대해 가장 유사한 출처 청크 검색""" if not self.available or self._doc_count == 0: return [] try: results = self.collection.query( query_texts=[sentence], n_results=min(n_results, self._doc_count), ) matches = [] if results and results['distances'] and results['distances'][0]: for j, dist in enumerate(results['distances'][0]): # ChromaDB cosine distance: 0 = 동일, 2 = 정반대 # similarity = 1 - (distance / 2) similarity = max(0, 1 - dist / 2) if similarity < 0.3: # 30% 미만은 무시 continue meta = results['metadatas'][0][j] if results['metadatas'] else {} doc_text = results['documents'][0][j] if results['documents'] else "" matches.append({ "similarity": round(similarity * 100, 1), "url": meta.get("url", ""), "title": meta.get("title", ""), "source": meta.get("source", ""), "matched_text": doc_text[:150], }) return matches except Exception as e: print(f"벡터DB 쿼리 오류: {e}") return [] def check_document(self, text, min_similarity=30): """전체 문서를 문장별로 검사 → 표절 문장 목록 반환""" if not self.available or self._doc_count == 0: return [], 0.0, {} sents = _split_sentences(text) flagged = [] # 표절 의심 문장 all_sims = [] # 전체 유사도 source_hits = {} # URL별 히트 횟수 for sent in sents: if len(sent) < 15: continue matches = self.query_sentence(sent, n_results=3) if not matches: all_sims.append(0) continue best = matches[0] all_sims.append(best["similarity"]) if best["similarity"] >= min_similarity: flagged.append({ "sentence": sent[:80], "similarity": best["similarity"], "url": best["url"], "title": best["title"], "matched_text": best["matched_text"][:100], }) url = best["url"] source_hits[url] = source_hits.get(url, 0) + 1 # 표절율: 유사 문장 비율 total_checked = len(all_sims) flagged_count = len(flagged) plag_ratio = (flagged_count / total_checked * 100) if total_checked > 0 else 0 # 평균 유사도 (유사 문장만) avg_sim = sum(s["similarity"] for s in flagged) / len(flagged) if flagged else 0 return flagged, plag_ratio, source_hits def get_stats(self): """DB 상태""" if not self.available: return {"available": False, "chunks": 0} return {"available": True, "chunks": self._doc_count} # ============================================ # 핵심 구문 추출 # ============================================ def _extract_key_phrases(text, max_phrases=6): """텍스트에서 핵심 검색 구문 추출 (Brave/학술 검색용)""" sents = _split_sentences(text) phrases = [] ranked = sorted(sents, key=lambda s: len(s), reverse=True) for s in ranked: if 15 <= len(s) <= 80: phrases.append(s) elif len(s) > 80: phrases.append(s[:80]) if len(phrases) >= max_phrases: break if len(phrases) < 2 and sents: phrases.append(sents[0][:80]) return phrases # ============================================ # 메인 표절 검사 함수 # ============================================ def run_plagiarism(text, progress=gr.Progress()): """표절 검사 — 3단계 파이프라인: ① Gemini Google Search ② Brave 웹검색 ③ 학술DB""" if not text or len(text.strip()) < 50: return "
⚠️ 최소 50자 이상
", "" text = text.strip() now = datetime.now().strftime("%Y-%m-%d %H:%M") doc_id = hashlib.md5(text[:100].encode()).hexdigest()[:8].upper() log_lines = [] # progress 헬퍼 def _prog(val, msg=""): try: progress(val, msg) except: pass gemini_pct = 0 gemini_sources = [] gemini_response = "" brave_sources = [] academic_sources = [] # ═══════════════════════════════════════ # PHASE 1: Gemini Google Search Grounding # ═══════════════════════════════════════ if HAS_GENAI and GEMINI_KEY: _prog(0.10, "① Gemini Google Search Grounding...") try: client = genai.Client(api_key=GEMINI_KEY) prompt = f"""당신은 표절 검사 전문가입니다. 아래 텍스트가 인터넷에 이미 존재하는 내용인지 Google Search로 철저히 검색하세요. [검사 대상 텍스트] {text[:3000]} [응답 형식] 1. 발견된 유사 콘텐츠를 각각 "제목 | URL | 유사도(높음/중간/낮음)" 형식으로 나열 2. 발견 못하면 "유사 콘텐츠 없음" 3. 마지막 줄에 반드시 "표절율: XX%" 형식으로 종합 판정""" contents = [ gtypes.Content( role="user", parts=[gtypes.Part.from_text(text=prompt)], ) ] tools = [gtypes.Tool(googleSearch=gtypes.GoogleSearch())] generate_content_config = gtypes.GenerateContentConfig( thinking_config=gtypes.ThinkingConfig(thinking_budget=0), tools=tools, temperature=0.1, max_output_tokens=4000, ) _prog(0.20, "① Google Search 실행 중...") full_response = "" for chunk in client.models.generate_content_stream( model="gemini-flash-lite-latest", contents=contents, config=generate_content_config, ): if chunk.text: full_response += chunk.text gemini_response = full_response # 표절율 추출 pm = re.search(r'표절율[:\s]*(\d+)', full_response) if pm: gemini_pct = int(pm.group(1)) # Grounding Metadata에서 출처 추출 (비스트리밍 재호출) _prog(0.28, "① 출처 메타데이터 추출...") try: resp_full = client.models.generate_content( model="gemini-flash-lite-latest", contents=prompt, config=gtypes.GenerateContentConfig( tools=[gtypes.Tool(googleSearch=gtypes.GoogleSearch())], temperature=0.1, max_output_tokens=2000, ), ) if hasattr(resp_full, 'candidates') and resp_full.candidates: cand = resp_full.candidates[0] gm = getattr(cand, 'grounding_metadata', None) if gm: chunks = getattr(gm, 'grounding_chunks', None) or [] for gc in chunks: web = getattr(gc, 'web', None) if web: title = getattr(web, 'title', '') or '' uri = getattr(web, 'uri', '') or '' if uri: gemini_sources.append({ "title": title, "url": uri, "source": "Google", "snippet": "", }) supports = getattr(gm, 'grounding_supports', None) or [] for sup in supports: seg = getattr(sup, 'segment', None) snippet_text = getattr(seg, 'text', '') if seg else '' idxs = getattr(sup, 'grounding_chunk_indices', []) or [] for idx in idxs: if idx < len(gemini_sources) and snippet_text: gemini_sources[idx]["snippet"] = snippet_text[:120] except Exception as e2: print(f"Gemini 메타데이터 추출 오류: {e2}") # 텍스트 응답에서 추가 URL 추출 existing_urls = {s["url"] for s in gemini_sources} for m in re.finditer(r'https?://[^\s\)\]\,\"\']{10,}', full_response): url = m.group(0).rstrip('.') if url not in existing_urls: domain = url.split('/')[2] if len(url.split('/')) > 2 else url gemini_sources.append({"title": domain, "url": url, "source": "Google", "snippet": ""}) existing_urls.add(url) log_lines.append(f"[Gemini] 표절율={gemini_pct}%, 출처={len(gemini_sources)}건") except Exception as e: log_lines.append(f"[Gemini] 오류: {str(e)[:100]}") print(f"Gemini 오류: {str(e)}") else: log_lines.append("[Gemini] API 키 없음 — 건너뜀") # ═══════════════════════════════════════ # PHASE 2: Brave Search 병렬 웹 검색 # ═══════════════════════════════════════ _prog(0.40, "② Brave Search 웹 검색...") key_phrases = _extract_key_phrases(text, max_phrases=6) if BRAVE_KEY and key_phrases: try: brave_results = parallel_brave_search(key_phrases, max_workers=10) seen_urls = {s["url"] for s in gemini_sources} for query, results in brave_results.items(): for r in results: url = r.get("url", "") if url and url not in seen_urls: brave_sources.append({ "title": r.get("title", "")[:80], "url": url, "source": "Brave", "snippet": r.get("snippet", "")[:120], }) seen_urls.add(url) log_lines.append(f"[Brave] 쿼리={len(key_phrases)}개, 출처={len(brave_sources)}건") except Exception as e: log_lines.append(f"[Brave] 오류: {str(e)[:80]}") elif not BRAVE_KEY: try: seen_urls = {s["url"] for s in gemini_sources} for phrase in key_phrases[:3]: for r in duckduckgo_search(phrase, max_results=3): url = r.get("url", "") if url and url not in seen_urls: brave_sources.append({ "title": r.get("title", "")[:80], "url": url, "source": "Web", "snippet": r.get("snippet", "")[:120], }) seen_urls.add(url) log_lines.append(f"[DuckDuckGo] 폴백, 출처={len(brave_sources)}건") except Exception as e: log_lines.append(f"[DuckDuckGo] 오류: {str(e)[:80]}") # ═══════════════════════════════════════ # PHASE 3: 학술 DB 검색 (KCI · RISS · arXiv) # ═══════════════════════════════════════ _prog(0.60, "③ 학술 DB 검색 (KCI·RISS·arXiv)...") academic_query = text[:100].replace('\n', ' ') has_korean = bool(re.search(r'[가-힣]', text)) try: with ThreadPoolExecutor(max_workers=5) as executor: futures = {} futures[executor.submit(search_arxiv, academic_query[:60])] = "arXiv" if has_korean: futures[executor.submit(search_kci, academic_query[:40])] = "KCI" futures[executor.submit(search_riss, academic_query[:40])] = "RISS" seen_urls = {s["url"] for s in gemini_sources + brave_sources} for future in as_completed(futures, timeout=15): src_name = futures[future] try: results = future.result() for r in results: url = r.get("url", "") if url and url not in seen_urls: academic_sources.append({ "title": r.get("title", "")[:80], "url": url, "source": src_name, "snippet": r.get("snippet", "")[:120], }) seen_urls.add(url) except Exception: pass log_lines.append(f"[학술] KCI·RISS·arXiv 출처={len(academic_sources)}건") except Exception as e: log_lines.append(f"[학술] 오류: {str(e)[:80]}") # ═══════════════════════════════════════ # PHASE 4: 출처 검증 (실제 유사도 측정) # ═══════════════════════════════════════ _prog(0.75, "④ 출처 검증 (크롤링 대조)...") raw_sources = gemini_sources + brave_sources + academic_sources log_lines.append(f"[수집] 총 {len(raw_sources)}건 (검증 전)") # 실제 크롤링하여 원문과 대조 → 유사도 미달 출처 제거 verified_sources, avg_similarity, crawled_texts = _verify_sources_parallel( text, raw_sources, min_similarity=0.02, max_workers=8 ) # 검증 통과하지 못한 출처는 "미검증"으로 별도 보관 verified_urls = {s["url"] for s in verified_sources} unverified_sources = [s for s in raw_sources if s["url"] not in verified_urls] log_lines.append(f"[검증] 통과={len(verified_sources)}건, 미검증={len(unverified_sources)}건, 크롤링={len(crawled_texts)}건, 평균유사도={avg_similarity:.3f}") # ═══════════════════════════════════════ # PHASE 5: 벡터DB 문장별 정밀 대조 # ═══════════════════════════════════════ vector_flagged = [] vector_plag_ratio = 0.0 vector_source_hits = {} vdb = None if HAS_CHROMADB and crawled_texts: _prog(0.80, "⑤ 벡터DB 문장별 정밀 대조...") try: vdb = VectorPlagiarismDB() if vdb.available: # 크롤링된 모든 출처 본문을 벡터DB에 인덱싱 indexed_count = 0 for url, info in crawled_texts.items(): n = vdb.index_source( url=url, title=info.get("title", ""), text=info["text"], source_type=info.get("source", "Web"), ) indexed_count += n log_lines.append(f"[벡터DB] {len(crawled_texts)}개 출처 → {indexed_count}개 청크 인덱싱") if indexed_count > 0: # 입력 텍스트를 문장별로 벡터DB 검색 vector_flagged, vector_plag_ratio, vector_source_hits = vdb.check_document( text, min_similarity=35 ) log_lines.append( f"[벡터DB] 표절 문장={len(vector_flagged)}건, " f"문장표절율={vector_plag_ratio:.1f}%, " f"히트출처={len(vector_source_hits)}건" ) except Exception as e: log_lines.append(f"[벡터DB] 오류: {str(e)[:80]}") elif not HAS_CHROMADB: log_lines.append("[벡터DB] chromadb 미설치 — 건너뜀") else: log_lines.append("[벡터DB] 크롤링 데이터 없음 — 건너뜀") # ═══════════════════════════════════════ # 종합 판정 (n-gram + 벡터DB 복합) # ═══════════════════════════════════════ _prog(0.90, "보고서 생성...") all_sources = verified_sources # 검증된 출처만 표시 # 표절율 산출: 다층 검증 # ① n-gram 기반 (기존) max_sim = max((s.get("similarity", 0) for s in verified_sources), default=0) count_factor = min(len(verified_sources) * 3, 30) avg_factor = avg_similarity * 100 ngram_score = min(round(max_sim * 0.4 + avg_factor * 0.3 + count_factor * 0.3), 100) # ② 벡터DB 기반 (문장별 매칭) vector_score = round(vector_plag_ratio) if vector_flagged else 0 # 복합 점수: 벡터DB가 있으면 50:50, 없으면 n-gram만 if vector_flagged: combined_score = round(ngram_score * 0.4 + vector_score * 0.6) else: combined_score = ngram_score # Gemini 표절율은 참고용 (20% 가중, 검증 출처가 있을 때만) if gemini_pct > 0 and len(verified_sources) > 0: plag_pct = min(round(gemini_pct * 0.2 + combined_score * 0.8), 100) elif len(verified_sources) > 0: plag_pct = combined_score else: plag_pct = min(round(gemini_pct * 0.1), 20) log_lines.append( f"[판정] Gemini={gemini_pct}%, n-gram={ngram_score}%, " f"벡터={vector_score}%, → 종합={plag_pct}%" ) if plag_pct >= 50: grade, gc = "🚨 표절 의심", "#FF4444" elif plag_pct >= 30: grade, gc = "⚠️ 주의 필요", "#FF8800" elif plag_pct >= 15: grade, gc = "📌 유사표현", "#DDAA00" elif plag_pct >= 5: grade, gc = "✓ 양호", "#4ECDC4" else: grade, gc = "✅ 우수", "#22AA44" word_count = len(_split_words(text)) char_count = len(text) # ═══════════════════════════════════════ # 출처 테이블 HTML 생성 # ═══════════════════════════════════════ def _source_badge(src): colors = { "Google": "#4285F4", "Brave": "#FB542B", "Web": "#888", "KCI": "#2E7D32", "RISS": "#1565C0", "arXiv": "#B71C1C", } c = colors.get(src, "#666") return f'{src}' src_rows = "" for i, s in enumerate(all_sources[:30]): title_display = s['title'][:55] if s['title'] else ( s['url'].split('/')[2] if len(s['url'].split('/')) > 2 else s['url'][:40] ) sim_val = s.get("similarity", 0) sim_color = "#FF4444" if sim_val >= 30 else "#FF8800" if sim_val >= 10 else "#DDAA00" if sim_val >= 3 else "#999" sim_html = f'{sim_val:.1f}%' method = s.get("method", "") method_tag = f' ({method})' if method else "" snippet_html = ( f'
{s["snippet"][:100]}
' if s.get("snippet") else "" ) matched = s.get("matched", []) matched_html = "" if matched: matched_html = '
매칭: ' + " | ".join(matched[:2]) + '
' src_rows += f""" {i + 1} {_source_badge(s.get('source', ''))} {sim_html}{method_tag} {title_display}{snippet_html}{matched_html} {s['url'][:60]} """ if not src_rows: src_rows = '검증된 유사 출처 없음 — 표절 가능성 낮음' # Gemini 분석 요약 (접기) gemini_summary = "" if gemini_response: safe_resp = gemini_response.replace('<', '<').replace('>', '>').replace('\n', '
') gemini_summary = f"""
🤖 Gemini 분석 상세
{safe_resp}
""" # 벡터DB 문장별 매칭 결과 vector_section = "" if vector_flagged: vf_rows = "" for k, vf in enumerate(vector_flagged[:15]): sim_val = vf["similarity"] sim_color = "#FF4444" if sim_val >= 70 else "#FF8800" if sim_val >= 50 else "#DDAA00" sent_safe = vf["sentence"][:70].replace('<', '<') matched_safe = vf["matched_text"][:90].replace('<', '<') title_safe = vf["title"][:40].replace('<', '<') vf_rows += f""" {k+1} {sent_safe} {sim_val:.0f}% {matched_safe} {title_safe} """ vdb_stats = vdb.get_stats() if vdb else {"chunks": 0} vector_section = f"""
🧬 벡터DB 문장별 정밀 대조 — 표절 문장 {len(vector_flagged)}건 / 문장표절율 {vector_plag_ratio:.1f}% (DB: {vdb_stats['chunks']}청크) {vf_rows}
# 입력 문장 유사도 매칭 출처 내용 출처
""" HDR = '#3B7DD8' html = f"""
표절 검사 결과
Gemini + Brave + KCI·RISS·arXiv + 벡터DB 정밀대조
문서: {doc_id}
{now}
{plag_pct}%
종합 표절율
{grade}
판정
{len(all_sources)}
검증 출처
{len(raw_sources)} 수집
{len(verified_sources)} 검증
{len(unverified_sources)} 제외
크롤링 대조
📋 검사 정보
글자수 {char_count:,}자 단어수 {word_count:,}단어 검색엔진 Google + Brave + KCI·RISS·arXiv
{gemini_summary}{vector_section}
🔍 검증된 출처 ({len(all_sources)}건)
{src_rows}
# 소스 유사도 출처 URL
""" _prog(0.95, "완료!") log_text = "\n".join(log_lines) return html, log_text