Paijo commited on
update tests/integration/test_admin_security.py
Browse files
tests/integration/test_admin_security.py
ADDED
|
@@ -0,0 +1,149 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""
|
| 2 |
+
Integration tests for admin endpoint security.
|
| 3 |
+
Tests that admin endpoints properly enforce authorization.
|
| 4 |
+
"""
|
| 5 |
+
|
| 6 |
+
import pytest
|
| 7 |
+
from fastapi.testclient import TestClient
|
| 8 |
+
from app.main import app
|
| 9 |
+
from app.dependencies import get_current_user
|
| 10 |
+
from app.db_models import User
|
| 11 |
+
|
| 12 |
+
client = TestClient(app)
|
| 13 |
+
|
| 14 |
+
|
| 15 |
+
def test_admin_endpoints_require_authentication():
|
| 16 |
+
"""Test that admin endpoints return 401 when not authenticated."""
|
| 17 |
+
# Try to access admin stats without authentication
|
| 18 |
+
response = client.get("/api/v1/admin/validation-stats")
|
| 19 |
+
assert response.status_code == 401 or response.status_code == 403
|
| 20 |
+
|
| 21 |
+
response = client.get("/api/v1/admin/quality-distribution")
|
| 22 |
+
assert response.status_code == 401 or response.status_code == 403
|
| 23 |
+
|
| 24 |
+
response = client.get("/api/v1/admin/recent-validations")
|
| 25 |
+
assert response.status_code == 401 or response.status_code == 403
|
| 26 |
+
|
| 27 |
+
|
| 28 |
+
def test_admin_endpoints_reject_regular_users():
|
| 29 |
+
"""Test that admin endpoints return 403 when accessed by non-admin users."""
|
| 30 |
+
|
| 31 |
+
# Mock a regular user (not admin)
|
| 32 |
+
def get_mock_user():
|
| 33 |
+
return User(
|
| 34 |
+
id=1,
|
| 35 |
+
oauth_provider="test",
|
| 36 |
+
oauth_id="test123",
|
| 37 |
+
email="user@test.com",
|
| 38 |
+
username="testuser",
|
| 39 |
+
role="user", # NOT admin
|
| 40 |
+
)
|
| 41 |
+
|
| 42 |
+
app.dependency_overrides[get_current_user] = get_mock_user
|
| 43 |
+
|
| 44 |
+
try:
|
| 45 |
+
response = client.get("/api/v1/admin/validation-stats")
|
| 46 |
+
assert response.status_code == 403
|
| 47 |
+
assert "Admin access required" in response.json().get("detail", "")
|
| 48 |
+
|
| 49 |
+
response = client.get("/api/v1/admin/quality-distribution")
|
| 50 |
+
assert response.status_code == 403
|
| 51 |
+
|
| 52 |
+
response = client.get("/api/v1/admin/recent-validations")
|
| 53 |
+
assert response.status_code == 403
|
| 54 |
+
finally:
|
| 55 |
+
app.dependency_overrides.clear()
|
| 56 |
+
|
| 57 |
+
|
| 58 |
+
def test_scrape_endpoints_require_admin():
|
| 59 |
+
"""Test that scrape endpoints require admin authentication."""
|
| 60 |
+
# Test without authentication
|
| 61 |
+
response = client.post(
|
| 62 |
+
"/api/v1/proxies/scrape",
|
| 63 |
+
json={"url": "https://example.com/proxies.txt", "type": "github_raw"},
|
| 64 |
+
)
|
| 65 |
+
assert response.status_code == 401 or response.status_code == 403
|
| 66 |
+
|
| 67 |
+
response = client.post("/api/v1/proxies/demo")
|
| 68 |
+
assert response.status_code == 401 or response.status_code == 403
|
| 69 |
+
|
| 70 |
+
response = client.post("/api/v1/proxies/scrape-all")
|
| 71 |
+
assert response.status_code == 401 or response.status_code == 403
|
| 72 |
+
|
| 73 |
+
|
| 74 |
+
def test_scrape_endpoints_reject_regular_users():
|
| 75 |
+
"""Test that scrape endpoints reject regular users."""
|
| 76 |
+
|
| 77 |
+
def get_mock_user():
|
| 78 |
+
return User(
|
| 79 |
+
id=1,
|
| 80 |
+
oauth_provider="test",
|
| 81 |
+
oauth_id="test123",
|
| 82 |
+
email="user@test.com",
|
| 83 |
+
username="testuser",
|
| 84 |
+
role="user",
|
| 85 |
+
)
|
| 86 |
+
|
| 87 |
+
app.dependency_overrides[get_current_user] = get_mock_user
|
| 88 |
+
|
| 89 |
+
try:
|
| 90 |
+
response = client.post(
|
| 91 |
+
"/api/v1/proxies/scrape",
|
| 92 |
+
json={"url": "https://example.com/proxies.txt", "type": "github_raw"},
|
| 93 |
+
)
|
| 94 |
+
assert response.status_code == 403
|
| 95 |
+
|
| 96 |
+
response = client.post("/api/v1/proxies/demo")
|
| 97 |
+
assert response.status_code == 403
|
| 98 |
+
|
| 99 |
+
response = client.post("/api/v1/proxies/scrape-all")
|
| 100 |
+
assert response.status_code == 403
|
| 101 |
+
finally:
|
| 102 |
+
app.dependency_overrides.clear()
|
| 103 |
+
|
| 104 |
+
|
| 105 |
+
def test_admin_user_can_access_admin_endpoints():
|
| 106 |
+
"""Test that admin users CAN access admin endpoints."""
|
| 107 |
+
|
| 108 |
+
def get_mock_admin():
|
| 109 |
+
return User(
|
| 110 |
+
id=1,
|
| 111 |
+
oauth_provider="test",
|
| 112 |
+
oauth_id="admin123",
|
| 113 |
+
email="admin@test.com",
|
| 114 |
+
username="admin",
|
| 115 |
+
role="admin", # IS admin
|
| 116 |
+
)
|
| 117 |
+
|
| 118 |
+
app.dependency_overrides[get_current_user] = get_mock_admin
|
| 119 |
+
|
| 120 |
+
try:
|
| 121 |
+
# Admin should be able to access these endpoints
|
| 122 |
+
# (They might return errors due to missing data, but should not return 403)
|
| 123 |
+
response = client.get("/api/v1/admin/validation-stats")
|
| 124 |
+
assert response.status_code != 403 # Should not be forbidden
|
| 125 |
+
|
| 126 |
+
response = client.get("/api/v1/admin/quality-distribution")
|
| 127 |
+
assert response.status_code != 403
|
| 128 |
+
|
| 129 |
+
response = client.get("/api/v1/admin/recent-validations")
|
| 130 |
+
assert response.status_code != 403
|
| 131 |
+
finally:
|
| 132 |
+
app.dependency_overrides.clear()
|
| 133 |
+
|
| 134 |
+
|
| 135 |
+
def test_public_endpoints_remain_accessible():
|
| 136 |
+
"""Test that public endpoints don't require authentication."""
|
| 137 |
+
# These endpoints should be accessible without auth
|
| 138 |
+
response = client.get("/")
|
| 139 |
+
assert response.status_code == 200
|
| 140 |
+
|
| 141 |
+
response = client.get("/health")
|
| 142 |
+
assert response.status_code == 200
|
| 143 |
+
|
| 144 |
+
response = client.get("/api/v1/sources")
|
| 145 |
+
assert response.status_code == 200
|
| 146 |
+
|
| 147 |
+
# Proxies endpoint should work without auth (public service)
|
| 148 |
+
response = client.get("/api/v1/proxies?limit=10")
|
| 149 |
+
assert response.status_code == 200
|