File size: 6,580 Bytes
6bed18e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import pytest
from datetime import timedelta
from jose import jwt
from backend.src.auth.security import create_access_token, verify_token
from backend.src.core.config import settings


def test_create_valid_jwt_token():
    """Test that creating a JWT token works correctly"""
    user_data = {"user_id": "test_user_123", "role": "user"}

    token = create_access_token(data=user_data)

    # Verify the token was created
    assert token is not None
    assert isinstance(token, str)
    assert len(token) > 0

    # Verify the token can be decoded
    decoded_payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
    assert decoded_payload["user_id"] == "test_user_123"
    assert decoded_payload["role"] == "user"
    assert "exp" in decoded_payload


def test_create_token_with_custom_expiry():
    """Test that creating a token with custom expiry works"""
    user_data = {"user_id": "expiry_test_user", "role": "user"}
    expiry_delta = timedelta(minutes=30)

    token = create_access_token(data=user_data, expires_delta=expiry_delta)

    # Decode the token without verification to check expiry
    decoded_payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])

    # Check that expiry is approximately 30 minutes from now
    import time
    current_time = time.time()
    exp_time = decoded_payload["exp"]

    # Should be approximately 30 minutes (1800 seconds) from now
    assert abs(exp_time - current_time - 1800) < 10  # Allow 10 second tolerance


def test_verify_valid_token():
    """Test that verifying a valid token returns the correct payload"""
    user_data = {"user_id": "verify_test_user", "role": "admin"}
    token = create_access_token(data=user_data)

    payload = verify_token(token)

    assert payload is not None
    assert payload["user_id"] == "verify_test_user"
    assert payload["role"] == "admin"


def test_verify_invalid_token():
    """Test that verifying an invalid token returns None"""
    invalid_token = "this.is.not.a.valid.jwt.token"

    payload = verify_token(invalid_token)

    assert payload is None


def test_verify_expired_token():
    """Test that verifying an expired token returns None"""
    user_data = {"user_id": "expired_test_user", "role": "user"}
    # Create a token that expires immediately
    token = create_access_token(data=user_data, expires_delta=timedelta(seconds=-1))

    payload = verify_token(token)

    assert payload is None


def test_verify_token_with_different_secret():
    """Test that verifying a token with wrong secret returns None"""
    user_data = {"user_id": "wrong_secret_user", "role": "user"}
    token = create_access_token(data=user_data)

    # Try to verify with a different secret
    different_secret = "different_secret_key_that_does_not_match"
    try:
        payload = jwt.decode(token, different_secret, algorithms=[settings.JWT_ALGORITHM])
        # If this doesn't raise an exception, the payload should be invalid
        assert False, "Expected JWTError was not raised"
    except jwt.JWTError:
        # Expected behavior - token should not be valid with different secret
        pass


def test_token_contains_correct_claims():
    """Test that tokens contain the expected claims"""
    user_data = {
        "user_id": "claims_test_user",
        "role": "tester",
        "email": "test@example.com",
        "custom_field": "custom_value"
    }
    token = create_access_token(data=user_data)

    decoded_payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])

    # Check that all original data is preserved
    assert decoded_payload["user_id"] == "claims_test_user"
    assert decoded_payload["role"] == "tester"
    assert decoded_payload["email"] == "test@example.com"
    assert decoded_payload["custom_field"] == "custom_value"

    # Check that expiration is added
    assert "exp" in decoded_payload


def test_token_algorithm_compliance():
    """Test that tokens are created and verified with the configured algorithm"""
    user_data = {"user_id": "algorithm_test_user", "role": "user"}
    token = create_access_token(data=user_data)

    # Verify using the configured algorithm
    payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])

    assert payload["user_id"] == "algorithm_test_user"


def test_empty_user_data_token():
    """Test creating and verifying a token with minimal data"""
    user_data = {}  # Empty dictionary
    token = create_access_token(data=user_data)

    payload = verify_token(token)

    # Should contain expiration but no other claims
    assert payload is not None
    assert "exp" in payload


def test_large_user_data_token():
    """Test creating and verifying a token with large data payload"""
    large_data = {"user_id": "large_data_user", "data": "x" * 1000}  # Large string
    token = create_access_token(data=large_data)

    payload = verify_token(token)

    assert payload is not None
    assert payload["user_id"] == "large_data_user"
    assert len(payload["data"]) == 1000


def test_token_unicode_support():
    """Test that tokens handle Unicode characters correctly"""
    unicode_data = {"user_id": "用户测试", "name": "José María", "role": "测试角色"}
    token = create_access_token(data=unicode_data)

    payload = verify_token(token)

    assert payload is not None
    assert payload["user_id"] == "用户测试"
    assert payload["name"] == "José María"
    assert payload["role"] == "测试角色"


def test_concurrent_token_creation():
    """Test that multiple tokens can be created concurrently without issues"""
    import threading
    import time

    results = []

    def create_token_and_store(index):
        user_data = {"user_id": f"concurrent_user_{index}", "role": "user"}
        token = create_access_token(data=user_data)
        payload = verify_token(token)
        results.append((index, token, payload))

    # Create 5 tokens in parallel
    threads = []
    for i in range(5):
        thread = threading.Thread(target=create_token_and_store, args=(i,))
        threads.append(thread)
        thread.start()

    # Wait for all threads to complete
    for thread in threads:
        thread.join()

    # Verify all tokens were created and verified correctly
    assert len(results) == 5
    for index, token, payload in results:
        assert token is not None
        assert payload is not None
        assert payload["user_id"] == f"concurrent_user_{index}"


if __name__ == "__main__":
    pytest.main([__file__])