Python threading实现多线程 提高篇 线程同步,以及各种锁 本文主要讲多线程的线程之间的资源共享怎么保持同步。 多线程基础篇见,木头人:Python threading实现多线程 基础篇 Python的多线程,只有用于I/O密集型程序时效率才会有明显的提高,如文件/输入输出/socket网络通信/http通讯等待。对于计算密集型程序一般采用多进程,这里不多讲。 一、多线程的同步问题 一般在多线程代码中,总会有一些特定的函数或代码块不想被多个线程同时执行,如:修改数据库、更新文件或其他会产生程序冲突的类似情况。 如果两个线程的运行顺序不同,他有可能产生不同的结果,或者造成执行的轨迹或行为不相同,这时我们就需要使用到多线程的同步。 当任意数量的线程可以访问临界区的代码,当在同一时刻只能有一个线程可以通过时,就需要使用同步。我们可以选择合适的同步原语,也可以让线程控制机制来执行同步。同步有不同的类型,python也支持多种同步类型,你可以选择最合适的来运行程序。 最常用的同理原语有:锁/互斥,以及信号量。锁是最简单最低级的机制。信号量用于多线程竞争有限资源的情况。 二、Lock 同步锁(原语锁) 2.1 同步锁的使用 我们一般使用获得锁(加锁)和释放锁(解锁)函数来控制锁的两种状态“锁定”和“未锁定”。一般只要在公共操作前加上加锁和解锁的操作即可。 示例:加锁 与 解锁 当我们通过 lock.acquire() 获得锁后线程程将一直执行不会中断,直到该线程 lock.release( )释放锁后线程才有可能被释放(注意:锁被释放后线程不一定会释放)。 示例:锁的运用 注意:上面代码先将lock.acquire()和lock.release()行注释掉表示不使用锁,取消lock.acquire()和lock.release()行的注释表示使用锁。 运行结果:不使用锁程序运行输出为 99;使用锁程序运行结果为0 为什么会有差异?这就是有锁和无锁的差别。
注意,第10行,这里增加了一个sleep()操作,当在没有锁的情况下线程将在这里被释放出来,让给下一线程运行,而我们的num值还没有被修改,所以后面线程的num1的取值都是100。 Lock 与GIL(全局解释器锁)存在区别。 我们需要知道 Lock 锁的目的,它是为了保护共享的数据,同时刻只能有一个线程来修改共享的数据,而保护不同的数据需要使用不同的锁。 GIL用于限制一个进程中同一时刻只有一个线程被CPU调度,GIL的级别比Lock高,GIL是解释器级别。 GIL与Lock同时存在,程序执行如下:1. 同时存在两个线程:线程A,线程B2. 线程A 抢占到GIL,进入CPU执行,并加了Lock,但为执行完毕,线程被释放3. 线程B 抢占到GIL,进入CPU执行,执行时发现数据被线程A Lock,于是线程B被阻塞4. 线程B的GIL被夺走,有可能线程A拿到GIL,执行完操作、解锁,并释放GIL5. 线程B再次拿到GIL,才可以正常执行 通过上述应该能看到,Lock 通过牺牲执行的效率换数据安全。 2.2 死锁 多线程最怕的是遇到死锁,两个或两个以上的线程在执行时,因争夺资源被相互锁住而相互等待。 示例:互锁造成死锁 如上,如果两个锁同时被多个线程运行,就有可能出现死锁,如果没出现死锁,就多运行几遍就会出现死锁现象。上面程序死锁为偶现性,这种bug也是最难找的。 2.3 重入锁(递归锁) threading.RLock() 为了支持同一个线程中多次请求同一资源,Python 提供了可重入锁(RLock)。这个RLock内部维护着一个锁(Lock)和一个计数器(counter)变量,counter 记录了acquire 的次数,从而使得资源可以被多次acquire。直到一个线程所有 acquire都被release(计数器counter变为0),其他的线程才能获得资源。 示例: 注意观察程序的运行,当运行到程序B时,即使B休眠了3秒也不会切换线程。 使用重入锁时,counter 没有变为0(所有的acquire没有被释放掉),即使遇到长时间的io操作也不会切换线程。 三、信号量(Semaphore) 信号量是一个内部数据,它有一个内置的计数器,它标明当前的共享资源可以有多少线程同时读取。 示例:定义一个只能同时执行5个线程的信号量。 当线程需要读取关联信号量的共享资源时,需调用acquire(),这时信号量的计数器会-1。 在线程不需要共享资源时,需释放信号release(),这时信号量的计数器会+1,在信号量等待队列中排在最前面的线程会拿到共享资源的权限。。 信号量控制规则:当计数器大于0时,那么可以为线程分配资源权限;当计数器小于0时,未获得权限的线程会被挂起,直到其他线程释放资源。 示例1: 信号量运行3个线程并行 注意观察程序运行,开始只有3个线程获得了资源的权限,后面当释放几个资源时就有几个获得资源权限。 示例2:运用信号量进行线程同步 信号量被初始化为0,目的是同步两个或多个线程。线程必须并行运行,所以需要信号量同步。这种运用场景有时会用到,比较难理解,多运行示例仔细观察打印结果。 拓展: 信号量的一个特殊用法是互斥量。互斥量是初始值为1的信号量,可以实现数据、资源的互斥访问。 信号量在在多线程的编程语言中应用很广,他也有可能造成死锁的情况。例如,有一个线程t1,先等待信号量s1,然后等待信号量s2,而线程t2会先等待信号量s2,然后再等待信号量s1,这样就会发生死锁,导致t1等待s2,但是t2在等待s1。 四、Condition 条件变量 Condition 条件变量通常与一个锁相关联。需要在多个Condition 条件中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则他将自己产生一个RLock实例。 定义条件变量锁实例 比较难理解,先看看Condition()下的方法。acquire() 获得锁(线程锁)release() 释放锁wait(timeout) 挂起线程timeout秒(为None时时间无限),直到收到notify通知或者超时才会被唤醒继续运行。必须在获得Lock下运行。notify(n=1) 通知挂起的线程开始运行,默认通知正在等待该condition的线程,可同时唤醒n个。必须在获得Lock下运行。notifyAll() 通知所有被挂起的线程开始运行。必须在获得Lock下运行。 示例1:生产与消费,线程produce生产产品当产品生产成功后通知线程B使用产品,线程consume使用完产品后通知线程produce继续生产产品。 示例2:生产商品数量达到一定条件后被消费 两个示例差不多,只是实现方式不同,注意观察输出,理解程序运行规则。 release() 和 wait() 都有释放锁的作用,不同在于wait() 后,该子线程就在那里挂起等待,要继续执行,就需要接收到 notify() 或者 notifyAll() 来唤醒线程,而 release() 该线程还能继续执行。 五、Event 事件锁对象 用于线程间通信,即程序中的其一个线程需要通过判断某个线程的状态来确定自己下一步的操作,就用到了event()对象。event()对象有个状态值,他的默认值为 Flase,即遇到 event() 对象就阻塞线程的执行。 定义Event 事件锁实例 Event对象的方法wait(timeout=None) 挂起线程timeout秒(None时间无限),直到超时或收到event()信号开关为True时才唤醒程序。set() Even状态值设为Trueclear() Even状态值设为 FalseisSet() 返回Even对象的状态值。 示例: func等待connect启动服务 输出结果:等待服务响应…成功启动服务连接到服务器 观察结果会发现,t1线程运行的func函数需要等到connect函数运行event.set()后才继续执行之后的操作。 六、Barrie 障碍锁 也可以叫屏障或者栅栏,可以想象成路障、道闸。当线程达到设定的数值时放开道闸允许继续运行。 创建Barrier障碍锁 参数:parties 参与线程的数量action 全部线程被释放时可被其中一条线程调用的可调用对象timeout 调用wait方法时未指定时超时的默认值 Barrier实例的方法wait(timeout=None) 等待通过栅栏,返回值是一个0到parties-1之间的整数, 每个线程返回不同。如果wait方法设置了超时,并超时发送,栅栏将处于broken状态。reset() 重置障碍,返回默认的 False 状态,即重新开始阻塞线程。abort() 将障碍置为断开状态,这将导致已调用wait()和之后调用wait()引发BrokenBarrierError异常。 Barrier实例的属性partier 通过障碍的线程数n_waiting 当前在屏障中等待的线程数broken 布尔值,表明barrier是否broken。 示例1: 注意:当我们设置Barrier的放行的线程数时,我们启动的线程只能是他的倍数,不然会有线程一直陷入等待中。 怎么解决这个问题,我们需要在wait加入timeout参数,如下示例。 示例2: 解决线程不是Barrier的倍数问题 七、threading 使用 Queue 保持线程同步 Queue 模块可以实现多生产者与多消费者队列,它可以实现多个线程之间的信息安全交换。 它有几种队列模式:FIFO队列,先进先出 LIFO队列,后进先出,如同栈 Priority队列,对着中的数据始终保持排序,优先检索最低值。 通常使用 (优先序号, 数据)形式存储数据。不带需要默认对其值进行排序。注意:Priority队列因为是顺序的,存储的数据必须是要能排序,即相同类型数据 注意: 创建队列时会指定队列中最多存储项目的个数,就是设定一个非常大的值也比无限制好。 队列的公共方法:qsize() 队列大大致大小,非准确值empty() 当前是否为空full() 当前是否已满put(item, block=True, timeout=None) 将item放入队列。 block=True, timeout=None 在必要时阻塞,直到有空位可用,timeout 为阻止的时间,超时抛出Full异常。block=False 立即将item放入队列,队列已满引发Full异常。 put_nowait(item) 立即放入队列,同put(item,False)get(block=True, timeout=None) 从队列中删除并返回一个item。block=True, timeout=None 在必要时阻塞,直到有可用数据为止,timeout 为阻止的时间,超时抛出Empty异常。block=False 立即队列中的可用数据,否则抛出Empty异常。get_nowait() 立即队列中的数据,同get(False)。task_done() 向已完成的队列任务发送一个信号。一般是告诉join() 我以完成任务。join() 阻塞线程,直到队列为空才放行。 注意:join() 与 task_done() 是套组合拳,有使用 join() 必须在任务结束后执行 task_done() 通知队列。 示例:生产 消费模式
2024最新激活全家桶教程,稳定运行到2099年,请移步至置顶文章:https://sigusoft.com/99576.html
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。 文章由激活谷谷主-小谷整理,转载请注明出处:https://sigusoft.com/72808.html