public abstract class CountedCompleter<T> extends ForkJoinTask<T>
ForkJoinTask
,当触发时执行完成操作,并且没有剩余的待处理操作。
CountedCompleters通常比其他形式的ForkJoinTasks在子任务停顿和阻塞的情况下更加强大,但是不太直观的编程。
CountedCompleter的使用与其他基于完成的组件(例如CompletionHandler
)的使用类似,但可能需要多个未完成的完成才能触发完成操作onCompletion(CountedCompleter)
,而不仅仅是一个。
除非另有初始化, pending count开始于零,但也可以是(原子),使用方法改变setPendingCount(int)
, addToPendingCount(int)
和compareAndSetPendingCount(int, int)
。
在调用tryComplete()
时,如果待处理的行动计数不为零,则递减;
否则,执行完成操作,如果完成者本身具有完整性,则该过程将继续完成。
与相关同步组件(如Phaser
和Semaphore
)的情况一样 ,这些方法仅影响内部计数;
他们没有建立任何进一步的内部簿记。
特别地,未维护未决任务的身份。
如下所示,您可以创建在需要时记录一些或所有待处理任务或其结果的子类。
如下所示,还提供了支持定制完成遍历的实用程序方法。
然而,由于CountedCompleters仅提供基本的同步机制,因此创建进一步的抽象子类可能是有用的,这些子类保持适用于一组相关用法的链接,字段和其他支持方法。
具体的CountedCompleter类必须定义方法compute()
,在大多数情况下(如下所示),在tryComplete()
之前调用tryComplete()
一次。 该类还可以可选地覆盖方法onCompletion(CountedCompleter)
以在正常完成时执行动作,并且方法onExceptionalCompletion(Throwable, CountedCompleter)
对任何异常执行动作。
CountedCompleter通常不承担结果,在这种情况下,它们通常被声明为CountedCompleter<Void>
,并将始终返回null
作为结果值。 在其他情况下,你应该重写方法getRawResult()
提供从结果join(), invoke()
,以及相关方法。 一般来说,该方法应该返回在完成后保存结果的CountedCompleter对象的一个字段(或一个或多个字段的函数)的值。 默认方法setRawResult(T)
在CountedCompleters中不起作用。 可能但很少适用于覆盖此方法来维护其他对象或保存结果数据的字段。
一个CountedCompleter本身不具有一个完整的(即getCompleter()
返回null
)可以用作这个添加的功能的常规ForkJoinTask。 然而,任何完成者又具有另一个完成者仅用作其他计算的内部帮助器,因此其自己的任务状态(如方法如ForkJoinTask.isDone()
所报告)是任意的; 这种状况只有在明确调用改变complete(T)
, ForkJoinTask.cancel(boolean)
, ForkJoinTask.completeExceptionally(Throwable)
或方法的特殊结束后compute
。 在任何异常完成之后,如果有任何异常可能会被传递到任务的完成者(以及其完成者等),如果存在并且尚未完成。 同样地,取消一个内部的CountedCompleter只对该完成者有局部的影响,所以并不常用。
示例用法
并行递归分解。 CountedCompleters可能被安排在与RecursiveAction
经常使用的类似的树中,尽管与设置相关的结构通常有所不同。 这里,每个任务的完成者是其计算树中的父项。 即使它们需要更多的簿记,CountedCompleters可能是更好的选择,当应用可能耗时的操作(不能进一步细分)到数组或集合的每个元素; 特别是当操作对于一些元素的时间要比其他元素完成时要多得多,这是因为内在的变化(例如I / O)或诸如垃圾收集的辅助效应。 因为CountedCompleters提供自己的延续,其他线程不需要阻止等待执行它们。
例如,这是一个初始版本的类,它使用二分法递归分解将工作分成单个部分(叶子任务)。 即使将工作分解为单独的调用,基于树的技术通常比直接分支叶子任务更为可取,因为它们可以减少线程间通信并改善负载平衡。 在递归的情况下,要完成的每对子任务副本中的第二个触发器完成其父进程(因为没有执行结果组合,所以方法onCompletion
的默认无操作实现不被覆盖)。 静态实用程序方法设置基本任务并调用它(这里隐含地使用ForkJoinPool.commonPool()
)。
class MyOperation<E> { void apply(E e) { ... } } class ForEach<E> extends CountedCompleter<Void> { public static <E> void forEach(E[] array, MyOperation<E> op) { new ForEach<E>(null, array, op, 0, array.length).invoke(); } final E[] array; final MyOperation<E> op; final int lo, hi; ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) { super(p); this.array = array; this.op = op; this.lo = lo; this.hi = hi; } public void compute() { // version 1 if (hi - lo >= 2) { int mid = (lo + hi) >>> 1; setPendingCount(2); // must set pending count before fork new ForEach(this, array, op, mid, hi).fork(); // right child new ForEach(this, array, op, lo, mid).fork(); // left child } else if (hi > lo) op.apply(array[lo]); tryComplete(); } }
通过注意到在递归的情况下,该任务在分配正确的任务后无关,因此可以在返回之前直接调用其左任务,从而可以改善此设计。
(这是一个尾递归删除的模拟。)另外,因为任务在执行其左任务时返回(而不是通过调用tryComplete
),待处理的计数设置为1:
class ForEach<E> ... public void compute() { // version 2 if (hi - lo >= 2) { int mid = (lo + hi) >>> 1; setPendingCount(1); // only one pending new ForEach(this, array, op, mid, hi).fork(); // right child new ForEach(this, array, op, lo, mid).compute(); // direct invoke } else { if (hi > lo) op.apply(array[lo]); tryComplete(); } }
作为一个进一步的改进,注意左边的任务甚至不存在。
而不是创建一个新的,我们可以迭代使用原始任务,并为每个fork添加一个挂起的计数。
另外,因为这个树中没有任务实现onCompletion(CountedCompleter)
方法, tryComplete()
可以替换为propagateCompletion()
。
class ForEach<E> ... public void compute() { // version 3 int l = lo, h = hi; while (h - l >= 2) { int mid = (l + h) >>> 1; addToPendingCount(1); new ForEach(this, array, op, mid, h).fork(); // right child h = mid; } if (h > l) op.apply(array[l]); propagateCompletion(); }
这些类的额外改进可能需要预先计算待处理的计数,以便它们可以在构造函数中建立,专门用于叶子步骤的类,按每个重复的四个细分,而不是两个细分,并使用自适应阈值,而不是总是细分为单个元素。
搜索。 CountedCompleters的树可以在数据结构的不同部分搜索一个值或属性,一旦发现结果,就会在AtomicReference
中报告结果。 其他人可以轮询结果,以避免不必要的工作。 (您可以另外提供cancel其他任务,但通常只需让他们注意到结果被设置,并且如果是跳过进一步处理,则通常更简单和更有效)。再次使用完全分区(再次,在实践中,叶任务)几乎总是处理多个元素):
class Searcher<E> extends CountedCompleter<E> { final E[] array; final AtomicReference<E> result; final int lo, hi; Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) { super(p); this.array = array; this.result = result; this.lo = lo; this.hi = hi; } public E getRawResult() { return result.get(); } public void compute() { // similar to ForEach version 3 int l = lo, h = hi; while (result.get() == null && h >= l) { if (h - l >= 2) { int mid = (l + h) >>> 1; addToPendingCount(1); new Searcher(this, array, result, mid, h).fork(); h = mid; } else { E x = array[l]; if (matches(x) && result.compareAndSet(null, x)) quietlyCompleteRoot(); // root task is now joinable break; } } tryComplete(); // normally complete whether or not found } boolean matches(E e) { ... } // return true if found public static <E> E search(E[] array) { return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke(); } }
在此示例中,以及其他任务除了compareAndSet设置常见结果之外没有其他效果,tryComplete的后续无条件tryComplete
可以被设置为有条件的( if (result.get() == null) tryComplete();
),因为一旦根任务完成,就不需要进一步的簿记管理完成。
记录子任务 结合多个子任务的CountedCompleter任务通常需要在方法onCompletion(CountedCompleter)
中访问这些结果。 如下面的类所示(执行map-reduce的简化形式,其中映射和缩减都是类型为E
),分割和征服设计的一种方法是使每个子任务记录成为兄弟,以便它可以可以在方法onCompletion
访问。 这种技术适用于结合左和右结果的顺序无关紧要的减少; 有序减少需要明确的左/右指定。 上述示例中可以看到其他流程图的变体。
class MyMapper<E> { E apply(E v) { ... } } class MyReducer<E> { E apply(E x, E y) { ... } } class MapReducer<E> extends CountedCompleter<E> { final E[] array; final MyMapper<E> mapper; final MyReducer<E> reducer; final int lo, hi; MapReducer<E> sibling; E result; MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper, MyReducer<E> reducer, int lo, int hi) { super(p); this.array = array; this.mapper = mapper; this.reducer = reducer; this.lo = lo; this.hi = hi; } public void compute() { if (hi - lo >= 2) { int mid = (lo + hi) >>> 1; MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid); MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi); left.sibling = right; right.sibling = left; setPendingCount(1); // only right is pending right.fork(); left.compute(); // directly execute left } else { if (hi > lo) result = mapper.apply(array[lo]); tryComplete(); } } public void onCompletion(CountedCompleter<?> caller) { if (caller != this) { MapReducer<E> child = (MapReducer<E>)caller; MapReducer<E> sib = child.sibling; if (sib == null || sib.result == null) result = child.result; else result = reducer.apply(child.result, sib.result); } } public E getRawResult() { return result; } public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { return new MapReducer<E>(null, array, mapper, reducer, 0, array.length).invoke(); } }
这里,方法onCompletion
采用了结合结果的许多完成设计共同的形式。
这种回调式方法在每个任务中触发一次,在挂起计数的两个不同上下文中的任一个中,或者当任务本身调用时其挂起的计数为零时变为零:(1),或者tryComplete
时的挂起计数为零,或(2)通过任何其子任务,当它们完成并将待处理的计数递减到零时。
caller
论证区分案例。
通常,当呼叫方为this
时,不需要采取任何行动。
否则,可以使用调用者参数(通常通过转换)来提供要组合的值(和/或链接到其他值)。
假设正确使用挂起计数,里面的动作onCompletion
发生(一次)一个任务,其子任务完成时。
在此方法中不需要额外的同步来确保对此任务或其他完成任务的字段的访问的线程安全性。
完成遍历 。 如果使用onCompletion
处理完成不适用或不方便,您可以使用方法firstComplete()
和nextComplete()
创建自定义遍历。 例如,要定义一个仅以第三个ForEach示例的形式分割右侧任务的MapReducer,完成必须按照未用尽的子任务链接合作减少,可以如下完成:
class MapReducer<E> extends CountedCompleter<E> { // version 2 final E[] array; final MyMapper<E> mapper; final MyReducer<E> reducer; final int lo, hi; MapReducer<E> forks, next; // record subtask forks in list E result; MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper, MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) { super(p); this.array = array; this.mapper = mapper; this.reducer = reducer; this.lo = lo; this.hi = hi; this.next = next; } public void compute() { int l = lo, h = hi; while (h - l >= 2) { int mid = (l + h) >>> 1; addToPendingCount(1); (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork(); h = mid; } if (h > l) result = mapper.apply(array[l]); // process completions by reducing along and advancing subtask links for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) { for (MapReducer t = (MapReducer)c, s = t.forks; s != null; s = t.forks = s.next) t.result = reducer.apply(t.result, s.result); } } public E getRawResult() { return result; } public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { return new MapReducer<E>(null, array, mapper, reducer, 0, array.length, null).invoke(); } }
触发器 一些CountedCompleters本身从来没有分叉,而是作为其他设计中的一些管道; 包括完成一个或多个异步任务触发另一个异步任务。 例如:
class HeaderBuilder extends CountedCompleter<...> { ... } class BodyBuilder extends CountedCompleter<...> { ... } class PacketSender extends CountedCompleter<...> { PacketSender(...) { super(null, 1); ... } // trigger on second completion public void compute() { } // never called public void onCompletion(CountedCompleter<?> caller) { sendPacket(); } } // sample use: PacketSender p = new PacketSender(); new HeaderBuilder(p, ...).fork(); new BodyBuilder(p, ...).fork();
Modifier | Constructor and Description |
---|---|
protected |
CountedCompleter()
创建一个新的CountedCompleter,没有完成,初始挂起计数为零。
|
protected |
CountedCompleter(CountedCompleter<?> completer)
创建一个新的CountedCompleter与给定的完成者和初始挂起计数为零。
|
protected |
CountedCompleter(CountedCompleter<?> completer, int initialPendingCount)
创建一个新的CountedCompleter与给定的完成和初始挂起计数。
|
Modifier and Type | Method and Description |
---|---|
void |
addToPendingCount(int delta)
将给定值添加(原子地)给挂起的计数。
|
boolean |
compareAndSetPendingCount(int expected, int count)
只有当当前持有给定的预期值时,将挂起的计数设置为(原子地)给定计数。
|
void |
complete(T rawResult)
不管挂起计数,调用
onCompletion(CountedCompleter) ,将此任务标记为完成,并进一步触发
tryComplete() 对此任务的完成(如果存在)。
|
abstract void |
compute()
这个任务执行的主要计算。
|
int |
decrementPendingCountUnlessZero()
如果挂起的计数非零,(原子地)减少它。
|
protected boolean |
exec()
实现CountedCompleters的执行约定。
|
CountedCompleter<?> |
firstComplete()
如果此任务的挂起计数为零,则返回此任务;
否则递减其挂起的计数并返回 null 。
|
CountedCompleter<?> |
getCompleter()
返回成立于这项任务的构造函数中完成者,或
null 如果没有。
|
int |
getPendingCount()
返回当前挂起的计数。
|
T |
getRawResult()
返回计算结果。
|
CountedCompleter<?> |
getRoot()
返回当前计算的根;
即这个任务,如果它没有完成,否则它的完成者的根。
|
void |
helpComplete(int maxTasks)
如果此任务尚未完成,则尝试至少处理此任务在完成路径上的给定数量的其他未处理任务(如果有)。
|
CountedCompleter<?> |
nextComplete()
如果此任务没有完成,请调用
ForkJoinTask.quietlyComplete() 并返回
null 。
|
void |
onCompletion(CountedCompleter<?> caller)
当方法
tryComplete() 被调用并且挂起的计数为零时,或者当调用无条件方法
complete(T) 时,执行一个动作。
|
boolean |
onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller)
方法
ForkJoinTask.completeExceptionally(Throwable) 被调用或方法
compute() 引发异常,并且此任务尚未正常完成时执行操作。
|
void |
propagateCompletion()
相当于 tryComplete() 但并不在完成路径中调用onCompletion(CountedCompleter) :如果挂起的计数为非零,则递减计数;
否则,同样尝试完成此任务的完成,如果存在,否则将此任务标记为完成。
|
void |
quietlyCompleteRoot()
相当于
getRoot().quietlyComplete() 。
|
void |
setPendingCount(int count)
将待处理计数设置为给定值。
|
protected void |
setRawResult(T t)
带有CountedCompleters结果的方法可以可选地用于帮助维护结果数据。
|
void |
tryComplete()
如果挂起的计数不为零,则减去计数;
否则调用 onCompletion(CountedCompleter) ,然后类似地尝试完成此任务的完成,如果存在,否则将此任务标记为完成。
|
adapt, adapt, adapt, cancel, compareAndSetForkJoinTaskTag, completeExceptionally, fork, get, get, getException, getForkJoinTaskTag, getPool, getQueuedTaskCount, getSurplusQueuedTaskCount, helpQuiesce, inForkJoinPool, invoke, invokeAll, invokeAll, invokeAll, isCancelled, isCompletedAbnormally, isCompletedNormally, isDone, join, peekNextLocalTask, pollNextLocalTask, pollTask, quietlyComplete, quietlyInvoke, quietlyJoin, reinitialize, setForkJoinTaskTag, tryUnfork
protected CountedCompleter(CountedCompleter<?> completer, int initialPendingCount)
completer
-这个任务的完成者,或
null
如果没有
initialPendingCount
- 初始挂单数
protected CountedCompleter(CountedCompleter<?> completer)
completer
- 这个任务的完成者,或
null
如果没有
protected CountedCompleter()
public abstract void compute()
public void onCompletion(CountedCompleter<?> caller)
tryComplete()
被调用并且挂起的计数为零时,或当无条件方法complete(T)
被调用时执行一个动作。
默认情况下,此方法什么都不做。
您可以通过检查给定的呼叫者参数的身份来区分情况。
如果不等于this
,则它通常是可以包含结合(和/或其他结果的链接)的子任务。
caller
- 调用此方法的任务(可能是此任务本身)
public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller)
ForkJoinTask.completeExceptionally(Throwable)
被调用或方法compute()
引发异常,并且此任务尚未正常完成时执行操作。
进入这个方法,这个任务ForkJoinTask.isCompletedAbnormally()
。
该方法的返回值控制进一步的传播:如果true
并且此任务具有尚未完成的完成者,那么该完成器也完成异常,与完成者的异常相同。
该方法的默认实现除了返回true
之外什么都不true
。
ex
- 例外
caller
- 调用此方法的任务(可能是此任务本身)
true
如果这个异常应该被传播到这个任务的完成者(如果存在的话)
public final CountedCompleter<?> getCompleter()
null
如果没有。
public final int getPendingCount()
public final void setPendingCount(int count)
count
- 计数
public final void addToPendingCount(int delta)
delta
- 要添加的值
public final boolean compareAndSetPendingCount(int expected, int count)
expected
- 期望值
count
- 新的价值
true
如果成功
public final int decrementPendingCountUnlessZero()
public final CountedCompleter<?> getRoot()
public final void tryComplete()
onCompletion(CountedCompleter)
,然后同样尝试完成此任务的完成,如果存在,否则将此任务标记为完成。
public final void propagateCompletion()
tryComplete()
但不会在完成路径下调用onCompletion(CountedCompleter)
:如果挂起的计数为非零,则递减计数;
否则,同样尝试完成此任务的完成,如果存在,否则将此任务标记为完成。
这种方法可能是在情况下非常有用,其中onCompletion
不应该或不需要,调用在计算每完成者。
public void complete(T rawResult)
onCompletion(CountedCompleter)
,将此任务标记为完成,并进一步触发tryComplete()
此任务的完成(如果存在)。
给定rawResult作为参数传递给setRawResult(T)
调用之前onCompletion(CountedCompleter)
或纪念这一任务已完成;
其值仅对涵盖setRawResult的类setRawResult
。
此方法不会修改挂起的计数。
当获得几个子任务结果的任何一个(相对于所有)结果时,强制完成时,此方法可能是有用的。 然而,在setRawResult
不被覆盖的共同(和推荐)情况下,可以使用quietlyCompleteRoot();
更简单地获得该效果。
complete
在
ForkJoinTask<T>
rawResult
- 原始结果
public final CountedCompleter<?> firstComplete()
null
public final CountedCompleter<?> nextComplete()
ForkJoinTask.quietlyComplete()
并返回null
。
或者,如果完成者的挂起计数不为零,则减少待处理的计数并返回null
。
否则,返回完成者。
此方法可用作同构任务层次结构的完成遍历循环的一部分:
for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) { // ... process c ... }
null
如果没有)
public final void quietlyCompleteRoot()
getRoot().quietlyComplete()
。
public final void helpComplete(int maxTasks)
maxTasks
- 要处理的最大任务数。
如果小于或等于零,则不处理任务。
protected final boolean exec()
exec
在类别
ForkJoinTask<T>
true
如果此任务已知正常完成
public T getRawResult()
null
,这适用于Void
操作,但在其他情况下应该被覆盖,几乎总是返回一个在完成后保存结果的字段或函数。
getRawResult
在类别
ForkJoinTask<T>
protected void setRawResult(T t)
setRawResult
在类别
ForkJoinTask<T>
t
- 值