进程、多进程
进程同步
进程池

概述

在之前的文章全局解释器锁GIL中我们知道由于GIL全局解释器锁存在,多线程对于CPU密集型程序并没有任何优势
一种解决方案是可以使用多进程来绕开GIL全局解释器锁,这样多进程可以可以完全独立的进程环境中运行程序,较充分地利用多处理器。但是进程本身的隔离带来的数据不共享也是一个问题。而且线程比进程轻量级

多进程解决方案

Python提供multiprocessing包Process类,Process类遵循了Thread类的API,具体使用方式这里就不过多介绍,具体使用方式可对照api文档或源码

  • 针对CPU密集型程序多进程示例
import logging
import datetime
import multiprocessing

logging.basicConfig(level=logging.INFO, format="%(process)s %(processName)s %(thread)s %(threadName)s %(message)s")


# CPU密集型,大量的计算
def calc():
    res = 0
    for _ in range(100000000):
        res += 1
    logging.info(f'{res}~~~~~')


if __name__ == '__main__':
    start = datetime.datetime.now()
    ps = []
    for i in range(3):
        p = multiprocessing.Process(target=calc, name=f'calc-{i + 1}')
        ps.append(p)
        p.start()

    for p in ps:
        p.join()
        logging.info(f'{p.name, p.exitcode}')

    delta = (datetime.datetime.now() - start).total_seconds()
    logging.info(delta)
--------------输出结果--------------
94326 calc-2 4532121088 MainThread 100000000~~~~~
94327 calc-3 4532121088 MainThread 100000000~~~~~
94325 calc-1 4532121088 MainThread 100000000~~~~~
94323 MainProcess 4532121088 MainThread ('calc-1', 0)
94323 MainProcess 4532121088 MainThread ('calc-2', 0)
94323 MainProcess 4532121088 MainThread ('calc-3', 0)
94323 MainProcess 4532121088 MainThread 4.929923

对比之前的结果:
使用串行或多线程跑了15s左右
使用多进程仅5s左右,由此可见,多个进程都在使用CPU,这是“真并行”

  • 注意点:
  1. 多进程代码最好放在 __name__ == "__main__" 下面执行(类UNIX系统下由于操作系统本身支持fork()语句,win32系统由于本身不支持fork())
  2. 可进一步参考如下文章:

进程特有属性及方法

  • pid:进程id
  • exitcode:进程的退出状态码
  • terminate():终止指定的进程

进程间同步

  • 如果每个进程中没有实现多线程,GIL则没用
  • Python在进程间同步(进程间竞争资源)提供了和线程同步一样的类,如Lock, Event, Semaphore等。这些类和线程中的类使用的方法一样,使用的效果也类似,但是系统底层实现是不同的,只是Python屏蔽了这些不同之处,让用户使用起来和线程一样,更简单易用。使用方式可参看线程同步
  • 进程间代价要高于线程间

进程池mutiprocessing.Pool类相关方法

  • multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
# 源码
......
  if processes is None:
            processes = os.cpu_count() or 1
  if processes < 1:
            raise ValueError("Number of processes must be at least 1")
......

至于processes的值的选取规则,通过查看源码,默认为None,即不指定的情况下默认为系统cpu个数(核数)或1(cpu数获取不到时)
如果指定,也最好指定processes与cpu个数一致,保证cpu亲缘性,避免切换时不能充分利用原CPU缓存中的数据

  • apply(self, func, args=(), kwds={}):阻塞执行,导致主进程执行其他子进程就像一个
    个执行
  • apply_async(self, func, args=(), kwds={},callback=None, error_callback=None):与apply方法用法一致,非阻塞异步执行,得到结果后会执行回调
  • close():关闭池,池不能再接受新的任务,所有任务完成后退出进程
  • terminate():立即结束工作进程,不再处理未处理的任务
  • join():主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用

进程池示例

import logging
import datetime
import multiprocessing

logging.basicConfig(level=logging.INFO, format="%(process)d %(processName)s %(thread)d %(threadName)s %(message)s")

def calc(i):
    res = 0
    for _ in range(100000000):
        res += 1
    logging.info(res)
    return i, res  # 进程要return,才可以拿到这个结果

if __name__ == '__main__':
    start = datetime.datetime.now()
    pool = multiprocessing.Pool(3)
    for i in range(3):
        # ret = pool.apply(calc, args=(i,))  # 同步阻塞
        # ret = pool.apply_async(calc, args=(i,))  # 异步非阻塞,但这种方式不能拿到calc函数返回结果
        ret = pool.apply_async(calc, args=(i,),callback=lambda ret: logging.info('{} in callback'.format(ret)))  # 异步非阻塞并callback,拿到calc函数返回结果
        logging.info(f'{ret}~~~~~~~~')
    pool.close()
    pool.join()
    delta = (datetime.datetime.now() - start).total_seconds()
    print(delta)
    print('end~~~~~~')
----------同步输出结果-------------
90837 ForkPoolWorker-1 4561653248  MainThread 100000000
90834 MainProcess 4561653248  MainThread (0, 100000000)~~~~~~~~
90838 ForkPoolWorker-2 4561653248  MainThread 100000000
90834 MainProcess 4561653248  MainThread (1, 100000000)~~~~~~~~
90840 ForkPoolWorker-3 4561653248  MainThread 100000000
90834 MainProcess 4561653248  MainThread (2, 100000000)~~~~~~~~
14.434507
end~~~~~~
-----------异步callback输出结果-----------
90898 MainProcess 4477214208  MainThread <multiprocessing.pool.ApplyResult object at 0x100c45860>~~~~~~~~
90898 MainProcess 4477214208  MainThread <multiprocessing.pool.ApplyResult object at 0x100c45978>~~~~~~~~
90898 MainProcess 4477214208  MainThread <multiprocessing.pool.ApplyResult object at 0x100c45a58>~~~~~~~~
90901 ForkPoolWorker-3 4477214208  MainThread 100000000
90898 MainProcess 123145456320512  Thread-3 (2, 100000000) in callback
90900 ForkPoolWorker-2 4477214208  MainThread 100000000
90898 MainProcess 123145456320512  Thread-3 (1, 100000000) in callback
90899 ForkPoolWorker-1 4477214208  MainThread 100000000
90898 MainProcess 123145456320512  Thread-3 (0, 100000000) in callback
4.966565
end~~~~~~

参考

  • magedu