一、前言
前面介绍了使用CAS实现的非阻塞队列ConcurrentLinkedQueue,下面就来介绍下使用独占锁实现的阻塞队列LinkedBlockingQueue的实现
二、 LinkedBlockingQueue类图结构
如图LinkedBlockingQueue中也有两个Node分别用来存放首尾节点,并且里面有个初始值为0的原子变量count用来记录队列素个数,另外里面有两个ReentrantLock的独占锁,分别用来控制素入队和出队加锁,其中takeLock用来控制同时只有一个线程可以从队列获取素,其他线程必须等待,putLock控制同时只能有一个线程可以获取锁去添加素,其他线程必须等待。另外notEmpty和notFull用来实现入队和出队的同步。 另外由于出入队是两个非公平独占锁,所以可以同时又一个线程入队和一个线程出队,其实这个是个生产者-消费者模型。
如图默认队列容量为0x7fffffff;用户也可以自己指定容量。
三、必备基础
3.1 ReentrantLock
可以参考
https://www.atatech.org/articles/80539?flag_data_from=active
3.2 条件变量(Condition)
条件变量这里使用的是takeLock.newCondition()获取也就是说调用ReentrantLock的方法获取的,那么可预见Condition使用了ReentrantLock的state。上面的参考没有提到所以这里串串讲下
- 首先看下类图结构
如图ConditionObject中两个node分别用来存放条件队列的首尾节点,条件队列就是调用条件变量的await方法被阻塞后的节点组成的单向链表。另外ConditionObject还要依赖AQS的state,ConditionObject是AQS类的一个内部类。
- awaitNanos操作
首先如果当前线程中断标志被设置了,直接抛出异常。添加当前线程节点(状态为:-2)到条件队列。
然后尝试释放当前线程拥有的锁并保存当前计数,可知如果当前线程调用awaitNano前没有使用当前条件变量所在的Reetenlock变量调用lock或者lockInterruptibly获取到锁,会抛出
IllegalMonitorStateException异常。
然后调用park挂起当前线程直到超时或者其他线程调用了当前线程的unpark方法,或者调用了当前线程的interupt方法(这时候会抛异常)。
如果超时或者其他线程调用了当前线程的unpark方法,则当前线程从挂起变为激活,获取cpu资源后会继续执行,会重新获取锁。
- signal操作
首先看调用signal的线程是不是持有了独占锁,没有则抛出异常。
然后获取在条件队列里面待的时间最长的node,把它移动到线程持有的锁所在的AQS队列。
其中enq方法就是把当前节点放入了AQS队列,但是这时候该节点还是在条件队列里面那,那么什么时候从条件队列移除那?其实在await里面的unlinkCancelledWaiters方法。
总结: 无论是条件变量的await和singal都是需要先获取独占锁才能调用,因为条件变量使用的就是独占锁里面的state管理状态,否者会报异常。
四 、带超时时间的offer操作-生产者
在队尾添加素,如果队列满了,那么等待timeout时候,如果时间超时则返回false,如果在超时前队列有空余空间,则插入后返回true。
如果获取锁前面有线程调用了putLock. interrupt(),并且后面没有调用interrupted()重置中断标志,调用lockInterruptibly时候会抛出InterruptedException异常。
队列满的时候调用notFull.awaitNanos阻塞当前线程,当前线程会释放获取的锁,然后等待超时或者其他线程调用了notFull.signal()才会返回并重新获取锁,或者其他线程调用了该线程的interrupt方法设置了中断标志,这时候也会返回但是会抛出InterruptedException异常。
如果超时则直接返回false,如果超时前调用了notFull.signal()则会退出循环,执行(2)添加素到队列,然后执行(3),(3)的目的是为了激活其他入队等待线程。(4)的话c==0说明队列里面已经有一个素了,这时候就可以激活等待出队线程了。
另外signalNotEmpty函数是先获取独占锁,然后在调用的signal这也证明了3.2节的结论。
五、 带超时时间的poll操作-消费者
获取并移除队首素,在指定的时间内去轮询队列看有没有首素有则返回,否者超时后返回null
首先获取独占锁,然后进入循环当当前队列有素才会退出循环,或者超时了,直接返回null。
超时前退出循环后,就从队列移除素,然后计数器减去一,如果减去1前队列素大于1则说明当前移除后队列还有素,那么就发信号激活其他可能阻塞到当前条件信号的线程。
最后如果减去1前队列素个数=最大值,那么移除一个后会腾出一个空间来,这时候可以激活可能存在的入队阻塞线程。
六、put操作-生产者
与带超时时间的poll类似不同在于put时候如果当前队列满了它会一直等待其他线程调用notFull.signal才会被唤醒。
七、 take操作-消费者
与带超时时间的poll类似不同在于take时候如果当前队列空了它会一直等待其他线程调用notEmpty.signal()才会被唤醒。
八、 size操作
当前队列素个数,如代码直接使用原子变量count获取。
九、peek操作
获取但是不移除当前队列的头素,没有则返回null
十、 remove操作
删除队列里面的一个素,有则删除返回true,没有则返回false,在删除操作时候由于要遍历队列所以加了双重锁,也就是在删除过程中不允许入队也不允许出队操作
十一、开源框架中使用
tomcat中任务队列TaskQueue
11.1 类图结构
可知TaskQueue继承了LinkedBlockingQueue并且泛化类型固定了为Runnalbe.重写了offer,poll,take方法。
11.2 TaskQueue
tomcat中有个线程池ThreadPoolExecutor,在NIOEndPoint中当acceptor线程接受到请求后,会把任务放入队列,然后poller 线程从队列里面获取任务,然后就吧任务放入线程池执行。这个ThreadPoolExecutor中的的一个参数就是TaskQueue。
先看看ThreadPoolExecutor的参数如果是普通LinkedBlockingQueue是怎么样的执行逻辑:
当调用线程池方法 execute() 方法添加一个任务时:
- 如果当前运行的线程数量小于 corePoolSize,则创建新线程运行该任务
- 如果当前运行的线程数量大于或等于 corePoolSize,则将这个任务放入阻塞队列。
- 如果当前队列满了,并且当前运行的线程数量小于 maximumPoolSize,则创建新线程运行该任务;
- 如果当前队列满了,并且当前运行的线程数量大于或等于 maximumPoolSize,那么线程池将会抛出RejectedExecutionException异常。
- 如果线程执行完了当前任务,那么会去队列里面获取一个任务来执行,如果任务执行完了,并且当前线程数大于corePoolSize,那么会根据线程空闲时间keepAliveTime回收一些线程保持线程池corePoolSize个线程。
首先看下线程池中exectue添加任务时候的逻辑:
可知当当前工作线程个数为corePoolSize后,如果在来任务会把任务添加到队列,队列满了或者入队失败了则开启新线程。
然后看看TaskQueue中重写的offer方法的逻辑:
可知parent.getPoolSize()<parent.getMaximumPoolSize()普通队列会把当前任务放入队列,TAskQueue则是返回false,因为这会开启新线程执行任务,当然前提是当前线程个数没有达到最大值。
然后看下Worker线程中如果从队列里面获取任务执行的:
十二、总结
12.1 并发安全总结
仔细思考下阻塞队列是如何实现并发安全的维护队列链表的,先分析下简单的情况就是当队列里面有多个素时候,由于同时只有一个线程(通过独占锁putLock实现)入队素并且是操作last节点(,而同时只有一个出队线程(通过独占锁takeLock实现)操作head节点,所以不存在并发安全问题。
- 考虑当队列为空的时候队列状态为:
这时候假如一个线程调用了take方法,由于队列为空,所以count.get()==0所以当前线程会调用notEmpty.await()把自己挂起,并且放入notEmpty的条件队列,并且释放当前条件变量关联的通过
takeLock.lockInterruptibly()获取的独占锁。由于释放了锁,所以这时候其他线程调用take时候就会通过
takeLock.lockInterruptibly()获取独占锁,然后同样阻塞到notEmpty.await(),同样会被放入notEmpty的条件队列,也就说在队列为空的情况下可能会有多个线程因为调用take被放入了notEmpty的条件队列。
这时候如果有一个线程调用了put方法,那么就会调用enqueue操作,该操作会在last节点后面添加新素并且设置last为新节点。然后count.getAndIncrement()先获取当前队列个数为0保存到c,然后自增count为1,由于c==0所以调用signalNotEmpty激活notEmpty的条件队列里面的阻塞时间最长的线程,这时候take中调用notEmpty.await()的线程会被激活await内部会重新去获取独占锁获取成功则返回,否者被放入AQS的阻塞队列,如果获取成功,那么count.get() >0因为可能多个线程put了,所以调用dequeue从队列获取素(这时候一定可以获取到),然后调用c = count.getAndDecrement() 把当前计数返回后并减去1,如果c>1 说明当前队列还有其他素,那么就调用 notEmpty.signal()去激活 notEmpty的条件队列里面的其他阻塞线程。
- 考虑当队列满的时候:
当队列满的时候调用put方法时候,会由于notFull.await()当前线程被阻塞放入notFull管理的条件队列里面,同理可能会有多个调用put方法的线程都放到了notFull的条件队列里面。
这时候如果有一个线程调用了take方法,调用dequeue()出队一个素,c = count.getAndDecrement();count值减一;c==capacity;现在队列有一个空的位置,所以调用signalNotFull()激活notFull条件队列里面等待最久的一个线程。
12.2简单对比
LinkedBlockingQueue与ConcurrentLinkedQueue相比前者前者是阻塞队列使用可重入独占的非公平锁来实现通过使用put锁和take锁使得入队和出队解耦可以同时进行处理,但是同时只有一个线程可以入队或者出队,其他线程必须等待,另外引入了条件变量来进行入队和出队的同步,每个条件变量维护一个条件队列用来存放阻塞的线程,要注意这个队列和AQS的队列不是一个东东。LinkedBlockingQueue的size操作通过使用原子变量count获取能够比较精确的获取当前队列的素个数,另外remove方法使用双锁保证删除时候队列素保持不变,另外其实这个是个生产者-消费者模型。
而ConcurrentLinkedQueue则使用CAS非阻塞算法来实现,使用CAS原子操作保证链表构建的安全性,当多个线程并发时候CAS失败的线程不会被阻塞,而是使用cpu资源去轮询CAS直到成功,size方法先比LinkedBlockingQueue的获取的个数是不精确的,因为获取size的时候是通过遍历队列进行的,而遍历过程中可能进行增加删除操作,remove方法操作时候也没有对整个队列加锁,remove时候可能进行增加删除操作,这就可能删除了一个刚刚新增的素,而不是删除的想要位置的。
2024最新激活全家桶教程,稳定运行到2099年,请移步至置顶文章:https://sigusoft.com/99576.html
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。 文章由激活谷谷主-小谷整理,转载请注明出处:https://sigusoft.com/16509.html