File size: 15,028 Bytes
674fb4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request, UploadFile, File, Form, Query
from fastapi import status
from pathlib import Path
from typing import List, Dict, Any, Optional

from ...core.neo4j_store import Neo4jStore
from ...retrieval.agent import AgentRetrievalSystem
from ...ingestion.pipeline import IngestionPipeline
from ...config import settings
from ...api.models import *
from ...api.auth import get_current_user, User
from ...workers.celery_worker import ingest_document_task, celery_app
from celery.result import AsyncResult
import redis
from ..dependencies import get_graph_store, get_retrieval_agent, get_ingestion_pipeline, get_redis_client

router = APIRouter()

from ...core.storage import get_storage
storage = get_storage()

@router.post("/api/documents/upload", response_model=DocumentUploadResponse, tags=["Documents"])
async def upload_document(request: Request, 
    file: UploadFile = File(...),
    current_user: User = Depends(get_current_user)
):
    """
    Upload document for ingestion
    Returns task ID for tracking ingestion progress
    """
    
    # Validate file type
    file_extension = Path(file.filename).suffix.lower()
    if file_extension not in settings.allowed_file_types:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"File type {file_extension} not allowed. Allowed types: {settings.allowed_file_types}"
        )

    # Validate MIME type using python-magic
    import magic
    file_header = await file.read(2048)
    await file.seek(0)
    mime_type = magic.from_buffer(file_header, mime=True)
    
    # Basic mapping of extension to MIME types for allowed_file_types
    allowed_mimes = {
        ".pdf": ["application/pdf"],
        ".txt": ["text/plain"],
        ".md": ["text/plain", "text/markdown"],
        ".csv": ["text/csv", "text/plain"],
        ".xlsx": ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"],
        ".pptx": ["application/vnd.openxmlformats-officedocument.presentationml.presentation"]
    }
    
    if file_extension in allowed_mimes and mime_type not in allowed_mimes[file_extension]:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"File content ({mime_type}) does not match extension {file_extension}"
        )

    # SECURITY: sanitize filename to prevent path traversal (e.g. "../../../etc/passwd")
    import re as _re
    safe_stem = _re.sub(r"[^\w\-]", "_", Path(file.filename).stem)[:100]
    safe_name = f"{safe_stem}{file_extension}"
    file_path = settings.upload_dir / safe_name
    # Ensure the resolved path is still inside upload_dir
    try:
        file_path.resolve().relative_to(settings.upload_dir.resolve())
    except ValueError:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Invalid filename"
        )

    import aiofiles
    async with aiofiles.open(file_path, "wb") as buffer:
        while chunk := await file.read(8192):
            await buffer.write(chunk)
    
    file_size = file_path.stat().st_size
    import hashlib
    hasher = hashlib.sha256()
    hasher.update(str(file_path).encode())
    hasher.update(str(file_path.stat().st_mtime).encode())
    doc_id = hasher.hexdigest()[:16]
    
    # Validate file size
    if file_size > settings.max_upload_size_mb * 1024 * 1024:
        file_path.unlink()
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"File too large. Maximum size: {settings.max_upload_size_mb}MB"
        )
    
    # Queue ingestion task
    task = ingest_document_task.delay(
        str(file_path),
        ontology_dict=None,
        tenant_id=current_user.tenant_id
    )
    
    return DocumentUploadResponse(
        document_id=doc_id,
        filename=file.filename,
        size_bytes=file_size,
        task_id=task.id,
        message="Document uploaded successfully. Ingestion in progress."
    )



@router.post("/api/documents/scrape", response_model=DocumentUploadResponse, tags=["Documents"])
async def scrape_url(
    request: ScrapeRequest,
    current_user: User = Depends(get_current_user)
):
    """
    Scrape URL content into text and ingest it.
    """
    import httpx
    from bs4 import BeautifulSoup
    import markdownify
    import re
    from ...ingestion.web_crawler import WebCrawler

    try:
        import sys
        if hasattr(sys.stdout, 'reconfigure'):
            sys.stdout.reconfigure(encoding='utf-8')
            sys.stderr.reconfigure(encoding='utf-8')

        # We will attempt to use the powerful AsyncWebCrawler which runs Playwright headless and naturally bypasses 403 blocks.
        crawler = WebCrawler(max_depth=0, max_pages=1)
        results = await crawler.crawl(request.url)
        
        if not results or not results[0].get("markdown"):
            raise ValueError("No content was returned by the crawler.")
            
        text = results[0]["markdown"]
        title = results[0].get("title", "scraped_page")
        if not title:
            title = "scraped_page"
            
        safe_title = re.sub(r'[^a-zA-Z0-9_\-]', '_', title)
        filename = f"{safe_title}.md"
        
        # Save to disk
        file_path = settings.upload_dir / filename
        
        import aiofiles
        async with aiofiles.open(file_path, "w", encoding="utf-8") as buffer:
            await buffer.write(text)
            
        file_size = file_path.stat().st_size
        import hashlib
        hasher = hashlib.sha256()
        hasher.update(str(file_path).encode())
        hasher.update(str(file_path.stat().st_mtime).encode())
        doc_id = hasher.hexdigest()[:16]
        
        # Queue ingestion
        task = ingest_document_task.delay(
            str(file_path),
            ontology_dict=None,
            tenant_id=current_user.tenant_id
        )
        
        return DocumentUploadResponse(
            document_id=doc_id,
            filename=filename,
            size_bytes=file_size,
            task_id=task.id,
            message="URL scraped and ingestion initiated successfully."
        )
        
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to scrape URL: {str(e)}"
        )



@router.post("/api/documents/crawl", tags=["Documents"])
async def crawl_urls(
    request: CrawlRequest,
    background_tasks: BackgroundTasks,
    current_user: User = Depends(get_current_user)
):
    """
    Advanced async Web Crawling using locally-hosted Crawl4AI (Playwright).
    This extracts clean Markdown format and queues items into Celery for Graph ingestion.
    """
    from ...ingestion.web_crawler import WebCrawler
    import re
    import hashlib
    
    crawler = WebCrawler(max_depth=request.max_depth, max_pages=request.max_pages)
    
    async def run_crawl_and_ingest():
        try:
            results = await crawler.crawl(request.url)
            for page in results:
                if not page.get("markdown"):
                    continue
                    
                # Create a safe filename
                safe_title = re.sub(r'[^a-zA-Z0-9_\-]', '_', page.get("title", "page_") or "page_")
                url_hash = hashlib.md5(page['url'].encode()).hexdigest()[:6]
                filename = f"crawled_{safe_title}_{url_hash}.txt"
                
                file_content = f"# Source Metadata\n- URL: {page['url']}\n- Title: {page['title']}\n\n"
                file_content += page["markdown"]
                
                storage.save_file(filename, file_content.encode("utf-8"))
                    
                # Queue parsing
                ingest_document_task.delay(filename, ontology_dict=None)
                
        except Exception as e:
            import logging
            logging.error(f"Crawling pipeline failed for {request.url}: {e}")
            
    background_tasks.add_task(run_crawl_and_ingest)
    
    return {
        "message": f"Crawler started asynchronously for {request.url} (up to {request.max_pages} pages)",
        "status": "processing"
    }



@router.get("/api/documents", response_model=DocumentListResponse, tags=["Documents"])
async def list_documents(request: Request, current_user: User = Depends(get_current_user)):
    """List all ingested documents for the current tenant"""
    tenant_id = current_user.tenant_id
    if tenant_id:
        query = """
        MATCH (d:Document {tenant_id: $tenant_id})
        RETURN d.id as id, d.filename as filename, d.file_type as file_type,
               d.size_bytes as size_bytes, toString(d.upload_date) as upload_date
        ORDER BY d.upload_date DESC
        """
        results = await request.app.state.graph_store.execute_query(query, {"tenant_id": tenant_id})
    else:
        query = """
        MATCH (d:Document)
        RETURN d.id as id, d.filename as filename, d.file_type as file_type,
               d.size_bytes as size_bytes, toString(d.upload_date) as upload_date
        ORDER BY d.upload_date DESC
        """
        results = await request.app.state.graph_store.execute_query(query)
    docs = [
        DocumentInfo(
            id=r["id"] or "",
            filename=r["filename"] or "",
            file_type=r["file_type"] or "",
            size_bytes=r["size_bytes"] or 0,
            upload_date=str(r["upload_date"] or "")[:19]
        )
        for r in results
    ]
    return DocumentListResponse(documents=docs, total=len(docs))



@router.delete("/api/documents/{document_id}", tags=["Documents"])
async def delete_document(request: Request, 
    document_id: str,
    current_user: User = Depends(get_current_user)
):
    """Delete a document and all its chunks and entity links from the graph"""
    # Remove chunks + document node; entities shared with other docs are kept
    # We must retrieve the filename from the graph before deleting the node
    query = "MATCH (d:Document {id: $doc_id}) RETURN d.filename as filename"
    results = await request.app.state.graph_store.execute_query(query, {"doc_id": document_id})
    filename_to_delete = results[0]["filename"] if results and results[0].get("filename") else None

    delete_query = """
    MATCH (d:Document {id: $doc_id})
    OPTIONAL MATCH (d)-[:CONTAINS]->(c:Chunk)
    DETACH DELETE c, d
    """
    await request.app.state.graph_store.execute_query(delete_query, {"doc_id": document_id})

    # Remove uploaded file from storage
    if filename_to_delete:
        try:
            storage.delete_file(filename_to_delete)
        except Exception:
            pass

    return {"status": "deleted", "document_id": document_id}


@router.get("/api/documents/{document_id}/download", tags=["Documents"])
async def download_document(request: Request, 
    document_id: str,
    current_user: User = Depends(get_current_user)
):
    """Download an uploaded document"""
    from fastapi.responses import FileResponse
    
    # 1. Look up the real filename associated with this hashed ID
    query = "MATCH (d:Document {id: $doc_id}) RETURN d.filename as filename"
    results = await request.app.state.graph_store.execute_query(query, {"doc_id": document_id})
    
    filename_target = results[0]["filename"] if results and results[0].get("filename") else None
    
    if filename_target:
        possible_path = settings.upload_dir / filename_target
        if possible_path.exists():
            return FileResponse(
                path=possible_path,
                filename=filename_target,
                content_disposition_type="inline"
            )
            
    # 2. Backups: Iterate and match stem or try URL fallback
    for f in settings.upload_dir.iterdir():
        if f.stem == document_id or f.name.startswith(document_id):
            return FileResponse(
                path=f,
                filename=f.name,
                content_disposition_type="inline"
            )
            
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail=f"Document file '{filename_target}' not found on disk"
    )




@router.get("/api/documents/{document_id}/preview", tags=["Documents"])
async def preview_document(request: Request, 
    document_id: str,
    current_user: User = Depends(get_current_user)
):
    """Return raw text content of a document for in-app preview (works for .txt, .md scraped files)."""
    from fastapi.responses import JSONResponse

    query = "MATCH (d:Document {id: $doc_id}) RETURN d.filename as filename, d.file_type as file_type"
    results = await request.app.state.graph_store.execute_query(query, {"doc_id": document_id})

    if not results or not results[0].get("filename"):
        raise HTTPException(status_code=404, detail="Document not found in graph")

    filename = results[0]["filename"]
    file_type = results[0]["file_type"] or ""
    file_path = settings.upload_dir / filename

    if not file_path.exists():
        raise HTTPException(status_code=404, detail=f"File '{filename}' not found on disk")

    if file_type.lower() not in (".txt", ".md", ""):
        raise HTTPException(status_code=415, detail="Preview only supported for text files. Use download for PDFs.")

    try:
        content = file_path.read_text(encoding="utf-8", errors="replace")
        word_count = len(content.split())
        char_count = len(content)
        return JSONResponse({
            "filename": filename,
            "file_type": file_type,
            "word_count": word_count,
            "char_count": char_count,
            "content": content
        })
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to read file: {str(e)}")



@router.get("/api/documents/status/{task_id}", response_model=IngestionStatusResponse, tags=["Documents"])
async def get_ingestion_status(request: Request, 
    task_id: str,
    current_user: User = Depends(get_current_user)
):
    """Get ingestion task status"""
    
    task = AsyncResult(task_id, app=celery_app)
    
    if task.state == 'PENDING':
        response = {
            "task_id": task_id,
            "status": "pending",
            "progress": None,
            "result": None
        }
    elif task.state == 'PROCESSING':
        response = {
            "task_id": task_id,
            "status": "processing",
            "progress": task.info,
            "result": None
        }
    elif task.state == 'SUCCESS':
        response = {
            "task_id": task_id,
            "status": "completed",
            "progress": None,
            "result": task.info
        }
    else:
        response = {
            "task_id": task_id,
            "status": task.state.lower(),
            "progress": None,
            "result": str(task.info) if task.info else None
        }
    
    return IngestionStatusResponse(**response)


# Conversations / Memory Endpoints