| import pytest |
| from datetime import timedelta |
| from jose import jwt |
| from backend.src.auth.security import create_access_token, verify_token |
| from backend.src.core.config import settings |
|
|
|
|
| def test_create_valid_jwt_token(): |
| """Test that creating a JWT token works correctly""" |
| user_data = {"user_id": "test_user_123", "role": "user"} |
|
|
| token = create_access_token(data=user_data) |
|
|
| |
| assert token is not None |
| assert isinstance(token, str) |
| assert len(token) > 0 |
|
|
| |
| decoded_payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]) |
| assert decoded_payload["user_id"] == "test_user_123" |
| assert decoded_payload["role"] == "user" |
| assert "exp" in decoded_payload |
|
|
|
|
| def test_create_token_with_custom_expiry(): |
| """Test that creating a token with custom expiry works""" |
| user_data = {"user_id": "expiry_test_user", "role": "user"} |
| expiry_delta = timedelta(minutes=30) |
|
|
| token = create_access_token(data=user_data, expires_delta=expiry_delta) |
|
|
| |
| decoded_payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]) |
|
|
| |
| import time |
| current_time = time.time() |
| exp_time = decoded_payload["exp"] |
|
|
| |
| assert abs(exp_time - current_time - 1800) < 10 |
|
|
|
|
| def test_verify_valid_token(): |
| """Test that verifying a valid token returns the correct payload""" |
| user_data = {"user_id": "verify_test_user", "role": "admin"} |
| token = create_access_token(data=user_data) |
|
|
| payload = verify_token(token) |
|
|
| assert payload is not None |
| assert payload["user_id"] == "verify_test_user" |
| assert payload["role"] == "admin" |
|
|
|
|
| def test_verify_invalid_token(): |
| """Test that verifying an invalid token returns None""" |
| invalid_token = "this.is.not.a.valid.jwt.token" |
|
|
| payload = verify_token(invalid_token) |
|
|
| assert payload is None |
|
|
|
|
| def test_verify_expired_token(): |
| """Test that verifying an expired token returns None""" |
| user_data = {"user_id": "expired_test_user", "role": "user"} |
| |
| token = create_access_token(data=user_data, expires_delta=timedelta(seconds=-1)) |
|
|
| payload = verify_token(token) |
|
|
| assert payload is None |
|
|
|
|
| def test_verify_token_with_different_secret(): |
| """Test that verifying a token with wrong secret returns None""" |
| user_data = {"user_id": "wrong_secret_user", "role": "user"} |
| token = create_access_token(data=user_data) |
|
|
| |
| different_secret = "different_secret_key_that_does_not_match" |
| try: |
| payload = jwt.decode(token, different_secret, algorithms=[settings.JWT_ALGORITHM]) |
| |
| assert False, "Expected JWTError was not raised" |
| except jwt.JWTError: |
| |
| pass |
|
|
|
|
| def test_token_contains_correct_claims(): |
| """Test that tokens contain the expected claims""" |
| user_data = { |
| "user_id": "claims_test_user", |
| "role": "tester", |
| "email": "test@example.com", |
| "custom_field": "custom_value" |
| } |
| token = create_access_token(data=user_data) |
|
|
| decoded_payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]) |
|
|
| |
| assert decoded_payload["user_id"] == "claims_test_user" |
| assert decoded_payload["role"] == "tester" |
| assert decoded_payload["email"] == "test@example.com" |
| assert decoded_payload["custom_field"] == "custom_value" |
|
|
| |
| assert "exp" in decoded_payload |
|
|
|
|
| def test_token_algorithm_compliance(): |
| """Test that tokens are created and verified with the configured algorithm""" |
| user_data = {"user_id": "algorithm_test_user", "role": "user"} |
| token = create_access_token(data=user_data) |
|
|
| |
| payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]) |
|
|
| assert payload["user_id"] == "algorithm_test_user" |
|
|
|
|
| def test_empty_user_data_token(): |
| """Test creating and verifying a token with minimal data""" |
| user_data = {} |
| token = create_access_token(data=user_data) |
|
|
| payload = verify_token(token) |
|
|
| |
| assert payload is not None |
| assert "exp" in payload |
|
|
|
|
| def test_large_user_data_token(): |
| """Test creating and verifying a token with large data payload""" |
| large_data = {"user_id": "large_data_user", "data": "x" * 1000} |
| token = create_access_token(data=large_data) |
|
|
| payload = verify_token(token) |
|
|
| assert payload is not None |
| assert payload["user_id"] == "large_data_user" |
| assert len(payload["data"]) == 1000 |
|
|
|
|
| def test_token_unicode_support(): |
| """Test that tokens handle Unicode characters correctly""" |
| unicode_data = {"user_id": "用户测试", "name": "José María", "role": "测试角色"} |
| token = create_access_token(data=unicode_data) |
|
|
| payload = verify_token(token) |
|
|
| assert payload is not None |
| assert payload["user_id"] == "用户测试" |
| assert payload["name"] == "José María" |
| assert payload["role"] == "测试角色" |
|
|
|
|
| def test_concurrent_token_creation(): |
| """Test that multiple tokens can be created concurrently without issues""" |
| import threading |
| import time |
|
|
| results = [] |
|
|
| def create_token_and_store(index): |
| user_data = {"user_id": f"concurrent_user_{index}", "role": "user"} |
| token = create_access_token(data=user_data) |
| payload = verify_token(token) |
| results.append((index, token, payload)) |
|
|
| |
| threads = [] |
| for i in range(5): |
| thread = threading.Thread(target=create_token_and_store, args=(i,)) |
| threads.append(thread) |
| thread.start() |
|
|
| |
| for thread in threads: |
| thread.join() |
|
|
| |
| assert len(results) == 5 |
| for index, token, payload in results: |
| assert token is not None |
| assert payload is not None |
| assert payload["user_id"] == f"concurrent_user_{index}" |
|
|
|
|
| if __name__ == "__main__": |
| pytest.main([__file__]) |