concurrent.futures

在之前的文章介绍过多线程多进程进程池GIL协程,先进行一下回顾

回顾

  • 串行
import time

def calc():
    res = 0
    for _ in range(100000000):
        res += 1
    return res

if __name__ == '__main__':
    start_time = time.time()

    calc()
    calc()
    calc()

    end_time = time.time()
    print(f"耗时{end_time - start_time}")
  • 多线程
import threading
import time

def calc():
    res = 0
    for _ in range(100000000):
        res += 1
    return res

if __name__ == '__main__':
    start_time = time.time()

    ts = []
    for i in range(3):
        t = threading.Thread(target=calc)
        ts.append(t)
        t.start()

    for t in ts:
        t.join()

    end_time = time.time()
    print(f"耗时{end_time-start_time}")
  • 多进程
import multiprocessing
import time

def calc():
    res = 0
    for _ in range(100000000):
        res += 1
    return res

if __name__ == '__main__':
    start_time = time.time()

    ps = []
    for i in range(3):
        p = multiprocessing.Process(target=calc)
        ps.append(p)
        p.start()

    for p in ps:
        p.join()

    end_time = time.time()
    print(f"耗时{end_time - start_time}")
  • 线程池
import logging
import multiprocessing
import time

logging.basicConfig(level=logging.INFO)

def calc():
    res = 0
    for _ in range(100000000):
        res += 1
    return res

if __name__ == '__main__':
    start_time = time.time()

    pool = multiprocessing.Pool(3)
    ret_list = []
    for i in range(3):
        # ret = pool.apply(calc)  # 同步阻塞
        # ret = pool.apply_async(calc)  # 异步非阻塞,但这种方式不能拿到calc函数返回结果

        ret = pool.apply_async(calc, callback=lambda ret: logging.info(
            '{} in callback'.format(ret)))  # 异步非阻塞并callback,拿到calc函数返回结果
        logging.info(f'{ret}~~~~~~~~')
        ret_list.append(ret)

    pool.close()
    pool.join()
    # 通过get拿到返回值
    for ret in ret_list:
        logging.info(f"{ret.get()}++++++++++++")

    end_time = time.time()
    print(f"耗时{end_time - start_time}")
  • 协程
import asyncio
import time

async def calc():
    res = 0
    for _ in range(100000000):
        res += 1
    return res

async def main():
    tasks = [asyncio.create_task(calc()) for i in range(3)]
    await asyncio.gather(*tasks)

if __name__ == '__main__':
    start_time = time.time()
    asyncio.run(main())
    end_time = time.time()
    print(f"耗时{end_time - start_time}")

concurrent.futures

异步并行任务编程模块,提供一个高级的异步可执行的便利接口,提供了2个池执行器

  • ThreadPoolExecutor:异步调用的线程池的Executor

  • ProcessPoolExecutor:异步调用的进程池的Executor

  • 常用方法:

    • ThreadPoolExecutor(max_workers=None):池中至多创建max_workers个线程的池来同时异步执行,返回Executor实例
    • submit(fn, *args, **kwargs):提交执行的函数及其参数,返回Future类的实例
    • shutdown(wait=True):清理池
  • Future类常用方法

    • done():如果调用被成功的取消或者执行完成,返回True
    • result(timeout=None):取返回的结果,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError 异常
  • 例:线程池执行器

import logging
import threading
import time
from concurrent.futures import ThreadPoolExecutor

FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


def work(n):
    logging.info('begin to work-{}'.format(n))
    time.sleep(5)
    logging.info('finished {}'.format(n))


if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=3)

    fs = []
    for i in range(3):
        future = executor.submit(work, i)
        fs.append(future)

    for i in range(3, 6):
        future = executor.submit(work, i)
        fs.append(future)

    while True:
        time.sleep(2)
        logging.info(threading.enumerate())

        flag = True
        for f in fs:  # 判断是否还有未完成的任务
            logging.info(f.done())
            flag = flag and f.done()
            # if not flag: # 注释了这个if,输出的日志看的清楚些
            # break
        print('-' * 30)
        if flag:
            executor.shutdown()
            logging.info(threading.enumerate())
            break
  • 进程池执行器,方法基本与上面的例子一致,将ThreadPoolExecutor换成ProcessPoolExecutor即可
import logging
import threading
import time
from concurrent.futures import ProcessPoolExecutor

FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


def work(n):
    logging.info('begin to work-{}'.format(n))
    time.sleep(5)
    logging.info('finished {}'.format(n))


if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=3)

    fs = []
    for i in range(3):
        future = executor.submit(work, i)
        fs.append(future)

    for i in range(3, 6):
        future = executor.submit(work, i)
        fs.append(future)

    while True:
        time.sleep(2)
        logging.info(threading.enumerate())

        flag = True
        for f in fs:  # 判断是否还有未完成的任务
            logging.info(f.done())
            flag = flag and f.done()
            # if not flag: # 注释了这个if,输出的日志看的清楚些
            # break
        print('-' * 30)
        if flag:
            executor.shutdown()
            # logging.info(threading.enumerate())
            break
  • 支持上下文管理with
# 源码
def __enter__(self):
    return self

def __exit__(self, exc_type, exc_val, exc_tb):
    self.shutdown(wait=True)
    return False
# 使用上下文管理改写
import logging
import threading
import time
from concurrent.futures import ProcessPoolExecutor

FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


def work(n):
    logging.info('begin to work-{}'.format(n))
    time.sleep(5)
    logging.info('finished {}'.format(n))


if __name__ == '__main__':
    # executor = ProcessPoolExecutor(max_workers=3)
    with ProcessPoolExecutor(max_workers=3) as executor:
        fs = []
        for i in range(3):
            future = executor.submit(work, i)
            fs.append(future)

        for i in range(3, 6):
            future = executor.submit(work, i)
            fs.append(future)

        while True:
            time.sleep(2)
            logging.info(threading.enumerate())

            flag = True
            for f in fs:  # 判断是否还有未完成的任务
                logging.info(f.done())
                flag = flag and f.done()
                # if not flag: # 注释了这个if,输出的日志看的清楚些
                # break
            print('-' * 30)
            if flag:
                # executor.shutdown()
                # logging.info(threading.enumerate())
                break

参看及扩展阅读