| import os |
| import time |
| import requests |
| import random |
| import re |
| from difflib import SequenceMatcher |
| from typing import List, Optional, Dict, Any |
| from urllib.parse import quote_plus |
|
|
| from fastapi import FastAPI, UploadFile, File, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| import uvicorn |
| from pydantic import BaseModel |
| from PyPDF2 import PdfReader |
|
|
| from langchain_groq import ChatGroq |
| from langchain_core.prompts import ChatPromptTemplate |
|
|
| |
| |
| |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
| SERPER_API_KEY = os.getenv("SERPER_API_KEY") |
| SEMANTIC_SCHOLAR_API_KEY = os.getenv("SEMANTIC_SCHOLAR_API_KEY") |
| SEMANTIC_SCHOLAR_BASE_URL = "https://api.semanticscholar.org/graph/v1" |
| SEMANTIC_SCHOLAR_MIN_INTERVAL_SECONDS = 1.2 |
| SEMANTIC_SCHOLAR_MAX_RETRIES = 4 |
|
|
| if not GROQ_API_KEY or not SERPER_API_KEY: |
| print("WARNING: GROQ_API_KEY or SERPER_API_KEY is missing!") |
|
|
| llm = ChatGroq(model="openai/gpt-oss-120b", temperature=0.1) |
|
|
| |
| query_cache = {} |
| semantic_query_cache: Dict[str, List[Dict[str, str]]] = {} |
| _last_semantic_scholar_call_ts = 0.0 |
|
|
| |
| |
| |
| class MatchReport(BaseModel): |
| chunk_text: str |
| is_plagiarized: bool |
| plagiarism_type: Optional[str] = None |
| source_url: Optional[str] = None |
| source_type: Optional[str] = None |
| similarity_score: float |
|
|
| class PlagiarismReport(BaseModel): |
| filename: str |
| total_words: int |
| plagiarized_words: int |
| overall_plagiarism_score: float |
| severity_level: str |
| details: List[MatchReport] |
|
|
| class DetailedPlagiarismReport(BaseModel): |
| """Comprehensive report generated by LLM""" |
| filename: str |
| scan_timestamp: str |
| executive_summary: str |
| overall_score: float |
| severity_level: str |
| matched_sources: List[Dict[str, Any]] |
| key_findings: List[str] |
| plagiarism_breakdown: Dict[str, Any] |
| detailed_analysis: str |
| affected_sections: List[Dict[str, Any]] |
| recommendations: List[str] |
| academic_integrity_risk: str |
|
|
| app = FastAPI(title="Pro Plagiarism Detector (Turnitin Clone)") |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| |
| |
|
|
| def _semantic_scholar_headers() -> Dict[str, str]: |
| headers: Dict[str, str] = {} |
| if SEMANTIC_SCHOLAR_API_KEY: |
| |
| headers["x-api-key"] = SEMANTIC_SCHOLAR_API_KEY |
| return headers |
|
|
|
|
| def _semantic_scholar_get(path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: |
| global _last_semantic_scholar_call_ts |
| filtered_params = {k: v for k, v in (params or {}).items() if v is not None} |
|
|
| for attempt in range(SEMANTIC_SCHOLAR_MAX_RETRIES): |
| elapsed = time.time() - _last_semantic_scholar_call_ts |
| if elapsed < SEMANTIC_SCHOLAR_MIN_INTERVAL_SECONDS: |
| time.sleep(SEMANTIC_SCHOLAR_MIN_INTERVAL_SECONDS - elapsed) |
|
|
| response = requests.get( |
| f"{SEMANTIC_SCHOLAR_BASE_URL}{path}", |
| headers=_semantic_scholar_headers(), |
| params=filtered_params, |
| timeout=20, |
| ) |
| _last_semantic_scholar_call_ts = time.time() |
|
|
| if response.status_code == 429 and attempt < SEMANTIC_SCHOLAR_MAX_RETRIES - 1: |
| retry_after = response.headers.get("Retry-After") |
| if retry_after and retry_after.isdigit(): |
| wait_seconds = float(retry_after) |
| else: |
| wait_seconds = (2 ** attempt) + random.uniform(0.2, 0.7) |
| time.sleep(wait_seconds) |
| continue |
|
|
| response.raise_for_status() |
| return response.json() |
|
|
| raise requests.HTTPError("Semantic Scholar request failed after retries") |
|
|
|
|
| def _semantic_scholar_post(path: str, body: Dict[str, Any], params: Optional[Dict[str, Any]] = None) -> Any: |
| global _last_semantic_scholar_call_ts |
| filtered_params = {k: v for k, v in (params or {}).items() if v is not None} |
|
|
| for attempt in range(SEMANTIC_SCHOLAR_MAX_RETRIES): |
| elapsed = time.time() - _last_semantic_scholar_call_ts |
| if elapsed < SEMANTIC_SCHOLAR_MIN_INTERVAL_SECONDS: |
| time.sleep(SEMANTIC_SCHOLAR_MIN_INTERVAL_SECONDS - elapsed) |
|
|
| response = requests.post( |
| f"{SEMANTIC_SCHOLAR_BASE_URL}{path}", |
| headers=_semantic_scholar_headers(), |
| params=filtered_params, |
| json=body, |
| timeout=25, |
| ) |
| _last_semantic_scholar_call_ts = time.time() |
|
|
| if response.status_code == 429 and attempt < SEMANTIC_SCHOLAR_MAX_RETRIES - 1: |
| retry_after = response.headers.get("Retry-After") |
| if retry_after and retry_after.isdigit(): |
| wait_seconds = float(retry_after) |
| else: |
| wait_seconds = (2 ** attempt) + random.uniform(0.2, 0.7) |
| time.sleep(wait_seconds) |
| continue |
|
|
| response.raise_for_status() |
| return response.json() |
|
|
| raise requests.HTTPError("Semantic Scholar request failed after retries") |
|
|
|
|
| def s2_paper_autocomplete(query: str) -> Dict[str, Any]: |
| return _semantic_scholar_get("/paper/autocomplete", {"query": query[:100]}) |
|
|
|
|
| def s2_paper_batch(ids: List[str], fields: Optional[str] = None) -> Any: |
| return _semantic_scholar_post("/paper/batch", {"ids": ids[:500]}, {"fields": fields}) |
|
|
|
|
| def s2_paper_search( |
| query: str, |
| fields: Optional[str] = None, |
| limit: int = 100, |
| offset: int = 0, |
| year: Optional[str] = None, |
| fields_of_study: Optional[str] = None, |
| open_access_pdf: bool = False, |
| ) -> Dict[str, Any]: |
| params: Dict[str, Any] = { |
| "query": query, |
| "fields": fields, |
| "limit": min(max(limit, 1), 100), |
| "offset": max(offset, 0), |
| "year": year, |
| "fieldsOfStudy": fields_of_study, |
| } |
| if open_access_pdf: |
| params["openAccessPdf"] = "" |
| return _semantic_scholar_get("/paper/search", params) |
|
|
|
|
| def s2_paper_search_bulk( |
| query: str, |
| fields: Optional[str] = None, |
| token: Optional[str] = None, |
| sort: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| return _semantic_scholar_get( |
| "/paper/search/bulk", |
| { |
| "query": query, |
| "fields": fields, |
| "token": token, |
| "sort": sort, |
| }, |
| ) |
|
|
|
|
| def s2_paper_search_match(query: str, fields: Optional[str] = None) -> Dict[str, Any]: |
| return _semantic_scholar_get("/paper/search/match", {"query": query, "fields": fields}) |
|
|
|
|
| def s2_paper_details(paper_id: str, fields: Optional[str] = None) -> Dict[str, Any]: |
| safe_id = quote_plus(paper_id) |
| return _semantic_scholar_get(f"/paper/{safe_id}", {"fields": fields}) |
|
|
|
|
| def s2_paper_authors( |
| paper_id: str, |
| fields: Optional[str] = None, |
| limit: int = 100, |
| offset: int = 0, |
| ) -> Dict[str, Any]: |
| safe_id = quote_plus(paper_id) |
| return _semantic_scholar_get( |
| f"/paper/{safe_id}/authors", |
| {"fields": fields, "limit": min(max(limit, 1), 1000), "offset": max(offset, 0)}, |
| ) |
|
|
|
|
| def s2_paper_citations( |
| paper_id: str, |
| fields: Optional[str] = None, |
| limit: int = 100, |
| offset: int = 0, |
| publication_date_or_year: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| safe_id = quote_plus(paper_id) |
| return _semantic_scholar_get( |
| f"/paper/{safe_id}/citations", |
| { |
| "fields": fields, |
| "limit": min(max(limit, 1), 1000), |
| "offset": max(offset, 0), |
| "publicationDateOrYear": publication_date_or_year, |
| }, |
| ) |
|
|
|
|
| def s2_paper_references( |
| paper_id: str, |
| fields: Optional[str] = None, |
| limit: int = 100, |
| offset: int = 0, |
| ) -> Dict[str, Any]: |
| safe_id = quote_plus(paper_id) |
| return _semantic_scholar_get( |
| f"/paper/{safe_id}/references", |
| {"fields": fields, "limit": min(max(limit, 1), 1000), "offset": max(offset, 0)}, |
| ) |
|
|
|
|
| def s2_author_batch(ids: List[str], fields: Optional[str] = None) -> Any: |
| return _semantic_scholar_post("/author/batch", {"ids": ids[:1000]}, {"fields": fields}) |
|
|
|
|
| def s2_author_search( |
| query: str, |
| fields: Optional[str] = None, |
| limit: int = 100, |
| offset: int = 0, |
| ) -> Dict[str, Any]: |
| return _semantic_scholar_get( |
| "/author/search", |
| { |
| "query": query, |
| "fields": fields, |
| "limit": min(max(limit, 1), 1000), |
| "offset": max(offset, 0), |
| }, |
| ) |
|
|
|
|
| def s2_author_details(author_id: str, fields: Optional[str] = None) -> Dict[str, Any]: |
| safe_id = quote_plus(author_id) |
| return _semantic_scholar_get(f"/author/{safe_id}", {"fields": fields}) |
|
|
|
|
| def s2_author_papers( |
| author_id: str, |
| fields: Optional[str] = None, |
| limit: int = 100, |
| offset: int = 0, |
| publication_date_or_year: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| safe_id = quote_plus(author_id) |
| return _semantic_scholar_get( |
| f"/author/{safe_id}/papers", |
| { |
| "fields": fields, |
| "limit": min(max(limit, 1), 1000), |
| "offset": max(offset, 0), |
| "publicationDateOrYear": publication_date_or_year, |
| }, |
| ) |
|
|
|
|
| def s2_snippet_search( |
| query: str, |
| fields: Optional[str] = None, |
| limit: int = 10, |
| year: Optional[str] = None, |
| fields_of_study: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| return _semantic_scholar_get( |
| "/snippet/search", |
| { |
| "query": query, |
| "fields": fields, |
| "limit": min(max(limit, 1), 1000), |
| "year": year, |
| "fieldsOfStudy": fields_of_study, |
| }, |
| ) |
|
|
|
|
| def build_search_query(text: str, max_terms: int = 10) -> str: |
| """Builds a compact keyword query to improve search recall and reduce noisy long queries.""" |
| stopwords = { |
| "the", "and", "for", "that", "with", "this", "from", "into", "our", "their", |
| "were", "have", "has", "had", "been", "are", "was", "will", "would", "can", |
| "could", "should", "about", "through", "using", "based", "than", "then", "also", |
| "such", "these", "those", "while", "where", "when", "what", "which", "who", |
| } |
| words = re.findall(r"[A-Za-z0-9]+", text.lower()) |
| keywords = [w for w in words if len(w) > 2 and w not in stopwords] |
| return " ".join(keywords[:max_terms]) if keywords else " ".join(words[:max_terms]) |
|
|
| def search_google_serper(query: str) -> List[Dict]: |
| """Searches the open web using Google Serper API.""" |
| url = "https://google.serper.dev/search" |
| payload = {"q": query} |
| headers = { |
| 'X-API-KEY': SERPER_API_KEY, |
| 'Content-Type': 'application/json' |
| } |
| |
| try: |
| response = requests.post(url, headers=headers, json=payload) |
| response.raise_for_status() |
| data = response.json() |
| |
| results = [] |
| for item in data.get("organic", [])[:3]: |
| results.append({ |
| "text": item.get("snippet", ""), |
| "url": item.get("link", ""), |
| "source_type": "Web (Google)" |
| }) |
| return results |
| except Exception as e: |
| print(f"Serper Error: {e}") |
| return [] |
|
|
| def search_semantic_scholar(query: str) -> List[Dict]: |
| """Searches academic papers using Semantic Scholar API.""" |
| prepared_query = build_search_query(query, max_terms=10) |
| normalized_query = " ".join(prepared_query.split()).lower() |
| if normalized_query in semantic_query_cache: |
| return semantic_query_cache[normalized_query] |
|
|
| try: |
| results = [] |
|
|
| |
| snippet_data = s2_snippet_search( |
| query=prepared_query, |
| fields="snippet.text,snippet.snippetKind", |
| limit=3, |
| ) |
| for item in snippet_data.get("data", []): |
| snippet = item.get("snippet", {}) |
| paper = item.get("paper", {}) |
| snippet_text = snippet.get("text", "") |
| if snippet_text: |
| corpus_id = paper.get("corpusId") |
| paper_url = f"https://www.semanticscholar.org/paper/{corpus_id}" if corpus_id else None |
| results.append({ |
| "text": snippet_text, |
| "url": paper_url, |
| "source_type": "Academic (Semantic Scholar Snippet)", |
| }) |
|
|
| |
| data = s2_paper_search( |
| query=prepared_query, |
| limit=2, |
| fields="title,abstract,url", |
| ) |
|
|
| for item in data.get("data", []): |
| if item.get("abstract"): |
| results.append({ |
| "text": item["abstract"], |
| "url": item.get("url", f"https://www.semanticscholar.org/paper/{item['paperId']}"), |
| "source_type": "Academic (Semantic Scholar)" |
| }) |
| semantic_query_cache[normalized_query] = results |
| return results |
| except Exception as e: |
| print(f"Semantic Scholar Error: {e}") |
| return [] |
|
|
| def aggregate_search(query: str) -> List[Dict]: |
| """Combines Academic and Web sources and implements caching.""" |
| |
| search_query = " ".join(query.split()[:15]) |
| |
| if search_query in query_cache: |
| return query_cache[search_query] |
|
|
| |
| web_results = search_google_serper(search_query) |
| academic_results = search_semantic_scholar(search_query) |
| |
| combined = web_results + academic_results |
| query_cache[search_query] = combined |
| |
| |
| time.sleep(1) |
| |
| return combined |
|
|
| |
| |
| |
|
|
| def calculate_exact_similarity(text1: str, text2: str) -> float: |
| return SequenceMatcher(None, text1.lower(), text2.lower()).ratio() |
|
|
| def check_paraphrasing_with_llm(chunk: str, source_text: str) -> bool: |
| prompt = ChatPromptTemplate.from_messages([ |
| ("system", "You are an expert academic plagiarism detector. Determine if TEXT A is a direct paraphrase, stolen idea, or highly similar structure to TEXT B. Ignore generic academic phrases like 'In this paper we demonstrate'. Respond ONLY with 'YES' or 'NO'."), |
| ("user", "TEXT A: {chunk}\n\nTEXT B: {source_text}") |
| ]) |
| chain = prompt | llm |
| response = chain.invoke({"chunk": chunk, "source_text": source_text}) |
| return "YES" in response.content.upper() |
|
|
| def generate_detailed_report_with_llm( |
| filename: str, |
| match_reports: List[MatchReport], |
| total_words: int, |
| overall_score: float |
| ) -> DetailedPlagiarismReport: |
| """Generate a comprehensive report using LLM analysis""" |
| from datetime import datetime |
| |
| |
| plagiarized_reports = [r for r in match_reports if r.is_plagiarized] |
| plagiarism_types = {} |
| sources_by_type = {"Academic": [], "Web": []} |
| |
| for report in plagiarized_reports: |
| ptype = report.plagiarism_type or "Unknown" |
| plagiarism_types[ptype] = plagiarism_types.get(ptype, 0) + 1 |
| |
| if report.source_type: |
| if "Academic" in report.source_type: |
| if report.source_url not in sources_by_type["Academic"]: |
| sources_by_type["Academic"].append({ |
| "url": report.source_url, |
| "type": report.source_type, |
| "max_similarity": report.similarity_score |
| }) |
| else: |
| if report.source_url not in sources_by_type["Web"]: |
| sources_by_type["Web"].append({ |
| "url": report.source_url, |
| "type": report.source_type, |
| "max_similarity": report.similarity_score |
| }) |
| |
| |
| if overall_score < 15: |
| severity = "Low" |
| risk_level = "Minimal - Normal citation variations detected" |
| elif overall_score < 30: |
| severity = "Medium" |
| risk_level = "Moderate - Multiple sources match detected" |
| elif overall_score < 50: |
| severity = "High" |
| risk_level = "Significant - Substantial plagiarism detected" |
| else: |
| severity = "Very High" |
| risk_level = "Critical - Extensive plagiarism detected" |
| |
| |
| plagiarism_context = f""" |
| Document: {filename} |
| Total Words: {total_words} |
| Plagiarism Score: {overall_score}% |
| Plagiarism Types Found: {plagiarism_types} |
| Academic Matches: {len(sources_by_type['Academic'])} |
| Web Matches: {len(sources_by_type['Web'])} |
| |
| Suspicious Sections (samples): |
| {chr(10).join([f"- {r.chunk_text[:100]}..." for r in plagiarized_reports[:5]])} |
| """ |
| |
| analysis_prompt = ChatPromptTemplate.from_messages([ |
| ("system", """You are an expert academic integrity analyzer and plagiarism report generator. |
| Generate a professional, detailed plagiarism analysis report. |
| Focus on: severity assessment, academic integrity concerns, specific problem areas, and recommendations. |
| Be thorough but concise."""), |
| ("user", """Create a detailed plagiarism analysis for this document: |
| |
| {plagiarism_context} |
| |
| Provide: |
| 1. Executive Summary (2-3 sentences) |
| 2. Key Findings (3-4 bullet points) |
| 3. Detailed Analysis (2-3 paragraphs explaining the plagiarism pattern) |
| 4. Recommendations (3-4 specific actions to remediate) |
| |
| Format clearly with section headers.""") |
| ]) |
|
|
| chain = analysis_prompt | llm |
| llm_response = chain.invoke({"plagiarism_context": plagiarism_context}) |
| llm_analysis = llm_response.content |
| |
| |
| lines = llm_analysis.split('\n') |
| key_findings = [] |
| recommendations = [] |
| detailed_analysis = "" |
| |
| in_findings = False |
| in_recommendations = False |
| |
| for line in lines: |
| if 'Key Findings' in line: |
| in_findings = True |
| in_recommendations = False |
| elif 'Recommendations' in line: |
| in_findings = False |
| in_recommendations = True |
| elif 'Detailed Analysis' in line or 'Analysis' in line: |
| in_findings = False |
| in_recommendations = False |
| elif in_findings and line.strip().startswith(('-', '*', '•')): |
| key_findings.append(line.strip().lstrip('-*•').strip()) |
| elif in_recommendations and line.strip().startswith(('-', '*', '•')): |
| recommendations.append(line.strip().lstrip('-*•').strip()) |
| elif not in_findings and not in_recommendations and line.strip(): |
| detailed_analysis += line + "\n" |
| |
| if not key_findings: |
| key_findings = [ |
| f"Overall plagiarism score: {overall_score}%", |
| f"Primary plagiarism type: {max(plagiarism_types.keys(), key=plagiarism_types.get) if plagiarism_types else 'Not detected'}", |
| f"Multiple sources detected: {len(sources_by_type['Academic']) + len(sources_by_type['Web'])} sources" |
| ] |
| |
| if not recommendations: |
| recommendations = [ |
| "Properly cite all sources according to your institution's guidelines", |
| "Use quotation marks for direct quotes and provide page numbers", |
| "Paraphrase content properly and cite original sources", |
| "Use plagiarism detection tools during the writing process" |
| ] |
| |
| |
| affected_sections = [] |
| for i, report in enumerate(plagiarized_reports[:10]): |
| affected_sections.append({ |
| "section_number": i + 1, |
| "text_snippet": report.chunk_text[:150], |
| "similarity_score": report.similarity_score, |
| "plagiarism_type": report.plagiarism_type, |
| "source": report.source_url, |
| "source_type": report.source_type |
| }) |
| |
| return DetailedPlagiarismReport( |
| filename=filename, |
| scan_timestamp=datetime.now().isoformat(), |
| executive_summary=llm_analysis.split('\n')[0] if llm_analysis else f"Document contains {overall_score}% plagiarized content", |
| overall_score=round(overall_score, 2), |
| severity_level=severity, |
| matched_sources=sources_by_type["Academic"] + sources_by_type["Web"], |
| key_findings=key_findings, |
| plagiarism_breakdown={ |
| "total_plagiarism_percentage": round(overall_score, 2), |
| "types": plagiarism_types, |
| "academic_sources": len(sources_by_type["Academic"]), |
| "web_sources": len(sources_by_type["Web"]) |
| }, |
| detailed_analysis=detailed_analysis or llm_analysis, |
| affected_sections=affected_sections, |
| recommendations=recommendations, |
| academic_integrity_risk=risk_level |
| ) |
|
|
| def analyze_chunk(chunk: str) -> MatchReport: |
| search_results = aggregate_search(chunk) |
| |
| best_score = 0.0 |
| best_url = None |
| best_source_type = None |
| plagiarism_type = None |
| is_plagiarized = False |
| |
| for result in search_results: |
| source_text = result['text'] |
| |
| |
| exact_sim = calculate_exact_similarity(chunk, source_text) |
| |
| if exact_sim > best_score: |
| best_score = exact_sim |
| best_url = result['url'] |
| best_source_type = result['source_type'] |
| |
| if exact_sim > 0.50: |
| is_plagiarized = True |
| plagiarism_type = "Exact/Heavy Match" |
| break |
| |
| |
| elif exact_sim > 0.25: |
| if check_paraphrasing_with_llm(chunk, source_text): |
| is_plagiarized = True |
| plagiarism_type = "Paraphrased Match (Mosaic)" |
| best_url = result['url'] |
| best_source_type = result['source_type'] |
| best_score = max(best_score, 0.85) |
| break |
| |
| return MatchReport( |
| chunk_text=chunk, |
| is_plagiarized=is_plagiarized, |
| plagiarism_type=plagiarism_type, |
| source_url=best_url, |
| source_type=best_source_type, |
| similarity_score=round(best_score, 2) |
| ) |
|
|
| |
| |
| |
|
|
| def format_report_json(detailed_report: DetailedPlagiarismReport) -> Dict[str, Any]: |
| """Format report as JSON""" |
| return { |
| "filename": detailed_report.filename, |
| "scan_timestamp": detailed_report.scan_timestamp, |
| |
| "overall_score": detailed_report.overall_score, |
| "severity_level": detailed_report.severity_level, |
| "academic_integrity_risk": detailed_report.academic_integrity_risk, |
| "summary": { |
| "overall_plagiarism_score": detailed_report.overall_score, |
| "severity_level": detailed_report.severity_level, |
| "academic_integrity_risk": detailed_report.academic_integrity_risk |
| }, |
| "executive_summary": detailed_report.executive_summary, |
| "key_findings": detailed_report.key_findings, |
| "plagiarism_breakdown": detailed_report.plagiarism_breakdown, |
| "matched_sources": detailed_report.matched_sources, |
| "affected_sections": detailed_report.affected_sections, |
| "detailed_analysis": detailed_report.detailed_analysis, |
| "recommendations": detailed_report.recommendations |
| } |
|
|
| def format_report_text(detailed_report: DetailedPlagiarismReport) -> str: |
| """Format report as plain text""" |
| report = "=" * 80 + "\n" |
| report += "DETAILED PLAGIARISM DETECTION REPORT\n" |
| report += "=" * 80 + "\n\n" |
| |
| report += f"FILE: {detailed_report.filename}\n" |
| report += f"SCAN DATE: {detailed_report.scan_timestamp}\n" |
| report += "-" * 80 + "\n\n" |
| |
| report += "SUMMARY\n" |
| report += "-" * 80 + "\n" |
| report += f"Overall Plagiarism Score: {detailed_report.overall_score}%\n" |
| report += f"Severity Level: {detailed_report.severity_level}\n" |
| report += f"Academic Integrity Risk: {detailed_report.academic_integrity_risk}\n\n" |
| |
| report += "EXECUTIVE SUMMARY\n" |
| report += "-" * 80 + "\n" |
| report += f"{detailed_report.executive_summary}\n\n" |
| |
| report += "KEY FINDINGS\n" |
| report += "-" * 80 + "\n" |
| for i, finding in enumerate(detailed_report.key_findings, 1): |
| report += f"{i}. {finding}\n" |
| report += "\n" |
| |
| report += "PLAGIARISM BREAKDOWN\n" |
| report += "-" * 80 + "\n" |
| report += f"Total Plagiarism %: {detailed_report.plagiarism_breakdown['total_plagiarism_percentage']}%\n" |
| report += f"Academic Sources: {detailed_report.plagiarism_breakdown['academic_sources']}\n" |
| report += f"Web Sources: {detailed_report.plagiarism_breakdown['web_sources']}\n" |
| if detailed_report.plagiarism_breakdown.get('types'): |
| report += "Types Detected:\n" |
| for ptype, count in detailed_report.plagiarism_breakdown['types'].items(): |
| report += f" - {ptype}: {count} instances\n" |
| report += "\n" |
| |
| report += "MATCHED SOURCES\n" |
| report += "-" * 80 + "\n" |
| if detailed_report.matched_sources: |
| for i, source in enumerate(detailed_report.matched_sources[:10], 1): |
| report += f"{i}. URL: {source.get('url', 'N/A')}\n" |
| report += f" Type: {source.get('type', 'N/A')}\n" |
| report += f" Similarity: {source.get('max_similarity', 'N/A')}\n\n" |
| else: |
| report += "No sources matched.\n\n" |
| |
| report += "DETAILED ANALYSIS\n" |
| report += "-" * 80 + "\n" |
| report += f"{detailed_report.detailed_analysis}\n\n" |
| |
| if detailed_report.affected_sections: |
| report += "AFFECTED SECTIONS (Top Issues)\n" |
| report += "-" * 80 + "\n" |
| for section in detailed_report.affected_sections[:5]: |
| report += f"\nSection {section['section_number']}:\n" |
| report += f"Text Snippet: {section['text_snippet']}\n" |
| report += f"Similarity Score: {section['similarity_score']}\n" |
| report += f"Plagiarism Type: {section['plagiarism_type']}\n" |
| report += f"Source: {section['source']}\n" |
| report += "\n" |
| |
| report += "RECOMMENDATIONS\n" |
| report += "-" * 80 + "\n" |
| for i, rec in enumerate(detailed_report.recommendations, 1): |
| report += f"{i}. {rec}\n" |
| report += "\n" |
| |
| report += "=" * 80 + "\n" |
| report += "End of Report\n" |
| report += "=" * 80 + "\n" |
| |
| return report |
|
|
| def format_report_html(detailed_report: DetailedPlagiarismReport) -> str: |
| """Format report as HTML""" |
| html = f""" |
| <!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="UTF-8"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| <title>Plagiarism Detection Report - {detailed_report.filename}</title> |
| <style> |
| body {{ font-family: Arial, sans-serif; margin: 40px; background-color: #f5f5f5; }} |
| .container {{ background-color: white; padding: 30px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }} |
| h1 {{ color: #333; border-bottom: 3px solid #2196F3; padding-bottom: 10px; }} |
| h2 {{ color: #2196F3; margin-top: 30px; }} |
| .summary {{ background-color: #f0f7ff; padding: 15px; border-left: 4px solid #2196F3; margin: 20px 0; }} |
| .score {{ font-size: 24px; font-weight: bold; color: #d32f2f; }} |
| .severity-low {{ color: #4caf50; }} |
| .severity-medium {{ color: #ff9800; }} |
| .severity-high {{ color: #f44336; }} |
| .severity-very-high {{ color: #c41c3b; }} |
| .findings {{ background-color: #fff3e0; padding: 15px; border-left: 4px solid #ff9800; }} |
| .source-item {{ background-color: #f5f5f5; padding: 10px; margin: 10px 0; border-radius: 4px; }} |
| .recommendation {{ background-color: #e8f5e9; padding: 10px; margin: 10px 0; border-left: 3px solid #4caf50; }} |
| table {{ width: 100%; border-collapse: collapse; margin: 15px 0; }} |
| th, td {{ padding: 10px; text-align: left; border-bottom: 1px solid #ddd; }} |
| th {{ background-color: #2196F3; color: white; }} |
| .affected-section {{ background-color: #fce4ec; padding: 15px; margin: 10px 0; border-radius: 4px; }} |
| </style> |
| </head> |
| <body> |
| <div class="container"> |
| <h1>🔍 Plagiarism Detection Report</h1> |
| |
| <div class="summary"> |
| <p><strong>File:</strong> {detailed_report.filename}</p> |
| <p><strong>Scan Date:</strong> {detailed_report.scan_timestamp}</p> |
| <p><strong>Overall Plagiarism Score:</strong> <span class="score">{detailed_report.overall_score}%</span></p> |
| <p><strong>Severity Level:</strong> <span class="severity-{detailed_report.severity_level.lower().replace(' ', '-')}">{detailed_report.severity_level}</span></p> |
| <p><strong>Academic Integrity Risk:</strong> {detailed_report.academic_integrity_risk}</p> |
| </div> |
| |
| <h2>Executive Summary</h2> |
| <p>{detailed_report.executive_summary}</p> |
| |
| <h2>Key Findings</h2> |
| <div class="findings"> |
| <ul> |
| {"".join([f"<li>{finding}</li>" for finding in detailed_report.key_findings])} |
| </ul> |
| </div> |
| |
| <h2>Plagiarism Breakdown</h2> |
| <table> |
| <tr> |
| <th>Category</th> |
| <th>Value</th> |
| </tr> |
| <tr> |
| <td>Total Plagiarism %</td> |
| <td>{detailed_report.plagiarism_breakdown['total_plagiarism_percentage']}%</td> |
| </tr> |
| <tr> |
| <td>Academic Sources</td> |
| <td>{detailed_report.plagiarism_breakdown['academic_sources']}</td> |
| </tr> |
| <tr> |
| <td>Web Sources</td> |
| <td>{detailed_report.plagiarism_breakdown['web_sources']}</td> |
| </tr> |
| </table> |
| |
| <h2>Matched Sources</h2> |
| {"".join([f'<div class="source-item"><strong>{source.get("type", "Unknown")}</strong><br/><a href="{source.get("url", "#")}" target="_blank">{source.get("url", "N/A")}</a><br/>Similarity: {source.get("max_similarity", "N/A")}</div>' for source in detailed_report.matched_sources[:10]])} |
| |
| <h2>Detailed Analysis</h2> |
| <p>{detailed_report.detailed_analysis.replace(chr(10), "<br/>")}</p> |
| |
| {"<h2>Affected Sections (Top Issues)</h2>" + "".join([f'<div class="affected-section"><strong>Section {section["section_number"]}</strong><br/><em>Text:</em> {section["text_snippet"]}...<br/><em>Similarity:</em> {section["similarity_score"]}<br/><em>Type:</em> {section["plagiarism_type"]}</div>' for section in detailed_report.affected_sections[:5]]) if detailed_report.affected_sections else ""} |
| |
| <h2>Recommendations</h2> |
| <div> |
| {"".join([f'<div class="recommendation"><strong>✓</strong> {rec}</div>' for rec in detailed_report.recommendations])} |
| </div> |
| </div> |
| </body> |
| </html> |
| """ |
| return html |
|
|
| |
| |
| |
|
|
| def extract_text_from_pdf(file_bytes) -> str: |
| reader = PdfReader(file_bytes) |
| return "".join([page.extract_text() + "\n" for page in reader.pages if page.extract_text()]) |
|
|
| def chunk_text(text: str, words_per_chunk: int = 40) -> List[str]: |
| words = text.split() |
| chunks = [] |
| for i in range(0, len(words), words_per_chunk - 10): |
| chunk = " ".join(words[i:i + words_per_chunk]) |
| if len(chunk.split()) > 15: |
| chunks.append(chunk) |
| return chunks |
|
|
| @app.post("/scan-paper", response_model=PlagiarismReport) |
| async def scan_paper(file: UploadFile = File(...)): |
| text = extract_text_from_pdf(file.file) |
| total_words = len(text.split()) |
| |
| if total_words == 0: |
| raise HTTPException(status_code=400, detail="Could not extract text. Is this a scanned PDF?") |
|
|
| chunks = chunk_text(text) |
| |
| |
| if len(chunks) > 20: |
| chunks = chunks[:20] |
|
|
| detailed_reports = [] |
| plagiarized_word_count = 0 |
| |
| for chunk in chunks: |
| report = analyze_chunk(chunk) |
| detailed_reports.append(report) |
| |
| if report.is_plagiarized: |
| plagiarized_word_count += len(chunk.split()) |
|
|
| plagiarized_word_count = min(plagiarized_word_count, total_words) |
| overall_score = (plagiarized_word_count / total_words) * 100 |
| |
| |
| if overall_score < 15: |
| severity = "Low" |
| elif overall_score < 30: |
| severity = "Medium" |
| elif overall_score < 50: |
| severity = "High" |
| else: |
| severity = "Very High" |
|
|
| return PlagiarismReport( |
| filename=file.filename, |
| total_words=total_words, |
| plagiarized_words=plagiarized_word_count, |
| overall_plagiarism_score=round(overall_score, 2), |
| severity_level=severity, |
| details=detailed_reports |
| ) |
|
|
| @app.post("/generate-detailed-report") |
| async def generate_detailed_report(file: UploadFile = File(...)): |
| """Generate comprehensive plagiarism report with LLM analysis""" |
| text = extract_text_from_pdf(file.file) |
| total_words = len(text.split()) |
| |
| if total_words == 0: |
| raise HTTPException(status_code=400, detail="Could not extract text. Is this a scanned PDF?") |
|
|
| chunks = chunk_text(text) |
| |
| |
| if len(chunks) > 20: |
| chunks = chunks[:20] |
|
|
| detailed_reports = [] |
| plagiarized_word_count = 0 |
| |
| for chunk in chunks: |
| report = analyze_chunk(chunk) |
| detailed_reports.append(report) |
| |
| if report.is_plagiarized: |
| plagiarized_word_count += len(chunk.split()) |
|
|
| plagiarized_word_count = min(plagiarized_word_count, total_words) |
| overall_score = (plagiarized_word_count / total_words) * 100 |
|
|
| |
| detailed_report = generate_detailed_report_with_llm( |
| filename=file.filename, |
| match_reports=detailed_reports, |
| total_words=total_words, |
| overall_score=overall_score |
| ) |
| |
| return format_report_json(detailed_report) |
|
|
| @app.post("/report/text") |
| async def report_text(file: UploadFile = File(...)): |
| """Generate detailed plagiarism report as plain text""" |
| text = extract_text_from_pdf(file.file) |
| total_words = len(text.split()) |
| |
| if total_words == 0: |
| raise HTTPException(status_code=400, detail="Could not extract text. Is this a scanned PDF?") |
|
|
| chunks = chunk_text(text) |
| |
| if len(chunks) > 20: |
| chunks = chunks[:20] |
|
|
| detailed_reports = [] |
| plagiarized_word_count = 0 |
| |
| for chunk in chunks: |
| report = analyze_chunk(chunk) |
| detailed_reports.append(report) |
| |
| if report.is_plagiarized: |
| plagiarized_word_count += len(chunk.split()) |
|
|
| plagiarized_word_count = min(plagiarized_word_count, total_words) |
| overall_score = (plagiarized_word_count / total_words) * 100 |
|
|
| |
| detailed_report = generate_detailed_report_with_llm( |
| filename=file.filename, |
| match_reports=detailed_reports, |
| total_words=total_words, |
| overall_score=overall_score |
| ) |
| |
| from fastapi.responses import PlainTextResponse |
| return PlainTextResponse(format_report_text(detailed_report)) |
|
|
| @app.post("/report/html") |
| async def report_html(file: UploadFile = File(...)): |
| """Generate detailed plagiarism report as HTML""" |
| text = extract_text_from_pdf(file.file) |
| total_words = len(text.split()) |
| |
| if total_words == 0: |
| raise HTTPException(status_code=400, detail="Could not extract text. Is this a scanned PDF?") |
|
|
| chunks = chunk_text(text) |
| |
| if len(chunks) > 20: |
| chunks = chunks[:20] |
|
|
| detailed_reports = [] |
| plagiarized_word_count = 0 |
| |
| for chunk in chunks: |
| report = analyze_chunk(chunk) |
| detailed_reports.append(report) |
| |
| if report.is_plagiarized: |
| plagiarized_word_count += len(chunk.split()) |
|
|
| plagiarized_word_count = min(plagiarized_word_count, total_words) |
| overall_score = (plagiarized_word_count / total_words) * 100 |
|
|
| |
| detailed_report = generate_detailed_report_with_llm( |
| filename=file.filename, |
| match_reports=detailed_reports, |
| total_words=total_words, |
| overall_score=overall_score |
| ) |
| |
| from fastapi.responses import HTMLResponse |
| return HTMLResponse(format_report_html(detailed_report)) |
|
|
| @app.get("/") |
| async def root(): |
| return { |
| "message": "Pro Plagiarism Detector API", |
| "endpoints": { |
| "scan": "/scan-paper (POST - basic scan)", |
| "detailed_report": "/generate-detailed-report (POST - JSON report with LLM analysis)", |
| "text_report": "/report/text (POST - plain text report)", |
| "html_report": "/report/html (POST - HTML report)" |
| } |
| } |
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=8000) |