Event、Timer
Lock、RLock
Conditon、Semaphore/BoundedSemaphore

线程同步


线程同步,即线程间协同,让一个线程访问某些数据时,其他线程不能访问这些数据,直到线程完成对数据的操作

Event(*)


  • Event事件,是线程间通信机制中最简单的实现
  • 使用一个内部的标记flag,通过flag(internal flag)的True或False的变化来进行操作
  • 这个flag初始化值为False

Event类相关方法

  • is_set():标记是否为True
  • set():设置标记为True
  • clear():(清除标记),设置(reset)标记为False
  • wait(timeout=None):一直阻塞(Block),直到标记为True
    • 如果本身当前标记为True,直接返回,返回值为True
    • 如果当前标记为False,则一直阻塞直到标记被set()为True,返回值为True;或者未等到标记变为True,到timeout超时后,返回值为False
    • timeout为等待时间,默认为None,为无限等待

例:老板等待工人生产完10件商品

# 答案1:使用全局变量, 不用event实现
import threading
import time
import logging

flag = False  # 全局变量

def worker(count=10):
    global flag
    goods_num = 0
    while True:
        time.sleep(0.5)
        logging.warning('i am working~~~~')
        goods_num += 1
        # print(goods_num)
        if goods_num >= count:
            break
    logging.warning('finished~~~~')
    flag = True  # 完成后设置全局变量flag

def boss():
    while not flag:
        logging.warning('i am waiting~~~~')
        time.sleep(1)  # 每一秒查看一次flag状态
    logging.warning('good job, worker!!!~~~~')

w = threading.Thread(target=worker, name='worker', args=(10,))
b = threading.Thread(target=boss, name='boss')
w.start()
b.start()
  • 使用event示例
# 答案2:使用event
import threading
import time
import logging

event = threading.Event()
# print(event.is_set())

def worker(count=10):
    goods_num = 0
    while True:
        time.sleep(0.5)
        logging.warning('i am working~~~~')
        goods_num += 1
        if goods_num >= count:
            event.set()
            break
    logging.warning('finished~~~~')

def boss():
    event.wait() #阻塞
    logging.warning('good job, worker!!!~~~~')

w = threading.Thread(target=worker, name='worker', args=(10,))
b = threading.Thread(target=boss, name='boss')
w.start()
b.start()

使用event能够节省cpu时间,比例1中轮询查看方式要好

  • 改写如上例子(建议使用的方式,event通过参数传入)
import threading
import time
import logging

event = threading.Event()

def worker(e: threading.Event, count=10):
    goods_num = 0
    while True:
        time.sleep(0.5)
        logging.warning('i am working~~~~')
        goods_num += 1
        if goods_num >= count:
            e.set()
            break
    logging.warning('finished~~~~')

def boss(e: threading.Event):
    logging.warning("i'm waiting for you...")
    if e.wait():
        logging.warning('good job, worker!!!~~~~')

w = threading.Thread(target=worker, name='worker', args=(event, 10))
b = threading.Thread(target=boss, name='boss', args=(event, ))

w.start()
b.start()
  • 补充例子
import threading
import time
import logging

e = threading.Event()

def worker(e: threading.Event, timeout=1):
    while not e.wait(timeout):  # 每隔1s
    # while not e.is_set():
    #     time.sleep(1)
        logging.warning('i am working')

w = threading.Thread(target=worker, name='worker', args=(e, 1))
# w = threading.Thread(target=worker, name='worker', args=(e, 8)) # 10s后set,worker阻塞几秒?(10s后立即返回,不需要等16s(8+8)
w.start()

e.wait(10)  # 在主线程中阻塞
e.set()
print('end~~~~~~~')

Timer

Timer实例是能够延时执行目标函数的线程

# Timer类部分源码
class Timer(Thread):
    def __init__(self, interval, function, args=None, kwargs=None):
        Thread.__init__(self)
        self.interval = interval
        self.function = function
        self.args = args if args is not None else []
        self.kwargs = kwargs if kwargs is not None else {}
        self.finished = Event()
    ......
  • Timer是线程Thread的子类,就是线程类,具有线程的能力和特征
  • Timer实例内部提供了一个finished属性,该属性是Event对象
import threading
import time
import logging

def worker():
    logging.warning('i am working~~~~')

t = threading.Timer(3, worker)  # 延迟3s执行worker
# Timer是Thread的子类,则可设置如下方法
t.setName('worker')  # t.name = 'worker'
# t.setDaemon(True)
t.setDaemon(False)
# t.cancel()  # t.cancel无论在t.start()前或者后,都能cancel
t.start()
# t.cancel() # t.cancel无论在t.start()前或者后,都能cancel

time.sleep(4)  # sleep 4s后还能cancel吗??不能,因为已经进入worker函数
t.cancel()

print('end~~~~~~~')
  • cancel()的本质
# 源码
self.finished = Event()  # Event()
def cancel(self):
    """Stop the timer if it hasn't finished yet."""
    self.finished.set()

def run(self):
    self.finished.wait(self.interval)
    if not self.finished.is_set(): # 若Event没有被set,则进入函数,否则,不进入函数,跳过函数
        self.function(*self.args, **self.kwargs)
    self.finished.set()

能够cancel的本质:

  • 函数执行前对finished属性(Event) set方法操作,从而跳过函数执行,达到了取消的效果
  • cancel方法本质使用Event类实现。这并不是说线程提供了取消的方法
  • 在真正执行目标函数之前,都可以cancel它

Lock(*)


锁,一旦线程获得锁,其他线程试图获得锁将被阻塞。应用在存在共享资源争抢的场景,从而保障只有一个使用者可以完全使用该资源

Lock类相关方法

  • acquire(blocking=True, timeout=-1)
    • 默认阻塞blocking=True
    • 阻塞时可以设置超时时间timeout,time
    • 非阻塞时,timeout禁止设置,ValueError: can't specify a timeout for a non-blocking call
    • 成功获取锁返回True,否则返回False
  • release()
    • 释放锁,可以从任何线程中释放
    • 已上锁的锁,release后会被重置为unlocked
    • 未上锁的锁调用release,抛RuntimeError异常,RuntimeError: release unlocked lock
from threading import Lock

lock = Lock()
# lock.release()  # RuntimeError: release unlocked lock
# lock.acquire(blocking=False, timeout=2)  # ValueError: can't specify a timeout for a non-blocking call
lock.acquire()
print(1, lock.acquire(blocking=False))  # 非阻塞锁, 拿到返回True,拿不到直接返回False
print(2, lock.acquire(timeout=2))  # 2s内拿不到锁则返回false
lock.release()
print(3, lock.acquire())  # 阻塞锁
# lock.acquire()  # 在未release前再想拿lock,只能等,阻塞住
-----------输出结果----------------
1 False
2 False
3 True
  • 应用示例
# 十个工人生产1000件商品
import logging
import threading
import time

count = []

def worker(goods_num):
    while len(count) < goods_num:
        time.sleep(0.0001)
        count.append(1)
    logging.warning(f'goods num is {len(count)}')

for i in range(1, 11):
    t = threading.Thread(target=worker, name=f'worker-{i}', args=(1000,))
    t.start()

很明显,如果不加锁,则会多生产

  • 改写如上例子
import logging
from threading import Thread, Lock
import threading
import time

count = []
lock = Lock()

def worker(goods_num):
    flag = False
    logging.warning('i am working ~~~~')
    while True:
        lock.acquire()
        if len(count) < goods_num:
            time.sleep(0.001)
            count.append(1)
        if len(count) == goods_num:
            flag = True
        lock.release()
        if flag:
            break

for i in range(1, 11):
    t = Thread(target=worker, name=f'worker-{i}', args=(1000,))
    t.start()

while True:
    time.sleep(1)
    if threading.active_count() == 1:
        print(f'当前线程是threading.current_thread()')
        print(f'goods num is {len(count)}')
        break
    else:
        print(f'当前alive的线程有{threading.active_count()}个')

也可通过Thread的参数将lock传入

保证锁的释放

  • try...finally
  • with上下文管理,锁对象支持上下文管理(__enter____exit__)
import threading
import time
from threading import Thread, Lock

class Counter:
    def __init__(self):
        self._value = 0
        self._lock = Lock()

    @property
    def value(self):
        with self._lock:
            return self._value

    def inc(self):
        try:  # 用with更好
            self._lock.acquire()
            self._value += 1
        finally:
            self._lock.release()

    def dec(self):
        with self._lock:
            self._value -= 1

def run(c: Counter, count=1000):
    for _ in range(count):
        for i in range(-50, 50):
            if i < 0:
                c.dec()
            else:
                c.inc()

c = Counter()
c1 = 10  # 线程数
c2 = 1000
for i in range(c1):
    Thread(target=run, args=(c, c2)).start()

while True:
    time.sleep(1)
    if threading.active_count() == 1:
        # print(threading.enumerate())
        print(c.value)
        break
    else:
        # print(threading.enumerate())
        pass

锁的使用场景及原则

  • 锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候
  • 少用锁,必要时用锁
  • 使用了锁,多线程访问被锁的资源时,就成了串行执行,效率会降低
  • 加锁时间越短越好,不需要就立即释放锁
  • 一定要避免死锁

可重入锁RLock

  • 与线程相关,可在一个线程中获取锁,并可继续在同一线程中不阻塞多次获取锁
  • 当锁未释放完,其它线程获取锁就会阻塞,直到当前持有锁的线程释放完锁
  • 锁都应该使用完后释放。可重入锁也是锁,应该acquire多少次,就release多少次
  • 支持上下文管理,具体应用示例,这里就不详细展开

Condition


  • 构造方法Condition(lock=None),可以传入一个Lock或RLock对象,默认是RLock
  • 示例:一个工人生产1000个杯子,有2个老板等到他生产完为止
# 用Event实现方式
import logging
import threading
import time

FORMAT = '%(asctime)s %(threadName)s %s(thread) %(message)s'
logging.basicConfig(format="", level=logging.INFO)

e = threading.Event()
cups = []


def boss(c: threading.Event):
    current_thread = threading.current_thread()
    logging.info("{} is waiting~~~~".format(current_thread.name))
    e.wait()
    logging.info('{} say Good Job'.format(current_thread.name))


def worker(c: threading.Event, count=1000):
    print("{} is working~~~~".format(threading.current_thread().name))
    while len(cups) < count:
        time.sleep(0.0001)
        cups.append(1)
    e.set()
    print("{} finished.cups is {}".format(threading.current_thread().name, len(cups)))


threading.Thread(target=worker, args=(e, 1000), name='worker').start()

for i in range(2):
    threading.Thread(target=boss, args=(e,), name=f'boos-{i + 1}').start()

print('~~~~~~~~~')

Condition类相关方法

  • acquire(*args):获取锁

  • wait(self, timeout=None):等待或者超时

  • notify(n=1):唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作

  • notify_all():唤醒所有等待的线程

  • 上例改写成Condition实现

# Condition实现方式
import logging
import threading
import time

FORMAT = '%(asctime)s %(threadName)s %s(thread) %(message)s'
logging.basicConfig(format="", level=logging.INFO)

cups = []
cond = threading.Condition()

def boss(c: threading.Condition):
    current_thread = threading.current_thread()
    logging.info("{} is waiting~~~~".format(current_thread.name))
    with cond:
        cond.wait()
    logging.info('{} say Good Job'.format(current_thread.name))

def worker(c: threading.Condition, count=1000):
    print("{} is working~~~~".format(threading.current_thread().name))
    with cond:
        while len(cups) < count:
            time.sleep(0.001)
            cups.append(1)
        print("{} finished.cups is {}".format(threading.current_thread().name, len(cups)))
        # cond.notify(1)
        cond.notify_all()

for i in range(2):
    threading.Thread(target=boss, args=(cond,), name=f'boos-{i + 1}').start()  # 消费者先启动
threading.Thread(target=worker, args=(cond, 1000), name='worker').start()

print('~~~~~~~~~')
  • 用于生产者、消费者模型,为了解决生产者消费者速度匹配问题
  • 采用了通知机制,非常有效率。生产者生产好消息,对消费者发通知,可以使用notify或者notify_all方法
  • 使用Condition,必须先acquire,用完了要release,因为内部使用了锁,默认使用RLock锁,最好的方式是使用with上下文。消费者wait,等待通知
  • Condition可以看到实现了消息的一对多,即广播模式

更多示例可参看文档

Semaphore信号量


  • 和Lock很像,信号量对象内部维护一个倒计数器,每一次acquire都会减1
  • 当acquire方法发现计数为0就阻塞请求的线程,直到其它线程对信号量release后,计数大于0,恢复阻塞的线程

Semaphore类相关方法

  • Semaphore(value=1):构造方法。value小于0,抛ValueError异常
  • acquire(blocking=True, timeout=None):获取信号量,计数器减1,获取成功返回True
  • release():释放信号量,计数器加1
# Semaphore类__init__源码
class Semaphore:
    def __init__(self, value=1):
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
        self._cond = Condition(Lock())
        self._value = value

BoundedSemaphore类

  • 有界的信号量
  • 不允许使用release超出初始值的范围,否则,抛出ValueError异常
import threading

# sema = threading.BoundedSemaphore(2)  # ValueError: Semaphore released too many times
sema = threading.Semaphore(2)

sema.release()
print(sema._value)
print(sema.acquire())
print(sema.acquire())
print(sema._value)
sema.release()
print(sema._value)
print(sema.acquire())
print(sema._value)
# sema.acquire()  # 发现计数为0就阻塞请求的线程
print(sema._value)
sema.release()
sema.release()
sema.release()
print(sema._value)

信号量和锁的区别

  • 信号量,可以多个线程访问共享资源,但这个共享资源数量有限(即计数为0就阻塞请求的线程),当有资源释放了(即线程对信号量release后,计数大于0后),其他线程才能用(即阻塞的线程恢复)

  • 锁,可以看做特殊的信号量,即信号量计数器初值为1。只允许同一个时间一个线程独占资源

  • 补充示例:连接池(在资源有限的情况下,开启一个连接成本高,所以使用连接池)

# 模拟连接池的示例:使用信号量解决资源有限的问题
'''
如果池中有资源,请求者获取资源时信号量减1,拿走资源。当请求超过资源数,请求者只能等待。当使用者用完归还资源后信号量加1,等待线程就可以被唤醒拿走资源
'''
import random
import threading
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)-8d %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

class Conn:
    def __init__(self, name):
        self.name = name

class Pool:
    def __init__(self, count: int):
        self.count = count
        self.pool = [self._connect('conn-{}'.format(i)) for i in range(self.count)]
        # self._lock = threading.Lock()
        self.sema = threading.Semaphore(count)  # self.sema = threading.BoundedSemaphore(count)

    def _connect(self, conn_name):
        return Conn(conn_name)

    def get_conn(self):
        # 用锁解决问题
        # with self._lock:
        #     if len(self.pool) > 0:
        #         return self.pool.pop()
        # 用信号量
        self.sema.acquire()
        return self.pool.pop()

    def return_conn(self, conn: Conn):
        # 用锁解决问题
        # with self._lock:
        #     if len(self.pool) < self.count:
        #         self.pool.append(conn)
        self.pool.append(conn)
        self.sema.release()

pool = Pool(3)

def worker(pool: Pool):
    conn = pool.get_conn()
    logging.info(conn)
    time.sleep(random.randint(1, 5))
    pool.return_conn(conn)

for i in range(6):
    threading.Thread(target=worker, name='worker-{}'.format(i), args=(pool,)).start()

参考


  • magedu