至此,FastAPI的基本使用和整个流程都过完了。接下来看一下登录认证,是一个系统里最基本的功能,有些页面只能登录之后才能够访问。
有Session和Token两种方式实现,目前最流行的下面token方式:
- 用户登录后,返回一个token给前端,后台不存储该token
- 前端再带着这个token去访问需要登录的页面
- 后端根据token去判断用户,确定当前登录用户 - Bearer的简单OAuth2,前面已聊过FastAPI: Security(14),当时还没有生成token,OAuth2不能生成token,我们一般用 - JWT来实现
JWT
JWT全称是Json Web Token,JWT认证目前是前后端分离中非常流行的一种认证方式,生成的token分为三个部分: HEADER.PAYLOAD.SIGNATURE, 这三个部分都是可逆算法base64加密后的字符串, 最后用.拼接
- HEADER - 通常是加密算法 
- PAYLOAD - 存储的自定义信息和token的过期时间 
- SIGNATURE - 这个签证信息由三部分组成: - header (base64后的)
- payload (base64后的)
- secret (盐值(salt): 指的是加密时加入的自定义的字符串, 最好是随机或者杂乱的字符串, 这样更能够确定加密后字符串的唯一性, 可以使用settings中的SECRET_KEY)
 - 这个部分需要base64加密后的header和base64加密后的payload使用 - .连接组成的字符串,然后通过header中声明的加密方式进行加盐- secret组合加密,然后就构成了jwt的第三部分。
   python中有好几多库可以实现JWT认证,这里也用官网使用的python-jose,Python-jose需要一个额外的加密后端。这里我们使用的是推荐的后端:pyca/cryptography
| 1 | pip install python-jose[cryptography] | 
生成token
| 1 | from jose import jwt | 
解密token
| 1 | from jose.exceptions import ExpiredSignatureError, JWTError | 
FastAPI实现JWT认证
- 加密解密的公共方法 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21- def create_jwt_token(data: dict, expires_delta: Optional[timedelta] = None): 
 to_encode = data.copy()
 if expires_delta:
 expire = datetime.utcnow() + expires_delta
 else:
 expire = datetime.utcnow() + timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
 to_encode.update({"exp": expire})
 token = jwt.encode(claims=to_encode, key=config.SECRET_KEY, algorithm=config.ALGORITHM)
 return token
 def decode_jwt_token(token: str):
 try:
 payload = jwt.decode(token=token, key=config.SECRET_KEY, algorithms=config.ALGORITHM)
 return payload
 except (jwt.JWTError, jwt.ExpiredSignatureError, AttributeError) as e:
 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
 detail=f'access token failed: {e}',
 # 根据OAuth2规范, 认证失败需要在响应头中添加如下键值对
 headers={'WWW-Authenticate': "Bearer"}
 )
- 登录生成Token - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/login") 
 @router.post('/api/login', tags=['users'])
 def user_login(username: str = Form(...), password: str = Form(...), db: Session = Depends(get_db)):
 db_user = crud.get_user_by_mail(db, username)
 if not db_user:
 raise HTTPException(status_code=404, detail="User not found")
 if password != db_user.hash_password:
 raise HTTPException(status_code=400, detail='passwod incorrect')
 data = {'email': db_user.email, 'user_id': db_user.id}
 token = jwttoken.create_jwt_token(data)
 return {"token": token, "token_type": "bearer"}
 @router.get('/current/user', tags=['users'])
 def get_current_user(token: Optional[str] = Header(...), db: Session = Depends(get_db)):
 payload = jwttoken.decode_jwt_token(token)
 email: str = payload.get("email")
 if email is None:
 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
 detail=f'access token failed',
 # 根据OAuth2规范, 认证失败需要在响应头中添加如下键值对
 headers={'WWW-Authenticate': "Bearer"}
 )
 user = crud.get_user_by_mail(db, email=email)
 if user is None:
 raise HTTPException(status_code=404, detail="User not found")
 return user
- 验证Token  
 
     
        