← full-stack-fastapi-template  /  backend/app/crud.py

1
import uuid
2
from typing import Any
3
4
from sqlmodel import Session, select
5
6
from app.core.security import get_password_hash, verify_password
7
from app.models import Item, ItemCreate, User, UserCreate, UserUpdate
8
9
10
def create_user(*, session: Session, user_create: UserCreate) -> User:
11
    db_obj = User.model_validate(
12
        user_create, update={"hashed_password": get_password_hash(user_create.password)}
13
    )
14
    session.add(db_obj)
15
    session.commit()
16
    session.refresh(db_obj)
17
    return db_obj
18
19
20
def update_user(*, session: Session, db_user: User, user_in: UserUpdate) -> Any:
21
    user_data = user_in.model_dump(exclude_unset=True)
22
    extra_data = {}
23
    if "password" in user_data:
24
        password = user_data["password"]
25
        hashed_password = get_password_hash(password)
26
        extra_data["hashed_password"] = hashed_password
27
    db_user.sqlmodel_update(user_data, update=extra_data)
28
    session.add(db_user)
29
    session.commit()
30
    session.refresh(db_user)
31
    return db_user
32
33
34
def get_user_by_email(*, session: Session, email: str) -> User | None:
35
    statement = select(User).where(User.email == email)
36
    session_user = session.exec(statement).first()
37
    return session_user
38
39
40
# Dummy hash to use for timing attack prevention when user is not found
41
# This is an Argon2 hash of a random password, used to ensure constant-time comparison
42
DUMMY_HASH = "$argon2id$v=19$m=65536,t=3,p=4$MjQyZWE1MzBjYjJlZTI0Yw$YTU4NGM5ZTZmYjE2NzZlZjY0ZWY3ZGRkY2U2OWFjNjk"
43
44
45
def authenticate(*, session: Session, email: str, password: str) -> User | None:
46
    db_user = get_user_by_email(session=session, email=email)
47
    if not db_user:
48
        # Prevent timing attacks by running password verification even when user doesn't exist
49
        # This ensures the response time is similar whether or not the email exists
50
        verify_password(password, DUMMY_HASH)
51
        return None
52
    verified, updated_password_hash = verify_password(password, db_user.hashed_password)
53
    if not verified:
54
        return None
55
    if updated_password_hash:
56
        db_user.hashed_password = updated_password_hash
57
        session.add(db_user)
58
        session.commit()
59
        session.refresh(db_user)
60
    return db_user
61
62
63
def create_item(*, session: Session, item_in: ItemCreate, owner_id: uuid.UUID) -> Item:
64
    db_item = Item.model_validate(item_in, update={"owner_id": owner_id})
65
    session.add(db_item)
66
    session.commit()
67
    session.refresh(db_item)
68
    return db_item
69