parallelStream与ForkJoinPool的二三事

浏览量3250

parallelStream与ForkJoinPool的二三事

前言: 在Java7中引入了ForkJoinPool并在Java8中的并行流中将他发扬光大

本文主要讲述了我在使用parallelStream执行redis操作和mysql等密集型操作以及异步编程的一些感悟, 并对ForkJoinPoolCompletableFuture的使用教程贴出了其他教程性博文

一、ForkJoinPool 分支合并框架

为了充分利用多个 CPU、多核CPU的性能优势,可以将一个任务拆分成多个小任务,把每个小任务放到多个处理器上并行执行

Fork/Join 框架: 在必要的情况下,将一个大任务,进行拆分(fork) 成若干个小任务(拆到不可再拆),再将一个一个的小任务运算的结果进行join汇总。

如果没用过ForkJoinPool可以参考这篇博文 https://blog.csdn.net/qq_31156277/article/details/80531543

ForkJoinPoll原理: https://www.cnblogs.com/bjlhx/p/11131197.html

二、CompetableFuture组合式异步编程

如果你的意图是实现并发,而非并行,或者你的主要目标是在同一个CPU上执行集合松耦合的任务,充分利用CPU的核,让其足够忙碌,从而最大化程序的吞吐量,那么你其实真正想做的是避免因为等待远程服务的返回等一些操作而阻塞线程的执行,浪费宝贵的计算资源,因为这种等待的时间可能很长,通过下文就会了解,Future接口,尤其是他的实现:CompletableFuture,是这种情况的处理利器

同样, 如果没用过CompetableFuture可以参考 https://yq.aliyun.com/articles/672802

三、ForkJoinPool的通用线程池

ForkJoinPool在Java8中新添加了java.util.concurrent.ForkJoinPool#common通用线程池, 他的介绍如下

Common (static) pool. Non-null for public use unless a static construction exception, but internal usages null-check on use to paranoically avoid potential initialization circularities as well as to simplify generated code.

翻译: 公共的(静态)池。公共使用时非null,除非静态构造异常,但内部使用null检查使用,以避免潜在的初始化循环,并简化生成的代码。

通用线程池主要是为了并行流来设计使用的, 在我们使用并行流时, 默认就会使用这个通用线程池, 他的默认线程数量是我们当前机器CPU核数, 这种线程分配对于计算密集型任务来说是刚刚好的, 并且ForkJoinPool的设计初衷也是对计算密集型任务的一种新的编程模型

初始化ForkJoinPool的代码: ForkJoinPool#makeCommonPool

/**
 * Creates and returns the common pool, respecting user settings
 * specified via system properties.
 */
private static ForkJoinPool makeCommonPool() {
    int parallelism = -1;
    ForkJoinWorkerThreadFactory factory = null;
    UncaughtExceptionHandler handler = null;
    try {  // ignore exceptions in accessing/parsing properties
        String pp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.parallelism");
        String fp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.threadFactory");
        String hp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
        if (pp != null)
            parallelism = Integer.parseInt(pp);
        if (fp != null)
            factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                       getSystemClassLoader().loadClass(fp).newInstance());
        if (hp != null)
            handler = ((UncaughtExceptionHandler)ClassLoader.
                       getSystemClassLoader().loadClass(hp).newInstance());
    } catch (Exception ignore) {
    }
    if (factory == null) {
        if (System.getSecurityManager() == null)
            factory = defaultForkJoinWorkerThreadFactory;
        else // use security-managed default
            factory = new InnocuousForkJoinWorkerThreadFactory();
    }
    if (parallelism < 0 && // default 1 less than #cores
        (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
        parallelism = 1;
    if (parallelism > MAX_CAP)
        parallelism = MAX_CAP;
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                            "ForkJoinPool.commonPool-worker-");
}

可以看到他是使用Runtime.getRuntime().availableProcessors() - 1来设置的初始线程数量, 并且最小值为1

而且我们可以通过在启动应用时通过加-Djava.util.concurrent.ForkJoinPool.common.parallelism={threadNum}来设置通用池的默认线程数量

四、CompletableFuture#runAsync默认线程池

通过分析CompletableFuture#runAsync的源码

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    e.execute(new AsyncRun(d, f));
    return d;
}

asyncPool来源

private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

// ThreadPerTaskExecutor
static final class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) { new Thread(r).start(); }
}
private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);

可以看到CompletableFuture#runAsync使用的线程池在ForkJoinPool默认的并行度其实也就是线程数为1时其实执行时并没有复用线程而是开辟一个新的线程来执行, 当然我们也可以在CompletableFuture执行任务时就指定一个线程池, 这也是推荐做的

具体操作CompletableFuture#runAsync(Runnable, Executor)

五、parallelStream 实际在串行执行 ? ? ?

在这里的服务器指1~2核CPU的小型服务器(比如我的学生机), 通过分析ForkJoinPool源码不难看出此时并行度为1

在只有1~2核的CPU上执行parallelStream时, 默认情况下执行计算密集型任务不会有什么不妥的地方(虽然此时使用并行流也没带来性能提升), 但是我在web应用开发时经常使用并行流来处理一些有网络io的操作比如从redis获取一组数据, 这样如果使用默认的线程数量是不能达到最大的性能要求的

而且此时CompletableFuture#runAsyn实际也是在每次都new一个线程来执行

解决方案

解决方案一

第一种解决方案为设置默认通用池的并行度, 在启动时加JVM参数-Djava.util.concurrent.ForkJoinPool.common.parallelism={threadNum}即可, 我们可以根据实际需要设置线程的数量

这里给出计算我们需要线程数量的一个计算公式:

线程数量 = cpu的数量 * cpu期望利用率 * (1 + 等待时间 / CPU执行时间)。

这种解决方案的优势是同时解决了parallelStreamCompletableFuture#runAsync的问题; 缺点是要在启动参数上做修改, 而且不符合ForkJoinPool的设计初衷

解决方案二

第二种解决方案是在执行并行流时指定我们设置的线程池

具体代码如下, 程序为计算1到1,000,000之和:

public static void main(String[] args) throws ExecutionException, InterruptedException{
	long firstNum = 1;
    long lastNum = 1_000_000;
 
    List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
      .collect(Collectors.toList());
 	// 定义线程数为4的ForkJoinPool
    ForkJoinPool customThreadPool = new ForkJoinPool(4);
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
  	
    System.out.println(actualTotal);
    System.out.println((lastNum + firstNum) * lastNum / 2);       
}

但是这种解决方案不能同时解决CompletableFuture#runAsync的问题, 不过解决方案也很简单 -> CompletableFuture#runAsync(Runnable, Executor) 在执行时指定线程池就好了

这种方法的优点是灵活性足够高, 可以根据需求定制化线程池; 缺点主要是每次都需要指定线程池会麻烦些, 但是我认为这种操作是更正规的, 比较完美的一种解决方案

评论

添加一条评论