FastAPI: Mysql数据库操作(15)

FastAPI可以通过SQLAlchemy将其轻松的适应于任何的数据库。SQLAlchemy是一个ORM(object-relational mapping)的框架。在ORM中,你创建一个类就会通过SQLAlchemy将其自动转成一张表,在类中的每一个属性就会将其转成表中的字段。

需要安装

1
pip install sqlalchemy
pip install pymysql

统一来管理数据库相关:

1
2
3
4
5
6
├─db
│ crud.py
│ database.py
│ models.py
│ schemas.py
│ __init__.py

database.py 数据库配置相关

在数据库相关的配置文件中,首先创建一个SQLAlchemy的”engine”,然后创建SessionLocal实例进行会话,最后创建模型类的基类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:123456@127.0.0.1:3306/test"

engine = create_engine(
SQLALCHEMY_DATABASE_URL, encoding='utf8', echo=True
)

# SQLAlchemy中,CRUD是通过会话进行管理的,所以需要先创建会话,
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 创建基本映射类
Base = declarative_base()

models.py 数据库模型表

通过数据库配置文件中的基类来创建模型类。

1
2
3
4
5
6
7
8
9
10
from sqlalchemy import Boolean, Column, Integer, String
from database import Base


class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(32), unique=True, index=True)
hash_password = Column(String(32))
is_active = Column(Boolean, default=True)

schemas.py 模型验证

定义请求参数模型验证与响应模型验证的Pydantic模型,其中响应模型中设置orm_mode=True参数,表示与ORM模型兼容,因为后续中返回的数据库查询是orm模型,通过设置这个参数可以将orm模型通过pydantic模型进行验证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from pydantic import BaseModel


class UserBase(BaseModel):
email: str


class UserCreate(UserBase):
"""
请求模型验证:
email:
password:
"""

password: str

class UserUpdate(UserBase):
is_active: bool

class User(UserBase):
"""
响应模型:
id:
email:
is_active
并且设置orm_mode与之兼容
"""

id: int
is_active: bool

class Config:
orm_mode = True

crud.py 数据库操作相关

通过传入数据库连接以及参数等进行数据库操作,包括创建用户、查询用户等,返回的是orm模型对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from sqlalchemy.orm import Session
import models, schemas


# Get user by id
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()

# Get all users
def get_users(db: Session, start: int=0, limit: int=10):
return db.query(models.User).offset(start).limit(limit).all()

# creae user
def create_user(db: Session, user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
user = models.User(email=user.email, hash_password=fake_hashed_password)
db.add(user)
db.commit() # 提交保存到数据库中
db.refresh(user) # 刷新
return user

# Delete user
def delete_user(db: Session, user_id: int):
user = db.query(models.User).filter(models.User.id == user_id).first()
if user:
db.delete(user)
db.commit()
db.flush()
return user


# Update user
def update_user(db: Session, user_id: int, update_user: schemas.UserUpdate):
user = db.query(models.User).filter(models.User.id == user_id).first()
if user:
update_user = update_user.dict(exclude_unset=True)
for k, v in update_user.items():
setattr(user, k, v)
db.commit()
db.flush()
db.refresh(user)
return user

调用验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
Base.metadata.create_all(bind=engine) #数据库初始化,如果没有库或者表,会自动创建

# 实例化 FastAPI
app = FastAPI()

# Dependency
def get_db():
"""
每一个请求处理完毕后会关闭当前连接,不同的请求使用不同的连接
:return:
"""

db = SessionLocal()
try:
yield db
finally:
db.close()


# 新建用户
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
return crud.db_create_user(db=db, user=user)


# 通过id查询用户
@app.get("/user/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
return db_user

# 所有用户
@app.get('/users/', response_model=List[schemas.User])
def read_users(start: int=0, limit: int=10, db: Session = Depends(get_db)):
return crud.get_users(db, start, limit)

# Delete user
@app.delete('/user/{user_id}', response_model=schemas.User)
def delete_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.delete_user(db, user_id=user_id)
if not db_user:
raise HTTPException(status_code=400, detail='User not found')
return db_user

# Update User
@app.put('/user/{user_id}', response_model=schemas.User)
def update_user(user_id: int, update_user: schemas.UserUpdate, db: Session = Depends(get_db)):
db_user = crud.update_user(db, user_id=user_id, update_user=update_user)
if not db_user:
raise HTTPException(status_code=400, detail='User not found')
return db_user

DB:

1
2
3
4
5
6
7
8
9
10
mysql> desc users;
+---------------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+-------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| email | varchar(32) | YES | UNI | NULL | |
| hash_password | varchar(32) | YES | | NULL | |
| is_active | tinyint(1) | YES | | NULL | |
+---------------+-------------+------+-----+---------+----------------+
4 rows in set (0.00 sec)

唐胡璐 wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
分享创造价值,您的支持将鼓励我继续前行!