Java并发-Fork Join框架
# 一、介绍
# 1. 什么是Fork/Join框架
Fork/Join框架是Java并发编程中的一个重要工具,它是在Java 7中引入的。
Fork/Join框架主要用于解决一类常见的并行问题,即将一个大任务拆分成多个小任务并行执行,然后将小任务的结果合并得到最终结果。
# 2. Fork/Join框架的特点
Fork/Join框架具有以下特点:
- 任务分治:Fork/Join框架通过将大任务划分为多个小任务,实现任务的分解与合并,充分利用多核处理器的性能。
- 工作窃取算法:Fork/Join框架使用工作窃取算法,即当某个线程执行完自己的任务后,会从其他线程的任务队列中窃取未执行的任务,以保证线程的负载均衡。
- 线程池支持:Fork/Join框架内部使用了线程池来管理线程的创建和销毁,可以通过配置线程池的参数来调整并发执行的效率。
Fork/Join框架是Java并发编程中一个强大的工具,可以用于解决各种需要并行计算的问题,提高程序的性能和效率。
# 二、基本概念
# 1. Fork/Join任务
Fork/Join任务是指可以被Fork/Join框架处理的可拆分任务,他的基类是ForkJoinTask
。一个Fork/Join任务通常继承自ForkJoinTask
的子类RecursiveTask
或RecursiveAction
类。RecursiveTask
用于有返回结果的任务,RecursiveAction
用于没有返回结果的任务。
在编写Fork/Join任务时,需要实现compute()
方法,在compute()
方法中定义任务的具体逻辑。如果任务可以进一步拆分为子任务,可以通过调用fork()
方法来拆分任务,并返回一个子任务。拆分任务后,可以通过调用join()
方法等待子任务的执行结果,并对子任务的结果进行合并得到最终结果。
# 2. Fork/Join池
Fork/Join池是Fork/Join框架的核心组件,用于管理和调度Fork/Join任务的执行。Fork/Join池内部包含了一组工作线程,这些线程用于执行任务并进行工作窃取。
Fork/Join池可以通过ForkJoinPool
类来创建和配置。在创建Fork/Join池时,可以指定线程数量、线程的优先级、线程的命名等参数。Fork/Join池还提供了一些方法来执行任务,如invoke()
用于执行一个Fork/Join任务并返回结果。
# 3. 工作窃取算法
工作窃取算法是Fork/Join框架中用于实现线程负载均衡的关键策略。在Fork/Join框架中,每个工作线程都有一个任务队列,用于存放待执行的任务。当一个线程执行完自己的任务后,会从其他线程的任务队列中窃取未执行的任务并执行。
通过工作窃取算法,Fork/Join框架可以实现线程的负载均衡。如果某个线程的任务执行速度较快,它可以从其他线程的任务队列中获取更多任务,以保证线程的利用率。这样可以充分利用多核处理器的性能,提高并发执行的效率。
在Fork/Join框架中,工作窃取算法是自动进行的,无需用户手动干预。框架会根据任务的拆分和合并情况,自动进行任务的调度和负载均衡。
# 三、示例代码
# 1. 创建Fork/Join任务
在Fork/Join框架中,我们需要自定义任务类来实现具体的任务逻辑。下面是一个示例代码,展示了如何创建一个Fork/Join任务:
import java.util.concurrent.RecursiveTask;
public class MyTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 10;
private int start;
private int end;
public MyTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
// 当任务足够小,直接计算结果
int sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
// 任务太大,需要拆分为子任务
int mid = (start + end) / 2;
MyTask leftTask = new MyTask(start, mid);
MyTask rightTask = new MyTask(mid + 1, end);
// 拆分任务并等待子任务的完成
leftTask.fork();
rightTask.fork();
// 合并子任务的结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
return leftResult + rightResult;
}
}
}
在这个示例代码中,我们定义了一个名为MyTask
的任务类,继承自RecursiveTask<Integer>
。任务类中重写了compute()
方法,该方法用于定义任务的具体逻辑。
当任务的大小小于等于阈值THRESHOLD
时,任务直接计算结果并返回。否则,将任务拆分为两个子任务,分别计算左半部分和右半部分的结果,并将子任务的结果合并得到最终结果。
# 2. 使用Fork/Join框架执行任务
在创建了Fork/Join任务之后,我们需要使用Fork/Join框架来执行任务。下面是一个使用示例代码:
import java.util.concurrent.ForkJoinPool;
public class Main {
public static void main(String[] args) {
// 创建Fork/Join池
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 创建任务实例
MyTask myTask = new MyTask(1, 100);
// 执行任务
int result = forkJoinPool.invoke(myTask);
System.out.println("计算结果:" + result);
}
}
在这个示例代码中,我们首先创建了一个Fork/Join池实例forkJoinPool
。然后,创建了一个任务实例myTask
,并通过调用forkJoinPool.invoke(myTask)
来执行任务。
# 使用默认的ForkJoinPool.commonPool()作为池
我们直接调用MyTask#invoke()
方法:
public class Main2 {
public static void main(String[] args) {
// 创建任务实例
MyTask myTask = new MyTask(1, 100);
// 执行任务
int result = myTask.invoke(myTask);
System.out.println("计算结果:" + result);
}
}
如果直接调用MyTask#invoke()
的话,会使用ForkJoinPool.commonPool()
作为池。看java.util.concurrent.ForkJoinTask#fork()
方法的源码:
public final ForkJoinTask<V> fork() {
Thread t; ForkJoinWorkerThread wt;
ForkJoinPool p; ForkJoinPool.WorkQueue q;
U.storeStoreFence(); // ensure safely publishable
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
p = (wt = (ForkJoinWorkerThread)t).pool;
q = wt.workQueue;
}
else
q = (p = ForkJoinPool.common).submissionQueue(false);
q.push(this, p, true);
return this;
}
这里的逻辑是,如果当前线程是ForkJoinWorkerThread
,则使用当前线程所属的ForkJoinPool
作为池;否则使用ForkJoinPool.commonPool()
作为池。
ForkJoinPool#common
的初始化逻辑在ForkJoinPool
的static
块中:
ForkJoinPool p = common = (System.getSecurityManager() == null) ?
new ForkJoinPool((byte)0) :
AccessController.doPrivileged(new PrivilegedAction<>() {
public ForkJoinPool run() {
return new ForkJoinPool((byte)0); }});
// 初始化的时候调用了ForkJoinPool(byte)构造方法
private ForkJoinPool(byte forCommonPoolOnly) {
ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory;
UncaughtExceptionHandler handler = null;
int maxSpares = DEFAULT_COMMON_MAX_SPARES;
int pc = 0, preset = 0; // nonzero if size set as property
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
if (pp != null) {
pc = Math.max(0, Integer.parseInt(pp));
preset = PRESET_SIZE;
}
String ms = System.getProperty
("java.util.concurrent.ForkJoinPool.common.maximumSpares");
if (ms != null)
maxSpares = Math.max(0, Math.min(MAX_CAP, Integer.parseInt(ms)));
String sf = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String sh = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if (sf != null || sh != null) {
ClassLoader ldr = ClassLoader.getSystemClassLoader();
if (sf != null)
fac = (ForkJoinWorkerThreadFactory)
ldr.loadClass(sf).getConstructor().newInstance();
if (sh != null)
handler = (UncaughtExceptionHandler)
ldr.loadClass(sh).getConstructor().newInstance();
}
} catch (Exception ignore) {
}
if (preset == 0)
pc = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
int p = Math.min(pc, MAX_CAP);
int size = (p == 0) ? 1 : 1 << (33 - Integer.numberOfLeadingZeros(p-1));
this.parallelism = p;
...
}
可以看到代码中使用了一些环境变量,其中最重要的就是java.util.concurrent.ForkJoinPool.common.parallelism
,这个环境变量用来设置ForkJoinPool.commonPool()
的最大线程数。如果没有设置这个环境变量,则默认使用当前系统可用核心数-1
个,即Runtime.getRuntime().availableProcessors() - 1
。
# 四、Java 8 parallelStream中对于Fork/Join的使用
在Java 8中,引入了parallelStream
方法来支持并行流操作。并行流操作可以将一个集合或数组分成多个子任务,并使用Fork/Join框架来并行执行这些子任务,从而提高处理数据的效率。
使用parallelStream
方法非常简单,只需在集合或数组上调用该方法即可。下面是一个示例代码,展示了如何使用parallelStream
方法:
import java.util.Arrays;
import java.util.List;
public class Main {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
int sum = numbers.parallelStream()
.filter(n -> n % 2 == 0) // 过滤偶数
.mapToInt(n -> n) // 转换为int类型
.sum(); // 求和
System.out.println("计算结果:" + sum);
}
}
在这个示例代码中,我们首先创建了一个整数列表numbers
。然后,通过调用parallelStream
方法来创建一个并行流。接下来,我们使用流的一系列操作,如filter
过滤、mapToInt
转换为int类型、sum
求和等。
从Java源码角度分析parallelStream()
方法的实现,可以看到它是基于Fork/Join框架来实现并行处理的。
下面是parallelStream()
方法的源码实现:
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
可以看到,parallelStream()
方法内部调用了StreamSupport.stream()
方法,并将第二个参数设置为true。这里的true表示创建的Stream是并行的。
StreamSupport.stream()
方法的实现如下:
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
在stream()
方法中,创建了一个新的ReferencePipeline.Head
对象,并将传入的spliterator和parallel参数传递给它。ReferencePipeline.Head
是Stream API中的一个节点,它实现了Stream接口。
ReferencePipeline.Head
的构造方法如下:
ReferencePipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags);
if (source.hasCharacteristics(Spliterator.DISTINCT)
&& (source.hasCharacteristics(Spliterator.SORTED) || source.hasCharacteristics(Spliterator.ORDERED)))
throw new IllegalArgumentException("Source must not have the DISTINCT characteristic if terminal operation is ordered or sorted");
this.sourceStage = source;
this.sourceSpliterator = source;
this.depth = 0;
this.sourceOrOpFlags = sourceFlags;
this.parallel = parallel;
this.combinedFlags = StreamOpFlag.combineOpFlags(sourceFlags, StreamOpFlag.toStreamFlags(StreamOpFlag.INITIAL_OPS_VALUE));
}
在ReferencePipeline.Head
的构造方法中,将传入的spliterator和parallel参数保存到成员变量中。这里的parallel参数决定了创建的Stream是并行的。
我们以forEach方法举例,最终会调用java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
方法:
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
在evaluate方法中,首先判断当前的Stream是否是并行的,如果是并行的,则调用terminalOp.evaluateParallel()
方法来执行并行操作。
这里terminalOp
的实现类是java.util.stream.ForEachOps.ForEachOp
,具体逻辑:
@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}
该方法的作用是并行地对一个数据流进行评估。方法接受两个参数:一个是类型为PipelineHelper<T>
的helper对象,另一个是类型为Spliterator<S>
的spliterator对象。
代码中首先进行了一个判断,如果ordered
为真,则创建一个ForEachOrderedTask
的实例,并调用其invoke
方法。否则,创建一个ForEachTask
的实例,并调用其invoke
方法。
ForEachOrderedTask
和ForEachTask
都实现了ForkJoinTask
。这里的invoke
方法是ForkJoinTask
的方法,用于启动任务的执行。
通过以上分析,我们可以看到,Java 8中的并行流底层操作是基于Fork/Join框架来实现的。
# 自定义parallelStream的线程池
要自定义parallelStream
的线程池,可以把并行流逻辑放到自定义的ForkJoinPool
中执行,示例代码如下:
List<Integer> integerList= IntStream.range(1,1000).boxed().collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
Integer actualTotal = customThreadPool.submit(
() -> integerList.parallelStream().reduce(0, Integer::sum)).get();
log.info("{}",actualTotal);
以上面的ForEachTask
为例,他的invoke()
方法最终调用了compute()
:
public void compute() {
Spliterator<S> rightSplit = spliterator, leftSplit;
long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
if ((sizeThreshold = targetSize) == 0L)
targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
boolean forkRight = false;
Sink<S> taskSink = sink;
ForEachTask<S, T> task = this;
while (!isShortCircuit || !taskSink.cancellationRequested()) {
if (sizeEstimate <= sizeThreshold ||
(leftSplit = rightSplit.trySplit()) == null) {
task.helper.copyInto(taskSink, rightSplit);
break;
}
ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
task.addToPendingCount(1);
ForEachTask<S, T> taskToFork;
if (forkRight) {
forkRight = false;
rightSplit = leftSplit;
taskToFork = task;
task = leftTask;
}
else {
forkRight = true;
taskToFork = leftTask;
}
taskToFork.fork();
sizeEstimate = rightSplit.estimateSize();
}
task.spliterator = null;
task.propagateCompletion();
}
这里只需要关注taskToFork.fork()
这一行代码,它会将任务提交到ForkJoinPool
中执行。
而上面我们已经分析过:如果当前线程是ForkJoinWorkerThread
,则使用当前线程所属的ForkJoinPool
作为池;否则使用ForkJoinPool.commonPool()
作为池。
那么这里的fork()
方法就是在自定义线程池中执行的了。
# 五、总结
本文以Java并发的Fork/Join框架为主题,介绍了其基本概念、特点以及使用方法,并提供了示例代码加深对其的理解。
我们还通过部分源码对ForkJoinTask进行了分析,以及分析了Java 8中的并行流底层操作是基于Fork/Join框架来实现的。
希望通过学习本文,你能够对Fork/Join框架有一个更深入的理解。
祝你变得更强!