Subhadip007 commited on
Commit
233102d
Β·
0 Parent(s):

feat: data ingestion and processing pipeline complete

Browse files

- ArXiv API fetcher with deduplication and rate limiting
- PDF downloader with exponential backoff retry
- PDF text extractor using PyMuPDF
- Text cleaner: hyphenation fix, artifact removal, reference stripping
- Pydantic schema validation for all paper metadata
- Idempotent pipeline: safe to re-run without reprocessing

Papers in system: 303 fetched, 270+ processed

.env.example ADDED
@@ -0,0 +1 @@
 
 
1
+ GROQ_API_KEY=your_groq_api_key_here
.gitignore ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ venv/
2
+ .env
3
+ __pycache__/
4
+ *.pyc
5
+ data/raw/
6
+ data/processed/
7
+ data/embeddings/
8
+ *.log
9
+ .DS_Store
.vscode/settings.json ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ {
2
+ }
config/__init__.py ADDED
File without changes
config/settings.py ADDED
@@ -0,0 +1,100 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Central configuration for ResearchPilot
3
+
4
+ RULE: No hardcoded values anywhere else in this codebase.
5
+ Every constant lives here. This make the system to
6
+ tune without hunting through multiple files.
7
+ """
8
+
9
+ import os
10
+ from pathlib import Path
11
+ from dotenv import load_dotenv
12
+
13
+ # Load environment variables from .env file
14
+ # This must happen before anything else reads os.environ
15
+ load_dotenv()
16
+
17
+ # ------------------------------------------
18
+ # PROJECT PATHS
19
+ # ------------------------------------------
20
+ # Path(__file__) = config/setting.py
21
+ # .parent = = config/
22
+ # .parent.parent = researchpilot/ <- project root
23
+ ROOT_DIR = Path(__file__).parent.parent
24
+
25
+ DATA_DIR = ROOT_DIR / "data"
26
+ RAW_DIR = DATA_DIR / "raw"
27
+ PROCESSED_DIR = DATA_DIR / "processed"
28
+ CHUNKS_DIR = DATA_DIR / "chunks"
29
+ EMBEDDINGS_DIR = DATA_DIR / "embeddings"
30
+ LOGS_DIR = ROOT_DIR / "logs"
31
+
32
+
33
+ # Create directories if they don't exist
34
+ # This ensures the app works on any machine without manual setup
35
+ for directory in [RAW_DIR, PROCESSED_DIR, CHUNKS_DIR, EMBEDDINGS_DIR, LOGS_DIR]:
36
+ directory.mkdir(
37
+ parents = True,
38
+ exist_ok = True
39
+ )
40
+
41
+ # ------------------------------------------
42
+ # DATA INGESTION SETTINGS
43
+ # ------------------------------------------
44
+ ARXIV_CATEGORIES = ["cs.LG", "cs.AI"] # Machine Learning + AI
45
+ MAX_PAPERS_PER_FETCH = 100 # Papers per API call
46
+ TOTAL_PAPERS_TARGET = 100 # Total papers to collect
47
+ ARXIV_API_DELAY_SECONDS = 3.0 # ArXiv rate limit: be respectful
48
+ PDF_DOWNLOAD_TIMEOUT = 30 # Seconds before giving up on a PDF
49
+ MAX_DOWNLOAD_RETRIES = 3 # Retry failed downloads N times
50
+
51
+ # ------------------------------------------
52
+ # DOCUMENT PROCESSING SETTINGS
53
+ # ------------------------------------------
54
+ MIN_TEXT_LENGTH = 500 # Skip papers with less that 500 chars
55
+ MAX_TEXT_LENGTH = 500_000 # Skip papers larger than 100k chars (corrupted)
56
+
57
+ # ------------------------------------------
58
+ # CHUNKING SETTINGS
59
+ # ------------------------------------------
60
+ CHUNK_SIZE = 512 # Charaters per chunk
61
+ CHUNK_OVERLAP = 50 # Overlap between consecutive chunks
62
+ MIN_CHUNK_SIZE = 100 # Discard chunks smaller than this
63
+
64
+ # ------------------------------------------
65
+ # EMBEDDING SETTINGS
66
+ # ------------------------------------------
67
+ EMBEDDING_MODEL_NAME = "BAAI/bge-base-en-v1.5"
68
+ EMBEDDING_BATCH_SIZE = 32 # Process N chunks at once
69
+ EMBEDDING_DIMENSION = 768 # BGE-base output dimension
70
+
71
+ # ------------------------------------------
72
+ # VECTOR STORE SETTINGS
73
+ # ------------------------------------------
74
+ QDRANT_COLLECTION_NAME = 'research_papers'
75
+ QDRANT_PATH = str(ROOT_DIR / 'data' / 'qdrant_db') # Local Storage path
76
+ TOP_K_RETRIEVAL = 20 # Retieve top 20 candidates
77
+ TOP_K_RERANK = 5 # Keep top 5 after reranking
78
+
79
+ # ------------------------------------------
80
+ # LLM SETTINGS
81
+ # ------------------------------------------
82
+ GROQ_API_KEY = os.getenv('GROQ_API_KEY') # Loaded from .env
83
+ LLM_MODEL_NAME = 'llama3-8b-8192' # Groq model ID
84
+ LLM_TEMPERATURE = 0.1 # Low = More factual/consistent
85
+ LLM_MAX_TOKENS = 1024 # Max response tokens
86
+
87
+ # ------------------------------------------
88
+ # API SETTINGS
89
+ # ------------------------------------------
90
+ API_HOST = "0.0.0.0"
91
+ API_PORT = 8000
92
+ API_RELOAD = True # Auto-reload on code change (dev-only)
93
+
94
+ # ------------------------------------------
95
+ # LOGGING SETTINGS
96
+ # ------------------------------------------
97
+ LOG_LEVEL = "INFO"
98
+ LOG_FILE = LOGS_DIR / "researchpilot.log"
99
+ LOG_ROTATION = "10 MB" # Create new log file after 10MB
100
+ LOG_RETENTION = "7 days" # Keep logs for 7 days
output.txt ADDED
Binary file (10.4 kB). View file
 
run_ingestion.py ADDED
@@ -0,0 +1,76 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Master script to run the data ingestion pipeline.
3
+
4
+ Run this from the project root:
5
+ python run_ingestion.py
6
+
7
+ This script orchestrates:
8
+ 1. Fetch paper metadata from ArXiv
9
+ 2. Download PDFs for fetched papers
10
+ """
11
+
12
+ import json
13
+ from pathlib import Path
14
+
15
+ from src.utils.logger import get_logger, setup_logger
16
+ from src.ingestion.arxiv_fetcher import ArXivFetcher
17
+ from src.ingestion.pdf_downloader import PDFDownloader
18
+ from src.processing.pdf_extractor import PDFExtractor
19
+ from config.settings import RAW_DIR, PROCESSED_DIR, TOTAL_PAPERS_TARGET
20
+
21
+
22
+ setup_logger()
23
+ logger = get_logger(__name__)
24
+
25
+
26
+ def load_all_raw_papers() -> list[dict]:
27
+ papers = []
28
+
29
+ for f in RAW_DIR.glob("*.json"):
30
+ if f.name == "paper_index.json":
31
+ continue
32
+ with open(f, encoding = 'utf-8') as fp:
33
+ papers.append(json.load(fp))
34
+ return papers
35
+
36
+
37
+
38
+ def print_section(title: str):
39
+ logger.info("=" * 60)
40
+ logger.info(title)
41
+ logger.info("=" * 60)
42
+
43
+
44
+
45
+ def main():
46
+ print_section("RESEARCHPILOT β€” FULL PIPELINE")
47
+
48
+ # -------- PHASE 1: Fetch Metadata --------
49
+ print_section("PHASE 1: Fetching ArXiv Metadata")
50
+ fetcher = ArXivFetcher()
51
+ new_papers = fetcher.fetch_papers(max_papers = TOTAL_PAPERS_TARGET)
52
+ logger.info(f"New papers fetched: {len(new_papers)}")
53
+
54
+ # -------- PHASE 2: Download PDFs --------
55
+ print_section("PHASE 2: Downloading PDFs")
56
+ all_papers = load_all_raw_papers()
57
+ downloader = PDFDownloader()
58
+ dl_stats = downloader.download_all(all_papers)
59
+ logger.info(f"Download stats: {dl_stats}")
60
+
61
+ # -------- PHASE 3: Extract Text --------
62
+ print_section("PHASE 3: Extracting and Cleaning Text")
63
+ extractor = PDFExtractor()
64
+ proc_stats = extractor.process_all()
65
+ logger.info(f"Processing stats: {proc_stats}")
66
+
67
+ # -------- SUMMARY --------
68
+ processed_files = list(PROCESSED_DIR.glob("*.json"))
69
+ print_section("PIPELINE COMPLETE")
70
+ logger.info(f"Papers in processed/: {len(processed_files)}")
71
+ logger.info("Ready for Phase 5: Chunking")
72
+
73
+
74
+
75
+ if __name__ == "__main__":
76
+ main()
src/__init__.py ADDED
File without changes
src/ingestion/__init__.py ADDED
File without changes
src/ingestion/arxiv_fetcher.py ADDED
@@ -0,0 +1,324 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ ArXiv API client for fetching ML paper metadata.
3
+
4
+ RESPONSIBILITY: This module has ONE job - fetch paper metadata from ArXiv
5
+ and return validated, structured data. It does NOT download PDFs (that's
6
+ the pdf extractor's job). Single Responsibility Principle.
7
+
8
+ Why ArXiv LIBRARY:
9
+ The arxiv Python library wraps the raw XML API response into clean
10
+ Python objects. We could parse XML ourselves with BeautifulSoup,
11
+ but using the official library means we benefit from their bug fixes
12
+ and API changes without rewriting our code.
13
+ """
14
+
15
+ import time
16
+ import json
17
+ from pathlib import Path
18
+ from datetime import datetime
19
+ from typing import Optional
20
+
21
+
22
+ import arxiv
23
+ from pydantic import BaseModel, field_validator
24
+
25
+ from src.utils.logger import get_logger
26
+ from config.settings import (
27
+ RAW_DIR,
28
+ ARXIV_CATEGORIES,
29
+ MAX_PAPERS_PER_FETCH,
30
+ TOTAL_PAPERS_TARGET,
31
+ ARXIV_API_DELAY_SECONDS
32
+ )
33
+
34
+
35
+ # Get a named logger for this module
36
+ # Every module gets its own named logger - makes debugging trivial
37
+ logger = get_logger(__name__)
38
+
39
+
40
+ # -------------------------------------------
41
+ # DATA MODEL
42
+ # -------------------------------------------
43
+
44
+ class PaperMetadata(BaseModel):
45
+ """
46
+ Pydantic model defininf the exact schema for a paper's metadata.
47
+
48
+ WHY PYDANTIC:
49
+ Pydantic enforces data types at runtime. If ArXiv returns a date
50
+ in an unexpected format, Pydantic raises a clear error immediately
51
+ instead of silently storing bad data that breaks things 3 steps later.
52
+
53
+ This is called "fail fast" - catch bad data as early as possible.
54
+ """
55
+ paper_id: str
56
+ title: str
57
+ abstract: str
58
+ authors: list[str]
59
+ categories: list[str]
60
+ primary_categories: str
61
+ published_date: str # ISO Format: "2023-01-17"
62
+ updated_date: str
63
+ arxiv_url: str
64
+ pdf_url: str
65
+
66
+ # Pipeline stage flags - track what processing has been done
67
+ pdf_downloaded: bool = False
68
+ text_extracted: bool = False
69
+ chunked: bool = False
70
+ embedded: bool = False
71
+
72
+
73
+ @field_validator("title", "abstract")
74
+ @classmethod
75
+ def clean_whitespace(cls, value: str) -> str:
76
+ """
77
+ Strip excess whitespace from text fields
78
+ ArXiv abstracts often contain \n and multiple spaces
79
+ """
80
+ return " ".join(value.split())
81
+
82
+
83
+ @field_validator("paper_id")
84
+ @classmethod
85
+ def extract_short_id(cls, value: str) -> str:
86
+ """
87
+ ArXiv returns IDs like 'http://arxiv.org/abs/2301.07041v1'
88
+ We want just '2301.07041'
89
+ """
90
+
91
+ # Split on "/" and take the last part, then remove version suffix
92
+ short_id = value.split("/")[-1]
93
+
94
+ if "v" in short_id:
95
+ short_id = short_id.split("v")[0]
96
+
97
+ return short_id
98
+
99
+
100
+
101
+
102
+ # -------------------------------------------
103
+ # FETCHER CLASS
104
+ # -------------------------------------------
105
+
106
+ class ArXivFetcher:
107
+ """
108
+ Fetches and persists paper metadata from the ArXiv API.
109
+
110
+ DESIGN PATTERN: This class is stateless β€” it doesn't store any
111
+ papers in memory. It fetches, validates, and immediately saves
112
+ to disk. This means if the process crashes at paper #347,
113
+ papers 1-346 are already saved and we can resume.
114
+ """
115
+
116
+ def __init__(self):
117
+ # arxiv.Client lets us configure rate limiting behavior
118
+ self.client = arxiv.Client(
119
+ page_size = MAX_PAPERS_PER_FETCH,
120
+ # Delay between API page requests (ArXiv policy: >= 3 Seconds)
121
+ delay_seconds = ARXIV_API_DELAY_SECONDS,
122
+ num_retries = 3 # Retry failed requests automatically
123
+ )
124
+
125
+
126
+ # File to track which paper IDs we've already downloaded
127
+ # This enables idempotent runs - safe to run pipeline multiple times
128
+ self.index_file = RAW_DIR / "paper_index.json"
129
+ self.existing_ids = self._load_existing_ids()
130
+
131
+
132
+ logger.info(
133
+ f"ArXivFetcher initialized. "
134
+ f"Already have {len(self.existing_ids)} papers indexed."
135
+ )
136
+
137
+
138
+ def _load_existing_ids(self) -> set[str]:
139
+ """
140
+ Load set of already-fetched paper IDs from disk
141
+
142
+ WHY A SET: Checking 'if paper_id in existing_ids' is O(1) with a set
143
+ versus O(n) with a list. At 10,000 papers, this matters.
144
+ """
145
+ if self.index_file.exists():
146
+ with open(self.index_file, "r") as f:
147
+ data = json.load(f)
148
+ return set(data.get("paper_ids", []))
149
+
150
+ return set()
151
+
152
+
153
+
154
+ def _save_paper_metadata(self, paper: PaperMetadata) -> Path:
155
+ """
156
+ Save a single paper's metadata as JSON to disk.
157
+
158
+ Each paper gets its own JSON file named by its ID.
159
+ WHY NOT A DATABASE: For a pipeline this size, flat JSON files
160
+ are simpler, portable, and Git-friendly. We add a database
161
+ later when we need querying capabilities.
162
+ """
163
+
164
+ # e.g., data/raw/2301.07041.json
165
+ file_path = RAW_DIR / f"{paper.paper_id}.json"
166
+
167
+ with open(file_path, "w", encoding = 'utf-8') as f:
168
+ # model.dump() converts Pydantic model to dict
169
+ # indent = 2 makes the JSON human-readable
170
+ json.dump(paper.model_dump(), f, indent = 2, ensure_ascii = False)
171
+
172
+
173
+ return file_path
174
+
175
+
176
+ def _update_index(self, paper_id: str):
177
+ """
178
+ Add paper_id to our index file and memory set.
179
+ Called after every successful save
180
+ """
181
+ self.existing_ids.add(paper_id)
182
+
183
+ with open(self.index_file, "w") as f:
184
+ json.dump(
185
+ {
186
+ "paper_ids": list(self.existing_ids),
187
+ "last_updated": datetime.now().isoformat(),
188
+ "total_count": len(self.existing_ids)
189
+ },
190
+ f, indent = 2
191
+ )
192
+
193
+
194
+
195
+ def _parse_arxiv_result(self, result: arxiv.Result) -> Optional[PaperMetadata]:
196
+ """
197
+ Convert a raw arxiv.Result object into our PaperMetadata model.
198
+
199
+ WHY THIS WRAPPER EXISTS:
200
+ The arxiv library's Result object has its own structure that
201
+ may change across library versions. By converting to our own
202
+ PaperMetadata model here, the rest of our codebase never
203
+ depends on the arxiv library directly. If arxiv changes its
204
+ API tomorrow, we only fix this one function.
205
+
206
+ This is called the ADAPTER PATTERN.
207
+ """
208
+ try:
209
+ metadata = PaperMetadata(
210
+ paper_id = result.entry_id,
211
+ title = result.title,
212
+ abstract = result.summary,
213
+ authors = [str(a) for a in result.authors],
214
+ categories = result.categories,
215
+ primary_categories = result.primary_category,
216
+ published_date = result.published.strftime("%Y-%m-%d"),
217
+ updated_date = result.updated.strftime("%Y-%m-%d"),
218
+ arxiv_url = result.entry_id,
219
+ pdf_url = result.pdf_url,
220
+ )
221
+
222
+ return metadata
223
+
224
+ except Exception as e:
225
+ # Log warning but don't crash - one bad paper shouldn't
226
+ # stop the entire pipeline
227
+ logger.warning(f"Failed to parse paper: {result.entry_id}: {e}")
228
+ return None
229
+
230
+
231
+
232
+ def fetch_papers(
233
+ self,
234
+ categories: list[str] = None,
235
+ max_papers: int = None,
236
+ date_filter_year: Optional[int] = None
237
+ ) -> list[PaperMetadata]:
238
+ """
239
+ Main method: fetch papers from ArXiv for given categories.
240
+
241
+ Args:
242
+ categories: ArXiv category codes e.g. ["cs.LG", "cs.AI"]
243
+ max_papers: Maximum papers to fetch
244
+ date_filter_year: Only fetch papers from this years onwards
245
+
246
+ Returns:
247
+ List of validated PaperMetaData objects
248
+
249
+ HOW THE QUERY WORKS:
250
+ ArXiv search syntax uses boolean operators.
251
+ 'cat:cs.LG' OR 'cat:cs.AI' means "Papers in cs.LG OR cs.AI category"
252
+ We sort by submission date (newest first) to get fresh papers.
253
+ """
254
+
255
+ if categories is None:
256
+ categories = ARXIV_CATEGORIES
257
+ if max_papers is None:
258
+ max_papers = TOTAL_PAPERS_TARGET
259
+
260
+ # Build search query: "cat:cs.LG OR cat:cs.AI"
261
+ category_query = " OR ".join([f"cat:{cat}" for cat in categories])
262
+ logger.info(f"Search query: '{category_query}'")
263
+ logger.info(f"Target: '{max_papers} papers'")
264
+
265
+
266
+ # Configure ArXiv search
267
+ search = arxiv.Search(
268
+ query = category_query,
269
+ max_results = max_papers * 2, # Fetch extra account for skips
270
+ sort_by = arxiv.SortCriterion.SubmittedDate,
271
+ sort_order = arxiv.SortOrder.Descending,
272
+ )
273
+
274
+ fetched_papers = []
275
+ skipped_duplicate = 0
276
+ skipped_invalid = 0
277
+
278
+ logger.info("Starting ArXiv fetch...")
279
+
280
+ # self.client.results() is a GENERATOR
281
+ # WHY GENERATOR: It fetches pages lazily - doesn't load all 500
282
+ # papers into memory at once. Memory efficient.
283
+ for result in self.client.results(search):
284
+
285
+ # Stop if we've reached our target
286
+ if len(fetched_papers) >= max_papers:
287
+ break
288
+
289
+ # Skip papers we already have
290
+ raw_id = result.entry_id.split("/")[-1].split("v")[0]
291
+ if raw_id in self.existing_ids:
292
+ skipped_duplicate += 1
293
+ continue
294
+
295
+ # Apply year filter if specified
296
+ if date_filter_year and result.published.year < date_filter_year:
297
+ continue
298
+
299
+ # Parse and validate
300
+ paper = self._parse_arxiv_result(result)
301
+ if paper is None:
302
+ skipped_invalid += 1
303
+ continue
304
+
305
+ # Save to disk immediately
306
+ self._save_paper_metadata(paper)
307
+ self._update_index(paper.paper_id)
308
+ fetched_papers.append(paper)
309
+
310
+ # Progress logging every 10 papers
311
+ if len(fetched_papers) % 10 == 0:
312
+ logger.info(
313
+ f"Progress: {len(fetched_papers)}/{max_papers} papers fetched"
314
+ )
315
+
316
+
317
+ logger.info(
318
+ f"Fetch complete."
319
+ f"Fetched: {len(fetched_papers)} | "
320
+ f"Skipped (duplicate): {skipped_duplicate} | "
321
+ f"Skipped (invalid): {skipped_invalid}"
322
+ )
323
+
324
+ return fetched_papers
src/ingestion/pdf_downloader.py ADDED
@@ -0,0 +1,204 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Downloads PDF files for papers that have been fetched from ArXiv.
3
+
4
+ SEPARATION OF CONCERNS:
5
+ arxiv_fetcher.py β†’ Gets metadata (fast, no large files)
6
+ pdf_downloader.py β†’ Downloads PDFs (slow, large files)
7
+
8
+ This separation means if PDF download fails, metadata is safe.
9
+ We can retry ONLY the failed PDFs without re-fetching metadata.
10
+ """
11
+
12
+ import time
13
+ import json
14
+ import requests
15
+ from pathlib import Path
16
+
17
+ from tqdm import tqdm # Progress bar
18
+
19
+ from src.utils.logger import get_logger
20
+ from config.settings import (
21
+ RAW_DIR,
22
+ PDF_DOWNLOAD_TIMEOUT,
23
+ MAX_DOWNLOAD_RETRIES,
24
+ ARXIV_API_DELAY_SECONDS
25
+ )
26
+
27
+ logger = get_logger(__name__)
28
+
29
+
30
+
31
+ class PDFDownloader:
32
+ """
33
+ Download PDFs from ArXiv with retry logic and progress tracking
34
+ """
35
+
36
+ def __init__(self):
37
+ # Configure requests session
38
+ # WHY SESSION: Reuses TCP Connection across requests
39
+ # (faster than creating new connection per download)
40
+ self.session = requests.Session()
41
+ self.session.headers.update(
42
+ {
43
+ # Identify themselves to ArXiv - polite and avoids blocks
44
+ "User-Agent": "ResearchPilot/1.0 (educational research project)"
45
+ }
46
+ )
47
+
48
+
49
+ # Directory for downloaded PDFs
50
+ self.pdf_dir = RAW_DIR / "pdfs"
51
+ self.pdf_dir.mkdir(exist_ok = True)
52
+
53
+
54
+ def download_pdf(self, paper_id: str, pdf_url: str) -> bool:
55
+ """
56
+ Download a single PDF with retry logic.
57
+
58
+ Args:
59
+ paper_id: ArXiv paper ID (used for filename)
60
+ pdf_url: Direct URL to the PDF
61
+
62
+ Returns:
63
+ True if downloaded successfully, False otherwise
64
+
65
+ RETRY PATTERN (Exponential Backoff):
66
+ Attempt 1: fail β†’ wait 2 seconds
67
+ Attempt 2: fail β†’ wait 4 seconds
68
+ Attempt 3: fail β†’ wait 8 seconds
69
+ β†’ give up, log error, continue to next paper
70
+
71
+ WHY EXPONENTIAL BACKOFF:
72
+ If a server is overloaded, hammering it with immediate retries
73
+ makes things worse. Waiting longer between retries gives the
74
+ server time to recover. This is standard practice for all
75
+ production systems that call external services.
76
+ """
77
+ output_path = self.pdf_dir / f"{paper_id}.pdf"
78
+
79
+
80
+ # Skip if already downloaded (idempotent)
81
+ if output_path.exists() and output_path.stat().st_size > 1000:
82
+ logger.debug(f"PDF already exists: {paper_id}")
83
+ return True
84
+
85
+ for attempt in range(1, MAX_DOWNLOAD_RETRIES + 1):
86
+ try:
87
+ logger.debug(f"Downloading {paper_id} (attempt {attempt})")
88
+
89
+ # stream = True means we download in chunks, not all at once
90
+ # This prevents running out of memory on large PDFs
91
+ response = self.session.get(
92
+ pdf_url,
93
+ timeout = PDF_DOWNLOAD_TIMEOUT,
94
+ stream = True
95
+ )
96
+
97
+ # Raise exception for 4xx or 5xx status codes
98
+ response.raise_for_status()
99
+
100
+ # Write PDF to disk in chunks of 8KB
101
+ with open(output_path, "wb") as f:
102
+ for chunk in response.iter_content(chunk_size = 8192):
103
+ if chunk: # Filter out keep-alive empty chunks
104
+ f.write(chunk)
105
+
106
+ # Verify file is not empty or suspiciously small
107
+ file_size = output_path.stat().st_size
108
+ if file_size < 1000:
109
+ logger.warning(f"Suspiciously small PDF: {paper_id} ({file_size} bytes)")
110
+ output_path.unlink() # Delete Bad File
111
+ return False
112
+
113
+ logger.debug(f"Downloaded {paper_id}: {file_size / 1024:.1f} KB")
114
+ return True
115
+
116
+
117
+ except requests.exceptions.RequestException as e:
118
+ logger.warning(f"Download attempt {attempt} failed for {paper_id}: {e}")
119
+
120
+ if attempt < MAX_DOWNLOAD_RETRIES:
121
+ # Exponential backoff: 2^attempt seconds
122
+ wait_time = 2 ** attempt
123
+ logger.debug(f"Waiting {wait_time}s before retry...")
124
+ time.sleep(wait_time)
125
+ else:
126
+ logger.error(f"All {MAX_DOWNLOAD_RETRIES} attemps failed for {paper_id}")
127
+ return False
128
+
129
+
130
+ return False
131
+
132
+
133
+
134
+ def download_all(self, papers: list[str]) -> dict:
135
+ """
136
+ Download PDFs for a list of papers with progress tracking.
137
+
138
+ Args:
139
+ papers: List of paper metadata dicts (loaded from JSON files)
140
+
141
+ Returns:
142
+ Summary statistics dict
143
+ """
144
+
145
+ successful = 0
146
+ failed = 0
147
+ skipped = 0
148
+
149
+
150
+ # tqdm wraps our list to show a progress bar
151
+ # desc= sets the label on the progress bar
152
+ for paper in tqdm(papers, desc = "Downloading PDFs"):
153
+ paper_id = paper['paper_id']
154
+ pdf_url = paper['pdf_url']
155
+
156
+
157
+ # Skip already downloaded papers
158
+ if paper.get("pdf_downloaded"):
159
+ skipped += 1
160
+ continue
161
+
162
+ # Download with delay to respect rate limits
163
+ success = self.download_pdf(paper_id, pdf_url)
164
+
165
+
166
+ if success:
167
+ successful += 1
168
+ # Update the paper's JSON file to mark pdf_downloaded = True
169
+ self._mark_downloaded(paper_id)
170
+ time.sleep(ARXIV_API_DELAY_SECONDS)
171
+ else:
172
+ failed += 1
173
+
174
+
175
+
176
+ summary = {
177
+ "successful": successful,
178
+ "failed": failed,
179
+ "skipped": skipped,
180
+ "total": len(papers)
181
+ }
182
+
183
+ logger.info(f"PDF download complete: {summary}")
184
+ return summary
185
+
186
+
187
+ def _mark_downloaded(self, paper_id: str):
188
+ """
189
+ Update the paper's JSON metadata to mark pdf_downloaded = True.
190
+ This updates our pipeline state flag.
191
+ """
192
+ json_path = RAW_DIR / f"{paper_id}.json"
193
+
194
+ if not json_path.exists():
195
+ return
196
+
197
+ with open(json_path, 'r', encoding = 'utf-8') as f:
198
+ data = json.load(f)
199
+
200
+
201
+ data["pdf_downloaded"] = True
202
+
203
+ with open(json_path, "w") as f:
204
+ json.dump(data, f, indent = 2)
src/processing/__init__.py ADDED
File without changes
src/processing/pdf_extractor.py ADDED
@@ -0,0 +1,253 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Extracts and cleans text from downloaded PDF files.
3
+
4
+ WHY PYMUPDF (fitz) over alternatives:
5
+ Library | Speed | Quality | Handles columns?
6
+ ---------------|--------|------------|-----------------
7
+ PyMuPDF | Fast | β˜…β˜…β˜…β˜…β˜… | Yes (sort=True)
8
+ pdfplumber | Medium | β˜…β˜…β˜…β˜…β˜† | Partial
9
+ pypdf2 | Medium | β˜…β˜…β˜…β˜†β˜† | No
10
+ pdfminer | Slow | β˜…β˜…β˜…β˜…β˜† | Partial
11
+
12
+ PyMuPDF's sort=True parameter reads text in natural reading order
13
+ (top-to-bottom, left-to-right) which is critical for multi-column
14
+ academic papers.
15
+ """
16
+
17
+ import json
18
+ from pathlib import Path
19
+
20
+ import fitz # PyMuPDF - imported as 'fitz' (legacy name from founder)
21
+ from tqdm import tqdm
22
+
23
+ from src.processing.text_cleaner import clean_text
24
+ from src.utils.logger import get_logger
25
+ from config.settings import (
26
+ RAW_DIR,
27
+ PROCESSED_DIR,
28
+ MIN_TEXT_LENGTH,
29
+ MAX_TEXT_LENGTH
30
+ )
31
+
32
+ logger = get_logger(__name__)
33
+
34
+
35
+
36
+ class PDFExtractor:
37
+ """
38
+ Extracts clean text from PDF files and saves to processed directory.
39
+
40
+ Output structure for each paper:
41
+ data/processed/2301.07041.json ← cleaned text + original metadata
42
+ """
43
+
44
+ def __init__(self):
45
+ self.pdf_dir = RAW_DIR / 'pdfs'
46
+
47
+
48
+ def extract_text_from_pdf(self, pdf_path: Path) -> str:
49
+ """
50
+ Extract raw text from a PDF using PyMuPDF.
51
+
52
+ Args:
53
+ pdf_path: Path to the PDF file
54
+
55
+ Returns:
56
+ Raw extracted text string (not yet cleaned)
57
+
58
+ HOW PYMUPDF READS PDFS:
59
+ PDF is a page-based format. We iterate each page,
60
+ extract text with sort=True (reading order), then
61
+ join all pages. The 'text' flag tells PyMuPDF to
62
+ extract plain text (vs HTML or dict formats).
63
+ """
64
+ try:
65
+ # Open PDF - fitz.open() handles file reading
66
+ doc = fitz.open(str(pdf_path))
67
+
68
+
69
+ pages_text = []
70
+
71
+ for page_num, page in enumerate(doc):
72
+ # get_text("text", sort = True)
73
+ # "text" -> plain text extraction mode
74
+ # sort = True -> respect reading order (critical for columns)
75
+ page_text = page.get_text("text", sort = True)
76
+
77
+ if page_text.strip():
78
+ pages_text.append(page_text)
79
+
80
+ # Close the document to free memory
81
+ doc.close()
82
+
83
+
84
+ # Join all pages with double newline (paragraph seperator)
85
+ full_text = '\n\n'.join(pages_text)
86
+ return full_text
87
+
88
+
89
+ except Exception as e:
90
+ logger.error(f"Failed to extract text from {pdf_path.name}: {e}")
91
+ return ""
92
+
93
+
94
+
95
+ def validate_extracted_text(self, text: str, paper_id: str) -> tuple[bool, str]:
96
+ """
97
+ Validate that extracted text is usable.
98
+
99
+ Returns:
100
+ (is_valid: bool, reason: str)
101
+
102
+ VALIDATION RULES:
103
+ 1. Not empty
104
+ 2. Long enough to be a real paper (not a 1-page erratum)
105
+ 3. Not too long (might indicate extraction corruption)
106
+ 4. Contains alphabetic characters (not just symbols/numbers)
107
+ 5. Is primarily English (our embedding model is English-optimized)
108
+ """
109
+ if not text:
110
+ return False, "Empty text"
111
+
112
+ if len(text) < MIN_TEXT_LENGTH:
113
+ return False, f"Too short: {len(text)} chars < {MIN_TEXT_LENGTH}"
114
+
115
+ if len(text) > MAX_TEXT_LENGTH:
116
+ return False, f"Too long: {len(text)} chars > {MAX_TEXT_LENGTH}"
117
+
118
+
119
+ # Check that text contains substantial alphabetic content
120
+ # (not just numbers, equations, or garbled encoding)
121
+ alpha_chars = sum(1 for c in text if c.isalpha())
122
+ alpha_ratio = alpha_chars / len(text)
123
+
124
+
125
+ if alpha_ratio < 0.4:
126
+ return False, f"Low alphanumeric ration: {alpha_ratio:.2f} (likely encoding issue)"
127
+
128
+ return True, "Valid"
129
+
130
+
131
+
132
+ def process_paper(self, paper_metadata: dict) -> bool:
133
+ """
134
+ Full pipeline for one paper: extract -> clean -> validate -> save.
135
+
136
+ Args:
137
+ paper_metadata: dict loaded from data/raw/{paper_id}.json
138
+
139
+ Returns:
140
+ True if processed successfully, False otherwise
141
+ """
142
+ paper_id = paper_metadata['paper_id']
143
+
144
+ # Skip if already processed (idempotent)
145
+ output_path = PROCESSED_DIR / f'{paper_id}.json'
146
+ if output_path.exists():
147
+ logger.debug(f"Already processed: {paper_id}")
148
+ return True
149
+
150
+ # Check PDF exists
151
+ pdf_path = self.pdf_dir / f"{paper_id}.pdf"
152
+ if not pdf_path.exists():
153
+ logger.warning(f"PDF not found for {paper_id}, using abstract only")
154
+ # FALLBACK: Use abstract as the text source
155
+ # Abstract is short but better than nothing
156
+ # This handles cases where PDF download failed
157
+ text = paper_metadata.get("abstract", "")
158
+ if not text:
159
+ return False
160
+
161
+ else:
162
+ # Extract from PDF
163
+ raw_text = self.extract_text_from_pdf(pdf_path)
164
+
165
+
166
+ # Clean the text
167
+ text = clean_text(raw_text)
168
+
169
+
170
+ # Validate
171
+ is_valid, reason = self.validate_extracted_text(text, paper_id)
172
+ if not is_valid:
173
+ logger.warning(f"Validation failed for {paper_id}: {reason}")
174
+ return False
175
+
176
+ # Build processed document
177
+ processed_doc = {
178
+ # Copy all original metadata
179
+ **paper_metadata,
180
+
181
+ # Add processed text
182
+ "full_text": text,
183
+ "text_length": len(text),
184
+ "word_count": len(text.split()),
185
+
186
+ # Update pipeline state
187
+ "text_extracted": True,
188
+ "pdf_downloaded": paper_metadata.get("pdf_downloaded", False),
189
+ }
190
+
191
+
192
+ # Save to processed directory
193
+ with open(output_path, "w", encoding = 'utf-8') as f:
194
+ json.dump(processed_doc, f, indent = 2, ensure_ascii = False)
195
+
196
+ logger.debug(
197
+ f"Processed {paper_id}: "
198
+ f"{processed_doc['word_count']} words, "
199
+ f"{len(text)} chars"
200
+ )
201
+
202
+ return True
203
+
204
+
205
+
206
+ def process_all(self) -> dict:
207
+ """
208
+ Process all papers that have been fetched.
209
+
210
+ Loads metadata from data/raw/, extracts text,
211
+ saves results to data/processed/.
212
+ """
213
+ # Load all paper metadata from raw directory
214
+ raw_files = [
215
+ f for f in RAW_DIR.glob("*.json")
216
+ if f.name != "paper_index.json"
217
+ ]
218
+
219
+
220
+ logger.info(f"Found {len(raw_files)} papers to process")
221
+
222
+ successful = 0
223
+ failed = 0
224
+ skipped = 0
225
+
226
+
227
+
228
+ for raw_file in tqdm(raw_files, desc = "Extracting text"):
229
+ with open(raw_file, 'r', encoding = 'utf-8') as f:
230
+ metadata = json.load(f)
231
+
232
+ # Skip if alrady processed
233
+ output_path = PROCESSED_DIR / f"{metadata['paper_id']}.json"
234
+ if output_path.exists():
235
+ skipped += 1
236
+ continue
237
+
238
+ success = self.process_paper(metadata)
239
+ if success:
240
+ successful += 1
241
+ else:
242
+ failed += 1
243
+
244
+
245
+ stats = {
246
+ "total": len(raw_files),
247
+ "successful": successful,
248
+ "failed": failed,
249
+ "skipped": skipped,
250
+ }
251
+
252
+ logger.info(f"Processing complete: {stats}")
253
+ return stats
src/processing/text_cleaner.py ADDED
@@ -0,0 +1,262 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Text normalization utilities for extracted PDF content.
3
+
4
+ These functions are PURE FUNCTIONS β€” they take a string,
5
+ return a string, have no side effects, and are independently
6
+ testable. This is the correct way to write data transformation
7
+ logic.
8
+ """
9
+
10
+ import re
11
+ import unicodedata
12
+ import ftfy
13
+
14
+ from src.utils.logger import get_logger
15
+
16
+ logger = get_logger(__name__)
17
+
18
+
19
+ def fix_hyphenated_linebreaks(text: str) -> str:
20
+ """
21
+ Fix words broken across lines with hyphens.
22
+
23
+ Research PDFs use justified text with hyphenation:
24
+ "This is a demon-
25
+ stration of the problem"
26
+
27
+ Should become:
28
+ "This is a demonstration of the problem"
29
+
30
+ REGEX EXPLANATION:
31
+ ([a-zA-Z]) -> capture a letter (end of line fragment)
32
+ - -> literal hyphen
33
+ \n -> newline
34
+ \s* -> optional whitespace on next line
35
+ ([a-zA-Z]) -> capture a letter (start of continuation)
36
+ """
37
+
38
+ return re.sub(r'([a-zA-Z])-\n\s*([a-zA-Z])', r'\1\2', text)
39
+
40
+
41
+
42
+ def remove_page_artifacts(text: str) -> str:
43
+ """
44
+ Remove common PDF page artifacts that pollute extracted text.
45
+
46
+ Handles:
47
+ - Form feed characters (\x0c) that mark page boundaries
48
+ - Standalone page numbers (lines containing only digits)
49
+ - Running headers/footers (short lines that repeat)
50
+ """
51
+
52
+ # Remove form feed characters (page breaks)
53
+ text = text.replace('\x0c', '\n')
54
+
55
+ lines = text.split('\n')
56
+ cleaned_lines = []
57
+
58
+
59
+ for line in lines:
60
+ stripped = line.strip()
61
+
62
+ # Skip empty lines (we'll normalize spacing later)
63
+ if not stripped:
64
+ cleaned_lines.append('')
65
+ continue
66
+
67
+
68
+ # Skip standalone page numbers: lines that are ONLY digits
69
+ # e.g., "12", "247"
70
+ if re.match(r'^\d{1,4}$', stripped):
71
+ continue
72
+
73
+ # Skip lines that look like page header/footers
74
+ # Pattern: short lines with mostly uppercase or digits
75
+ # e.g., "NEURIPS 2023", "arXiv:2301.07041v2"
76
+ # FIX: Check if the line CONTAINS these patterns anywhere,
77
+ # not just at the start. Also expanded patterns.
78
+ artifact_patterns = [
79
+ r'arXiv:\d{4}\.\d+', # arXiv:2301.07041v2
80
+ r'^doi:\s*10\.', # DOI lines
81
+ r'Preprint\.\s*Under review', # "Preprint. Under review"
82
+ r'Under review', # Review notice
83
+ r'Proceedings of (ICML|NeurIPS|ICLR|CVPR|ACL|EMNLP)',
84
+ r'(ICML|NeurIPS|ICLR|CVPR|ACL|EMNLP)\s+20\d{2}', # "ICML 2023"
85
+ r'Workshop on', # Workshop lines
86
+ r'^\*+Equal contribution', # Footnotes
87
+ r'^\dDepartment of', # Affiliation footnotes
88
+ r'^\d+University of', # University affiliations
89
+ r'Correspondence to:', # Contact info
90
+ ]
91
+
92
+ is_artifacts = False
93
+ for pattern in artifact_patterns:
94
+ if re.search(pattern, stripped, re.IGNORECASE):
95
+ is_artifacts = True
96
+ break
97
+
98
+ if is_artifacts:
99
+ continue
100
+
101
+ cleaned_lines.append(line)
102
+
103
+ return "\n".join(cleaned_lines)
104
+
105
+
106
+ def normalize_whitespace(text: str) -> str:
107
+ """
108
+ Normalize all forms of whitespace to standard single spaces.
109
+
110
+ PDFs produce various whitespace characters:
111
+ - Multiple consecutive spaces (from column alignment)
112
+ - Tabs
113
+ - Non-breaking spaces (\xa0)
114
+ - Zero-width spaces
115
+
116
+ STRATEGY:
117
+ 1. Replace all non-newline whitespace with single space
118
+ 2. Collapse multiple newlines into max double newline
119
+ (preserving paragraph breaks)
120
+ 3. Strip leading/trailing whitespace
121
+ """
122
+
123
+ # Replace tabs and non-breaking spaces with regular space
124
+ text = text.replace('\t', ' ')
125
+ text = text.replace('\xa0', ' ')
126
+
127
+ # Collapse multiple spaces into one
128
+ # re.sub with pattern ' +' matches one or more spaces
129
+ text = re.sub(r' +', ' ', text)
130
+
131
+ # Collapse 3+ consecutive newlines into exactly 2
132
+ # (preserves paragraph breaks without excessive gaps)
133
+ text = re.sub(r'\n{3,}', '\n\n', text)
134
+
135
+ # Strip each line individually, then rejoin
136
+ lines = [line.strip() for line in text.split('\n')]
137
+ text = '\n'.join(lines)
138
+
139
+ return text.strip()
140
+
141
+
142
+
143
+ def fix_unicode(text: str) -> str:
144
+ """
145
+ Fix broken Unicode encoding common in PDF text extraction.
146
+
147
+ PDFs often have encoding issues:
148
+ - "Ò€ℒ" instead of "'" (UTF-8 read as Latin-1)
149
+ - "é" instead of "Γ©"
150
+ - Ligature characters: "fi" (fi ligature) instead of "fi"
151
+
152
+ ftfy (Fixes Text For You) handles all these cases automatically.
153
+ It was created at Luminoso and is used in production at scale.
154
+ """
155
+ return ftfy.fix_text(text)
156
+
157
+
158
+
159
+ def remove_reference_section(text: str) -> str:
160
+ """
161
+ Remove the bibliography/references section from papers.
162
+
163
+ WHY: References contain hundreds of author names, journal names,
164
+ and years. These would pollute our vector index β€” if someone asks
165
+ about "attention mechanisms", we don't want to retrieve a chunk
166
+ that's just a list of citations like:
167
+ "Vaswani, A., Shazeer, N., Parmar, N., ... (2017). Attention is all you need."
168
+
169
+ APPROACH: Find the last occurrence of a "References" header and
170
+ remove everything after it. We use LAST occurrence because some
171
+ papers have "Related Work" sections that reference other sections
172
+ before the actual bibliography.
173
+ """
174
+ # Patterns that signal start of references section
175
+ # re.IGNORECASE to handle "References", "REFERENCES", "Bibliography"
176
+ # FIX: More robust patterns that handle varied spacing
177
+ referece_patterns = [
178
+ r'\n\s*References\s*\n',
179
+ r'\n\s*REFERENCES\s*\n',
180
+ r'\n\s*Bibliography\s*\n',
181
+ r'\n\s*BIBLIOGRAPHY\s*\n',
182
+ r'\n\s*\d+\.\s*References\s*\n',
183
+ r'\n\s*\d+\s+References\s*\n',
184
+ # Handle case where References appears after a section number
185
+ r'\nReferences$', # At end of line
186
+ ]
187
+
188
+
189
+ last_match_pos = -1
190
+
191
+ for pattern in referece_patterns:
192
+ # Find all matches, take the last one
193
+ matches = list(re.finditer(pattern, text, re.MULTILINE))
194
+ if matches:
195
+ # Take position of the last match
196
+ pos = matches[-1].start()
197
+ if pos > last_match_pos:
198
+ last_match_pos = pos
199
+
200
+
201
+ if last_match_pos > 0:
202
+ # Only remove if references is in the last 40% of document
203
+ # Increased from 30% because some papers have long conclusions
204
+ cutoff_threshold = len(text) * 0.60
205
+ if last_match_pos > cutoff_threshold:
206
+ text = text[:last_match_pos]
207
+ logger.debug('References section removed')
208
+ else:
209
+ logger.debug(
210
+ f"Reference found at {last_match_pos/len(text):.0%} "
211
+ f"- too early to be bibliography, keeping"
212
+ )
213
+
214
+ return text
215
+
216
+
217
+
218
+
219
+ def remove_short_lines(text: str, min_length: int = 3) -> str:
220
+ """
221
+ Remove lines that are too short to be meaningful content.
222
+
223
+ Very short lines in PDFs are usually:
224
+ - Stray characters from column separators
225
+ - Figure/table labels: "Fig.", "Table 1"
226
+ - Single letter section markers
227
+
228
+ We keep lines >= min_length characters.
229
+ """
230
+ lines = text.split('\n')
231
+ cleaned = [
232
+ line for line in lines
233
+ if len(line.strip()) == 0 or len(line.strip()) >= min_length
234
+ ]
235
+
236
+ return '\n'.join(cleaned)
237
+
238
+
239
+ def clean_text(text: str) -> str:
240
+ """
241
+ Master cleaning function β€” applies all transformations in order.
242
+
243
+ ORDER MATTERS:
244
+ 1. Fix encoding first (so subsequent regex works on clean chars)
245
+ 2. Fix hyphenation (before whitespace normalization)
246
+ 3. Remove page artifacts (before whitespace normalization)
247
+ 4. Remove references (on mostly clean text)
248
+ 5. Remove short lines
249
+ 6. Normalize whitespace LAST (cleans up after all other operations)
250
+ """
251
+ if not text or not text.strip():
252
+ return ""
253
+
254
+ text = fix_unicode(text)
255
+ text = fix_hyphenated_linebreaks(text)
256
+ text = remove_page_artifacts(text)
257
+ text = remove_reference_section(text)
258
+ text = remove_short_lines(text)
259
+ text = normalize_whitespace(text)
260
+
261
+
262
+ return text
src/utils/__init__.py ADDED
File without changes
src/utils/logger.py ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Centralized logging setup using loguru.
3
+
4
+ WHY LOGURU over Python's built-in logging:
5
+ - Built-in logging requires 10+ lines of boilerplate to set up properly
6
+ - Loguru does it in 3 lines
7
+ - Loguru auto-formats with colors, timestamps, and file/line info
8
+ - Loguru handles file rotation automatically
9
+ - Every serious Python project at companies like Stripe uses structured logging
10
+ """
11
+
12
+ import sys
13
+ from loguru import logger
14
+ from config.settings import LOG_LEVEL, LOG_FILE, LOG_ROTATION, LOG_RETENTION
15
+
16
+
17
+ def setup_logger():
18
+ """
19
+ Configure loguru logger with both console and file output.
20
+
21
+ Return the configured logger instance
22
+ Call this once at application startup
23
+ """
24
+
25
+ # Remove the default logger handler
26
+ # (it only prints to console with basic formatting)
27
+ logger.remove()
28
+
29
+ # ----------- Console Handler -----------
30
+ # Prints colored, formatted logs to terminal
31
+ # Format: 2024-01-15 10:23:45 | INFO | module:function:42 | Message
32
+ logger.add(
33
+ sys.stdout,
34
+ level = LOG_LEVEL,
35
+ format = (
36
+ "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
37
+ "<level>{level: <8}</level> | "
38
+ "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
39
+ "<level>{message}</level>"
40
+ ),
41
+ colorize = True
42
+ )
43
+
44
+ # ----------- File Handler -----------
45
+ # Writes all logs to file for debugging and monitoring
46
+ # rotation="10 MB" -> creates new file when current reaches 10MB
47
+ # retention="7 days" -> deletes log files oder than 7 days
48
+ logger.add(
49
+ LOG_FILE,
50
+ level = 'DEBUG',
51
+ format = "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} | {message}",
52
+ rotation = LOG_ROTATION,
53
+ retention = LOG_RETENTION,
54
+ encoding = 'utf-8'
55
+ )
56
+
57
+ return logger
58
+
59
+
60
+
61
+ # Create the logger instance
62
+ # Other modules import this directly:
63
+ # from src.utils.logger import get_logger
64
+ # logger = get_logger(__name__)
65
+ def get_logger(name: str):
66
+ """
67
+ Get a named logger instance.
68
+ The name appears in log output so you know which module logged what
69
+
70
+ Usage:
71
+ from src.utils.logger import get_logger
72
+ logger = get_logger(__name__)
73
+ logger.info("Starting ingestion...")
74
+ """
75
+ return logger.bind(name = name)
76
+
77
+ # Initialize logger when this module is first imported
78
+ # setup_logger()
test_fetch.py ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ from src.utils.logger import setup_logger
2
+ from src.ingestion.arxiv_fetcher import ArXivFetcher
3
+
4
+ setup_logger()
5
+ fetcher = ArXivFetcher()
6
+ papers = fetcher.fetch_papers(max_papers = 5)
7
+
8
+
9
+ for p in papers:
10
+ print(f"{p.paper_id}: {p.title[:60]}...")
test_fetch_2.py ADDED
@@ -0,0 +1,128 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # test_fetch.py
2
+ """
3
+ Smart test script that handles existing data correctly.
4
+ Tests three things:
5
+ 1. Can we load existing papers from disk?
6
+ 2. Can we fetch NEW papers (beyond what we have)?
7
+ 3. Is our data schema correct?
8
+ """
9
+
10
+ import json
11
+ from pathlib import Path
12
+ from src.utils.logger import setup_logger, get_logger
13
+ from src.ingestion.arxiv_fetcher import ArXivFetcher
14
+ from config.settings import RAW_DIR
15
+
16
+ setup_logger()
17
+ logger = get_logger(__name__)
18
+
19
+ def test_existing_data():
20
+ """Check what we already have on disk."""
21
+ paper_files = [
22
+ f for f in RAW_DIR.glob("*.json")
23
+ if f.name != "paper_index.json"
24
+ ]
25
+
26
+ logger.info(f"Papers already on disk: {len(paper_files)}")
27
+
28
+ if not paper_files:
29
+ logger.warning("No papers found on disk. Run fetch first.")
30
+ return []
31
+
32
+ papers = []
33
+ for pf in paper_files[:3]: # Show first 3
34
+ with open(pf) as f:
35
+ data = json.load(f)
36
+ papers.append(data)
37
+ logger.info(f" -> {data['paper_id']}: {data['title'][:60]}...")
38
+ logger.info(f" Category: {data['primary_categories']} | Date: {data['published_date']}")
39
+
40
+ return papers
41
+
42
+ def test_schema_validation():
43
+ """Verify our Pydantic schema works correctly."""
44
+ from src.ingestion.arxiv_fetcher import PaperMetadata
45
+
46
+ logger.info("Testing schema validation...")
47
+
48
+ # Test with valid data
49
+ try:
50
+ paper = PaperMetadata(
51
+ paper_id = "http://arxiv.org/abs/2301.07041v2", # Raw ID with version
52
+ title = " Test Paper With Extra Spaces ",
53
+ abstract = "This is a test abstract.",
54
+ authors = ["Author One", "Author Two"],
55
+ categories = ["cs.LG", "cs.AI"],
56
+ primary_categories = "cs.LG",
57
+ published_date = "2023-01-17",
58
+ updated_date = "2023-03-15",
59
+ arxiv_url = "https://arxiv.org/abs/2301.07041",
60
+ pdf_url = "https://arxiv.org/pdf/2301.07041",
61
+ )
62
+
63
+ # Verify our validators ran
64
+ assert paper.paper_id == "2301.07041", f"ID cleanup failed: {paper.paper_id}"
65
+ assert paper.title == "Test Paper With Extra Spaces", f"Whitespace cleanup failed: {paper.title}"
66
+
67
+ logger.info(" -> Schema validation: PASSED")
68
+ logger.info(f" paper_id cleaned: '2301.07041'")
69
+ logger.info(f" title cleaned: '{paper.title}'")
70
+ return True
71
+
72
+ except Exception as e:
73
+ logger.error(f" -> Schema validation FAILED: {e}")
74
+ return False
75
+
76
+ def test_fresh_fetch(n: int = 3):
77
+ """
78
+ Fetch papers, but temporarily ignore existing index
79
+ to force fresh results for testing.
80
+ """
81
+ logger.info(f"Fetching {n} fresh papers from ArXiv...")
82
+
83
+ fetcher = ArXivFetcher()
84
+
85
+ # TEMPORARY: clear existing IDs in memory only (not on disk)
86
+ # This lets us test the fetch logic without deleting real data
87
+ original_ids = fetcher.existing_ids.copy()
88
+ fetcher.existing_ids = set() # Pretend we have nothing
89
+
90
+ papers = fetcher.fetch_papers(max_papers=n)
91
+
92
+ # Restore original IDs
93
+ fetcher.existing_ids = original_ids
94
+
95
+ if papers:
96
+ logger.info(f" -> Fresh fetch: PASSED. Got {len(papers)} papers")
97
+ for p in papers:
98
+ logger.info(f" {p.paper_id}: {p.title[:55]}...")
99
+ else:
100
+ logger.warning(" -> Fresh fetch returned 0 papers. Check network connection.")
101
+
102
+ return papers
103
+
104
+ def main():
105
+ logger.info("=" * 55)
106
+ logger.info("RESEARCHPILOT β€” INGESTION TEST SUITE")
107
+ logger.info("=" * 55)
108
+
109
+ # Test 1: Existing data
110
+ logger.info("\n[TEST 1] Checking existing data on disk...")
111
+ existing = test_existing_data()
112
+
113
+ # Test 2: Schema validation
114
+ logger.info("\n[TEST 2] Schema validation...")
115
+ test_schema_validation()
116
+
117
+ # Test 3: Fresh fetch
118
+ logger.info("\n[TEST 3] Fresh fetch from ArXiv...")
119
+ fresh = test_fresh_fetch(n=3)
120
+
121
+ logger.info("\n" + "=" * 55)
122
+ logger.info("TEST SUITE COMPLETE")
123
+ logger.info(f"Existing papers: {len(existing)} shown (may have more)")
124
+ logger.info(f"Fresh papers fetched: {len(fresh)}")
125
+ logger.info("=" * 55)
126
+
127
+ if __name__ == "__main__":
128
+ main()
test_processing.py ADDED
@@ -0,0 +1,38 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from src.utils.logger import setup_logger, get_logger
2
+ from src.processing.text_cleaner import clean_text
3
+
4
+ setup_logger()
5
+ logger = get_logger(__name__)
6
+
7
+ # Simulate dirty PDF text
8
+ dirty_text = """
9
+ arXiv:2301.07041v2 [cs.LG] 17 Jan 2023
10
+
11
+ We propose a novel at-
12
+ tention mechanism that re-
13
+ duces computational com-
14
+ plexity significantly.
15
+
16
+ This method achieves state-of-the-art results.
17
+
18
+ 2
19
+
20
+ ICML 2023 Workshop
21
+
22
+ The key insight is that sparse attention patterns
23
+ can approximate full attention with minimal quality loss.
24
+
25
+ References
26
+
27
+ Vaswani, A., et al. (2017). Attention is all you need.
28
+ Brown, T., et al. (2020). Language models are few-shot learners.
29
+ """
30
+
31
+ cleaned = clean_text(dirty_text)
32
+
33
+ logger.info("─── DIRTY TEXT ───")
34
+ print(dirty_text[:300])
35
+ logger.info("─── CLEANED TEXT ───")
36
+ print(cleaned)
37
+ logger.info(f"Original length: {len(dirty_text)}")
38
+ logger.info(f"Cleaned length: {len(cleaned)}")