← full-stack-fastapi-template  /  backend/tests/api/routes/test_users.py

1
import uuid
2
from unittest.mock import patch
3
4
from fastapi.testclient import TestClient
5
from sqlmodel import Session, select
6
7
from app import crud
8
from app.core.config import settings
9
from app.core.security import verify_password
10
from app.models import User, UserCreate
11
from tests.utils.user import create_random_user
12
from tests.utils.utils import random_email, random_lower_string
13
14
15
def test_get_users_superuser_me(
16
    client: TestClient, superuser_token_headers: dict[str, str]
17
) -> None:
18
    r = client.get(f"{settings.API_V1_STR}/users/me", headers=superuser_token_headers)
19
    current_user = r.json()
20
    assert current_user
21
    assert current_user["is_active"] is True
22
    assert current_user["is_superuser"]
23
    assert current_user["email"] == settings.FIRST_SUPERUSER
24
25
26
def test_get_users_normal_user_me(
27
    client: TestClient, normal_user_token_headers: dict[str, str]
28
) -> None:
29
    r = client.get(f"{settings.API_V1_STR}/users/me", headers=normal_user_token_headers)
30
    current_user = r.json()
31
    assert current_user
32
    assert current_user["is_active"] is True
33
    assert current_user["is_superuser"] is False
34
    assert current_user["email"] == settings.EMAIL_TEST_USER
35
36
37
def test_create_user_new_email(
38
    client: TestClient, superuser_token_headers: dict[str, str], db: Session
39
) -> None:
40
    with (
41
        patch("app.utils.send_email", return_value=None),
42
        patch("app.core.config.settings.SMTP_HOST", "smtp.example.com"),
43
        patch("app.core.config.settings.SMTP_USER", "admin@example.com"),
44
    ):
45
        username = random_email()
46
        password = random_lower_string()
47
        data = {"email": username, "password": password}
48
        r = client.post(
49
            f"{settings.API_V1_STR}/users/",
50
            headers=superuser_token_headers,
51
            json=data,
52
        )
53
        assert 200 <= r.status_code < 300
54
        created_user = r.json()
55
        user = crud.get_user_by_email(session=db, email=username)
56
        assert user
57
        assert user.email == created_user["email"]
58
59
60
def test_get_existing_user_as_superuser(
61
    client: TestClient, superuser_token_headers: dict[str, str], db: Session
62
) -> None:
63
    username = random_email()
64
    password = random_lower_string()
65
    user_in = UserCreate(email=username, password=password)
66
    user = crud.create_user(session=db, user_create=user_in)
67
    user_id = user.id
68
    r = client.get(
69
        f"{settings.API_V1_STR}/users/{user_id}",
70
        headers=superuser_token_headers,
71
    )
72
    assert 200 <= r.status_code < 300
73
    api_user = r.json()
74
    existing_user = crud.get_user_by_email(session=db, email=username)
75
    assert existing_user
76
    assert existing_user.email == api_user["email"]
77
78
79
def test_get_non_existing_user_as_superuser(
80
    client: TestClient, superuser_token_headers: dict[str, str]
81
) -> None:
82
    r = client.get(
83
        f"{settings.API_V1_STR}/users/{uuid.uuid4()}",
84
        headers=superuser_token_headers,
85
    )
86
    assert r.status_code == 404
87
    assert r.json() == {"detail": "User not found"}
88
89
90
def test_get_existing_user_current_user(client: TestClient, db: Session) -> None:
91
    username = random_email()
92
    password = random_lower_string()
93
    user_in = UserCreate(email=username, password=password)
94
    user = crud.create_user(session=db, user_create=user_in)
95
    user_id = user.id
96
97
    login_data = {
98
        "username": username,
99
        "password": password,
100
    }
101
    r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
102
    tokens = r.json()
103
    a_token = tokens["access_token"]
104
    headers = {"Authorization": f"Bearer {a_token}"}
105
106
    r = client.get(
107
        f"{settings.API_V1_STR}/users/{user_id}",
108
        headers=headers,
109
    )
110
    assert 200 <= r.status_code < 300
111
    api_user = r.json()
112
    existing_user = crud.get_user_by_email(session=db, email=username)
113
    assert existing_user
114
    assert existing_user.email == api_user["email"]
115
116
117
def test_get_existing_user_permissions_error(
118
    db: Session,
119
    client: TestClient,
120
    normal_user_token_headers: dict[str, str],
121
) -> None:
122
    user = create_random_user(db)
123
124
    r = client.get(
125
        f"{settings.API_V1_STR}/users/{user.id}",
126
        headers=normal_user_token_headers,
127
    )
128
    assert r.status_code == 403
129
    assert r.json() == {"detail": "The user doesn't have enough privileges"}
130
131
132
def test_get_non_existing_user_permissions_error(
133
    client: TestClient,
134
    normal_user_token_headers: dict[str, str],
135
) -> None:
136
    user_id = uuid.uuid4()
137
138
    r = client.get(
139
        f"{settings.API_V1_STR}/users/{user_id}",
140
        headers=normal_user_token_headers,
141
    )
142
    assert r.status_code == 403
143
    assert r.json() == {"detail": "The user doesn't have enough privileges"}
144
145
146
def test_create_user_existing_username(
147
    client: TestClient, superuser_token_headers: dict[str, str], db: Session
148
) -> None:
149
    username = random_email()
150
    # username = email
151
    password = random_lower_string()
152
    user_in = UserCreate(email=username, password=password)
153
    crud.create_user(session=db, user_create=user_in)
154
    data = {"email": username, "password": password}
155
    r = client.post(
156
        f"{settings.API_V1_STR}/users/",
157
        headers=superuser_token_headers,
158
        json=data,
159
    )
160
    created_user = r.json()
161
    assert r.status_code == 400
162
    assert "_id" not in created_user
163
164
165
def test_create_user_by_normal_user(
166
    client: TestClient, normal_user_token_headers: dict[str, str]
167
) -> None:
168
    username = random_email()
169
    password = random_lower_string()
170
    data = {"email": username, "password": password}
171
    r = client.post(
172
        f"{settings.API_V1_STR}/users/",
173
        headers=normal_user_token_headers,
174
        json=data,
175
    )
176
    assert r.status_code == 403
177
178
179
def test_retrieve_users(
180
    client: TestClient, superuser_token_headers: dict[str, str], db: Session
181
) -> None:
182
    username = random_email()
183
    password = random_lower_string()
184
    user_in = UserCreate(email=username, password=password)
185
    crud.create_user(session=db, user_create=user_in)
186
187
    username2 = random_email()
188
    password2 = random_lower_string()
189
    user_in2 = UserCreate(email=username2, password=password2)
190
    crud.create_user(session=db, user_create=user_in2)
191
192
    r = client.get(f"{settings.API_V1_STR}/users/", headers=superuser_token_headers)
193
    all_users = r.json()
194
195
    assert len(all_users["data"]) > 1
196
    assert "count" in all_users
197
    for item in all_users["data"]:
198
        assert "email" in item
199
200
201
def test_update_user_me(
202
    client: TestClient, normal_user_token_headers: dict[str, str], db: Session
203
) -> None:
204
    full_name = "Updated Name"
205
    email = random_email()
206
    data = {"full_name": full_name, "email": email}
207
    r = client.patch(
208
        f"{settings.API_V1_STR}/users/me",
209
        headers=normal_user_token_headers,
210
        json=data,
211
    )
212
    assert r.status_code == 200
213
    updated_user = r.json()
214
    assert updated_user["email"] == email
215
    assert updated_user["full_name"] == full_name
216
217
    user_query = select(User).where(User.email == email)
218
    user_db = db.exec(user_query).first()
219
    assert user_db
220
    assert user_db.email == email
221
    assert user_db.full_name == full_name
222
223
224
def test_update_password_me(
225
    client: TestClient, superuser_token_headers: dict[str, str], db: Session
226
) -> None:
227
    new_password = random_lower_string()
228
    data = {
229
        "current_password": settings.FIRST_SUPERUSER_PASSWORD,
230
        "new_password": new_password,
231
    }
232
    r = client.patch(
233
        f"{settings.API_V1_STR}/users/me/password",
234
        headers=superuser_token_headers,
235
        json=data,
236
    )
237
    assert r.status_code == 200
238
    updated_user = r.json()
239
    assert updated_user["message"] == "Password updated successfully"
240
241
    user_query = select(User).where(User.email == settings.FIRST_SUPERUSER)
242
    user_db = db.exec(user_query).first()
243
    assert user_db
244
    assert user_db.email == settings.FIRST_SUPERUSER
245
    verified, _ = verify_password(new_password, user_db.hashed_password)
246
    assert verified
247
248
    # Revert to the old password to keep consistency in test
249
    old_data = {
250
        "current_password": new_password,
251
        "new_password": settings.FIRST_SUPERUSER_PASSWORD,
252
    }
253
    r = client.patch(
254
        f"{settings.API_V1_STR}/users/me/password",
255
        headers=superuser_token_headers,
256
        json=old_data,
257
    )
258
    db.refresh(user_db)
259
260
    assert r.status_code == 200
261
    verified, _ = verify_password(
262
        settings.FIRST_SUPERUSER_PASSWORD, user_db.hashed_password
263
    )
264
    assert verified
265
266
267
def test_update_password_me_incorrect_password(
268
    client: TestClient, superuser_token_headers: dict[str, str]
269
) -> None:
270
    new_password = random_lower_string()
271
    data = {"current_password": new_password, "new_password": new_password}
272
    r = client.patch(
273
        f"{settings.API_V1_STR}/users/me/password",
274
        headers=superuser_token_headers,
275
        json=data,
276
    )
277
    assert r.status_code == 400
278
    updated_user = r.json()
279
    assert updated_user["detail"] == "Incorrect password"
280
281
282
def test_update_user_me_email_exists(
283
    client: TestClient, normal_user_token_headers: dict[str, str], db: Session
284
) -> None:
285
    username = random_email()
286
    password = random_lower_string()
287
    user_in = UserCreate(email=username, password=password)
288
    user = crud.create_user(session=db, user_create=user_in)
289
290
    data = {"email": user.email}
291
    r = client.patch(
292
        f"{settings.API_V1_STR}/users/me",
293
        headers=normal_user_token_headers,
294
        json=data,
295
    )
296
    assert r.status_code == 409
297
    assert r.json()["detail"] == "User with this email already exists"
298
299
300
def test_update_password_me_same_password_error(
301
    client: TestClient, superuser_token_headers: dict[str, str]
302
) -> None:
303
    data = {
304
        "current_password": settings.FIRST_SUPERUSER_PASSWORD,
305
        "new_password": settings.FIRST_SUPERUSER_PASSWORD,
306
    }
307
    r = client.patch(
308
        f"{settings.API_V1_STR}/users/me/password",
309
        headers=superuser_token_headers,
310
        json=data,
311
    )
312
    assert r.status_code == 400
313
    updated_user = r.json()
314
    assert (
315
        updated_user["detail"] == "New password cannot be the same as the current one"
316
    )
317
318
319
def test_register_user(client: TestClient, db: Session) -> None:
320
    username = random_email()
321
    password = random_lower_string()
322
    full_name = random_lower_string()
323
    data = {"email": username, "password": password, "full_name": full_name}
324
    r = client.post(
325
        f"{settings.API_V1_STR}/users/signup",
326
        json=data,
327
    )
328
    assert r.status_code == 200
329
    created_user = r.json()
330
    assert created_user["email"] == username
331
    assert created_user["full_name"] == full_name
332
333
    user_query = select(User).where(User.email == username)
334
    user_db = db.exec(user_query).first()
335
    assert user_db
336
    assert user_db.email == username
337
    assert user_db.full_name == full_name
338
    verified, _ = verify_password(password, user_db.hashed_password)
339
    assert verified
340
341
342
def test_register_user_already_exists_error(client: TestClient) -> None:
343
    password = random_lower_string()
344
    full_name = random_lower_string()
345
    data = {
346
        "email": settings.FIRST_SUPERUSER,
347
        "password": password,
348
        "full_name": full_name,
349
    }
350
    r = client.post(
351
        f"{settings.API_V1_STR}/users/signup",
352
        json=data,
353
    )
354
    assert r.status_code == 400
355
    assert r.json()["detail"] == "The user with this email already exists in the system"
356
357
358
def test_update_user(
359
    client: TestClient, superuser_token_headers: dict[str, str], db: Session
360
) -> None:
361
    username = random_email()
362
    password = random_lower_string()
363
    user_in = UserCreate(email=username, password=password)
364
    user = crud.create_user(session=db, user_create=user_in)
365
366
    data = {"full_name": "Updated_full_name"}
367
    r = client.patch(
368
        f"{settings.API_V1_STR}/users/{user.id}",
369
        headers=superuser_token_headers,
370
        json=data,
371
    )
372
    assert r.status_code == 200
373
    updated_user = r.json()
374
375
    assert updated_user["full_name"] == "Updated_full_name"
376
377
    user_query = select(User).where(User.email == username)
378
    user_db = db.exec(user_query).first()
379
    db.refresh(user_db)
380
    assert user_db
381
    assert user_db.full_name == "Updated_full_name"
382
383
384
def test_update_user_not_exists(
385
    client: TestClient, superuser_token_headers: dict[str, str]
386
) -> None:
387
    data = {"full_name": "Updated_full_name"}
388
    r = client.patch(
389
        f"{settings.API_V1_STR}/users/{uuid.uuid4()}",
390
        headers=superuser_token_headers,
391
        json=data,
392
    )
393
    assert r.status_code == 404
394
    assert r.json()["detail"] == "The user with this id does not exist in the system"
395
396
397
def test_update_user_email_exists(
398
    client: TestClient, superuser_token_headers: dict[str, str], db: Session
399
) -> None:
400
    username = random_email()
401
    password = random_lower_string()
402
    user_in = UserCreate(email=username, password=password)
403
    user = crud.create_user(session=db, user_create=user_in)
404
405
    username2 = random_email()
406
    password2 = random_lower_string()
407
    user_in2 = UserCreate(email=username2, password=password2)
408
    user2 = crud.create_user(session=db, user_create=user_in2)
409
410
    data = {"email": user2.email}
411
    r = client.patch(
412
        f"{settings.API_V1_STR}/users/{user.id}",
413
        headers=superuser_token_headers,
414
        json=data,
415
    )
416
    assert r.status_code == 409
417
    assert r.json()["detail"] == "User with this email already exists"
418
419
420
def test_delete_user_me(client: TestClient, db: Session) -> None:
421
    username = random_email()
422
    password = random_lower_string()
423
    user_in = UserCreate(email=username, password=password)
424
    user = crud.create_user(session=db, user_create=user_in)
425
    user_id = user.id
426
427
    login_data = {
428
        "username": username,
429
        "password": password,
430
    }
431
    r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
432
    tokens = r.json()
433
    a_token = tokens["access_token"]
434
    headers = {"Authorization": f"Bearer {a_token}"}
435
436
    r = client.delete(
437
        f"{settings.API_V1_STR}/users/me",
438
        headers=headers,
439
    )
440
    assert r.status_code == 200
441
    deleted_user = r.json()
442
    assert deleted_user["message"] == "User deleted successfully"
443
    result = db.exec(select(User).where(User.id == user_id)).first()
444
    assert result is None
445
446
    user_query = select(User).where(User.id == user_id)
447
    user_db = db.execute(user_query).first()
448
    assert user_db is None
449
450
451
def test_delete_user_me_as_superuser(
452
    client: TestClient, superuser_token_headers: dict[str, str]
453
) -> None:
454
    r = client.delete(
455
        f"{settings.API_V1_STR}/users/me",
456
        headers=superuser_token_headers,
457
    )
458
    assert r.status_code == 403
459
    response = r.json()
460
    assert response["detail"] == "Super users are not allowed to delete themselves"
461
462
463
def test_delete_user_super_user(
464
    client: TestClient, superuser_token_headers: dict[str, str], db: Session
465
) -> None:
466
    username = random_email()
467
    password = random_lower_string()
468
    user_in = UserCreate(email=username, password=password)
469
    user = crud.create_user(session=db, user_create=user_in)
470
    user_id = user.id
471
    r = client.delete(
472
        f"{settings.API_V1_STR}/users/{user_id}",
473
        headers=superuser_token_headers,
474
    )
475
    assert r.status_code == 200
476
    deleted_user = r.json()
477
    assert deleted_user["message"] == "User deleted successfully"
478
    result = db.exec(select(User).where(User.id == user_id)).first()
479
    assert result is None
480
481
482
def test_delete_user_not_found(
483
    client: TestClient, superuser_token_headers: dict[str, str]
484
) -> None:
485
    r = client.delete(
486
        f"{settings.API_V1_STR}/users/{uuid.uuid4()}",
487
        headers=superuser_token_headers,
488
    )
489
    assert r.status_code == 404
490
    assert r.json()["detail"] == "User not found"
491
492
493
def test_delete_user_current_super_user_error(
494
    client: TestClient, superuser_token_headers: dict[str, str], db: Session
495
) -> None:
496
    super_user = crud.get_user_by_email(session=db, email=settings.FIRST_SUPERUSER)
497
    assert super_user
498
    user_id = super_user.id
499
500
    r = client.delete(
501
        f"{settings.API_V1_STR}/users/{user_id}",
502
        headers=superuser_token_headers,
503
    )
504
    assert r.status_code == 403
505
    assert r.json()["detail"] == "Super users are not allowed to delete themselves"
506
507
508
def test_delete_user_without_privileges(
509
    client: TestClient, normal_user_token_headers: dict[str, str], db: Session
510
) -> None:
511
    username = random_email()
512
    password = random_lower_string()
513
    user_in = UserCreate(email=username, password=password)
514
    user = crud.create_user(session=db, user_create=user_in)
515
516
    r = client.delete(
517
        f"{settings.API_V1_STR}/users/{user.id}",
518
        headers=normal_user_token_headers,
519
    )
520
    assert r.status_code == 403
521
    assert r.json()["detail"] == "The user doesn't have enough privileges"
522