至此,FastAPI的基本使用和整个流程都过完了。接下来看一下登录认证,是一个系统里最基本的功能,有些页面只能登录之后才能够访问。
有Session和Token两种方式实现,目前最流行的下面token方式:
- 用户登录后,返回一个token给前端,后台不存储该token
- 前端再带着这个token去访问需要登录的页面
后端根据token去判断用户,确定当前登录用户
Bearer的简单OAuth2,前面已聊过FastAPI: Security(14),当时还没有生成token,OAuth2不能生成token,我们一般用
JWT
来实现
JWT
JWT全称是Json Web Token
,JWT认证目前是前后端分离中非常流行的一种认证方式,生成的token分为三个部分: HEADER.PAYLOAD.SIGNATURE, 这三个部分都是可逆算法base64加密后的字符串, 最后用.
拼接
HEADER
通常是加密算法
PAYLOAD
存储的自定义信息和token的过期时间
SIGNATURE
这个签证信息由三部分组成:
- header (base64后的)
- payload (base64后的)
- secret (盐值(salt): 指的是加密时加入的自定义的字符串, 最好是随机或者杂乱的字符串, 这样更能够确定加密后字符串的唯一性, 可以使用settings中的SECRET_KEY)
这个部分需要base64加密后的header和base64加密后的payload使用
.
连接组成的字符串,然后通过header中声明的加密方式进行加盐secret
组合加密,然后就构成了jwt的第三部分。
python中有好几多库可以实现JWT认证,这里也用官网使用的python-jose
,Python-jose需要一个额外的加密后端。这里我们使用的是推荐的后端:pyca/cryptography
1 | pip install python-jose[cryptography] |
生成token
1 | from jose import jwt |
解密token
1 | from jose.exceptions import ExpiredSignatureError, JWTError |
FastAPI实现JWT认证
加密解密的公共方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21def create_jwt_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
token = jwt.encode(claims=to_encode, key=config.SECRET_KEY, algorithm=config.ALGORITHM)
return token
def decode_jwt_token(token: str):
try:
payload = jwt.decode(token=token, key=config.SECRET_KEY, algorithms=config.ALGORITHM)
return payload
except (jwt.JWTError, jwt.ExpiredSignatureError, AttributeError) as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail=f'access token failed: {e}',
# 根据OAuth2规范, 认证失败需要在响应头中添加如下键值对
headers={'WWW-Authenticate': "Bearer"}
)
登录生成Token
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/login")
@router.post('/api/login', tags=['users'])
def user_login(username: str = Form(...), password: str = Form(...), db: Session = Depends(get_db)):
db_user = crud.get_user_by_mail(db, username)
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
if password != db_user.hash_password:
raise HTTPException(status_code=400, detail='passwod incorrect')
data = {'email': db_user.email, 'user_id': db_user.id}
token = jwttoken.create_jwt_token(data)
return {"token": token, "token_type": "bearer"}
@router.get('/current/user', tags=['users'])
def get_current_user(token: Optional[str] = Header(...), db: Session = Depends(get_db)):
payload = jwttoken.decode_jwt_token(token)
email: str = payload.get("email")
if email is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail=f'access token failed',
# 根据OAuth2规范, 认证失败需要在响应头中添加如下键值对
headers={'WWW-Authenticate': "Bearer"}
)
user = crud.get_user_by_mail(db, email=email)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
验证Token