线程同步技术_同步计数器和异步计数器的区别

线程同步技术_同步计数器和异步计数器的区别Java并发包中线程同步器一、CountDownLatch场景:主线程需要等待所有子线程执行完毕后再进行汇总CountDownLatch实现比较简单,继承AQS实现了一个不可重入共享锁Sync1.不可重入共享锁Syncprivate static final

Java并发包中线程同步器   一、CountDownLatch   场景:主线程需要等待所有子线程执行完毕后再进行汇总   CountDownLatch实现比较简单,继承AQS实现了一个不可重入共享锁Sync   
线程同步技术_同步计数器和异步计数器的区别   1.不可重入共享锁Sync   private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = L; Sync(int count) { setState(count); } int getCount() { return getState(); }   //尝试锁 仅state==0时才能成功 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }   //尝试释放锁 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }   2.方法   1)void await()   public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1);//尝试锁,不忽略中断引起的返回 }   2)boolean await(long timeout, TimeUnit unit)   public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));//尝试一定时间内锁 }   3)void countDown()   public void countDown() { sync.releaseShared(1); }   3.实例   public class CountDownLatchTest { //定义CountDownLatch 实际创建共享锁 且锁已被两个线程持有 state == 2 private static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); pool.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(“childThreadOne over”); } catch (InterruptedException e) { e.printStackTrace(); } finally { //线程1释放共享锁,state– countDownLatch.countDown(); } } }); pool.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(“childThreadTwo over”); } catch (InterruptedException e) { e.printStackTrace(); } finally { //线程2释放共享锁,state– countDownLatch.countDown(); } } }); System.out.println(“wait all child thread over”); //主线程阻塞, 实际尝试共享锁 ,仅state == 0时成功或被中断打断引起异常 countDownLatch.await(); System.out.println(“all child thread over”); pool.shutdown(); } }   二、CyclicBarrier回环屏障   和CountDownLatch场景一样,但是CountDownLatch是一次性的,CyclicBarrier可重复使用;实现方式不同,所以使用方式不同,范围更大,见后面实例   CyclicBarrier采用独占锁ReentranLock及条件变量trip(阻塞到达屏障的线程)实现   设置一道屏障,①当线程数小于屏障规定的线程数时,线程入trip条件阻塞队列,线程阻塞;②当线程数等于屏障规定的线程数时,唤醒trip中所有的线程,并重置计数器状态(越过屏障)   另外CyclicBarrier也不忽略中断引起的返回,会抛出异常,屏障会失效,抛错genetation.barrier = true   1)变量与构造方法   / 独占锁 */ private final ReentrantLock lock = new ReentrantLock(); / 条件变量 */ private final Condition trip = lock.newCondition(); / 屏障阻塞的线程个数 */ private final int parties; /* 突破屏障后执行的任务 默认为空 */ private final Runnable barrierCommand; / 默认false,当前屏障被中断打破后,设置为true,继续使用屏障会抛出异常BrokenBarrierException */ private Generation generation = new Generation(); / * 实际计数器 count == 0时突破屏障 */ private int count; public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }   2.方法   1)int dowait(boolean timed, long nanos)   / * 主要代码 */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) //中断引起的跨过屏障,后续await屏障都会抛错 throw new BrokenBarrierException(); if (Thread.interrupted()) { //当前线程被中断,唤醒trip的所有阻塞线程,设置g.broken == true,抛出异常 breakBarrier(); throw new InterruptedException(); } //调用一次数据器-1 int index = –count; //当计数器 == 0时,达到屏蔽的线程数,越过屏障 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) //先执行屏障任务 command.run(); ranAction = true; //唤醒条件变量中所有线程trip.signalAll(); //重置计数器count = parties; //重置版本generation = new Generation(); nextGeneration(); return 0; } finally { if (!ranAction) //执行屏障任务抛错时, //依然唤醒所有阻塞线程, //但设置g.barrier == true,后续屏障都会抛错 breakBarrier(); } } // 当计数器 != 0 时,当前线程入条件阻塞队列 for (;;) { try { if (!timed) //无限阻塞 trip.await(); else if (nanos > 0L) //超时阻塞 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }   2)int await()   public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }   3) int await(long timeout, TimeUnit unit)   public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }   3.实例   public class CyclicBarrierTest { //设置屏障线程数为2 state = 2 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2); public static void main(String[] args){ ExecutorService pool = Executors.newFixedThreadPool(2); pool.submit(new Runnable(){ @Override public void run() { try { System.out.println(“thread1 step1”); //线程1入trip阻塞队列,state– cyclicBarrier.await(); System.out.println(“thread1 step2”); cyclicBarrier.await(); System.out.println(“thread1 step3”); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); pool.submit(new Runnable(){ @Override public void run() { try { System.out.println(“thread2 step1”); //线程2入trip阻塞队列,state– //与线程1的step1一起导致state == 0,越过屏障唤醒两个线程,state重新设置为2后续逻辑一致 cyclicBarrier.await(); System.out.println(“thread2 step2”); cyclicBarrier.await(); System.out.println(“thread2 step3”); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); pool.shutdown(); } }   三、Semaphore   场景:与CountDownLatch一样   信号量同步器设计类似于CountDownLatch,不同的是计数器是递增的   Semaphore不仅实现了公平锁,还实现了非公平锁   
线程同步技术_同步计数器和异步计数器的区别    1.共享锁Sync   abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available – acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error(“Maximum permit count exceeded”); if (compareAndSetState(current, next)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current – reductions; if (next > current) // underflow throw new Error(“Permit count underflow”); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } / * 非公平锁 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } / * 公平锁 */ static final class FairSync extends Sync { private static final long serialVersionUID = L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available – acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }   2实例   public class SemaphoreTest { //信号量 private static Semaphore semaphore = new Semaphore(0); public static void main(String[] args) throws InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); pool.submit(new Runnable() { @Override public void run() { try { System.out.println(“thread1 over”); //释放+1 semaphore.release(); } catch (Exception e) { e.printStackTrace(); } } }); pool.submit(new Runnable() { @Override public void run() { try { System.out.println(“thread2 over”); //释放+1 semaphore.release(); } catch (Exception e) { e.printStackTrace(); } } }); //同步 semaphore.acquire(2); System.out.println(“all child thread over”); pool.shutdown(); } }   四、总结   1.线程同步的设计类似《操作系统原理》中的进程同步,信号量机制,PV操作   2.CountDownLatch实现线程同步(计数器自减),是一次性的,仅支持公平锁,线程FIFO;   CyclicBarrier实现线程同步(计数器自减),是可复用的(计数器还原),使用独占锁ReentranLock的条件变量trip的阻塞队列实现。   Semaphore实现线程同步(计数器自增),也是可以复用的(计数器归0),提供公平锁与非公平锁实现。

2024最新激活全家桶教程,稳定运行到2099年,请移步至置顶文章:https://sigusoft.com/99576.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。 文章由激活谷谷主-小谷整理,转载请注明出处:https://sigusoft.com/78183.html

(0)
上一篇 2024年 8月 3日
下一篇 2024年 8月 3日

相关推荐

关注微信