ORM
SQLAlchemy

概述

在前面的文章介绍过PyMySQL操作数据库,在pymysql中是编写原生sql语句来操作数据库的,pymysql一般用于小项目,因为pymysql存在以下两方面的问题:

  • sql语句的执行效率:应用开发人员需要耗费一部分精力去优化sql语句
  • 数据库迁移:针对mysql开发的sql语句无法直接应用到oracle数据库上,一旦需要迁移数据库,便需要考虑跨平台问题
    为了解决上述问题,便引出了ORM框架

ORM

ORM(Object Relational Mapping) 即对象关系映射,是一种为了解决面向对象与关系数据库存在的互不匹配的现象的技术
它的作用是在关系型数据库和业务实体对象之间作一个映射,这样,我们在具体的操作业务对象的时候,就不需要再去和复杂的SQL语句打交道,只需简单的操作对象的属性和方法
【以下内容来自ChatGPT】
ORM全称为Object Relational Mapping,即对象关系映射。ORM是一种程序设计技术,通过使用描述对象和数据库之间映射的元数据,将面向对象的编程语言程序中的对象自动持久化到关系型数据库中
ORM的主要目的是将面向对象的编程语言和关系型数据库这两种不同的编程世界进行“桥接”,使得程序员能够使用面向对象的方式操作关系型数据库,从而降低了程序员的开发难度和工作量
ORM的优点包括:

  1. 提高开发效率:ORM可以让程序员以面向对象的方式操作数据库,从而避免了写SQL语句的繁琐和容易出错的问题
  2. 降低维护成本:ORM可以自动处理数据库的操作,从而减少了程序员对数据库操作的细节处理,降低了维护成本
  3. 提高程序的可移植性:ORM可以将程序和数据库之间的细节隐藏起来,使得程序可以更加容易地适配不同的数据库
    目前,ORM框架在Python中被广泛使用,比较流行的Python ORM框架有Django ORM、SQLAlchemy、Peewee等

SQLAlchemy

SQL databases behave less like object collections the more size and performance start to matter; object collections behave less like tables and rows the more abstraction starts to matter. SQLAlchemy aims to accommodate both of these principles.
SQLAlchemy considers the database to be a relational algebra engine, not just a collection of tables. Rows can be selected from not only tables but also joins and other select statements; any of these units can be composed into a larger structure. SQLAlchemy's expression language builds on this concept from its core.

  • 关系模型和Python对象之间的映射,如表student,字段为id int,name varchar,age int
    • 表table ——> python中的类,如Student类
    • 行row ——> python中的类实例,如s1 = Student(id=1, name='jerry', age=28)
    • 列column ——> python中的属性(类属性/实例属性,使用python中的描述器实现)

相关地址

SQLAlchemy安装

pip install SQLAlchemy

SQLAlchemy的使用

import sqlalchemy
print(sqlalchemy.__version__)
############输出结果###########
1.4.35

创建连接(创建引擎)

详细参看官方文档Engine and Connection Use

  • Database Urls :dialect+driver://username:password@host:port/database
    • dialect:如sqlite, mysql, postgresql, oracle, mssql
    • driver:即DBAPI如支持MySQL and MariaDB的DBAPI有mysqlclient、PyMySQL、MySQL Connector等
# 如mysql+PyMySQL示例
engine = create_engine('mysql+pymysql://scott:tiger@localhost/foo')
  • Lazy Connecting(懒连接):创建引擎并不会马上连接数据库,直到让数据库执行任务时才连接

The Engine, when first returned by create_engine(), has not actually tried to connect to the database yet; that happens only the first time it is asked to perform a task against the database. This is a software design pattern known as lazy initialization.

from sqlalchemy import create_engine

IP = "192.168.1.2"
USERNAME = "root"
PASSWORD = "monkey_jerry"
DB_NAME = "test"
PORT = 3306

# dialect+driver://username:password@host:port/database
engine = create_engine(f"mysql+pymysql://{USERNAME}:{PASSWORD}@{IP}:{PORT}/{DB_NAME}", echo=True)
print(engine)  # 懒连接(修改端口或者错误的数据库信息,查看engine)
#########################输出结果##########################
Engine(mysql+pymysql://root:***@192.168.1.2:3306/test)

如果报ModuleNotFoundError: No module named 'pymysql',则需要通过pip install pymysql安装driver;
echo=True:引擎是否打印执行的语句,一般在调试的时候打开

创建映射

  • 创建基类
from sqlalchemy.orm import declarative_base
Base = declarative_base()

创建基类,便于实体类继承。SQLAlchemy大量使用了元编程

  • 创建实体类(声明模型)
# 如创建如下表
CREATE TABLE student (
	id INTEGER NOT NULL AUTO_INCREMENT, 
	name VARCHAR(64) NOT NULL, 
	age INTEGER NOT NULL, 
	PRIMARY KEY (id)
)
# 实体类
from sqlalchemy import Column, Integer, String

class Student(Base):
    __tablename__ = 'student'  # 指定表名

    id = Column(Integer, primary_key=True, autoincrement=True)  # 第一参数是字段名,如果和属性名一致,则可不填写(一般一致),否则必须指定
    name = Column(String(64), nullable=False)
    age = Column(Integer, nullable=False)

    def __repr__(self):
        return f'<{self.__class__.__name__}>,id={self.id}, name={self.name}, age={self.age}'

print(Student.__dict__)
print(repr(Student.__table__))
#########################输出结果################
<class '__main__.Student'>
{'__module__': '__main__', '__tablename__': 'student', 'id': <sqlalchemy.orm.attributes.InstrumentedAttribute object at 0x108d24c20>, 'name': <sqlalchemy.orm.attributes.InstrumentedAttribute object at 0x108d24cc0>, 'age': <sqlalchemy.orm.attributes.InstrumentedAttribute object at 0x108d24d60>, '__repr__': <function Student.__repr__ at 0x108d088b0>, '__doc__': None, '_sa_class_manager': <ClassManager of <class '__main__.Student'> at 108c66220>, '__table__': Table('student', MetaData(), Column('id', Integer(), table=<student>, primary_key=True, nullable=False), Column('name', String(length=64), table=<student>, nullable=False), Column('age', Integer(), table=<student>, nullable=False), schema=None), '__init__': <function __init__ at 0x108d08c10>, '__mapper__': <Mapper at 0x108cfc6d0; Student>}
Table('student', MetaData(), Column('id', Integer(), table=<student>, primary_key=True, nullable=False), Column('name', String(length=64), table=<student>, nullable=False), Column('age', Integer(), table=<student>, nullable=False), schema=None)
  • 创建表
# 删除继承自Base的所有表
Base.metadata.drop_all(engine)
# 创建继承自Base的所有表
Base.metadata.create_all(engine)

生产环境其实很少这样创建表,一般是在系统上线的时候由脚本生成,删除表则更少用

创建表完整示例

运行我之前docker创建的mariadb运行如下代码来看看效果

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base

IP = "192.168.1.2"
USERNAME = "root"
PASSWORD = "monkey_jerry"
DB_NAME = "test"
PORT = 3306

# 1. 创建基类
Base = declarative_base()

# 2. 声明模型
class Student(Base):
    __tablename__ = 'student'

    id = Column(Integer, primary_key=True, autoincrement=True)  # 第一参数是字段名,如果和属性名一致,则可不填写(一般一致),否则必须指定
    name = Column(String(64), nullable=False)
    age = Column(Integer, nullable=False)

    def __repr__(self):
        return f'<{self.__class__.__name__}>, id={self.id}, name={self.name}, age={self.age}'

# 3. 创建引擎
# dialect+driver://username:password@host:port/database
engine = create_engine(f"mysql+pymysql://{USERNAME}:{PASSWORD}@{IP}:{PORT}/{DB_NAME}", echo=True)

# 4. 创建/删除表
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
##############输出结果##############
......
......
DROP TABLE student
......
CREATE TABLE student (
	id INTEGER NOT NULL AUTO_INCREMENT, 
	name VARCHAR(64) NOT NULL, 
	age INTEGER NOT NULL, 
	PRIMARY KEY (id)
)
......

由于创建引擎时echo=True,注意观察控制台的输出

  • 实例化
s1 = Student(id=1, name="jerry")
s1.name = 'tom'
s1.age = 20
print(s1)
print(s1.age)

s2 = Student()
s2.name = 'lily'
print(s2)
print(s2.id)
#############输出结果#############
<Student>, id=1, name=tom, age=20
20
<Student>, id=None, name=lily, age=None
None

session

  • 创建会话session
    • 在一个会话中操作数据库,会话建立在连接上,连接被引擎管理
from sqlalchemy.orm import sessionmaker

# 创建session
Session = sessionmaker(bind=engine)  # 工厂方法返回类
session = Session()  # 实例化

session对象线程不安全。不同线程应该使用不同的session对象,或者参考官方文档的其他方式

  • session.add、session.commit、session.rollback
from sqlalchemy.orm import sessionmaker

s1 = Student(name="jerry")
s1.name = 'tom'
s1.age = 20

Session = sessionmaker(bind=engine)
session = Session()

try:
    session.add(s1)
    session.commit()
    print('---' * 20)
except:
    session.rollback()
    print('===' * 20)
from sqlalchemy.orm import sessionmaker
s1 = Student(name='lily', age=29)
s2 = Student(name='lucy', age=28)

Session = sessionmaker(bind=engine)

# 1
with Session() as session:
    session.add(s1)
    session.commit()

# 2
with Session.begin() as session:
    session.add(s1)  # 这里再增加一次s1,查看是否增加上了?
    session.add(s2)

CRUD操作

增删改查

  • 增:add():增加一个对象、add_all():可迭代对象
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base, sessionmaker

IP = "192.168.1.2"
USERNAME = "root"
PASSWORD = "monkey_jerry"
DB_NAME = "test"
PORT = 3306

Base = declarative_base()

class Student(Base):
    __tablename__ = 'student'

    id = Column(Integer, primary_key=True, autoincrement=True)  # 第一参数是字段名,如果和属性名一致,则可不填写(一般一致),否则必须指定
    name = Column(String(64), nullable=False)
    age = Column(Integer, nullable=False)

    def __repr__(self):
        return f'<{self.__class__.__name__}>, id={self.id}, name={self.name}, age={self.age}'

engine = create_engine(f"mysql+pymysql://{USERNAME}:{PASSWORD}@{IP}:{PORT}/{DB_NAME}", echo=True)

Session = sessionmaker(bind=engine)
session = Session()

s1 = Student(name="tom", age=20)

try:
    session.add(s1)
    session.commit()

    session.add_all([s1])  # add_all() s1不会提交成功的,因为之前s1成功提交后,s1的主键就有了值,只要s1没有修改过,就认为没有改动
    session.commit()
    s1.age = 18
    session.add_all([s1])  # 修改s1后再提交则可提交成功了
    session.commit()
    print('---' * 20)
except:
    session.rollback()
    print('===' * 20)
#################输出结果################
......
......
INSERT INTO student (name, age) VALUES (%(name)s, %(age)s)
INFO sqlalchemy.engine.Engine [generated in 0.00016s] {'name': 'tom', 'age': 20}
INFO sqlalchemy.engine.Engine COMMIT
INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO sqlalchemy.engine.Engine SELECT student.id AS student_id, student.name AS student_name 
FROM student 
WHERE student.id = %(pk_1)s
INFO sqlalchemy.engine.Engine [generated in 0.00014s] {'pk_1': 9}
INFO sqlalchemy.engine.Engine UPDATE student SET age=%(age)s WHERE student.id = %(student_id)s
INFO sqlalchemy.engine.Engine [generated in 0.00015s] {'age': 18, 'student_id': 9}
INFO sqlalchemy.engine.Engine COMMIT
......

s主键没有值,就是新增;主键有值,就是找到主键对应的记录修改

  • 查:使用query()方法,返回一个Query对象
students = session.query(Student)  # 类似select * from student
print(students, type(students))  # 无内容,惰性的
for student in students:
    print(student)

print('~~~~~~~~~~~~~')

student = session.query(Student).get(5)  # 通过主键查询 select * from student where id=5
print(student)

query方法将实体类传入,返回类的对象可迭代对象,这时候并不查询。迭代它就执行SQL来查询数据库,封装数据到指定类的实例
get方法使用主键查询,返回一条传入类的一个实例

  • 改:需要先查回来,修改后,再提交更改
s2 = session.query(Student).get(5)
print(s2)  # 原数据
s2.name = 'sam'
s2.age = 30
print(s2)  # 修改后的数据
try:
    session.add(s2)
    session.commit()
except:
    session.rollback()
############输出结果##############
......
SELECT student.id AS student_id, student.name AS student_name, student.age AS student_age 
FROM student 
WHERE student.id = %(pk_1)s
......
<Student>, id=5, name=tom, age=18
<Student>, id=5, name=sam, age=30
......
UPDATE student SET name=%(name)s, age=%(age)s WHERE student.id = %(student_id)s
[generated in 0.00018s] {'name': 'sam', 'age': 30, 'student_id': 5}
COMMIT
  • 删:
'''
当前数据库中的数据如下
id name age
1  sam  30
2  jerry 20 
'''
# 例1
s = session.query(Student).get(1)
print(s)
try:
    session.delete(s)
    session.commit()
except:
    session.rollback()

# 例2
s = Student(id=2, name='jerry', age=20)
print(s)
try:
    session.delete(s)
    session.commit()
except Exception as e:
    session.rollback()
    print(e)
################输出结果################
......
SELECT student.id AS student_id, student.name AS student_name, student.age AS student_age 
FROM student 
WHERE student.id = %(pk_1)s

[generated in 0.00032s] {'pk_1': 1}
<Student>, id=1, name=sam, age=30
DELETE FROM student WHERE student.id = %(id)s

<Student>, id=2, name=jerry, age=20
Instance '<Student at 0x1052a6250>' is not persisted

例2会产生一个异常Instance '<Student at 0x1052a6250>' is not persisted未持久的异常

状态

如上例2,会抛出 not persisted,为什么会抛出异常,这跟实例的状态属性_sa_instance_state有关

s = Student(id=2, name='jerry', age=20)
print(s.__dict__)
print(s._sa_instance_state)

from sqlalchemy.orm.state import InstanceState  # 跳转到InstanceState类查看源码
'''
   >>> from sqlalchemy import inspect
   >>> insp = inspect(some_mapped_object) # 可以使用inspect(some_mapped_object)函数查看状态
'''

from sqlalchemy import inspect
ins_state: InstanceState = inspect(s)
print(ins_state)
####################输出结果#################3
{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x1076c5d00>, 'id': 2, 'name': 'jerry', 'age': 20}
<sqlalchemy.orm.state.InstanceState object at 0x1076c5d00>
<sqlalchemy.orm.state.InstanceState object at 0x1076c5d00>

每一个实例都有一个状态属性_sa_instance_state,其类型是sqlalchemy.orm.state.InstanceState,可以使用
sqlalchemy.inspect函数查看状态

# 定义get_state函数,来分析各个状态下的状态值
def get_state(instance, index):
    inp: InstanceState = inspect(instance)
    return f"{index}-->{inp.key} {inp.session_id}\n{'---' * 20}\nattach={inp._attached}" \
           f" transient={inp.transient} pending={inp.pending} persistent={inp.persistent} " \
           f"deleted={inp.deleted} detached={inp.detached}\n{'===' * 20}"
  • 先来看下构造的实体和从数据库中查询到的实体状态之间的区别
s1 = Student(id=2, name='jerry', age=20)
print(get_state(s1, 1))
s2 = session.query(Student).get(2)
print(get_state(s2, 2))
##################输出结果###############
1-->None None
------------------------------------------------------------
attach=False transient=True pending=False persistent=False deleted=False detached=False
============================================================
...
...
2-->(<class '__main__.Student'>, (2,), None) 1
------------------------------------------------------------
attach=True transient=False pending=False persistent=True deleted=False detached=False
============================================================
  • 状态说明
状态 说明
transient 实体类尚未加入到session中,同时并没有保存到数据库中
pending transient的实体被add()到session中,状态切换到pending,但它还没有flush到数据库中
persistent session中的实体对象对应着数据库中的真实记录。pending状态在提交成功后可以变成
persistent状态,或者查询成功返回的实体也是persistent状态
deleted 实体被删除且已经flush但未commit完成。事务提交成功了,实体变成detached,事务失败,
返回persistent状态
detached 删除成功的实体进入这个状态
  • 验证示例
student = session.query(Student).get(2)
print(get_state(student, 1))

try:
    student = Student(id=2, name='sam', age=30)
    print(get_state(student, 2))
    student = Student(name='sammy', age=30)
    print(get_state(student, 3))

    session.add(student)  # add后变成pending
    print(get_state(student, 4))
    # session.delete(student) # 抛出异常,删除的前提必须是persistent,也就是说先查后删
    # get_state(student, 5)
    session.commit()  # 提交后,变成persistent
    print(get_state(student, 6))
except Exception as e:
    session.rollback()
    print(e, '~~~~~~~~~~~~~~~~')

student = session.query(Student).get(2)
print(get_state(student, 10))
try:
    session.delete(student)  # 删除的前提是persistent
    print(get_state(student, 11))
    session.flush()
    print(get_state(student, 12))
    session.commit()
    print(get_state(student, 13))
except Exception as e:
    session.rollback()
    print(e, '~~~~~~~~')
  • 结论
    • 新建一个实体,状态是transient临时的,一旦add()后从transient变成pending状态,成功commit()后从pending变成persistent状态
    • 成功查询返回的实体对象,也是persistent状态
    • persistent状态的实体,修改依然是persistent状态
    • persistent状态的实体,删除后,flush后但没有commit,就变成deteled状态,成功提交,变为detached状态,提交失败,还原到persistent状态
    • flush方法,主动把改变应用到数据库中去
    • 删除、修改操作,需要对应一个真实的记录,所以要求实体对象是persistent状态

复杂查询

这里先导入官方测试数据库test_db

$  docker ps
CONTAINER ID   IMAGE     COMMAND                  CREATED        STATUS      PORTS                                       NAMES
b5029f1075ed   mariadb   "docker-entrypoint.s…"   9 months ago   Up 4 days   0.0.0.0:3306->3306/tcp, :::3306->3306/tcp   monkey_mariadb

$ docker cp /Users/xmly/Downloads/test_db-master b5029f1075ed:/
$ docker exec -it monkey_mariadb /bin/bash
$ root@b5029f1075ed:/test_db-master# ls employees.sql
employees.sql
$ root@b5029f1075ed:/test_db-master# mysql -uroot -pmonkey_jerry -t < employees.sql
  • 实体类及相关函数
from sqlalchemy import create_engine, Column, Integer, String, Date, Enum, ForeignKey, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
import enum

IP = "192.168.1.2"
USERNAME = "root"
PASSWORD = "monkey_jerry"
DB_NAME = "employees"
PORT = 3306

Base = declarative_base()

engine = create_engine(f"mysql+pymysql://{USERNAME}:{PASSWORD}@{IP}:{PORT}/{DB_NAME}", echo=True)

Session = sessionmaker(bind=engine)
session = Session()


# python内建库中的枚举enum
class MyEnum(enum.Enum):
    M = 'M'
    F = 'F'


class Employee(Base):
    __tablename__ = 'employees'

    emp_no = Column(Integer, primary_key=True)
    birth_date = Column(Date, nullable=False)
    first_name = Column(String(14), nullable=False)
    last_name = Column(String(16), nullable=False)
    gender = Column(Enum(MyEnum), nullable=False)  # 一般很少用枚举,使用Integer 0 1 2
    hire_date = Column(Date, nullable=False)

    def __repr__(self):
        return f"<{self.__class__.__name__} no={self.emp_no} name={self.first_name} {self.last_name} " \
               f"gender={self.gender.value}>"


def show(emps):
    for x in emps:
        print(x)
        print('~~~~~~~~~~~~~~~\n')


# 简单条件查询
emps = session.query(Employee).filter(Employee.emp_no > 499995)
show(emps)
  • 与或非/in/ like
    • filter
    • or_、and_、not_,运算符&、|、~,注意一定要在表达式上加上括号
    • in_/ notin_
    • like/notlike/ilike:ilike可以忽略大小写匹配
# 与或非
from sqlalchemy import or_, and_, not_

# AND
# 1. filter 类似where条件可链式调用
emps = session.query(Employee).filter(Employee.emp_no > 499995).filter(Employee.gender == MyEnum.F)
show(emps)
# 2. filter 多条件
emps = session.query(Employee).filter(Employee.emp_no > 499995, Employee.emp_no < 499998)
show(emps)
# 3. and_函数
emps = session.query(Employee).filter(and_(Employee.emp_no > 499995, Employee.gender == MyEnum.M))
show(emps)
# 4. & 一定要注意&符号两边表达式都要加括号,推荐用法
emps = session.query(Employee).filter((Employee.emp_no > 499995) & (Employee.gender == MyEnum.M))
show(emps)


# OR 条件
emps = session.query(Employee).filter((Employee.emp_no > 499995) | (Employee.emp_no < 10003))
show(emps)
emps = session.query(Employee).filter(or_(Employee.emp_no > 499995, Employee.emp_no < 10003))
show(emps)

# Not
emps = session.query(Employee).filter(not_(Employee.emp_no < 499995))
show(emps)
# ~
emps = session.query(Employee).filter(~(Employee.emp_no < 499995))
show(emps)

# in
emplist = [10010, 10015, 10018]
emps = session.query(Employee).filter(Employee.emp_no.in_(emplist))
show(emps)
# not in
# emps = session.query(Employee).filter(~Employee.emp_no.in_(emplist))
# show(emps)
# emps = session.query(Employee).filter(Employee.emp_no.notin_(emplist))
# show(emps)

# like
emps = session.query(Employee).filter(Employee.last_name.like('P%'))
show(emps)
# not like
# emps = session.query(Employee).filter(Employee.last_name.notlike('P%'))
# show(emps)
  • 排序
    • order_by
    • asc、desc
# 升序(默认)
emps = session.query(Employee).filter(Employee.emp_no > 499995).order_by(Employee.emp_no)
show(emps)
emps = session.query(Employee).filter(Employee.emp_no > 499995).order_by(Employee.emp_no.asc())
show(emps)
# 降序
emps = session.query(Employee).filter(Employee.emp_no > 499995).order_by(Employee.emp_no.desc())
show(emps)
# 多列排序
emps = session.query(Employee).filter(Employee.emp_no > 499995).order_by(Employee.last_name).order_by(Employee.emp_no.desc())
show(emps)
  • 分页
    • limit
    • offset
emps = session.query(Employee).limit(4)  # 等效于SELECT * FROM employees LIMIT 4
show(emps)
# 取第2条到第5条的数据
# 等效于 SELECT * FROM employees LIMIT 1,4 或SELECT * FROM employees LIMIT 4 OFFSET 1
emps = session.query(Employee).limit(4).offset(1) 
show(emps)
  • 消费者方法:session.query返回的Query对象(可迭代)作用在消费者方法上后就转换成了一个容器
    • count()
    • all(): 返回列表,查不到返回空列表
    • first():返回首行,查不到返回None,等价limit
    • one():如果查询结果是多行抛异常
    • delete()
# 行数
emps = session.query(Employee).limit(4)
# print(len(emps)) # TypeError: object of type 'Query' has no len()
print(len(list(emps)))  # 查询得到结果集,转成list,然后取长度
print(emps.count())  # 聚合函数count(*)的查询

# 取所有数据
print(emps.all())  # 返回列表,查不到返回空列表
# 取首行
print(emps.first())  # 返回首行,查不到返回None,等价limit


# 有且只能有一行
# print(emps.one())  # 如果查询结果是多行抛异常 sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when exactly one was required
print(emps.limit(1).one())

# 删除 delete by query
session.query(Employee).filter(Employee.emp_no > 499995).delete()
# session.commit()  # 提交则删除
  • 聚合和分组:sqlalchemy.func
    • count
    • max、min、avg
    • all、first、one、scalar
    • group_by
from sqlalchemy import func

query = session.query(func.count(Employee.emp_no))
print(query.all())  # 列表中一个元素 [(300024,)] # SELECT count(employees.emp_no) AS count_1 FROM employees
print(query.first())  # 一个只有一个元素的元组 (300024,)
print(query.one())  # 只能有一行返回,一个元组 (300024,)
print(query.scalar())  # 取one()的第一个元素 300024

# max/min/avg
print(session.query(func.max(Employee.emp_no)).scalar())  # 499999
print(session.query(func.min(Employee.emp_no)).scalar())  # 10001
print(session.query(func.avg(Employee.emp_no)).scalar())  # 253321.7634

# 分组
# SELECT employees.gender AS employees_gender, count(employees.emp_no) AS count_1 FROM employees GROUP BY employees.gender
query = session.query(Employee.gender, func.count(Employee.emp_no)).group_by(Employee.gender).all()
print(query)
for g, y in query:
    print(g.value, y)

关联查询

这里仍使用官方测试数据库test_db来做例子
通过查看DDL,可看到员工employees表、部门departments表之间的关系是多对多关系

CREATE TABLE `employees` (
  `emp_no` int(11) NOT NULL,
  `birth_date` date NOT NULL,
  `first_name` varchar(14) NOT NULL,
  `last_name` varchar(16) NOT NULL,
  `gender` enum('M','F') NOT NULL,
  `hire_date` date NOT NULL,
  PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `departments` (
  `dept_no` char(4) NOT NULL,
  `dept_name` varchar(40) NOT NULL,
  PRIMARY KEY (`dept_no`),
  UNIQUE KEY `dept_name` (`dept_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `dept_emp` (
  `emp_no` int(11) NOT NULL,
  `dept_no` char(4) NOT NULL,
  `from_date` date NOT NULL,
  `to_date` date NOT NULL,
  PRIMARY KEY (`emp_no`,`dept_no`),
  KEY `dept_no` (`dept_no`),
  CONSTRAINT `dept_emp_ibfk_1` FOREIGN KEY (`emp_no`) REFERENCES `employees` (`emp_no`) ON DELETE CASCADE,
  CONSTRAINT `dept_emp_ibfk_2` FOREIGN KEY (`dept_no`) REFERENCES `departments` (`dept_no`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  • 创建实体类

数据库连接等相关信息同上例题,这里就不详细列出

class Employee(Base):
    __tablename__ = 'employees'

    emp_no = Column(Integer, primary_key=True)
    birth_date = Column(Date, nullable=False)
    first_name = Column(String(14), nullable=False)
    last_name = Column(String(16), nullable=False)
    gender = Column(Enum(MyEnum), nullable=False)  # 一般很少用枚举,使用Integer 0 1 2
    hire_date = Column(Date, nullable=False)

    def __repr__(self):
        return f"<{self.__class__.__name__} no={self.emp_no} name={self.first_name} {self.last_name} " \
               f"gender={self.gender.value}>"


class Department(Base):
    __tablename__ = 'departments'

    dept_no = Column(String(4), primary_key=True)
    dept_name = Column(String(40), nullable=False, unique=True)

    def __repr__(self):
        return f"{self.__class__.__name__} no={self.dept_no} name={self.dept_name}"


class Dept_emp(Base):
    __tablename__ = "dept_emp"

    emp_no = Column(Integer, ForeignKey('employees.emp_no', ondelete='CASCADE'), primary_key=True)
    dept_no = Column(String(4), ForeignKey('departments.dept_no', ondelete='CASCADE'), primary_key=True)
    from_date = Column(Date, nullable=False)
    to_date = Column(Date, nullable=False)

    def __repr__(self):
        return f"{self.__class__.__name__} emp_no={self.emp_no} dept_no={self.dept_no}"
  • 其中如Column(Integer, ForeignKey('employees.emp_no', ondelete='CASCADE')中ForeignKey('employees.emp_no', ondelete='CASCADE') 定义外键约束
  • ForeignKey(column, ondelete=None) column为tablename.columnkeyschema.tablename.columnkey,ondelete可参看引用完整性

例:查询10010员工的所在的部门编号及员工信息

  • 使用隐式内连接
results = session.query(Employee, Dept_emp).filter(Employee.emp_no == Dept_emp.emp_no).filter(Employee.emp_no == 10010).all()
show(results)
########输出结果(返回两条结果)###########
(<Employee no=10010 name=Duangkaew Piveteau gender=F>, Dept_emp emp_no=10010 dept_no=d004)
~~~~~~~~~~~~~~~

(<Employee no=10010 name=Duangkaew Piveteau gender=F>, Dept_emp emp_no=10010 dept_no=d006)
~~~~~~~~~~~~~~~

结果分析:等效sql为:SELECT * FROM employees, dept_emp WHERE employees.emp_no = dept_emp.emp_no AND employees.emp_no = 10010

  • 使用join
# 方法1
results1 = session.query(Employee).join(Dept_emp).filter(Employee.emp_no == 10010).all()
# 方法2:
results2 = session.query(Employee).join(Dept_emp, Employee.emp_no == Dept_emp.emp_no).filter(Employee.emp_no == 10010).all()
print(results1, results2)
##################输出结果(方法1和方法2都返回一行数据)####################
[<Employee no=10010 name=Duangkaew Piveteau gender=F>] [<Employee no=10010 name=Duangkaew Piveteau gender=F>]

结果分析:
等效sql如下

SELECT
	employees.emp_no AS employees_emp_no,
	employees.birth_date AS employees_birth_date,
	employees.first_name AS employees_first_name,
	employees.last_name AS employees_last_name,
	employees.gender AS employees_gender,
	employees.hire_date AS employees_hire_date 
FROM
	employees
	INNER JOIN dept_emp ON employees.emp_no = dept_emp.emp_no 
WHERE
	employees.emp_no = 10010

执行该SQL语句返回确实是2行记录,但Python中的返回值列表中只有一个元素,原因在于query(Employee)只能返回一个实体对象中去,为了解决这个问题,需要修改实体类Employee来增加属性用来存放部门信息

  • sqlalchemy.orm.relationship("实体类名字符串")
    修改实体类Employee如下:
class Employee(Base):
    __tablename__ = 'employees'

    emp_no = Column(Integer, primary_key=True)
    birth_date = Column(Date, nullable=False)
    first_name = Column(String(14), nullable=False)
    last_name = Column(String(16), nullable=False)
    gender = Column(Enum(MyEnum), nullable=False)  # 一般很少用枚举,使用Integer 0 1 2
    hire_date = Column(Date, nullable=False)

    departments = relationship('Dept_emp')  # 新增类属性

    def __repr__(self):  # 新增self.departments打印
        return f"<{self.__class__.__name__} no={self.emp_no} name={self.first_name} {self.last_name} " \
               f"gender={self.gender.value} depts={self.departments}>"

修改完善后的查询

# 方法1,不建议使用
results1 = session.query(Employee).join(Dept_emp).filter(Employee.emp_no == 10010).all()
# 方法2,推荐
results2 = session.query(Employee).join(Dept_emp, Employee.emp_no == Dept_emp.emp_no).filter(Employee.emp_no == 10010).all()
# 方法3
results3 = session.query(Employee).join(Dept_emp, (Employee.emp_no == Dept_emp.emp_no) & (Employee.emp_no == 10010)).all()
show(results1)
show(results2)
show(results3)
results = session.query(Employee).join(Dept_emp, Employee.emp_no == Dept_emp.emp_no).filter(Employee.emp_no == 10010)

for x in results:
    print(x.emp_no)
    # print(x.departments)
    # print(x)

对比观察打印的结果及生成的sql语句变化,发现只要不访问departments属性,就不会查dept_emp这张表

参考及扩展阅读