asyncio
引子
#例1:串行执行
import time
def a():
for i in range(3):
time.sleep(0.1)
print(i)
def b():
for i in 'abc':
time.sleep(0.1)
print(i)
a()
b()
# 例2:使用生成器函数实现交替执行
import time
def a():
for i in range(3):
time.sleep(0.1)
print(i)
yield
def b():
for i in 'abc':
time.sleep(0.1)
print(i)
yield
x = a()
y = b()
for i in range(3):
next(x)
next(y)
例2是一个线程内通过生成器完成了调度,让两个函数都有机会执行,这样的调度不是操作系统的进程、线程完成的,而是由程序控制的,这样的程序需要两个条件:1. 需要使用yield来让出控制权;2. 需要循环帮助交替执行
协程
- 协程不是进程、也不是线程,它是用户空间调度的完成并发处理的方式
- 进程、线程由操作系统完成调度,而协程是线程内完成调度。它不需要更多的线程,没有多线程切换带来的开销
- 协程是非抢占式调度,只有一个协程主动让出控制权,另一个协程才会被调度
- 协程也不需要使用锁机制,因为是在同一个线程中执行
- 多CPU下,可以使用多进程和协程配合,既能进程并发又能发挥协程在单线程中的优势
- Python中协程是基于生成器的
- asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持
进程、线程和协程
- 进程:一个运行的程序(代码)就是一个进程,没有运行的代码叫程序。进程是系统资源分配的最小单位,进程拥有自己独立的内存空间,所有进程间数据不共享,开销大
- 线程:CPU调度执行的最小单位,也叫执行路径。线程不能独立存在,依赖进程存在,一个进程至少有一个线程,叫主线程,而多个线程共享内存(数据共享,共享全局变量),从而极大地提高了程序的运行效率
- 协程:是一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。协程调度时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操中栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快
【以下内容来自ChatGPT】
- Python中的进程线程和协程
-
资源占用:进程是操作系统中资源分配的基本单位,每个进程都有自己独立的内存空间、文件描述符等资源,因此进程之间的资源是相互独立的;线程是进程中的执行单位,线程之间共享进程的资源,包括内存空间、文件描述符等;协程则是轻量级的线程,它与线程类似,也可以共享进程资源,但不同的是,协程是在用户空间中运行的,不需要操作系统的支持,因此占用的资源更少
-
切换方式:在进程之间切换时,需要进行上下文切换,即保存当前进程的状态并加载下一个进程的状态,这是由操作系统实现的,因此上下文切换的开销比较大;在线程之间切换时,也需要进行上下文切换,但由于线程之间共享进程资源,因此上下文切换的开销比进程小;在协程之间切换时,不需要进行上下文切换,只需要保存当前协程的状态,然后加载下一个协程的状态,这个过程是在用户空间中完成的,因此开销更小
-
并发性:由于进程之间的资源是相互独立的,因此进程之间的并发性比较高,但进程之间的切换开销也比较大;线程之间共享进程资源,因此并发性比进程高,但线程之间的切换开销比进程小;协程是在用户空间中运行的,不需要操作系统的支持,因此切换开销很小,可以实现更高的并发性
-
编程模型:在进程和线程中,由于资源是独立的或共享的,因此编程模型比较复杂,需要考虑进程/线程之间的通信、同步等问题;而协程是在同一线程中运行的,因此编程模型相对简单,可以使用更简单的同步机制实现并发控制
综上可以简单概括为:
进程是操作系统中资源分配的基本单位,每个进程都有自己独立的内存空间和资源。线程是进程中的执行单位,线程之间共享进程的资源。协程是轻量级的线程,它与线程类似,也可以共享进程资源,但不同的是,协程是在用户空间中运行的,不需要操作系统的支持,因此占用的资源更少。进程之间的切换开销最大,线程之间的切换开销比进程小,协程之间的切换开销最小。并发性方面,进程之间并发性高,线程之间并发性比进程高,协程之间并发性最高。在编程模型方面,协程相对于进程和线程更简单,可以使用更简单的同步机制实现并发控制
相关文档
asyncio
- asyncio的编程模型就是一个消息循环。从asyncio模块中直接获取一个
EventLoop
的引用,然后把需要执行的协程扔到EventLoop
中执行,实现了异步IO - 协程对象及协程函数:协程对象是调用协程函数返回的对象
- asyncio.iscoroutine(obj) 判断是不是协程对象
- asyncio.iscoroutinefunction(func) 判断是不是协程函数
import asyncio
import time
def a():
for i in range(3):
time.sleep(0.1)
print(i)
yield
print(asyncio.iscoroutine(a())) # True
print(asyncio.iscoroutinefunction(a)) # False
# 老语法
# DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
@asyncio.coroutine
def b():
for i in 'abc':
time.sleep(0.1)
print(i)
yield
print(asyncio.iscoroutine(b())) # True
print(asyncio.iscoroutinefunction(b)) # True
事件循环
事件循环是每个 asyncio 应用的核心。 事件循环会运行异步任务和回调,执行网络 IO 操作,以及运行子进程
asyncio.get_event_loop()
:返回一个事件循环对象,是asyncio.BaseEventLoop的实例AbstractEventLoop.run_until_complete(future)
:运行直至Future对象运行完,返回Future的结果。参数可以是Future类或子类Task的对象。如果是协程对象也会被封装成Task对象
run_until_complete源码如下:
def run_until_complete(self, future):
"""Run until the Future is done.
If the argument is a coroutine, it is wrapped in a Task.
WARNING: It would be disastrous to call run_until_complete()
with the same coroutine twice -- it would wrap it in two
different Tasks and that can't be good.
Return the Future's result, or raise its exception.
"""
self._check_closed()
self._check_running()
new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
# An exception is raised if the future didn't complete, so there
# is no need to log the "destroy pending task" message
future._log_destroy_pending = False
future.add_done_callback(_run_until_complete_cb)
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
# The coroutine raised a BaseException. Consume the exception
# to not log a warning, the caller doesn't have access to the
# local task.
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')
return future.result()
注意其中的代码:
future = tasks.ensure_future(future, loop=self)
future.add_done_callback(_run_until_complete_cb)
AbstractEventLoop.close()
:关闭事件循环
# 例1: run_until_complete
import asyncio
@asyncio.coroutine
def sleep(t):
for i in range(t):
print(i, '~~~~')
yield from asyncio.sleep(1)
loop = asyncio.get_event_loop()
# 内部会调用ensure_future,内部会执行loop.run_forever()
loop.run_until_complete(sleep(3))
loop.close()
# 例2: ensure_future
import asyncio
@asyncio.coroutine
def sleep(t):
for i in range(t):
print(i, '~~~~')
yield from asyncio.sleep(1)
return 1000
loop = asyncio.get_event_loop()
# 本质就是tasks的ensure_future,把协程包装进一个Future对象中,并使用create_task返回一个task
future = asyncio.ensure_future(sleep(3))
print(1, future, '+++++') # pending
loop.run_until_complete(future)
print(2, future, '+++++') # finished
loop.close()
print(3, future.result(), '------')
-----------------------输出结果------------------------
1 <Task pending name='Task-1' coro=<sleep() running at xxxx >> +++++
0 ~~~~
1 ~~~~
2 ~~~~
2 <Task finished name='Task-1' coro=<sleep() done, defined at xxx result=1000> +++++
3 1000 ------
- future和task
- 通过Future对象可以了解任务执行的状态数据。事件循环来监控Future对象是否完成
- Task类是Future的子类,它的作用就是把协程包装成一个Future对象
ensure_future(coro_or_future, *, loop=None)
:如果参数已经是future直接返回;如果是协程,则使用loop.create_task()创建task,并返回task
# ensure_future部分源码
def ensure_future(coro_or_future, *, loop=None):
"""Wrap a coroutine or an awaitable in a future.
If the argument is a Future, it is returned directly.
"""
if coroutines.iscoroutine(coro_or_future):
if loop is None:
loop = events.get_event_loop()
task = loop.create_task(coro_or_future)
if task._source_traceback:
del task._source_traceback[-1]
return task
...
...
# 例3: create_task
import asyncio
@asyncio.coroutine
def sleep(t):
for i in range(t):
print(i, '~~~~')
yield from asyncio.sleep(1)
loop = asyncio.get_event_loop()
task = loop.create_task(sleep(3))
loop.run_until_complete(task)
loop.close()
回调
- future对象都可以调用
add_done_callback(fn)
增加回调函数,回调函数是单参的,参数就是future对象 - run_until_complete函数的返回值就是其参数future对象的返回结果
import asyncio
import time
@asyncio.coroutine
def sleep(t):
for i in range(t):
print(i, '~~~~')
yield from asyncio.sleep(1)
return 1000
# 回调函数,参数必须是future
def cb(fut):
print('in callback:{}'.format(fut))
print(fut.result(), '========')
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(sleep(3))
print(1, future, '+++++')
future.add_done_callback(cb) # 增加回调
print(2, future, '-----')
ret = loop.run_until_complete(future)
print('-' * 30)
loop.close()
time.sleep(5)
print(3, future.result()) # 拿return值
print(4, ret)
其中的增加了回调的future Task finished 后立马调用回调函数并拿到返回结果
多个任务
import asyncio
@asyncio.coroutine
def a():
for i in range(3):
print(i, 'in func a()')
yield
return 'a() finished'
@asyncio.coroutine
def b():
for i in 'abcdefg':
print(i, 'in func b()')
yield
return 'b() finished'
t1 = a()
t2 = b()
tasks = [t1, t2]
loop = asyncio.get_event_loop()
# asyncio.wait 会迭代列表中的对象并封装成{f1,f2},返回一个协程对象f
# 循环执行f,它内部等价yield from {f1,f2}
ret = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print(ret)
- 使用回调
import asyncio
@asyncio.coroutine
def a():
for i in range(3):
print(i, 'in func a()')
yield
return 'a() finished'
@asyncio.coroutine
def b():
for i in 'abcdefg':
print(i, 'in func b()')
yield
return 'b() finished'
def cb(future):
print(future.result(), '~~~~~~')
loop = asyncio.get_event_loop()
fs = set()
for t in (a(), b()):
fut = asyncio.ensure_future(t)
fut.add_done_callback(cb)
fs.add(fut)
ret = loop.run_until_complete(asyncio.wait(fs))
loop.close()
print(ret)
asyncio新语法
上面的介绍只是用来理解协程,偏底层api,而且相关的语法每个版本都有更新和变动,而且一些api会在新的python版本中被弃用,因此建议使用新版本的api及更高层的api,避免使用底层的api
- async def
- 简单地调用一个协程并不会使其被调度执行
- asyncio.run()
- await 等待一个协程
- asyncio.create_task()
- await + awaitable object
- await后是可等待对象,有三种主要类型: 协程, 任务 和 Future
- a coroutine function / a coroutine object
- task:任务被用来“并行的”调度协程,当一个协程通过 asyncio.create_task() 等函数被封装为一个任务,该协程会被自动调度执行
- Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的最终结果;当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕
asyncio.create_task(coro, *, name=None)
:并发运行作为 asyncio 任务 的多个协程,将 coro 协程 封装为一个 Task 并调度其执行。返回 Task 对象asyncio.sleep(delay, result=None, *, loop=None)
:阻塞 delay 指定的秒数,sleep() 总是会挂起当前任务,以允许其他任务运行asyncio.gather(*aws, loop=None, return_exceptions=False)
:并发 运行 aws 序列中的 可等待对象asyncio.run(coro, *, debug=False)
:运行最高层级的入口点,此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次loop.run_in_executor(executor, func, *args)
:在指定的执行器中调用 func
# 例:改写上面的例子
import asyncio
async def a():
for i in range(3):
print(i, 'in func a()')
await asyncio.sleep(0.1)
return 'a() finished'
async def b():
for i in 'abcdefg':
print(i, 'in func b()')
await asyncio.sleep(0.1)
return 'b() finished'
def cb(future):
print(future.result(), '~~~~~~')
async def main():
fs = list()
for t in (a(), b()):
fut = asyncio.create_task(t)
fut.add_done_callback(cb)
fs.append(fut)
ret = await asyncio.gather(*fs)
return ret
ret = asyncio.run(main())
print(ret)
async def 函数中不能使用yield,如果需要做交替执行,可使用await
深入理解基于协程的异步⾼并发
- 准备工作:使用fastapi搭建本地服务来模拟请求页面耗时
uvicorn server:app --reload
# server.py
import time
from fastapi import FastAPI
app = FastAPI()
@app.get('/sleep/{num}')
def sleep(num: int):
time.sleep(num)
return {'success': True, 'msg': f'等待时间: {num}秒'}
串行执行
import time
import requests
def run_request():
resp1 = requests.get('http://127.0.0.1:8000/sleep/3').json()
print(resp1)
resp1 = requests.get('http://127.0.0.1:8000/sleep/5').json()
print(resp1)
resp1 = requests.get('http://127.0.0.1:8000/sleep/8').json()
print(resp1)
start = time.time()
run_request()
end = time.time()
print(f'请求3个页面,总共耗时:{end - start}')
---------------输出结果----------------
{'success': True, 'msg': '等待时间: 3秒'}
{'success': True, 'msg': '等待时间: 5秒'}
{'success': True, 'msg': '等待时间: 8秒'}
请求3个页面,总共耗时:16.050989151000977
asyncio调用顺序
import asyncio
async def i_am_async_func():
print('我是一个异步函数')
return 'msg'
async def i_will_call_you():
print('我来调用异步函数')
result = await i_am_async_func()
print('异步函数返回的值是:', result)
async def main():
print('我去调用那个调用异步函数的异步函数')
await i_will_call_you()
# def main():
# print('我去调用那个调用异步函数的异步函数')
# asyncio.run(i_will_call_you())
if __name__ == '__main__':
# main()
asyncio.run(main())
----------输出结果-----------
我去调用那个调用异步函数的异步函数
我来调用异步函数
我是一个异步函数
异步函数返回的值是: msg
asyncio.run()
函数用来运行最高层级的入口点await
用来调用协程函数,碰到await
,程序就会阻塞在这,进入被调用的协程函数,执行完毕返回后再继续,而这也是 await 的字面意思
并发的协程
# 例1:未并发的协程
import asyncio
import httpx
import time
async def request(num):
async with httpx.AsyncClient(timeout=20) as client:
response = await client.get(f'http://127.0.0.1:8000/sleep/{num}')
result = response.json()
print(result)
async def main():
start = time.time()
await request(3)
await request(5)
await request(8)
end = time.time()
print(f'使用协程请求三个网址,耗时:{end - start}')
asyncio.run(main())
-----------------输出结果-----------------------
{'success': True, 'msg': '等待时间: 3秒'}
{'success': True, 'msg': '等待时间: 5秒'}
{'success': True, 'msg': '等待时间: 8秒'}
使用协程请求三个网址,耗时:16.131494283676147
await 是同步调用,进入被调用的协程函数,执行完毕返回后再继续
上面的示例相当于我们用异步接口写了个同步代码
# 例2:使用asyncio.create_task来并发
import asyncio
import httpx
import time
async def do_request(sleep_time):
async with httpx.AsyncClient(timeout=20) as session:
resp = await session.get(f'http://127.0.0.1:8000/sleep/{sleep_time}')
print(resp.json())
async def main():
start = time.time()
# task1 = asyncio.create_task(do_request(3))
# task2 = asyncio.create_task(do_request(5))
# task3 = asyncio.create_task(do_request(8))
# await task3 # 用耗时最长的任务防止退出函数
# await task1
# await task2
# for i in range(10):
# await asyncio.sleep(1) # 如果不知道谁耗时长,也可以用asyncio.sleep
tasks = [asyncio.create_task(do_request(delay)) for delay in (3, 5, 8)]
for task in tasks:
await task
end = time.time()
print(f'使用协程请求三个网址,耗时:{end - start}')
asyncio.run(main())
-----------------输出结果------------------
{'success': True, 'msg': '等待时间: 3秒'}
{'success': True, 'msg': '等待时间: 5秒'}
{'success': True, 'msg': '等待时间: 8秒'}
使用协程请求三个网址,耗时:8.096357107162476
asyncio.create_task()
函数用来并发运行作为 asyncio 任务 的多个协程- 当一个协程通过
asyncio.create_task()
等函数被封装为一个 任务,该协程会被自动调度执行
# 例3:使用asyncio.gather() 改写如上例子
import asyncio
import httpx
import time
async def request(sleep_time):
async with httpx.AsyncClient(timeout=20) as session:
resp = await session.get(f'http://127.0.0.1:8000/sleep/{sleep_time}')
print(resp.json())
async def main():
start = time.time()
# task1 = request(3)
# task2 = request(5)
# task3 = request(8)
# task_list = [task1, task2, task3]
# await asyncio.gather(*task_list)
tasks = [asyncio.create_task(request(delay)) for delay in (3, 5, 8)]
await asyncio.gather(*tasks)
end = time.time()
print(f'使用协程请求三个网址,耗时:{end - start}')
asyncio.run(main())
---------------------输出结果-------------
{'success': True, 'msg': '等待时间: 3秒'}
{'success': True, 'msg': '等待时间: 5秒'}
{'success': True, 'msg': '等待时间: 8秒'}
使用协程请求三个网址,耗时:8.027670860290527
深入理解create_task
import asyncio
import httpx
async def req(delay):
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
data = resp.json()
print(data)
async def task1():
print('进入task1里面')
print(1, 'in task1')
print(2, 'in task1')
await req(3)
print(3, 'task1 end')
async def task2():
print('进入 task2')
print(1, 'in task2')
print(2, 'in task2')
await req(5)
print(3, 'task2 end')
async def task_without_await():
print(1, '没有await的task')
print(2, '没有await的task')
async def main():
print('===1111111===')
t1 = asyncio.create_task(task1())
print('===2222222===')
t = asyncio.create_task(task_without_await())
t2 = asyncio.create_task(task2())
# await t1
print('===3333333===')
# await t2
# await asyncio.sleep(6)
asyncio.run(main())
--------------输出结果----------
===1111111===
===2222222===
===3333333===
进入task1里面
1 in task1
2 in task1
1 没有await的task
2 没有await的task
进入 task2
1 in task2
2 in task2
放开await t1 、await t2 和 await asyncio.sleep(6)再看看结果
asyncio.run()
开启事件循环asyncio.create_task()
会将task加入到事件循环中,并进入事件循环等待运行- 同步代码并不会阻塞,按照加入事件循环的顺序顺序执行
- 遇到
await
则会发生切换,交出控制权,并发执行事件循环中未阻塞的协程
asyncio处理cpu密集型代码
- 计算斐波那契数列示例
# 例1:cpu密集型
import time
def fib(n):
if n in [1, 2]:
return 1
return fib(n - 1) + fib(n - 2)
start = time.time()
result = fib(36)
end = time.time()
print(f'计算斐波拉契数列第36项,耗时:{end - start}')
-----------------输出结果-----------------
计算斐波拉契数列第36项,耗时:3.686440944671631
- 强行把 CPU 密集型代码放到 async 中
# 例2:强行将cpu密集型代码放入async中
import asyncio
import httpx
import time
async def fib(n):
if n in [1, 2]:
return 1
return await fib(n - 1) + await fib(n - 2)
async def request(sleep_time):
async with httpx.AsyncClient(timeout=20) as session:
resp = await session.get(f'http://127.0.0.1:8000/sleep/{sleep_time}')
print(resp.json())
async def main():
start = time.time()
task1 = fib(36)
task2 = request(5)
await asyncio.gather(task1, task2)
end = time.time()
print(f'强行把 CPU 密集型代码放到 async 中,耗时:{end - start}')
asyncio.run(main())
----------输出结果:反而执行时间增长了----------------
{'success': True, 'msg': '等待时间: 5秒'}
强行把 CPU 密集型代码放到 async 中,耗时:13.791552066802979
强行使用协程执行CPU密集型代码,执行时间反而增长了
- 正确的处理cpu密集型任务
import time
import httpx
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def request(sleep_time):
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get(f'http://127.0.0.1:8000/sleep/{sleep_time}')
resp_json = resp.json()
print(resp_json)
def sync_calc_fib(n):
if n in [1, 2]:
return 1
return sync_calc_fib(n - 1) + sync_calc_fib(n - 2)
def calc_fib(n):
result = sync_calc_fib(n)
print(f'第 {n} 项计算完成,结果是:{result}')
return result
async def main():
start = time.perf_counter()
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=4) as executor:
tasks_list = [
loop.run_in_executor(executor, calc_fib, 36),
asyncio.create_task(request(5))
]
await asyncio.gather(*tasks_list)
end = time.perf_counter()
print(f'总计耗时:{end - start}')
asyncio.run(main())
---------------输出结果---------------
第 36 项计算完成,结果是:14930352
{'success': True, 'msg': '等待时间: 5秒'}
总计耗时:6.703363878
至于concurrent包可进一步参看concurrent
补充:asyncio.gather中的return_exceptions参数
import asyncio
async def task1():
await asyncio.sleep(1)
return 1
async def task2():
await asyncio.sleep(2)
return 2 / 0
async def task3():
await asyncio.sleep(3)
return 3
async def main():
task_1 = asyncio.create_task(task1())
task_2 = asyncio.create_task(task2())
task_3 = asyncio.create_task(task2())
tasks = [task_1, task_2, task_3]
await asyncio.sleep(2)
task_3.cancel()
ret = await asyncio.gather(*tasks, return_exceptions=True) # 注:设置return_exceptions=True
print(ret)
asyncio.run(main())
---------------输出结果------------
[1, ZeroDivisionError('division by zero'), CancelledError()]
协程应用场景
协程的主要应用场景是 IO 密集型任务,常见的使用场景:
- 网络请求,aiohttp、gevent
- 文件读取, aiofile
- web 框架, aiohttp、fastapi
- 数据库, aiomysql、asyncpg