1、什么是Fork/Join框架
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
2、Fork/Join背景
多核处理器已广泛应用要提高应用程序在多核处理器上的执行效率,只能想办法提高应用程序的本身的并行能力。常规的做法就是使用多线程,让更多的任务同时处理,或者让一部分操作异步执行,这种简单的多线程处理方式在处理器核心数比较少的情况下能够有效地利用处理资源,因为在处理器核心比较少的情况下,让不多的几个任务并行执行即可。但是当处理器核心数发展很大的数目,上百上千的时候,这种按任务的并发处理方法也不能充分利用处理资源,因为一般的应用程序没有那么多的并发处理任务(服务器程序是个例外)。所以,只能考虑把一个任务拆分为多个单元,每个单元分别得执行最后合并每个单元的结果,而fork/join框架就是为这而生的,java7中才认识到了这个问题。
3、工作原理
工作窃取算法
那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
ForkJoinPool:这个类实现了ExecutorService接口和工作窃取算法(Work-Stealing Algorithm)。它管理工作者线程,并提供任务的状态信息,以及任务的执行信息。
ForkJoinTask:这个类是一个将在ForkJoinPool中执行的任务的基类。
Fork/Join框架提供了在一个任务里执行fork()和join()操作的机制和控制任务状态的方法。通常,为了实现Fork/Join任务,需要实现一个以下两个类之一的子类。
RecursiveAction:用于任务没有返回结果的场景。
RecursiveTask:用于任务有返回结果的场景。
当我们调用ForkJoinTask的fork方法时,程序会把任务放在ForkJoinWorkerThread的pushTask的workQueue中,异步地执行这个任务,然后立即返回结果,代码如下:
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
然后通过join方法阻塞获取线程返回的结果
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
其中doJoin有常见的几种状态:
static final int NORMAL = 0xf0000000; // 完成
static final int CANCELLED = 0xc0000000; // 取消
static final int EXCEPTIONAL = 0x80000000; // 异常
static final int SIGNAL = 0x00010000; // 信号
4、示例代码
任务代码:
public class NumRecursiveTask extends RecursiveTask<Integer> {
private int beginPosition;
private int endPosition;
public NumRecursiveTask(int beginPosition, int endPosition) {
super();
this.beginPosition = beginPosition;
this.endPosition = endPosition;
System.out.println("# " + (beginPosition + " " + endPosition));
}
protected Integer compute() {
System.out.println(Thread.currentThread().getName()
+ "----------------");
System.out.println("compute=" + beginPosition + " " + endPosition);
if ((endPosition - beginPosition) != 0) {
System.out.println("!=0");
int middleNum = (endPosition + beginPosition) / 2;
System.out.println("left 传入的值:"
+ (beginPosition + " " + middleNum));
NumRecursiveTask leftTask = new NumRecursiveTask(beginPosition,
middleNum);
System.out.println("right 传入的值:"
+ ((middleNum + 1) + " " + endPosition));
NumRecursiveTask rightTask = new NumRecursiveTask(middleNum + 1,
endPosition);
this.invokeAll(leftTask, rightTask);
Integer leftValue = leftTask.join();
Integer rightValue = rightTask.join();
return leftValue + rightValue;
} else {
return endPosition;
}
}
}
main执行代码:
public static void main(String[] args) throws InterruptedException {
try {
NumRecursiveTask task = new NumRecursiveTask(1, 10);
ForkJoinPool pool = new ForkJoinPool();
pool.submit(task);
System.out.println("结果值为:" + task.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
本文暂时没有评论,来添加一个吧(●'◡'●)