from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from datetime import timedelta from typing import Optional from models import UserCreate, UserLogin, UserResponse, UserInDB, Token from auth import get_password_hash, verify_password, create_access_token, decode_access_token from database import get_database from config import settings from bson import ObjectId router = APIRouter(prefix="/auth", tags=["authentication"]) security = HTTPBearer() async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> UserInDB: """Get current authenticated user""" token = credentials.credentials payload = decode_access_token(token) if payload is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) user_id: str = payload.get("sub") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) db = get_database() user_dict = await db.users.find_one({"_id": ObjectId(user_id)}) if user_dict is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", headers={"WWW-Authenticate": "Bearer"}, ) user_dict["_id"] = str(user_dict["_id"]) return UserInDB(**user_dict) @router.post("/register", response_model=Token, status_code=status.HTTP_201_CREATED) async def register(user: UserCreate): """Register a new user""" db = get_database() # Check if user already exists existing_user = await db.users.find_one({"email": user.email}) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) existing_username = await db.users.find_one({"username": user.username}) if existing_username: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already taken" ) # Create new user user_in_db = UserInDB( email=user.email, username=user.username, hashed_password=get_password_hash(user.password) ) user_dict = user_in_db.model_dump(by_alias=True) user_dict["_id"] = ObjectId(user_dict["_id"]) result = await db.users.insert_one(user_dict) user_dict["_id"] = str(result.inserted_id) # Create access token access_token_expires = timedelta(minutes=settings.access_token_expire_minutes) access_token = create_access_token( data={"sub": str(result.inserted_id)}, expires_delta=access_token_expires ) user_response = UserResponse( _id=str(result.inserted_id), email=user.email, username=user.username, created_at=user_in_db.created_at ) return Token(access_token=access_token, user=user_response) @router.post("/login", response_model=Token) async def login(user_login: UserLogin): """Login a user""" db = get_database() # Find user by email user_dict = await db.users.find_one({"email": user_login.email}) if not user_dict: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", headers={"WWW-Authenticate": "Bearer"}, ) user_dict["_id"] = str(user_dict["_id"]) user = UserInDB(**user_dict) # Verify password if not verify_password(user_login.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", headers={"WWW-Authenticate": "Bearer"}, ) # Create access token access_token_expires = timedelta(minutes=settings.access_token_expire_minutes) access_token = create_access_token( data={"sub": user.id}, expires_delta=access_token_expires ) user_response = UserResponse( _id=user.id, email=user.email, username=user.username, created_at=user.created_at ) return Token(access_token=access_token, user=user_response) @router.get("/me", response_model=UserResponse) async def get_me(current_user: UserInDB = Depends(get_current_user)): """Get current user information""" return UserResponse( _id=current_user.id, email=current_user.email, username=current_user.username, created_at=current_user.created_at )