协程同步 asyncio.Semaphore
概述
通过之前的文章asyncio协程的介绍,对协程有了一定的了解,按照套路,可以很简单的写出如下示例(注:还是使用本地的server)
import time
import asyncio
import httpx
async def req(delay):
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
result = resp.json()
print(result)
async def main():
start = time.time()
delay_list = [1, 3, 5, 9, 8, 4, 2, 6, 7, 3, 8, 6]
tasks = [asyncio.create_task(req(i)) for i in delay_list]
await asyncio.gather(*tasks)
end = time.time()
print(f"一共耗时:{end-start}")
asyncio.run(main())
控制协程任务数
如果这是一个爬虫项目,很多网站对有一些反爬虫的机制,如最多同时发起三个请求,那怎么处理呢?
方案1
import time
import asyncio
import httpx
async def req(delay):
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
result = resp.json()
print(result)
async def main():
start = time.time()
delay_list = [1, 3, 5, 9, 8, 4, 2, 6, 7, 3, 8, 6]
task_list = []
while True:
if not delay_list and not task_list:
break
while len(task_list) < 3:
if delay_list:
delay = delay_list.pop()
task = asyncio.create_task(req(delay))
task_list.append(task)
else:
break
task_list = [task for task in task_list if not task.done()]
await asyncio.sleep(0.00001)
end = time.time()
print(f"一共耗时:{end-start}")
asyncio.run(main())
方案2:asyncio.Semaphore()
import time
import asyncio
import httpx
async def req(delay, sem):
async with sem:
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
result = resp.json()
print(result)
async def main():
start = time.time()
delay_list = [1, 3, 5, 9, 8, 4, 2, 6, 7, 3, 8, 6]
sem = asyncio.Semaphore(3)
tasks = [asyncio.create_task(req(delay, sem)) for delay in delay_list]
await asyncio.gather(*tasks)
end = time.time()
print(f"一共耗时:{end-start}")
asyncio.run(main())
控制一定时间内运行的协程数
在并发的协程里面加个 asyncio.sleep ,例如上面的例子,我想限制每分钟只能有3个协程,那么可以把代码改为:
async def req(delay, sem):
async with sem:
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
result = resp.json()
print(result)
await asyncio.sleep(60)
如果程序中有多个不同的部分,有些部分限制并发数为 a,有些部分限制并发数为 b。可以初始化多个Semaphore对象,分别传给不同的协程即可