专业的JAVA编程教程与资源

网站首页 > java教程 正文

java同步工具:CyclicBarrier核心源码分析

temp10 2025-02-18 13:26:47 java教程 10 ℃ 0 评论

大神:线程同步方式有哪些?

小王:冷静思考中,说有生产-消费、信号量、互斥量,在项目中我经常使用。

java同步工具:CyclicBarrier核心源码分析

大神:恩,这些都是用于双线程的处理。线程组同步机制了解吗?

小王:我X,没有使用过,请大神赐教!!!

屏障

我们先理解一下什么是屏障,意思就是有N个线程它们正在计算,但是还没有计算完毕。过一会,第一个线程完成了所有需要在第一阶段进行的计算。它接着被挂起等待。又过一会,第N-2个和第N-3也完成了第一个阶段计算,也接着被挂起等待。当最后一个线程N到达屏障时,所有线程就一起被释放。

CyclicBarrier循环屏障

它是java.util.concurrent包下的同步辅助工具,它的作用允许N个线程全部等待彼此完成后在继续下一个任务;

关键点该类使ReentrantLock重入锁对象lock()、unlock()方法处理多线程之间互斥,Condition条件对象await()、signalAll()方法处理线程的挂起睡眠与唤醒;

private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();

该类提供两个创建对象的构造方法,可以指定循环数量、动作。

CyclicBarrier(int parties)
CyclicBarrier(int parties, Runnable barrierAction)

await等待方法的实现是调用类的私有dowait()方法。

await(){return dowait(false, 0L);}
await(long timeout, TimeUnit unit){return dowait(true, unit.toNanos(timeout));}

dowait核心方法是基于重入锁、条件来实现多线程互斥、挂起、唤醒。它是如何处理多线程到达屏障点的呢?看如下代码块:

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock; 
        lock.lock(); // 获得锁,如果当前线程没有获得锁那么将被禁用休眠等待线程调度
        try {
            final Generation g = generation; // 代表屏障

            if (g.broken) // broken为真说明屏障已遭破坏
                throw new BrokenBarrierException();

            if (Thread.interrupted()) { // 线程中断,屏障将会破坏
                breakBarrier(); // 
                throw new InterruptedException();
            }

            int index = --count; 
            if (index == 0) {  // 所有线程都到达屏障点
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand; 
                    if (command != null)
                        command.run(); // 执行barrierCommand任务
                    ranAction = true;
                    nextGeneration(); // 唤醒所有线程
                    return 0;
                } finally {
                    if (!ranAction) // 如果barrierCommand任务失败,屏障将会破坏
                        breakBarrier();
                }
            }

            // 自旋
            for (;;) {
                try {
                    // Condition 条件变量使线程阻塞
                    if (!timed)
                        trip.await(); // 线程将被挂起睡眠
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos); // 指定时间的线程将被挂起睡眠
                } catch (InterruptedException ie) { // 线程中断异常,屏障破坏
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken) // 屏障破坏
                    throw new BrokenBarrierException();

                if (g != generation) // 是否换代
                    return index;

                if (timed && nanos <= 0L) { // 超时,屏障破坏
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock(); // 释放锁
        }
    }

总结

通过以上代码分析我们已经知道了循环屏障的处理逻辑。线程组通过重入锁,条件达到挂起睡眠、唤醒。如何使用呢?通过构造方法指定线程数量,以及都达到屏障之后要执行的方法,线程执行调用await()方法,当最后一个线程调用await()方法后,就会唤醒所有线程,然后执行barrierCommand,并生成下一代。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表