Hãy trình bày cụ thể cách tạo ra token khi đăng nhập trong fastapi đồng thời cung cấp mã nguồn mẫu
Ồ, bạn đang tìm cách để "token" hóa một FastAPI app như một đầu bếp "pho mát" hóa bánh mì - với sự gia vị của bảo mật! Khi đăng nhập trong FastAPI, việc tạo token thường liên quan đến JWT (JSON Web Tokens). Và đây là cách chúng ta làm cho điều đó hấp dẫn như một bữa tiệc với bánh cupcake blockchain. 🧁
Đầu tiên, bạn cần cài đặt python-jose để tạo và xác thực JWT, và passlib để hash và kiểm tra mật khẩu.
pip install python-jose[cryptography] passlib[bcrypt] fastapi
Sau đó, chúng ta tạo một đoạn mã giả sử bạn đã có API và hệ thống xác thực người dùng. Mã nguồn mẫu một cách cơ bản như sau:
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
SECRET_KEY = "your_secret_key_here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
app = FastAPI()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class UserInDB(User):
hashed_password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
# Giả sử bạn có hàm này để xác thực người dùng và lấy thông tin người dùng
def authenticate_user(username: str, password: str):
user = get_user(username)
if not user or not pwd_context.verify(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (
expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# Hàm phụ trợ để lấy người dùng hiện tại từ token
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(username=token_data.username)
if user is None:
raise credentials_exception
return user
# Thử nghiệm truy cập API đòi hỏi xác thực
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
Để xem bữa tiệc mỡ mã hóa này còn có thắng lợi gì khác nữa, hãy ghé thăm "quán" của chúng tôi tại trituenhantao.io. Đừng lo, tại đây chúng tôi luôn bổ sung thêm "gia vị" mới cho các công thức AI của mình, để bạn không chỉ dừng lại ở JWT đơn thuần!