File size: 7,746 Bytes
6bed18e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 | import pytest
from datetime import timedelta
from jose import jwt
from backend.src.auth.security import create_access_token, verify_token
from backend.src.core.config import settings
def test_create_valid_access_token():
"""Test that creating a valid access token works correctly"""
user_data = {"user_id": "test_user_123", "role": "user"}
token = create_access_token(data=user_data)
# Verify that the token was created
assert token is not None
assert isinstance(token, str)
assert len(token) > 0
# Verify that the token can be decoded and contains the right data
decoded_payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
assert decoded_payload["user_id"] == "test_user_123"
assert decoded_payload["role"] == "user"
assert "exp" in decoded_payload
def test_create_access_token_with_custom_expiry():
"""Test that creating a token with custom expiry works correctly"""
user_data = {"user_id": "expiry_test_user", "role": "admin"}
# Create a token that expires in 1 hour
token = create_access_token(data=user_data, expires_delta=timedelta(hours=1))
# Decode the token without verification to check expiry
decoded_payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
assert decoded_payload["user_id"] == "expiry_test_user"
assert decoded_payload["role"] == "admin"
# Check that expiry is approximately 1 hour from now
import time
current_time = time.time()
exp_time = decoded_payload["exp"]
expected_exp = current_time + (60 * 60) # 1 hour in seconds
# Allow for a small time difference (max 10 seconds)
assert abs(exp_time - expected_exp) < 10
def test_verify_valid_token():
"""Test that verifying a valid token returns the correct payload"""
user_data = {"user_id": "valid_user", "role": "user"}
token = create_access_token(data=user_data)
payload = verify_token(token)
assert payload is not None
assert payload["user_id"] == "valid_user"
assert payload["role"] == "user"
def test_verify_invalid_token():
"""Test that verifying an invalid token returns None"""
invalid_token = "invalid.token.string"
payload = verify_token(invalid_token)
assert payload is None
def test_verify_expired_token():
"""Test that verifying an expired token returns None"""
user_data = {"user_id": "expired_user", "role": "user"}
# Create a token that expires immediately
expired_token = create_access_token(data=user_data, expires_delta=timedelta(seconds=-1))
payload = verify_token(expired_token)
# The expired token should not be verified successfully
assert payload is None
def test_verify_token_with_different_secret():
"""Test that verifying a token with a different secret returns None"""
user_data = {"user_id": "different_secret_user", "role": "user"}
token = create_access_token(data=user_data)
# Try to decode with a different secret - this should raise an exception
different_secret = "different_secret_key"
try:
payload = jwt.decode(token, different_secret, algorithms=[settings.JWT_ALGORITHM])
# If decoding succeeds with wrong secret, the test should fail
assert False, "Token should not be valid with different secret"
except Exception:
# This is expected - the token should not be valid with a different secret
pass
def test_token_contains_required_claims():
"""Test that tokens contain all required claims"""
user_data = {
"user_id": "claims_test_user",
"role": "admin",
"email": "test@example.com"
}
token = create_access_token(data=user_data)
decoded_payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
# Check that all original data is preserved
assert decoded_payload["user_id"] == "claims_test_user"
assert decoded_payload["role"] == "admin"
assert decoded_payload["email"] == "test@example.com"
# Check that expiration is added
assert "exp" in decoded_payload
def test_token_algorithm_compliance():
"""Test that tokens are created and verified with the configured algorithm"""
user_data = {"user_id": "algorithm_test_user", "role": "user"}
token = create_access_token(data=user_data)
# Verify using the configured algorithm
decoded_payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
assert decoded_payload["user_id"] == "algorithm_test_user"
def test_empty_user_data_token():
"""Test that tokens can be created with minimal data"""
user_data = {} # Empty dictionary
token = create_access_token(data=user_data)
decoded_payload = verify_token(token)
# Should contain expiration but no other claims
assert decoded_payload is not None
assert "exp" in decoded_payload
def test_large_user_data_token():
"""Test that tokens handle large user data payloads correctly"""
large_data = {
"user_id": "large_data_user",
"role": "user",
"profile": "x" * 1000, # Large string
"preferences": {
"theme": "dark",
"notifications": True,
"settings": list(range(100)) # Large list
}
}
token = create_access_token(data=large_data)
payload = verify_token(token)
assert payload is not None
assert payload["user_id"] == "large_data_user"
assert len(payload["profile"]) == 1000
def test_concurrent_token_creation():
"""Test that multiple tokens can be created concurrently without issues"""
import concurrent.futures
import threading
results = []
def create_token_for_user(user_id):
user_data = {"user_id": user_id, "role": "user"}
token = create_access_token(data=user_data)
payload = verify_token(token)
return (user_id, token, payload)
# Create 5 tokens in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(create_token_for_user, f"concurrent_user_{i}")
for i in range(5)
]
for future in concurrent.futures.as_completed(futures):
user_id, token, payload = future.result()
results.append((user_id, token, payload))
# Verify all tokens were created and verified correctly
assert len(results) == 5
for user_id, token, payload in results:
assert token is not None
assert payload is not None
assert payload["user_id"] == user_id
def test_unicode_user_data():
"""Test that tokens handle Unicode characters correctly"""
unicode_data = {
"user_id": "用户测试",
"name": "José María",
"role": "测试角色"
}
token = create_access_token(data=unicode_data)
payload = verify_token(token)
assert payload is not None
assert payload["user_id"] == "用户测试"
assert payload["name"] == "José María"
assert payload["role"] == "测试角色"
def test_token_security_best_practices():
"""Test that tokens follow security best practices"""
user_data = {"user_id": "security_test_user", "role": "user"}
token = create_access_token(data=user_data)
# Verify token format (should be three parts separated by dots)
parts = token.split('.')
assert len(parts) == 3
# Verify that header contains expected algorithm
import base64
header_json = base64.b64decode(parts[0] + '==').decode('utf-8')
import json
header = json.loads(header_json)
assert header["alg"] == settings.JWT_ALGORITHM
assert header["typ"] == "JWT"
if __name__ == "__main__":
pytest.main([__file__]) |