FastAPI: JWT登录认证(20)

至此,FastAPI的基本使用和整个流程都过完了。接下来看一下登录认证,是一个系统里最基本的功能,有些页面只能登录之后才能够访问。

有Session和Token两种方式实现,目前最流行的下面token方式:

  1. 用户登录后,返回一个token给前端,后台不存储该token
  2. 前端再带着这个token去访问需要登录的页面
  3. 后端根据token去判断用户,确定当前登录用户

    Bearer的简单OAuth2,前面已聊过FastAPI: Security(14),当时还没有生成token,OAuth2不能生成token,我们一般用JWT来实现

JWT

JWT全称是Json Web Token,JWT认证目前是前后端分离中非常流行的一种认证方式,生成的token分为三个部分: HEADER.PAYLOAD.SIGNATURE, 这三个部分都是可逆算法base64加密后的字符串, 最后用.拼接

  1. HEADER

    通常是加密算法

  2. PAYLOAD

    存储的自定义信息和token的过期时间

  3. SIGNATURE

    这个签证信息由三部分组成:

    • header (base64后的)
    • payload (base64后的)
    • secret (盐值(salt): 指的是加密时加入的自定义的字符串, 最好是随机或者杂乱的字符串, 这样更能够确定加密后字符串的唯一性, 可以使用settings中的SECRET_KEY)

    这个部分需要base64加密后的header和base64加密后的payload使用.连接组成的字符串,然后通过header中声明的加密方式进行加盐secret组合加密,然后就构成了jwt的第三部分。

python中有好几多库可以实现JWT认证,这里也用官网使用的python-jose,Python-jose需要一个额外的加密后端。这里我们使用的是推荐的后端:pyca/cryptography

1
pip install python-jose[cryptography]
生成token
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from jose import jwt
from datetime import datetime, timedelta

# 加密密钥
SECRET_KEY = "testjwt"

# 设置过期时间 示例5分钟
expire = datetime.utcnow() + timedelta(minutes=5)

# exp 是固定写法必须得传 username和uid是自己存的值
to_encode = {"exp": expire, "username": "admin", "uid": "12345"}

# 生成token
token = jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
解密token
1
2
3
4
5
6
7
8
from jose.exceptions import ExpiredSignatureError, JWTError
try:
payload = jwt.decode(token, SECRET_KEY, algorithms="HS256" )
print(payload)
except ExpiredSignatureError as e:
print("token过期")
except JWTError as e:
print("token验证失败")

FastAPI实现JWT认证

  1. 加密解密的公共方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    def create_jwt_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
    expire = datetime.utcnow() + expires_delta
    else:
    expire = datetime.utcnow() + timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    token = jwt.encode(claims=to_encode, key=config.SECRET_KEY, algorithm=config.ALGORITHM)
    return token


    def decode_jwt_token(token: str):
    try:
    payload = jwt.decode(token=token, key=config.SECRET_KEY, algorithms=config.ALGORITHM)
    return payload
    except (jwt.JWTError, jwt.ExpiredSignatureError, AttributeError) as e:
    raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
    detail=f'access token failed: {e}',
    # 根据OAuth2规范, 认证失败需要在响应头中添加如下键值对
    headers={'WWW-Authenticate': "Bearer"}
    )
  1. 登录生成Token

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/login")

    @router.post('/api/login', tags=['users'])
    def user_login(username: str = Form(...), password: str = Form(...), db: Session = Depends(get_db)):
    db_user = crud.get_user_by_mail(db, username)
    if not db_user:
    raise HTTPException(status_code=404, detail="User not found")
    if password != db_user.hash_password:
    raise HTTPException(status_code=400, detail='passwod incorrect')

    data = {'email': db_user.email, 'user_id': db_user.id}
    token = jwttoken.create_jwt_token(data)

    return {"token": token, "token_type": "bearer"}

    @router.get('/current/user', tags=['users'])
    def get_current_user(token: Optional[str] = Header(...), db: Session = Depends(get_db)):
    payload = jwttoken.decode_jwt_token(token)
    email: str = payload.get("email")

    if email is None:
    raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
    detail=f'access token failed',
    # 根据OAuth2规范, 认证失败需要在响应头中添加如下键值对
    headers={'WWW-Authenticate': "Bearer"}
    )

    user = crud.get_user_by_mail(db, email=email)
    if user is None:
    raise HTTPException(status_code=404, detail="User not found")
    return user
  1. 验证Token

唐胡璐 wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
分享创造价值,您的支持将鼓励我继续前行!