concurrent.futures
在之前的文章介绍过多线程、多进程、进程池、GIL、协程,先进行一下回顾
回顾
- 串行
import time
def calc():
res = 0
for _ in range(100000000):
res += 1
return res
if __name__ == '__main__':
start_time = time.time()
calc()
calc()
calc()
end_time = time.time()
print(f"耗时{end_time - start_time}")
- 多线程
import threading
import time
def calc():
res = 0
for _ in range(100000000):
res += 1
return res
if __name__ == '__main__':
start_time = time.time()
ts = []
for i in range(3):
t = threading.Thread(target=calc)
ts.append(t)
t.start()
for t in ts:
t.join()
end_time = time.time()
print(f"耗时{end_time-start_time}")
- 多进程
import multiprocessing
import time
def calc():
res = 0
for _ in range(100000000):
res += 1
return res
if __name__ == '__main__':
start_time = time.time()
ps = []
for i in range(3):
p = multiprocessing.Process(target=calc)
ps.append(p)
p.start()
for p in ps:
p.join()
end_time = time.time()
print(f"耗时{end_time - start_time}")
- 线程池
import logging
import multiprocessing
import time
logging.basicConfig(level=logging.INFO)
def calc():
res = 0
for _ in range(100000000):
res += 1
return res
if __name__ == '__main__':
start_time = time.time()
pool = multiprocessing.Pool(3)
ret_list = []
for i in range(3):
# ret = pool.apply(calc) # 同步阻塞
# ret = pool.apply_async(calc) # 异步非阻塞,但这种方式不能拿到calc函数返回结果
ret = pool.apply_async(calc, callback=lambda ret: logging.info(
'{} in callback'.format(ret))) # 异步非阻塞并callback,拿到calc函数返回结果
logging.info(f'{ret}~~~~~~~~')
ret_list.append(ret)
pool.close()
pool.join()
# 通过get拿到返回值
for ret in ret_list:
logging.info(f"{ret.get()}++++++++++++")
end_time = time.time()
print(f"耗时{end_time - start_time}")
- 协程
import asyncio
import time
async def calc():
res = 0
for _ in range(100000000):
res += 1
return res
async def main():
tasks = [asyncio.create_task(calc()) for i in range(3)]
await asyncio.gather(*tasks)
if __name__ == '__main__':
start_time = time.time()
asyncio.run(main())
end_time = time.time()
print(f"耗时{end_time - start_time}")
concurrent.futures
异步并行任务编程模块,提供一个高级的异步可执行的便利接口,提供了2个池执行器
-
ThreadPoolExecutor
:异步调用的线程池的Executor -
ProcessPoolExecutor
:异步调用的进程池的Executor -
常用方法:
ThreadPoolExecutor(max_workers=None)
:池中至多创建max_workers个线程的池来同时异步执行,返回Executor实例submit(fn, *args, **kwargs)
:提交执行的函数及其参数,返回Future类的实例shutdown(wait=True)
:清理池
-
Future类常用方法
done()
:如果调用被成功的取消或者执行完成,返回Trueresult(timeout=None)
:取返回的结果,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError 异常
-
例:线程池执行器
import logging
import threading
import time
from concurrent.futures import ThreadPoolExecutor
FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
def work(n):
logging.info('begin to work-{}'.format(n))
time.sleep(5)
logging.info('finished {}'.format(n))
if __name__ == '__main__':
executor = ThreadPoolExecutor(max_workers=3)
fs = []
for i in range(3):
future = executor.submit(work, i)
fs.append(future)
for i in range(3, 6):
future = executor.submit(work, i)
fs.append(future)
while True:
time.sleep(2)
logging.info(threading.enumerate())
flag = True
for f in fs: # 判断是否还有未完成的任务
logging.info(f.done())
flag = flag and f.done()
# if not flag: # 注释了这个if,输出的日志看的清楚些
# break
print('-' * 30)
if flag:
executor.shutdown()
logging.info(threading.enumerate())
break
- 进程池执行器,方法基本与上面的例子一致,将ThreadPoolExecutor换成ProcessPoolExecutor即可
import logging
import threading
import time
from concurrent.futures import ProcessPoolExecutor
FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
def work(n):
logging.info('begin to work-{}'.format(n))
time.sleep(5)
logging.info('finished {}'.format(n))
if __name__ == '__main__':
executor = ProcessPoolExecutor(max_workers=3)
fs = []
for i in range(3):
future = executor.submit(work, i)
fs.append(future)
for i in range(3, 6):
future = executor.submit(work, i)
fs.append(future)
while True:
time.sleep(2)
logging.info(threading.enumerate())
flag = True
for f in fs: # 判断是否还有未完成的任务
logging.info(f.done())
flag = flag and f.done()
# if not flag: # 注释了这个if,输出的日志看的清楚些
# break
print('-' * 30)
if flag:
executor.shutdown()
# logging.info(threading.enumerate())
break
- 支持上下文管理with
# 源码
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
# 使用上下文管理改写
import logging
import threading
import time
from concurrent.futures import ProcessPoolExecutor
FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
def work(n):
logging.info('begin to work-{}'.format(n))
time.sleep(5)
logging.info('finished {}'.format(n))
if __name__ == '__main__':
# executor = ProcessPoolExecutor(max_workers=3)
with ProcessPoolExecutor(max_workers=3) as executor:
fs = []
for i in range(3):
future = executor.submit(work, i)
fs.append(future)
for i in range(3, 6):
future = executor.submit(work, i)
fs.append(future)
while True:
time.sleep(2)
logging.info(threading.enumerate())
flag = True
for f in fs: # 判断是否还有未完成的任务
logging.info(f.done())
flag = flag and f.done()
# if not flag: # 注释了这个if,输出的日志看的清楚些
# break
print('-' * 30)
if flag:
# executor.shutdown()
# logging.info(threading.enumerate())
break