asyncio

引子

#例1:串行执行
import time

def a():
    for i in range(3):
        time.sleep(0.1)
        print(i)

def b():
    for i in 'abc':
        time.sleep(0.1)
        print(i)

a()
b()

# 例2:使用生成器函数实现交替执行
import time

def a():
    for i in range(3):
        time.sleep(0.1)
        print(i)
        yield

def b():
    for i in 'abc':
        time.sleep(0.1)
        print(i)
        yield

x = a()
y = b()

for i in range(3):
    next(x)
    next(y)

例2是一个线程内通过生成器完成了调度,让两个函数都有机会执行,这样的调度不是操作系统的进程、线程完成的,而是由程序控制的,这样的程序需要两个条件:1. 需要使用yield来让出控制权;2. 需要循环帮助交替执行

协程

  • 协程不是进程、也不是线程,它是用户空间调度的完成并发处理的方式
  • 进程、线程由操作系统完成调度,而协程是线程内完成调度。它不需要更多的线程,没有多线程切换带来的开销
  • 协程是非抢占式调度,只有一个协程主动让出控制权,另一个协程才会被调度
  • 协程也不需要使用锁机制,因为是在同一个线程中执行
  • 多CPU下,可以使用多进程和协程配合,既能进程并发又能发挥协程在单线程中的优势
  • Python中协程是基于生成器的
  • asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持

进程、线程和协程

  • 进程:一个运行的程序(代码)就是一个进程,没有运行的代码叫程序。进程是系统资源分配的最小单位,进程拥有自己独立的内存空间,所有进程间数据不共享,开销大
  • 线程:CPU调度执行的最小单位,也叫执行路径。线程不能独立存在,依赖进程存在,一个进程至少有一个线程,叫主线程,而多个线程共享内存(数据共享,共享全局变量),从而极大地提高了程序的运行效率
  • 协程:是一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。协程调度时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操中栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快

【以下内容来自ChatGPT】

  • Python中的进程线程和协程
  1. 资源占用:进程是操作系统中资源分配的基本单位,每个进程都有自己独立的内存空间、文件描述符等资源,因此进程之间的资源是相互独立的;线程是进程中的执行单位,线程之间共享进程的资源,包括内存空间、文件描述符等;协程则是轻量级的线程,它与线程类似,也可以共享进程资源,但不同的是,协程是在用户空间中运行的,不需要操作系统的支持,因此占用的资源更少

  2. 切换方式:在进程之间切换时,需要进行上下文切换,即保存当前进程的状态并加载下一个进程的状态,这是由操作系统实现的,因此上下文切换的开销比较大;在线程之间切换时,也需要进行上下文切换,但由于线程之间共享进程资源,因此上下文切换的开销比进程小;在协程之间切换时,不需要进行上下文切换,只需要保存当前协程的状态,然后加载下一个协程的状态,这个过程是在用户空间中完成的,因此开销更小

  3. 并发性:由于进程之间的资源是相互独立的,因此进程之间的并发性比较高,但进程之间的切换开销也比较大;线程之间共享进程资源,因此并发性比进程高,但线程之间的切换开销比进程小;协程是在用户空间中运行的,不需要操作系统的支持,因此切换开销很小,可以实现更高的并发性

  4. 编程模型:在进程和线程中,由于资源是独立的或共享的,因此编程模型比较复杂,需要考虑进程/线程之间的通信、同步等问题;而协程是在同一线程中运行的,因此编程模型相对简单,可以使用更简单的同步机制实现并发控制

综上可以简单概括为:
进程是操作系统中资源分配的基本单位,每个进程都有自己独立的内存空间和资源。线程是进程中的执行单位,线程之间共享进程的资源。协程是轻量级的线程,它与线程类似,也可以共享进程资源,但不同的是,协程是在用户空间中运行的,不需要操作系统的支持,因此占用的资源更少。进程之间的切换开销最大,线程之间的切换开销比进程小,协程之间的切换开销最小。并发性方面,进程之间并发性高,线程之间并发性比进程高,协程之间并发性最高。在编程模型方面,协程相对于进程和线程更简单,可以使用更简单的同步机制实现并发控制

相关文档

asyncio

  • asyncio的编程模型就是一个消息循环。从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,实现了异步IO
  • 协程对象及协程函数:协程对象是调用协程函数返回的对象
  • asyncio.iscoroutine(obj) 判断是不是协程对象
  • asyncio.iscoroutinefunction(func) 判断是不是协程函数
import asyncio
import time

def a():
    for i in range(3):
        time.sleep(0.1)
        print(i)
        yield

print(asyncio.iscoroutine(a()))  # True
print(asyncio.iscoroutinefunction(a))  # False

# 老语法
# DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
@asyncio.coroutine  
def b():
    for i in 'abc':
        time.sleep(0.1)
        print(i)
        yield

print(asyncio.iscoroutine(b()))  # True
print(asyncio.iscoroutinefunction(b))  # True

事件循环

事件循环是每个 asyncio 应用的核心。 事件循环会运行异步任务和回调,执行网络 IO 操作,以及运行子进程

  • asyncio.get_event_loop():返回一个事件循环对象,是asyncio.BaseEventLoop的实例
  • AbstractEventLoop.run_until_complete(future):运行直至Future对象运行完,返回Future的结果。参数可以是Future类或子类Task的对象。如果是协程对象也会被封装成Task对象

run_until_complete源码如下:

def run_until_complete(self, future):
    """Run until the Future is done.

    If the argument is a coroutine, it is wrapped in a Task.

    WARNING: It would be disastrous to call run_until_complete()
    with the same coroutine twice -- it would wrap it in two
    different Tasks and that can't be good.

    Return the Future's result, or raise its exception.
    """
    self._check_closed()
    self._check_running()

    new_task = not futures.isfuture(future)
    future = tasks.ensure_future(future, loop=self)
    if new_task:
        # An exception is raised if the future didn't complete, so there
        # is no need to log the "destroy pending task" message
        future._log_destroy_pending = False

    future.add_done_callback(_run_until_complete_cb)
    try:
        self.run_forever()
    except:
        if new_task and future.done() and not future.cancelled():
            # The coroutine raised a BaseException. Consume the exception
            # to not log a warning, the caller doesn't have access to the
            # local task.
            future.exception()
        raise
    finally:
        future.remove_done_callback(_run_until_complete_cb)
    if not future.done():
        raise RuntimeError('Event loop stopped before Future completed.')

    return future.result()

注意其中的代码:
future = tasks.ensure_future(future, loop=self)
future.add_done_callback(_run_until_complete_cb)

  • AbstractEventLoop.close():关闭事件循环
# 例1: run_until_complete
import asyncio

@asyncio.coroutine
def sleep(t):
    for i in range(t):
        print(i, '~~~~')
        yield from asyncio.sleep(1)

loop = asyncio.get_event_loop()
# 内部会调用ensure_future,内部会执行loop.run_forever()
loop.run_until_complete(sleep(3))
loop.close() 
# 例2: ensure_future
import asyncio

@asyncio.coroutine
def sleep(t):
    for i in range(t):
        print(i, '~~~~')
        yield from asyncio.sleep(1)
    return 1000

loop = asyncio.get_event_loop()
# 本质就是tasks的ensure_future,把协程包装进一个Future对象中,并使用create_task返回一个task
future = asyncio.ensure_future(sleep(3))
print(1, future, '+++++')   # pending
loop.run_until_complete(future)
print(2, future, '+++++')    # finished
loop.close()
print(3, future.result(), '------')
-----------------------输出结果------------------------
1 <Task pending name='Task-1' coro=<sleep() running at xxxx >> +++++
0 ~~~~
1 ~~~~
2 ~~~~
2 <Task finished name='Task-1' coro=<sleep() done, defined at xxx result=1000> +++++
3 1000 ------
  • future和task
    • 通过Future对象可以了解任务执行的状态数据。事件循环来监控Future对象是否完成
    • Task类是Future的子类,它的作用就是把协程包装成一个Future对象
  • ensure_future(coro_or_future, *, loop=None):如果参数已经是future直接返回;如果是协程,则使用loop.create_task()创建task,并返回task
# ensure_future部分源码
def ensure_future(coro_or_future, *, loop=None):
    """Wrap a coroutine or an awaitable in a future.

    If the argument is a Future, it is returned directly.
    """
    if coroutines.iscoroutine(coro_or_future):
        if loop is None:
            loop = events.get_event_loop()
        task = loop.create_task(coro_or_future)
        if task._source_traceback:
            del task._source_traceback[-1]
        return task
...
...
# 例3: create_task
import asyncio

@asyncio.coroutine
def sleep(t):
    for i in range(t):
        print(i, '~~~~')
        yield from asyncio.sleep(1)

loop = asyncio.get_event_loop()
task = loop.create_task(sleep(3))
loop.run_until_complete(task)
loop.close()

回调

  • future对象都可以调用add_done_callback(fn)增加回调函数,回调函数是单参的,参数就是future对象
  • run_until_complete函数的返回值就是其参数future对象的返回结果
import asyncio
import time

@asyncio.coroutine
def sleep(t):
    for i in range(t):
        print(i, '~~~~')
        yield from asyncio.sleep(1)
    return 1000

# 回调函数,参数必须是future
def cb(fut):
    print('in callback:{}'.format(fut))
    print(fut.result(), '========')

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(sleep(3))
print(1, future, '+++++')

future.add_done_callback(cb)  # 增加回调
print(2, future, '-----')

ret = loop.run_until_complete(future)
print('-' * 30)

loop.close()
time.sleep(5)
print(3, future.result())  # 拿return值
print(4, ret)

其中的增加了回调的future Task finished 后立马调用回调函数并拿到返回结果

多个任务

import asyncio

@asyncio.coroutine
def a():
    for i in range(3):
        print(i, 'in func a()')
        yield
    return 'a() finished'

@asyncio.coroutine
def b():
    for i in 'abcdefg':
        print(i, 'in func b()')
        yield
    return 'b() finished'

t1 = a()
t2 = b()
tasks = [t1, t2]

loop = asyncio.get_event_loop()

# asyncio.wait 会迭代列表中的对象并封装成{f1,f2},返回一个协程对象f
# 循环执行f,它内部等价yield from {f1,f2}
ret = loop.run_until_complete(asyncio.wait(tasks))
loop.close()

print(ret)
  • 使用回调
import asyncio

@asyncio.coroutine
def a():
    for i in range(3):
        print(i, 'in func a()')
        yield
    return 'a() finished'

@asyncio.coroutine
def b():
    for i in 'abcdefg':
        print(i, 'in func b()')
        yield
    return 'b() finished'

def cb(future):
    print(future.result(), '~~~~~~')

loop = asyncio.get_event_loop()

fs = set()
for t in (a(), b()):
    fut = asyncio.ensure_future(t)
    fut.add_done_callback(cb)
    fs.add(fut)

ret = loop.run_until_complete(asyncio.wait(fs))
loop.close()

print(ret)

asyncio新语法

上面的介绍只是用来理解协程,偏底层api,而且相关的语法每个版本都有更新和变动,而且一些api会在新的python版本中被弃用,因此建议使用新版本的api及更高层的api,避免使用底层的api

  1. async def
  2. 简单地调用一个协程并不会使其被调度执行
  • asyncio.run()
  • await 等待一个协程
  • asyncio.create_task()
  1. await + awaitable object
  • await后是可等待对象,有三种主要类型: 协程, 任务 和 Future
    • a coroutine function / a coroutine object
    • task:任务被用来“并行的”调度协程,当一个协程通过 asyncio.create_task() 等函数被封装为一个任务,该协程会被自动调度执行
    • Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的最终结果;当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕
  1. asyncio.create_task(coro, *, name=None):并发运行作为 asyncio 任务 的多个协程,将 coro 协程 封装为一个 Task 并调度其执行。返回 Task 对象
  2. asyncio.sleep(delay, result=None, *, loop=None):阻塞 delay 指定的秒数,sleep() 总是会挂起当前任务,以允许其他任务运行
  3. asyncio.gather(*aws, loop=None, return_exceptions=False):并发 运行 aws 序列中的 可等待对象
  4. asyncio.run(coro, *, debug=False):运行最高层级的入口点,此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次
  5. loop.run_in_executor(executor, func, *args):在指定的执行器中调用 func
# 例:改写上面的例子
import asyncio

async def a():
    for i in range(3):
        print(i, 'in func a()')
        await asyncio.sleep(0.1)
    return 'a() finished'

async def b():
    for i in 'abcdefg':
        print(i, 'in func b()')
        await asyncio.sleep(0.1)
    return 'b() finished'

def cb(future):
    print(future.result(), '~~~~~~')

async def main():
    fs = list()
    for t in (a(), b()):
        fut = asyncio.create_task(t)
        fut.add_done_callback(cb)
        fs.append(fut)
    ret = await asyncio.gather(*fs)
    return ret

ret = asyncio.run(main())
print(ret)

async def 函数中不能使用yield,如果需要做交替执行,可使用await

深入理解基于协程的异步⾼并发

  • 准备工作:使用fastapi搭建本地服务来模拟请求页面耗时uvicorn server:app --reload
# server.py
import time
from fastapi import FastAPI

app = FastAPI()

@app.get('/sleep/{num}')
def sleep(num: int):
    time.sleep(num)
    return {'success': True, 'msg': f'等待时间: {num}秒'}

串行执行

import time
import requests

def run_request():
    resp1 = requests.get('http://127.0.0.1:8000/sleep/3').json()
    print(resp1)
    resp1 = requests.get('http://127.0.0.1:8000/sleep/5').json()
    print(resp1)
    resp1 = requests.get('http://127.0.0.1:8000/sleep/8').json()
    print(resp1)

start = time.time()
run_request()
end = time.time()

print(f'请求3个页面,总共耗时:{end - start}')
---------------输出结果----------------
{'success': True, 'msg': '等待时间: 3秒'}
{'success': True, 'msg': '等待时间: 5秒'}
{'success': True, 'msg': '等待时间: 8秒'}
请求3个页面,总共耗时:16.050989151000977

asyncio调用顺序

import asyncio

async def i_am_async_func():
    print('我是一个异步函数')
    return 'msg'

async def i_will_call_you():
    print('我来调用异步函数')
    result = await i_am_async_func()
    print('异步函数返回的值是:', result)

async def main():
    print('我去调用那个调用异步函数的异步函数')
    await i_will_call_you()

# def main():
#     print('我去调用那个调用异步函数的异步函数')
#     asyncio.run(i_will_call_you())

if __name__ == '__main__':
    # main()
    asyncio.run(main())
----------输出结果-----------
我去调用那个调用异步函数的异步函数
我来调用异步函数
我是一个异步函数
异步函数返回的值是: msg
  • asyncio.run() 函数用来运行最高层级的入口点
  • await 用来调用协程函数,碰到 await ,程序就会阻塞在这,进入被调用的协程函数,执行完毕返回后再继续,而这也是 await 的字面意思

并发的协程

# 例1:未并发的协程
import asyncio
import httpx
import time

async def request(num):
    async with httpx.AsyncClient(timeout=20) as client:
        response = await client.get(f'http://127.0.0.1:8000/sleep/{num}')
        result = response.json()
        print(result)

async def main():
    start = time.time()
    await request(3)
    await request(5)
    await request(8)
    end = time.time()
    print(f'使用协程请求三个网址,耗时:{end - start}')

asyncio.run(main())
-----------------输出结果-----------------------
{'success': True, 'msg': '等待时间: 3秒'}
{'success': True, 'msg': '等待时间: 5秒'}
{'success': True, 'msg': '等待时间: 8秒'}
使用协程请求三个网址,耗时:16.131494283676147

await 是同步调用,进入被调用的协程函数,执行完毕返回后再继续
上面的示例相当于我们用异步接口写了个同步代码

# 例2:使用asyncio.create_task来并发
import asyncio
import httpx
import time

async def do_request(sleep_time):
    async with httpx.AsyncClient(timeout=20) as session:
        resp = await session.get(f'http://127.0.0.1:8000/sleep/{sleep_time}')
        print(resp.json())

async def main():
    start = time.time()
    # task1 = asyncio.create_task(do_request(3))
    # task2 = asyncio.create_task(do_request(5))
    # task3 = asyncio.create_task(do_request(8))
    # await task3  # 用耗时最长的任务防止退出函数
    # await task1
    # await task2
    # for i in range(10):
    #     await asyncio.sleep(1)  # 如果不知道谁耗时长,也可以用asyncio.sleep
    tasks = [asyncio.create_task(do_request(delay)) for delay in (3, 5, 8)]
    for task in tasks:
        await task
    end = time.time()
    print(f'使用协程请求三个网址,耗时:{end - start}')

asyncio.run(main())
-----------------输出结果------------------
{'success': True, 'msg': '等待时间: 3秒'}
{'success': True, 'msg': '等待时间: 5秒'}
{'success': True, 'msg': '等待时间: 8秒'}
使用协程请求三个网址,耗时:8.096357107162476
  • asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程
  • 当一个协程通过 asyncio.create_task() 等函数被封装为一个 任务,该协程会被自动调度执行
# 例3:使用asyncio.gather() 改写如上例子
import asyncio
import httpx
import time

async def request(sleep_time):
    async with httpx.AsyncClient(timeout=20) as session:
        resp = await session.get(f'http://127.0.0.1:8000/sleep/{sleep_time}')
        print(resp.json())

async def main():
    start = time.time()
    # task1 = request(3)
    # task2 = request(5)
    # task3 = request(8)
    # task_list = [task1, task2, task3]
    # await asyncio.gather(*task_list)
    tasks = [asyncio.create_task(request(delay)) for delay in (3, 5, 8)]
    await asyncio.gather(*tasks)
    end = time.time()
    print(f'使用协程请求三个网址,耗时:{end - start}')

asyncio.run(main())
---------------------输出结果-------------
{'success': True, 'msg': '等待时间: 3秒'}
{'success': True, 'msg': '等待时间: 5秒'}
{'success': True, 'msg': '等待时间: 8秒'}
使用协程请求三个网址,耗时:8.027670860290527

深入理解create_task

import asyncio
import httpx


async def req(delay):
    async with httpx.AsyncClient(timeout=20) as client:
        resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
        data = resp.json()
        print(data)


async def task1():
    print('进入task1里面')
    print(1, 'in task1')
    print(2, 'in task1')
    await req(3)
    print(3, 'task1 end')


async def task2():
    print('进入 task2')
    print(1, 'in task2')
    print(2, 'in task2')
    await req(5)
    print(3, 'task2 end')


async def task_without_await():
    print(1, '没有await的task')
    print(2, '没有await的task')


async def main():
    print('===1111111===')
    t1 = asyncio.create_task(task1())
    print('===2222222===')
    t = asyncio.create_task(task_without_await())
    t2 = asyncio.create_task(task2())
    # await t1
    print('===3333333===')
    # await t2
    # await asyncio.sleep(6)

asyncio.run(main())
--------------输出结果----------
===1111111===
===2222222===
===3333333===
进入task1里面
1 in task1
2 in task1
1 没有await的task
2 没有await的task
进入 task2
1 in task2
2 in task2

放开await t1 、await t2 和 await asyncio.sleep(6)再看看结果

  • asyncio.run()开启事件循环
  • asyncio.create_task() 会将task加入到事件循环中,并进入事件循环等待运行
  • 同步代码并不会阻塞,按照加入事件循环的顺序顺序执行
  • 遇到await 则会发生切换,交出控制权并发执行事件循环中未阻塞的协程

asyncio处理cpu密集型代码

  • 计算斐波那契数列示例
# 例1:cpu密集型
import time

def fib(n):
    if n in [1, 2]:
        return 1
    return fib(n - 1) + fib(n - 2)

start = time.time()
result = fib(36)
end = time.time()

print(f'计算斐波拉契数列第36项,耗时:{end - start}')
-----------------输出结果-----------------
计算斐波拉契数列第36项,耗时:3.686440944671631
  • 强行把 CPU 密集型代码放到 async 中
# 例2:强行将cpu密集型代码放入async中
import asyncio
import httpx
import time

async def fib(n):
    if n in [1, 2]:
        return 1
    return await fib(n - 1) + await fib(n - 2)

async def request(sleep_time):
    async with httpx.AsyncClient(timeout=20) as session:
        resp = await session.get(f'http://127.0.0.1:8000/sleep/{sleep_time}')
        print(resp.json())

async def main():
    start = time.time()
    task1 = fib(36)
    task2 = request(5)
    await asyncio.gather(task1, task2)
    end = time.time()
    print(f'强行把 CPU 密集型代码放到 async 中,耗时:{end - start}')

asyncio.run(main())
----------输出结果:反而执行时间增长了----------------
{'success': True, 'msg': '等待时间: 5秒'}
强行把 CPU 密集型代码放到 async 中,耗时:13.791552066802979

强行使用协程执行CPU密集型代码,执行时间反而增长了

  • 正确的处理cpu密集型任务
import time
import httpx
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def request(sleep_time):
    async with httpx.AsyncClient(timeout=20) as client:
        resp = await client.get(f'http://127.0.0.1:8000/sleep/{sleep_time}')
        resp_json = resp.json()
        print(resp_json)

def sync_calc_fib(n):
    if n in [1, 2]:
        return 1
    return sync_calc_fib(n - 1) + sync_calc_fib(n - 2)

def calc_fib(n):
    result = sync_calc_fib(n)
    print(f'第 {n} 项计算完成,结果是:{result}')
    return result

async def main():
    start = time.perf_counter()
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor(max_workers=4) as executor:
        tasks_list = [
            loop.run_in_executor(executor, calc_fib, 36),
            asyncio.create_task(request(5))
        ]
        await asyncio.gather(*tasks_list)
        end = time.perf_counter()
        print(f'总计耗时:{end - start}')

asyncio.run(main())
---------------输出结果---------------
第 36 项计算完成,结果是:14930352
{'success': True, 'msg': '等待时间: 5秒'}
总计耗时:6.703363878

至于concurrent包可进一步参看concurrent

补充:asyncio.gather中的return_exceptions参数

import asyncio

async def task1():
    await asyncio.sleep(1)
    return 1

async def task2():
    await asyncio.sleep(2)
    return 2 / 0

async def task3():
    await asyncio.sleep(3)
    return 3

async def main():
    task_1 = asyncio.create_task(task1())
    task_2 = asyncio.create_task(task2())
    task_3 = asyncio.create_task(task2())

    tasks = [task_1, task_2, task_3]

    await asyncio.sleep(2)
    task_3.cancel()

    ret = await asyncio.gather(*tasks, return_exceptions=True)  # 注:设置return_exceptions=True
    print(ret)

asyncio.run(main())
---------------输出结果------------
[1, ZeroDivisionError('division by zero'), CancelledError()]

协程应用场景

协程的主要应用场景是 IO 密集型任务,常见的使用场景:

  • 网络请求,aiohttp、gevent
  • 文件读取, aiofile
  • web 框架, aiohttp、fastapi
  • 数据库, aiomysql、asyncpg

参考及扩展阅读