← full-stack-fastapi-template / backend/app/crud.py
| 1 | import uuid |
| 2 | from typing import Any |
| 3 | |
| 4 | from sqlmodel import Session, select |
| 5 | |
| 6 | from app.core.security import get_password_hash, verify_password |
| 7 | from app.models import Item, ItemCreate, User, UserCreate, UserUpdate |
| 8 | |
| 9 | |
| 10 | def create_user(*, session: Session, user_create: UserCreate) -> User: |
| 11 | db_obj = User.model_validate( |
| 12 | user_create, update={"hashed_password": get_password_hash(user_create.password)} |
| 13 | ) |
| 14 | session.add(db_obj) |
| 15 | session.commit() |
| 16 | session.refresh(db_obj) |
| 17 | return db_obj |
| 18 | |
| 19 | |
| 20 | def update_user(*, session: Session, db_user: User, user_in: UserUpdate) -> Any: |
| 21 | user_data = user_in.model_dump(exclude_unset=True) |
| 22 | extra_data = {} |
| 23 | if "password" in user_data: |
| 24 | password = user_data["password"] |
| 25 | hashed_password = get_password_hash(password) |
| 26 | extra_data["hashed_password"] = hashed_password |
| 27 | db_user.sqlmodel_update(user_data, update=extra_data) |
| 28 | session.add(db_user) |
| 29 | session.commit() |
| 30 | session.refresh(db_user) |
| 31 | return db_user |
| 32 | |
| 33 | |
| 34 | def get_user_by_email(*, session: Session, email: str) -> User | None: |
| 35 | statement = select(User).where(User.email == email) |
| 36 | session_user = session.exec(statement).first() |
| 37 | return session_user |
| 38 | |
| 39 | |
| 40 | # Dummy hash to use for timing attack prevention when user is not found |
| 41 | # This is an Argon2 hash of a random password, used to ensure constant-time comparison |
| 42 | DUMMY_HASH = "$argon2id$v=19$m=65536,t=3,p=4$MjQyZWE1MzBjYjJlZTI0Yw$YTU4NGM5ZTZmYjE2NzZlZjY0ZWY3ZGRkY2U2OWFjNjk" |
| 43 | |
| 44 | |
| 45 | def authenticate(*, session: Session, email: str, password: str) -> User | None: |
| 46 | db_user = get_user_by_email(session=session, email=email) |
| 47 | if not db_user: |
| 48 | # Prevent timing attacks by running password verification even when user doesn't exist |
| 49 | # This ensures the response time is similar whether or not the email exists |
| 50 | verify_password(password, DUMMY_HASH) |
| 51 | return None |
| 52 | verified, updated_password_hash = verify_password(password, db_user.hashed_password) |
| 53 | if not verified: |
| 54 | return None |
| 55 | if updated_password_hash: |
| 56 | db_user.hashed_password = updated_password_hash |
| 57 | session.add(db_user) |
| 58 | session.commit() |
| 59 | session.refresh(db_user) |
| 60 | return db_user |
| 61 | |
| 62 | |
| 63 | def create_item(*, session: Session, item_in: ItemCreate, owner_id: uuid.UUID) -> Item: |
| 64 | db_item = Item.model_validate(item_in, update={"owner_id": owner_id}) |
| 65 | session.add(db_item) |
| 66 | session.commit() |
| 67 | session.refresh(db_item) |
| 68 | return db_item |
| 69 |