Fork-Join框架使用介绍(一)

losetowin 发布于:2017-12-6 19:25 分类:Java  有 1185 人浏览,获得评论 0 条 标签: forkjoin 

本文地址:http://www.dutycode.com/fork_join_1.html
除非注明,文章均为 www.dutycode.com 原创,欢迎转载!转载请注明本文地址,谢谢。
介绍
  • JDK1.7之后引入的并发框架
  • ForkJoinPool是ExecutorService的补充,某些场景下性能比ExecutorSerivce更优。
  • 并行支持,分而治之思想
  • ForkJoin模型简化了多线程创建和使用
  • ForkJoin模型会自动使用多核处理器                                                                                                                                                                                                                                                                     

使用场景:
    在处理大任务的时候,单线程的方式运行时间会很长,所以一般会考虑使用多线程的方式来执行,多线程处理的时候,就涉及到任务拆解。很多情况下, 大任务是可以拆解成小任务的,并且小任务是可以并行执行的。 
fork-join模型会帮助拆解任务,并且将任务拆解成小任务,小任务如果还比较大,还可以继续拆解。 
    一个现实的场景例子,方便理解Fork-Join模型。
    假设公司印了一批宣传单,需要发放到客户手中,进行宣传。 于是公司找到几家地推公司,给每家地推公司一部分宣传单(任务的第一步拆解,拆解成小任务), 之后地推公司拿到宣传单之后,又将这部分宣传单分给他招募的兼职发单员手中(小任务继续拆解成更小的任务),之后由兼职发单员来进行传单派发工作(执行任务), 这些传单派单员是可以同时并行的去发传单了。 
    如果某个派单员发完传单了,他是可以从其他未发完的派单员手里面再拿一些传单继续发的(工作窃取算法)。

简单例子
    比较简单的一个计算例子,计算从0~n的数字之和。

最简单的方式, 使用for循环即可。 
但比如计算1-100, 和101-200的过程,完全可以分开来并行计算,最后就做统一的汇总即可了。 

使用Fork-join实现如下: 


/**
 * @author zhangzhonghua
 * @version 0.0.1
 * @date 2017/11/29
 */
public class SumTask extends RecursiveTask<Integer> {

    final static int THEHOLD = 100;

    Integer[] array;
    int start;
    int end;

    public SumTask(Integer[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }


    @Override
    protected Integer compute() {

        if((end - start) < THEHOLD){
            int sum = 0;
            for (int i = start; i < end; i++){
                sum +=array[i];
            }

            System.out.println(String.format("compute() start:%d,end:%d, sum=%d, ThreadName=%s", start, end, sum, Thread.currentThread().getName()));
            return sum;
        }

        //任务大小不符合要求, 拆分两个任务
        int middle = (end + start) / 2;
        System.out.println(String.format("split, %d-%d,--> %d-%d, %d-%d", start, end, start, middle, middle, end));
        SumTask task1 = new SumTask(array, start, middle);
        SumTask task2 = new SumTask(array, middle, end);

        invokeAll(task1, task2);
//        task1.fork();
//        task2.fork();

        int res1 = task1.join();
        int res2 = task2.join();

        return res1 + res2;
    }


    public static void main(String[] args){

        Integer[] arr = randomInteger(400);
        ForkJoinPool pool = new ForkJoinPool(4);//最大并发数
        ForkJoinTask<Integer> task = new SumTask(arr, 0, arr.length);
        Integer result = pool.invoke(task);

        System.out.println(String.format("final result %d", result));

    }



    private static Integer[] randomInteger(int size){
        Integer[] arr = new Integer[size];
        for (int i=0; i < arr.length; i++){
            arr[i] = i;
        }

        return arr;
    }

}



程序执行结果: 


split, 0-400,--> 0-200, 200-400
split, 200-400,--> 200-300, 300-400
split, 0-200,--> 0-100, 100-200
split, 200-300,--> 200-250, 250-300
split, 100-200,--> 100-150, 150-200
split, 0-100,--> 0-50, 50-100
split, 300-400,--> 300-350, 350-400
compute() start:200,end:250, sum=11225, ThreadName=ForkJoinPool-1-worker-2
compute() start:0,end:50, sum=1225, ThreadName=ForkJoinPool-1-worker-1
compute() start:300,end:350, sum=16225, ThreadName=ForkJoinPool-1-worker-0
compute() start:250,end:300, sum=13725, ThreadName=ForkJoinPool-1-worker-2
compute() start:50,end:100, sum=3725, ThreadName=ForkJoinPool-1-worker-1
compute() start:350,end:400, sum=18725, ThreadName=ForkJoinPool-1-worker-0
compute() start:100,end:150, sum=6225, ThreadName=ForkJoinPool-1-worker-3
compute() start:150,end:200, sum=8725, ThreadName=ForkJoinPool-1-worker-1
final result 79800
任务的关键在于任务拆分,也就是在compute的方法, 如果任务大小足够小了,则就直接计算, 如果还不够小,则继续拆分。 
Fork-join模型如下图,比如容易理解: 
27F18610-2461-4CAC-AA32-16952C14A4B9.png

RecursiveTask和RecursiveAction区别及使用场景
RecursiveTask和RecursiveAction均继承自ForkJoinTask。
RecursiveTask可以返回结果,二RecursiveAction则没有返回结果。
从两个类下的compute方法也可以看出, RecursiveTask的compute的返回值是泛型T,但RecursiveAction的compuite的返回值是void。 

A68CE0CF-AAA6-44E8-819E-90D90B2A322A.png


E21BC79B-4759-4869-9313-3D9C871D1CA4.png

所以很明显, RecursiveAction适合不要求有返回结果的任务,比如发送push消息,某些场景下我们不需要知道消息是否到达,而只需要push即可。 那这种就不需要返回结果了。 
RecursiveTask适合需要有返回结果的任务,比如,在用户身份验证的环节,可能有很多的身份校验规则, 那么如果有一个校验不通过,就认为用户有风险,这种情况下,我们需要汇总各个任务的执行结果,然后做统一分析。这种就适合用RecursiveTask,如下图: 
15046558-8D00-47AB-8B4D-2BF8FB05EC55.png

ForkJoinTask中invokeAll和fork的区别
ForkJoinTask的核心在compute上,在compute方法中,如开始的例子,我们使用的是invokeAll()方法,但有些网站上说的是使用fork方法。具体的区别如下
执行compute()的线程本身也是一个worker工作线程,当两个子任务执行fork()方法时,当前的worker线程会将任务分配到两个worker线程上,这样的后果是当前的worker线程会闲置,不再工作,因为已经将任务都交给两个子任务了。这样就会导致fork-join线程池中浪费了一个worker线程。
而invokeAll()方法中,会将N-1个任务去调用fork()方法,剩下1个留给自己,这样就可以保证当前worker线程在分配完任务之后可以继续执行任务,减少资源浪费。

具体可以看下fork()和invokeAll()方法的实现。 

//invokeAll的具体实现:N-1个任务调用fork,剩下一个自己消化掉,不再调用fork
public static void invokeAll(ForkJoinTask<?>... tasks) {
    Throwable ex = null;
    int last = tasks.length - 1;
    for (int i = last; i >= 0; --i) {
        ForkJoinTask<?> t = tasks[i];
        if (t == null) {
            if (ex == null)
                ex = new NullPointerException();
        }
        else if (i != 0)
            t.fork(); //除了第一个之外,剩下的调用fork方法
        else if (t.doInvoke() < NORMAL && ex == null) //剩下的一个自己消化掉
            ex = t.getException();
    }
    for (int i = 1; i <= last; ++i) {
        ForkJoinTask<?> t = tasks[i];
        if (t != null) {
            if (ex != null)
                t.cancel(false);
            else if (t.doJoin() < NORMAL)
                ex = t.getException();
        }
    }
    if (ex != null)
        rethrow(ex);
}

//fork的具体实现:将当前任务放到当前工作线程的队列中
public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}
从代码上可以看出来, 理论上使用fork()会导致线程数增多。具体可以做个测试。
依旧以开始的例子,将 invokeAll(task1, task2)  改成 task1.fork(); task2.fork()
执行效果如下: 
split, 0-400,--> 0-200, 200-400
split, 0-200,--> 0-100, 100-200
split, 200-400,--> 200-300, 300-400
split, 0-100,--> 0-50, 50-100
split, 200-300,--> 200-250, 250-300
compute() start:0,end:50, sum=1225, ThreadName=ForkJoinPool-1-worker-0
compute() start:50,end:100, sum=3725, ThreadName=ForkJoinPool-1-worker-4
compute() start:200,end:250, sum=11225, ThreadName=ForkJoinPool-1-worker-3
compute() start:250,end:300, sum=13725, ThreadName=ForkJoinPool-1-worker-3
split, 300-400,--> 300-350, 350-400
split, 100-200,--> 100-150, 150-200
compute() start:300,end:350, sum=16225, ThreadName=ForkJoinPool-1-worker-0
compute() start:100,end:150, sum=6225, ThreadName=ForkJoinPool-1-worker-4
compute() start:350,end:400, sum=18725, ThreadName=ForkJoinPool-1-worker-0
compute() start:150,end:200, sum=8725, ThreadName=ForkJoinPool-1-worker-4
final result 79800
可以看出, 改成fork之后,线程数比以前多了1个, ForkJoinPool-1-worker-4。
BTW:线程名可能会有变化, 需要看具体的compute的执行时间。 


参考文章:

http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/
https://www.javaworld.com/article/2078440/enterprise-java/java-tip-when-to-use-forkjoinpool-vs-executorservice.html?page=2
http://www.iteye.com/topic/1117483
http://www.jianshu.com/p/0120b3dd255f
http://www.importnew.com/2279.html

https://www.liaoxuefeng.com/article/001493522711597674607c7f4f346628a76145477e2ff82000


后续:

Forkjoin和executorService区别

工作窃取算法

工作窃取算法如何避免资源冲突

forkjoin为什么默认使用多核处理器


版权所有:《攀爬蜗牛》 => 《Fork-Join框架使用介绍(一)
本文地址:https://www.dutycode.com/fork_join_1.html
除非注明,文章均为 《攀爬蜗牛》 原创,欢迎转载!转载请注明本文地址,谢谢。