协程同步 asyncio.Semaphore

概述

通过之前的文章asyncio协程的介绍,对协程有了一定的了解,按照套路,可以很简单的写出如下示例(注:还是使用本地的server)

import time
import asyncio
import httpx

async def req(delay):
    async with httpx.AsyncClient(timeout=20) as client:
        resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
        result = resp.json()
        print(result)

async def main():
    start = time.time()
    delay_list = [1, 3, 5, 9, 8, 4, 2, 6, 7, 3, 8, 6]
    tasks = [asyncio.create_task(req(i)) for i in delay_list]
    await asyncio.gather(*tasks)
    end = time.time()
    print(f"一共耗时:{end-start}")

asyncio.run(main())

控制协程任务数

如果这是一个爬虫项目,很多网站对有一些反爬虫的机制,如最多同时发起三个请求,那怎么处理呢?

方案1

import time
import asyncio
import httpx

async def req(delay):
    async with httpx.AsyncClient(timeout=20) as client:
        resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
        result = resp.json()
        print(result)

async def main():
    start = time.time()
    delay_list = [1, 3, 5, 9, 8, 4, 2, 6, 7, 3, 8, 6]
    task_list = []

    while True:
        if not delay_list and not task_list:
            break
        while len(task_list) < 3:
            if delay_list:
                delay = delay_list.pop()
                task = asyncio.create_task(req(delay))
                task_list.append(task)
            else:
                break
        task_list = [task for task in task_list if not task.done()]
        await asyncio.sleep(0.00001)
    end = time.time()
    print(f"一共耗时:{end-start}")

asyncio.run(main())

方案2:asyncio.Semaphore()

import time
import asyncio
import httpx

async def req(delay, sem):
    async with sem:
        async with httpx.AsyncClient(timeout=20) as client:
            resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
            result = resp.json()
            print(result)

async def main():
    start = time.time()
    delay_list = [1, 3, 5, 9, 8, 4, 2, 6, 7, 3, 8, 6]

    sem = asyncio.Semaphore(3)
    tasks = [asyncio.create_task(req(delay, sem)) for delay in delay_list]
    await asyncio.gather(*tasks)

    end = time.time()
    print(f"一共耗时:{end-start}")

asyncio.run(main())

控制一定时间内运行的协程数

在并发的协程里面加个 asyncio.sleep ,例如上面的例子,我想限制每分钟只能有3个协程,那么可以把代码改为:

async def req(delay, sem):
    async with sem:
        async with httpx.AsyncClient(timeout=20) as client:
            resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
            result = resp.json()
            print(result)
    await asyncio.sleep(60)

如果程序中有多个不同的部分,有些部分限制并发数为 a,有些部分限制并发数为 b。可以初始化多个Semaphore对象,分别传给不同的协程即可

参考及扩展阅读