Event、Timer
Lock、RLock
Conditon、Semaphore/BoundedSemaphore
线程同步
线程同步,即线程间协同,让一个线程访问某些数据时,其他线程不能访问这些数据,直到线程完成对数据的操作
Event(*)
- Event事件,是线程间通信机制中最简单的实现
- 使用一个内部的标记flag,通过flag(internal flag)的True或False的变化来进行操作
- 这个flag初始化值为False
Event类相关方法
- is_set():标记是否为True
- set():设置标记为True
- clear():(清除标记),设置(reset)标记为False
- wait(timeout=None):一直阻塞(Block),直到标记为True
- 如果本身当前标记为True,直接返回,返回值为True
- 如果当前标记为False,则一直阻塞直到标记被set()为True,返回值为True;或者未等到标记变为True,到timeout超时后,返回值为False
- timeout为等待时间,默认为None,为无限等待
例:老板等待工人生产完10件商品
# 答案1:使用全局变量, 不用event实现
import threading
import time
import logging
flag = False # 全局变量
def worker(count=10):
global flag
goods_num = 0
while True:
time.sleep(0.5)
logging.warning('i am working~~~~')
goods_num += 1
# print(goods_num)
if goods_num >= count:
break
logging.warning('finished~~~~')
flag = True # 完成后设置全局变量flag
def boss():
while not flag:
logging.warning('i am waiting~~~~')
time.sleep(1) # 每一秒查看一次flag状态
logging.warning('good job, worker!!!~~~~')
w = threading.Thread(target=worker, name='worker', args=(10,))
b = threading.Thread(target=boss, name='boss')
w.start()
b.start()
- 使用event示例
# 答案2:使用event
import threading
import time
import logging
event = threading.Event()
# print(event.is_set())
def worker(count=10):
goods_num = 0
while True:
time.sleep(0.5)
logging.warning('i am working~~~~')
goods_num += 1
if goods_num >= count:
event.set()
break
logging.warning('finished~~~~')
def boss():
event.wait() #阻塞
logging.warning('good job, worker!!!~~~~')
w = threading.Thread(target=worker, name='worker', args=(10,))
b = threading.Thread(target=boss, name='boss')
w.start()
b.start()
使用event能够节省cpu时间,比例1中轮询查看方式要好
- 改写如上例子(建议使用的方式,event通过参数传入)
import threading
import time
import logging
event = threading.Event()
def worker(e: threading.Event, count=10):
goods_num = 0
while True:
time.sleep(0.5)
logging.warning('i am working~~~~')
goods_num += 1
if goods_num >= count:
e.set()
break
logging.warning('finished~~~~')
def boss(e: threading.Event):
logging.warning("i'm waiting for you...")
if e.wait():
logging.warning('good job, worker!!!~~~~')
w = threading.Thread(target=worker, name='worker', args=(event, 10))
b = threading.Thread(target=boss, name='boss', args=(event, ))
w.start()
b.start()
- 补充例子
import threading
import time
import logging
e = threading.Event()
def worker(e: threading.Event, timeout=1):
while not e.wait(timeout): # 每隔1s
# while not e.is_set():
# time.sleep(1)
logging.warning('i am working')
w = threading.Thread(target=worker, name='worker', args=(e, 1))
# w = threading.Thread(target=worker, name='worker', args=(e, 8)) # 10s后set,worker阻塞几秒?(10s后立即返回,不需要等16s(8+8)
w.start()
e.wait(10) # 在主线程中阻塞
e.set()
print('end~~~~~~~')
Timer
Timer实例是能够延时执行目标函数的线程
# Timer类部分源码
class Timer(Thread):
def __init__(self, interval, function, args=None, kwargs=None):
Thread.__init__(self)
self.interval = interval
self.function = function
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
self.finished = Event()
......
- Timer是线程Thread的子类,就是线程类,具有线程的能力和特征
- Timer实例内部提供了一个finished属性,该属性是Event对象
import threading
import time
import logging
def worker():
logging.warning('i am working~~~~')
t = threading.Timer(3, worker) # 延迟3s执行worker
# Timer是Thread的子类,则可设置如下方法
t.setName('worker') # t.name = 'worker'
# t.setDaemon(True)
t.setDaemon(False)
# t.cancel() # t.cancel无论在t.start()前或者后,都能cancel
t.start()
# t.cancel() # t.cancel无论在t.start()前或者后,都能cancel
time.sleep(4) # sleep 4s后还能cancel吗??不能,因为已经进入worker函数
t.cancel()
print('end~~~~~~~')
- cancel()的本质
# 源码
self.finished = Event() # Event()
def cancel(self):
"""Stop the timer if it hasn't finished yet."""
self.finished.set()
def run(self):
self.finished.wait(self.interval)
if not self.finished.is_set(): # 若Event没有被set,则进入函数,否则,不进入函数,跳过函数
self.function(*self.args, **self.kwargs)
self.finished.set()
能够cancel的本质:
- 函数执行前对finished属性(Event) set方法操作,从而跳过函数执行,达到了取消的效果
- cancel方法本质使用Event类实现。这并不是说线程提供了取消的方法
- 在真正执行目标函数之前,都可以cancel它
Lock(*)
锁,一旦线程获得锁,其他线程试图获得锁将被阻塞。应用在存在共享资源争抢的场景,从而保障只有一个使用者可以完全使用该资源
Lock类相关方法
- acquire(blocking=True, timeout=-1)
- 默认阻塞blocking=True
- 阻塞时可以设置超时时间timeout,time
- 非阻塞时,timeout禁止设置,
ValueError: can't specify a timeout for a non-blocking call
- 成功获取锁返回True,否则返回False
- release()
- 释放锁,可以从任何线程中释放
- 已上锁的锁,release后会被重置为unlocked
- 未上锁的锁调用release,抛RuntimeError异常,
RuntimeError: release unlocked lock
from threading import Lock
lock = Lock()
# lock.release() # RuntimeError: release unlocked lock
# lock.acquire(blocking=False, timeout=2) # ValueError: can't specify a timeout for a non-blocking call
lock.acquire()
print(1, lock.acquire(blocking=False)) # 非阻塞锁, 拿到返回True,拿不到直接返回False
print(2, lock.acquire(timeout=2)) # 2s内拿不到锁则返回false
lock.release()
print(3, lock.acquire()) # 阻塞锁
# lock.acquire() # 在未release前再想拿lock,只能等,阻塞住
-----------输出结果----------------
1 False
2 False
3 True
- 应用示例
# 十个工人生产1000件商品
import logging
import threading
import time
count = []
def worker(goods_num):
while len(count) < goods_num:
time.sleep(0.0001)
count.append(1)
logging.warning(f'goods num is {len(count)}')
for i in range(1, 11):
t = threading.Thread(target=worker, name=f'worker-{i}', args=(1000,))
t.start()
很明显,如果不加锁,则会多生产
- 改写如上例子
import logging
from threading import Thread, Lock
import threading
import time
count = []
lock = Lock()
def worker(goods_num):
flag = False
logging.warning('i am working ~~~~')
while True:
lock.acquire()
if len(count) < goods_num:
time.sleep(0.001)
count.append(1)
if len(count) == goods_num:
flag = True
lock.release()
if flag:
break
for i in range(1, 11):
t = Thread(target=worker, name=f'worker-{i}', args=(1000,))
t.start()
while True:
time.sleep(1)
if threading.active_count() == 1:
print(f'当前线程是threading.current_thread()')
print(f'goods num is {len(count)}')
break
else:
print(f'当前alive的线程有{threading.active_count()}个')
也可通过Thread的参数将lock传入
保证锁的释放
- try...finally
- with上下文管理,锁对象支持上下文管理(
__enter__
和__exit__
)
import threading
import time
from threading import Thread, Lock
class Counter:
def __init__(self):
self._value = 0
self._lock = Lock()
@property
def value(self):
with self._lock:
return self._value
def inc(self):
try: # 用with更好
self._lock.acquire()
self._value += 1
finally:
self._lock.release()
def dec(self):
with self._lock:
self._value -= 1
def run(c: Counter, count=1000):
for _ in range(count):
for i in range(-50, 50):
if i < 0:
c.dec()
else:
c.inc()
c = Counter()
c1 = 10 # 线程数
c2 = 1000
for i in range(c1):
Thread(target=run, args=(c, c2)).start()
while True:
time.sleep(1)
if threading.active_count() == 1:
# print(threading.enumerate())
print(c.value)
break
else:
# print(threading.enumerate())
pass
锁的使用场景及原则
- 锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候
- 少用锁,必要时用锁
- 使用了锁,多线程访问被锁的资源时,就成了串行执行,效率会降低
- 加锁时间越短越好,不需要就立即释放锁
- 一定要避免死锁
可重入锁RLock
- 与线程相关,可在一个线程中获取锁,并可继续在同一线程中不阻塞多次获取锁
- 当锁未释放完,其它线程获取锁就会阻塞,直到当前持有锁的线程释放完锁
- 锁都应该使用完后释放。可重入锁也是锁,应该acquire多少次,就release多少次
- 支持上下文管理,具体应用示例,这里就不详细展开
Condition
- 构造方法Condition(lock=None),可以传入一个Lock或RLock对象,默认是RLock
- 示例:一个工人生产1000个杯子,有2个老板等到他生产完为止
# 用Event实现方式
import logging
import threading
import time
FORMAT = '%(asctime)s %(threadName)s %s(thread) %(message)s'
logging.basicConfig(format="", level=logging.INFO)
e = threading.Event()
cups = []
def boss(c: threading.Event):
current_thread = threading.current_thread()
logging.info("{} is waiting~~~~".format(current_thread.name))
e.wait()
logging.info('{} say Good Job'.format(current_thread.name))
def worker(c: threading.Event, count=1000):
print("{} is working~~~~".format(threading.current_thread().name))
while len(cups) < count:
time.sleep(0.0001)
cups.append(1)
e.set()
print("{} finished.cups is {}".format(threading.current_thread().name, len(cups)))
threading.Thread(target=worker, args=(e, 1000), name='worker').start()
for i in range(2):
threading.Thread(target=boss, args=(e,), name=f'boos-{i + 1}').start()
print('~~~~~~~~~')
Condition类相关方法
-
acquire(*args):获取锁
-
wait(self, timeout=None):等待或者超时
-
notify(n=1):唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作
-
notify_all():唤醒所有等待的线程
-
上例改写成Condition实现
# Condition实现方式
import logging
import threading
import time
FORMAT = '%(asctime)s %(threadName)s %s(thread) %(message)s'
logging.basicConfig(format="", level=logging.INFO)
cups = []
cond = threading.Condition()
def boss(c: threading.Condition):
current_thread = threading.current_thread()
logging.info("{} is waiting~~~~".format(current_thread.name))
with cond:
cond.wait()
logging.info('{} say Good Job'.format(current_thread.name))
def worker(c: threading.Condition, count=1000):
print("{} is working~~~~".format(threading.current_thread().name))
with cond:
while len(cups) < count:
time.sleep(0.001)
cups.append(1)
print("{} finished.cups is {}".format(threading.current_thread().name, len(cups)))
# cond.notify(1)
cond.notify_all()
for i in range(2):
threading.Thread(target=boss, args=(cond,), name=f'boos-{i + 1}').start() # 消费者先启动
threading.Thread(target=worker, args=(cond, 1000), name='worker').start()
print('~~~~~~~~~')
- 用于生产者、消费者模型,为了解决生产者消费者速度匹配问题
- 采用了通知机制,非常有效率。生产者生产好消息,对消费者发通知,可以使用notify或者notify_all方法
- 使用Condition,必须先acquire,用完了要release,因为内部使用了锁,默认使用RLock锁,最好的方式是使用with上下文。消费者wait,等待通知
- Condition可以看到实现了消息的一对多,即广播模式
更多示例可参看文档
Semaphore信号量
- 和Lock很像,信号量对象内部维护一个倒计数器,每一次acquire都会减1
- 当acquire方法发现计数为0就阻塞请求的线程,直到其它线程对信号量release后,计数大于0,恢复阻塞的线程
Semaphore类相关方法
- Semaphore(value=1):构造方法。value小于0,抛ValueError异常
- acquire(blocking=True, timeout=None):获取信号量,计数器减1,获取成功返回True
- release():释放信号量,计数器加1
# Semaphore类__init__源码
class Semaphore:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value
BoundedSemaphore类
- 有界的信号量
- 不允许使用release超出初始值的范围,否则,抛出ValueError异常
import threading
# sema = threading.BoundedSemaphore(2) # ValueError: Semaphore released too many times
sema = threading.Semaphore(2)
sema.release()
print(sema._value)
print(sema.acquire())
print(sema.acquire())
print(sema._value)
sema.release()
print(sema._value)
print(sema.acquire())
print(sema._value)
# sema.acquire() # 发现计数为0就阻塞请求的线程
print(sema._value)
sema.release()
sema.release()
sema.release()
print(sema._value)
信号量和锁的区别
-
信号量,可以多个线程访问共享资源,但这个共享资源数量有限(即计数为0就阻塞请求的线程),当有资源释放了(即线程对信号量release后,计数大于0后),其他线程才能用(即阻塞的线程恢复)
-
锁,可以看做特殊的信号量,即信号量计数器初值为1。只允许同一个时间一个线程独占资源
-
补充示例:连接池(在资源有限的情况下,开启一个连接成本高,所以使用连接池)
# 模拟连接池的示例:使用信号量解决资源有限的问题
'''
如果池中有资源,请求者获取资源时信号量减1,拿走资源。当请求超过资源数,请求者只能等待。当使用者用完归还资源后信号量加1,等待线程就可以被唤醒拿走资源
'''
import random
import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)-8d %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
class Conn:
def __init__(self, name):
self.name = name
class Pool:
def __init__(self, count: int):
self.count = count
self.pool = [self._connect('conn-{}'.format(i)) for i in range(self.count)]
# self._lock = threading.Lock()
self.sema = threading.Semaphore(count) # self.sema = threading.BoundedSemaphore(count)
def _connect(self, conn_name):
return Conn(conn_name)
def get_conn(self):
# 用锁解决问题
# with self._lock:
# if len(self.pool) > 0:
# return self.pool.pop()
# 用信号量
self.sema.acquire()
return self.pool.pop()
def return_conn(self, conn: Conn):
# 用锁解决问题
# with self._lock:
# if len(self.pool) < self.count:
# self.pool.append(conn)
self.pool.append(conn)
self.sema.release()
pool = Pool(3)
def worker(pool: Pool):
conn = pool.get_conn()
logging.info(conn)
time.sleep(random.randint(1, 5))
pool.return_conn(conn)
for i in range(6):
threading.Thread(target=worker, name='worker-{}'.format(i), args=(pool,)).start()
参考
- magedu