Fork-Join框架使用介绍(一)
losetowin 发布于:2017-12-6 19:25 分类:Java 有 2918 人浏览,获得评论 0 条 标签: forkjoin
本文地址:http://www.dutycode.com/fork_join_1.html
除非注明,文章均为 www.dutycode.com 原创,欢迎转载!转载请注明本文地址,谢谢。
除非注明,文章均为 www.dutycode.com 原创,欢迎转载!转载请注明本文地址,谢谢。
介绍
- JDK1.7之后引入的并发框架
- ForkJoinPool是ExecutorService的补充,某些场景下性能比ExecutorSerivce更优。
- 并行支持,分而治之思想
- ForkJoin模型简化了多线程创建和使用
- ForkJoin模型会自动使用多核处理器
使用场景:
在处理大任务的时候,单线程的方式运行时间会很长,所以一般会考虑使用多线程的方式来执行,多线程处理的时候,就涉及到任务拆解。很多情况下, 大任务是可以拆解成小任务的,并且小任务是可以并行执行的。
fork-join模型会帮助拆解任务,并且将任务拆解成小任务,小任务如果还比较大,还可以继续拆解。
一个现实的场景例子,方便理解Fork-Join模型。
假设公司印了一批宣传单,需要发放到客户手中,进行宣传。 于是公司找到几家地推公司,给每家地推公司一部分宣传单(任务的第一步拆解,拆解成小任务), 之后地推公司拿到宣传单之后,又将这部分宣传单分给他招募的兼职发单员手中(小任务继续拆解成更小的任务),之后由兼职发单员来进行传单派发工作(执行任务), 这些传单派单员是可以同时并行的去发传单了。
如果某个派单员发完传单了,他是可以从其他未发完的派单员手里面再拿一些传单继续发的(工作窃取算法)。
简单例子
比较简单的一个计算例子,计算从0~n的数字之和。
最简单的方式, 使用for循环即可。
但比如计算1-100, 和101-200的过程,完全可以分开来并行计算,最后就做统一的汇总即可了。
使用Fork-join实现如下:
/** * @author zhangzhonghua * @version 0.0.1 * @date 2017/11/29 */ public class SumTask extends RecursiveTask<Integer> { final static int THEHOLD = 100; Integer[] array; int start; int end; public SumTask(Integer[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } @Override protected Integer compute() { if((end - start) < THEHOLD){ int sum = 0; for (int i = start; i < end; i++){ sum +=array[i]; } System.out.println(String.format("compute() start:%d,end:%d, sum=%d, ThreadName=%s", start, end, sum, Thread.currentThread().getName())); return sum; } //任务大小不符合要求, 拆分两个任务 int middle = (end + start) / 2; System.out.println(String.format("split, %d-%d,--> %d-%d, %d-%d", start, end, start, middle, middle, end)); SumTask task1 = new SumTask(array, start, middle); SumTask task2 = new SumTask(array, middle, end); invokeAll(task1, task2); // task1.fork(); // task2.fork(); int res1 = task1.join(); int res2 = task2.join(); return res1 + res2; } public static void main(String[] args){ Integer[] arr = randomInteger(400); ForkJoinPool pool = new ForkJoinPool(4);//最大并发数 ForkJoinTask<Integer> task = new SumTask(arr, 0, arr.length); Integer result = pool.invoke(task); System.out.println(String.format("final result %d", result)); } private static Integer[] randomInteger(int size){ Integer[] arr = new Integer[size]; for (int i=0; i < arr.length; i++){ arr[i] = i; } return arr; } }
程序执行结果:
split, 0-400,--> 0-200, 200-400 split, 200-400,--> 200-300, 300-400 split, 0-200,--> 0-100, 100-200 split, 200-300,--> 200-250, 250-300 split, 100-200,--> 100-150, 150-200 split, 0-100,--> 0-50, 50-100 split, 300-400,--> 300-350, 350-400 compute() start:200,end:250, sum=11225, ThreadName=ForkJoinPool-1-worker-2 compute() start:0,end:50, sum=1225, ThreadName=ForkJoinPool-1-worker-1 compute() start:300,end:350, sum=16225, ThreadName=ForkJoinPool-1-worker-0 compute() start:250,end:300, sum=13725, ThreadName=ForkJoinPool-1-worker-2 compute() start:50,end:100, sum=3725, ThreadName=ForkJoinPool-1-worker-1 compute() start:350,end:400, sum=18725, ThreadName=ForkJoinPool-1-worker-0 compute() start:100,end:150, sum=6225, ThreadName=ForkJoinPool-1-worker-3 compute() start:150,end:200, sum=8725, ThreadName=ForkJoinPool-1-worker-1 final result 79800
任务的关键在于任务拆分,也就是在compute的方法, 如果任务大小足够小了,则就直接计算, 如果还不够小,则继续拆分。
Fork-join模型如下图,比如容易理解:

RecursiveTask和RecursiveAction区别及使用场景
RecursiveTask和RecursiveAction均继承自ForkJoinTask。
RecursiveTask可以返回结果,二RecursiveAction则没有返回结果。
从两个类下的compute方法也可以看出, RecursiveTask的compute的返回值是泛型T,但RecursiveAction的compuite的返回值是void。
所以很明显, RecursiveAction适合不要求有返回结果的任务,比如发送push消息,某些场景下我们不需要知道消息是否到达,而只需要push即可。 那这种就不需要返回结果了。
RecursiveTask适合需要有返回结果的任务,比如,在用户身份验证的环节,可能有很多的身份校验规则, 那么如果有一个校验不通过,就认为用户有风险,这种情况下,我们需要汇总各个任务的执行结果,然后做统一分析。这种就适合用RecursiveTask,如下图:

ForkJoinTask中invokeAll和fork的区别
ForkJoinTask的核心在compute上,在compute方法中,如开始的例子,我们使用的是invokeAll()方法,但有些网站上说的是使用fork方法。具体的区别如下
执行compute()的线程本身也是一个worker工作线程,当两个子任务执行fork()方法时,当前的worker线程会将任务分配到两个worker线程上,这样的后果是当前的worker线程会闲置,不再工作,因为已经将任务都交给两个子任务了。这样就会导致fork-join线程池中浪费了一个worker线程。
而invokeAll()方法中,会将N-1个任务去调用fork()方法,剩下1个留给自己,这样就可以保证当前worker线程在分配完任务之后可以继续执行任务,减少资源浪费。
具体可以看下fork()和invokeAll()方法的实现。
//invokeAll的具体实现:N-1个任务调用fork,剩下一个自己消化掉,不再调用fork public static void invokeAll(ForkJoinTask<?>... tasks) { Throwable ex = null; int last = tasks.length - 1; for (int i = last; i >= 0; --i) { ForkJoinTask<?> t = tasks[i]; if (t == null) { if (ex == null) ex = new NullPointerException(); } else if (i != 0) t.fork(); //除了第一个之外,剩下的调用fork方法 else if (t.doInvoke() < NORMAL && ex == null) //剩下的一个自己消化掉 ex = t.getException(); } for (int i = 1; i <= last; ++i) { ForkJoinTask<?> t = tasks[i]; if (t != null) { if (ex != null) t.cancel(false); else if (t.doJoin() < NORMAL) ex = t.getException(); } } if (ex != null) rethrow(ex); }
//fork的具体实现:将当前任务放到当前工作线程的队列中 public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }
从代码上可以看出来, 理论上使用fork()会导致线程数增多。具体可以做个测试。
依旧以开始的例子,将 invokeAll(task1, task2) 改成 task1.fork(); task2.fork()
执行效果如下:
split, 0-400,--> 0-200, 200-400 split, 0-200,--> 0-100, 100-200 split, 200-400,--> 200-300, 300-400 split, 0-100,--> 0-50, 50-100 split, 200-300,--> 200-250, 250-300 compute() start:0,end:50, sum=1225, ThreadName=ForkJoinPool-1-worker-0 compute() start:50,end:100, sum=3725, ThreadName=ForkJoinPool-1-worker-4 compute() start:200,end:250, sum=11225, ThreadName=ForkJoinPool-1-worker-3 compute() start:250,end:300, sum=13725, ThreadName=ForkJoinPool-1-worker-3 split, 300-400,--> 300-350, 350-400 split, 100-200,--> 100-150, 150-200 compute() start:300,end:350, sum=16225, ThreadName=ForkJoinPool-1-worker-0 compute() start:100,end:150, sum=6225, ThreadName=ForkJoinPool-1-worker-4 compute() start:350,end:400, sum=18725, ThreadName=ForkJoinPool-1-worker-0 compute() start:150,end:200, sum=8725, ThreadName=ForkJoinPool-1-worker-4 final result 79800
可以看出, 改成fork之后,线程数比以前多了1个, ForkJoinPool-1-worker-4。
BTW:线程名可能会有变化, 需要看具体的compute的执行时间。
参考文章:
https://www.liaoxuefeng.com/article/001493522711597674607c7f4f346628a76145477e2ff82000
后续:
Forkjoin和executorService区别
工作窃取算法
工作窃取算法如何避免资源冲突
forkjoin为什么默认使用多核处理器
版权所有:《攀爬蜗牛》 => 《Fork-Join框架使用介绍(一)》
本文地址:https://www.dutycode.com/fork_join_1.html
除非注明,文章均为 《攀爬蜗牛》 原创,欢迎转载!转载请注明本文地址,谢谢。