线程锁
问题现象: 多线程情况下,CPU遇到阻塞会进行线程的切换,所以导致执行了tmp-=1的值还未赋值给num=tmp,另一个线程2又开始了tmp -=1,所以导致最后的值重复赋值给了num,所以出现了final num非0 的情况。[time.sleep(0.000.) 休息的时间越短,最后的值越小]
import timeimport threadingdef addNum(): global num #在每个线程中都获取这个全局变量 temp=num print('--get num:',num ) time.sleep(0.00000001) temp -= 1 num = tempnum = 100 #设定一个共享变量thread_list = []lock=threading.Lock()for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t)for t in thread_list: #等待所有线程执行完毕 t.join()print('final num:', num )
问题解决:同步锁
锁的分类:
同步锁:确保每一时刻有一个线程使用共享资源,避免脏数据的产生
死 锁 :线程相互之间等待释放
递归锁:有内部计时器和锁组成,可以重复利用,每利用一次count+1,释放一次-1
同步锁:
lock.acquire() # 获得锁
lock.release() # 释放锁
import threadingdef addNum(): global num #在每个线程中都获取这个全局变量 # num-=1 lock.acquire() # 获得锁 temp=num print('--get num:',num ) #time.sleep(0.1) num =temp-1 #对此公共变量进行-1操作 lock.release() # 释放锁num = 100 #设定一个共享变量thread_list = []lock=threading.Lock()for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t)for t in thread_list: #等待所有线程执行完毕 t.join()print('final num:', num )
[线程分析---图片来自网络]
死锁: AB同时锁住了,等待对方释放锁
解决办法:使用递归锁
import timeimport threadingclass MyThread(threading.Thread): def doA(self): lockA.acquire() print(self.name, "gotlockA", time.ctime()) time.sleep(3) lockB.acquire() print(self.name, "gotlockB", time.ctime()) lockB.release() lockA.release() def doB(self): lockB.acquire() print(self.name,"gotlockB",time.ctime()) time.sleep(2) lockA.acquire() print(self.name,"gotlockA",time.ctime()) lockA.release() lockB.release() def run(self): self.doA() self.doB()if __name__ == '__main__': lockA = threading.Lock() lockB = threading.Lock() threads = [] for i in range(5): threads.append(MyThread()) for i in threads: i.start() for i in threads: i.join()
递归锁: 可以重复利用的锁 [计时器 + 锁]
import timeimport threadingclass MyThread(threading.Thread): def doA(self): lock.acquire() # 执行一个操作,用一个锁锁住线程 print(self.name, "gotlockA", time.ctime()) time.sleep(3) lock.acquire() # 执行一个操作,用一个锁锁住线程 print(self.name, "gotlockB", time.ctime()) lock.release() lock.release() def doB(self): lock.acquire() print(self.name,"gotlockB",time.ctime()) time.sleep(2) lock.acquire() print(self.name,"gotlockA",time.ctime()) lock.release() lock.release() def run(self): self.doA() self.doB()if __name__ == '__main__': lock = threading.RLock() threads = [] for i in range(5): threads.append(MyThread()) for i in threads: i.start() for i in threads: i.join()
问:为什么RLock里面还有一个Rlock?
答:1.防止其他的函数调用数据操作的函数,造成2个数据在进行写操作,产生脏数据
2.减少了其他函数中为了避免产生脏数据而做重复的锁操作
import timeimport threadingclass Account: def __init__(self, money, id): self.account = id self.balance = money self.r = threading.RLock() # 这里应该是每个都有自己的锁 def add(self, num): # 2 可以在方法上添加一个锁来解决其他函数调用方法造成脏数据的问题 self.r.acquire() self.balance += num self.r.release() def adwithdrd(self, num): self.r.acquire() self.balance -= num self.r.release() # def diy(self, num): # 3也是类中的重复调用,这也就是为什么会有Rlock的重复调用了 # self.r.acquire() # self.balance -= num # self.adwithdrd(num) # self.r.release() def __str__(self): print(self.balance, self.account)a1 = Account(500, 'A')b1 = Account(300, 'B')# def user_trans(A, B, num):# A.adwithdrd(num)# B.add(num) # 1如果有线程操作这个函数,也会执行add方法,会影响到最后的数据,最好的解决方法是在类中添加一个锁def trans(A, B, num): r = threading.RLock() r.acquire() A.adwithdrd(num) B.add(num) r.release()t1 = threading.Thread(target=trans, args=(a1, b1, 100))t3 = threading.Thread(target=trans, args=(a1, b1, 100))t2 = threading.Thread(target=trans, args=(b1, a1, 200))t1.start()t2.start()t3.start()a1.__str__()b1.__str__()
信号量:
信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。
计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)
BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常
应用:数据库连接池
import threading,timeclass myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(5) semaphore.release()if __name__=="__main__": semaphore=threading.Semaphore(5) thrs=[] for i in range(100): thrs.append(myThread()) for t in thrs: t.start()
线程池
【更多参考】
条件变量同步1223
有一类线程需要满足条件之后才能够继续执行,Python提供了threading.Condition 对象用于条件变量线程的支持,它除了能提供RLock()或Lock()的方法外,还提供了 wait()、notify()、notifyAll()方法。
lock_con=threading.Condition([Lock/Rlock]): 锁是可选选项,不传入锁,对象自动创建一个RLock()。
应用:用于线程之间的信息交流
wait():条件不满足时调用,线程会释放锁并进入等待阻塞;
notify():条件创造后调用,通知等待池激活一个线程;
notifyAll():条件创造后调用,通知等待池激活所有线程
import threading,timefrom random import randintclass Producer(threading.Thread): def run(self): global L while True: val=randint(0,100) print('生产者',self.name,":Append"+str(val),L) if lock_con.acquire(): L.append(val) lock_con.notify() # notify并没有释放锁的能力,所以需要手动释放 lock_con.release() # 手动释放锁 time.sleep(3)class Consumer(threading.Thread): def run(self): global L while True: lock_con.acquire() # 重新获得锁 if len(L)==0: lock_con.wait() # wait进入等待并且释放锁 print('消费者',self.name,":Delete"+str(L[0]),L) del L[0] lock_con.release() time.sleep(0.25)if __name__=="__main__": L=[] lock_con=threading.Condition() threads=[] for i in range(5): threads.append(Producer()) threads.append(Consumer()) for t in threads: t.start() for t in threads: t.join()