public class Phaser extends Object
CyclicBarrier
和CountDownLatch
,但支持更灵活的使用。
注册。 不像其他障碍的情况下,各方的号码进行注册 ,以在相位同步可以随时间变化。 任务可以在任何时间(使用的方法来注册register()
, bulkRegister(int)
,或构造建立各方的初始数的形式),和(使用任何抵达时任选注销arriveAndDeregister()
)。 与大多数基本同步结构一样,注册和注销仅影响内部计数; 他们没有建立任何进一步的内部簿记,所以任务无法查询是否注册。 (但是,您可以通过对此类进行子类化来介绍此类簿记。)
同步 像CyclicBarrier
, Phaser
可能会重复等待。 方法arriveAndAwaitAdvance()
有效果类似于CyclicBarrier.await
。 每一代移相器都具有相关的相位数。 相位数从零开始,当所有各方到达移相器时前进,在达到Integer.MAX_VALUE
之后Integer.MAX_VALUE
零。 通过使用阶段数字,可以通过两种可由任何注册方调用的方法,在到达移相器和等待其他人时独立控制动作:
arrive()
和arriveAndDeregister()
记录到达。 这些方法不会阻塞,而是返回相关的到达阶段数 ; 即到达应用的移相器的相位数。 当给定阶段的最后一方到达时,执行可选操作,并且阶段前进。 这些动作由触发相位提前的方执行,并且通过覆盖方法onAdvance(int, int)
进行排列,该方法也控制终止。 覆盖该方法是相似,但比更灵活,提供屏障作用到CyclicBarrier
。 awaitAdvance(int)
需要一个指示到达阶段数的参数,并且当相位器前进到(或已经处于)不同阶段时返回。 与使用CyclicBarrier
类似结构不同,方法awaitAdvance
继续等待即使等待的线程被中断。 中断和超时版本也是可用的,但任务等待中断或超时时遇到的异常不会改变移相器的状态。 如有必要,您可以在调用forceTermination
之后,在处理程序中执行任何关联的恢复。 在ForkJoinPool
中执行的任务也可以使用抖动器 ,当其他人被阻塞等待相位提前时,这将确保足够的并行执行任务。 终止。 移相器可能进入终止状态,可以使用方法isTerminated()
进行检查。 一旦终止,所有同步方法立即返回而不等待提前,如负值返回值所示。 同样,终止时注册的尝试也没有效果。 当调用onAdvance
返回true
。 如果注销已导致注册方的数量变为零,默认实现将返回true
。 如下所示,当相位器以固定次数的迭代控制动作时,当当前相位数达到阈值时,通常方便的是重写该方法以导致终止。 方法forceTermination()
也可用于突然释放等待线程并允许它们终止。
分层 移相器可以分层 (即,以树结构构造)以减少争用。 具有大量聚会的激动剂,否则将会遇到重度的同步竞争成本,可能会被设置为使得一些子相位器共享一个共同的父节点。 这可能会大大增加吞吐量,即使它产生更大的每操作开销。
在一个分层相位的树中,自动管理儿童相机与其父母的注册和注销。 每当儿童移动设备的注册方数量变得非零(如Phaser(Phaser,int)
构造函数中所确定的那样, register()
或bulkRegister(int)
),子移动设备就向其父母注册。 只要注册方的数量变为零的调用的结果arriveAndDeregister()
,孩子相位器从其父注销。
监控。 虽然同步方法只能由注册方调用,但是可以由任何呼叫者监视移相器的当前状态。 在任何时刻,共有getRegisteredParties()
个派对,其中getArrivedParties()
已到达现阶段( getPhase()
)。 剩下的( getUnarrivedParties()
)派对到达时,阶段进行。 这些方法返回的值可以反映瞬态状态,因此对于同步控制通常不是有用的。 方法toString()
以便于非正式监视的形式返回这些状态查询的快照。
示例用法:
可以使用A Phaser
而不是CountDownLatch
来控制为可变数目的方提供服务的一次性动作。 典型的成语是用于首先注册的方法设置,然后启动动作,然后注销,如:
void runTasks(List<Runnable> tasks) { final Phaser phaser = new Phaser(1); // "1" to register self // create and start threads for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { phaser.arriveAndAwaitAdvance(); // await all creation task.run(); } }.start(); } // allow threads to start and deregister self phaser.arriveAndDeregister(); }
导致一组线程重复执行给定次数迭代的一种方法是覆盖onAdvance
:
void startTasks(List<Runnable> tasks, final int iterations) { final Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int registeredParties) { return phase >= iterations || registeredParties == 0; } }; phaser.register(); for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { do { task.run(); phaser.arriveAndAwaitAdvance(); } while (!phaser.isTerminated()); } }.start(); } phaser.arriveAndDeregister(); // deregister self, don't wait }
如果主要任务必须等待终止,则可能会重新注册,然后执行类似的循环:
// ... phaser.register(); while (!phaser.isTerminated()) phaser.arriveAndAwaitAdvance();
可以使用相关的结构来等待上下文中的特定阶段数字,在这些上下文中,您确定阶段永远不会绕过Integer.MAX_VALUE
。 例如:
void awaitPhase(Phaser phaser, int phase) { int p = phaser.register(); // assumes caller not already registered while (p < phase) { if (phaser.isTerminated()) // ... deal with unexpected termination else p = phaser.arriveAndAwaitAdvance(); } phaser.arriveAndDeregister(); }
要创建一组n
采用移相器的一棵树的任务,你可以使用以下形式的代码,假设一个任务类构造函数接受Phaser
,它与在建注册。 在调用build(new Task[n], 0, n, new Phaser())
之后,可以启动这些任务,例如通过提交到池:
void build(Task[] tasks, int lo, int hi, Phaser ph) { if (hi - lo > TASKS_PER_PHASER) { for (int i = lo; i < hi; i += TASKS_PER_PHASER) { int j = Math.min(i + TASKS_PER_PHASER, hi); build(tasks, i, j, new Phaser(ph)); } } else { for (int i = lo; i < hi; ++i) tasks[i] = new Task(ph); // assumes new Task(ph) performs ph.register() } }
TASKS_PER_PHASER
的最佳价值主要取决于预期的同步率。
低至4的值可能适用于非常小的每阶段任务机构(因此高速率),或者对于极大的任务机构可能达到数百。
实施说明 :此实施限制了最大数量的65535.尝试注册附加方会导致IllegalStateException
。 但是,您可以并且应该创建分层相位器来适应任意大量的参与者。
Constructor and Description |
---|
Phaser()
创建一个新的移相器,没有最初的注册方,没有父级和初始阶段数0。
|
Phaser(int parties)
创建一个新的移相器与给定数量的注册无障碍方,没有父母和初始阶段0。
|
Phaser(Phaser parent)
相当于
Phaser(parent, 0) 。
|
Phaser(Phaser parent, int parties)
与给定的父母和注册的无礼方的数量创建一个新的移相器。
|
Modifier and Type | Method and Description |
---|---|
int |
arrive()
抵达这个移相器,而不用等待别人到达。
|
int |
arriveAndAwaitAdvance()
到达这个移相器,等待其他人。
|
int |
arriveAndDeregister()
到达这个移相器并从其中注销,而无需等待别人到达。
|
int |
awaitAdvance(int phase)
等待该相位器的相位从给定相位值前进,如果当前相位不等于给定相位值,则立即返回,或者该相位器被终止。
|
int |
awaitAdvanceInterruptibly(int phase)
等待该移相器的阶段从给定的相位值推进,如果在等待时
InterruptedException 则抛出
InterruptedException ,或者如果当前相位不等于给定的相位值或者该相位器被终止,则立即返回。
|
int |
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
等待该移相器的阶段从给定的相位值或给定的超时时间
InterruptedException 到等待时抛出
InterruptedException ,如果当前相位不等于给定的相位值,则立即返回,或者该相位器被终止。
|
int |
bulkRegister(int parties)
增加给定数量的新的有争议的派对到这个移相器。
|
void |
forceTermination()
强制此移相器进入终止状态。
|
int |
getArrivedParties()
返回在此移相器的当前阶段到达的已注册方的数量。
|
Phaser |
getParent()
返回此移相器的父级,如果没有,则返回
null 。
|
int |
getPhase()
返回当前相位数。
|
int |
getRegisteredParties()
返回在此移动设备上注册的各方数量。
|
Phaser |
getRoot()
返回此移相器的根祖先,如果它没有父代,则与该移相器相同。
|
int |
getUnarrivedParties()
返回尚未到达此移相器当前阶段的已注册方的数量。
|
boolean |
isTerminated()
返回
true 如果移相器已被终止。
|
protected boolean |
onAdvance(int phase, int registeredParties)
在即将进行的相位提前执行动作的可覆盖方法,并控制终止。
|
int |
register()
添加一个新的unririved party到这个移相器。
|
String |
toString()
返回一个标识此移相器的字符串及其状态。
|
public Phaser()
public Phaser(int parties)
parties
- 需要进入下一阶段的各方数量
IllegalArgumentException
- 如果当事方小于零或大于所支持的最大数目
public Phaser(Phaser parent)
Phaser(parent, 0)
。
parent
- 父移相器
public Phaser(Phaser parent, int parties)
parent
- 父移相器
parties
- 进入下一阶段所需的当事人数
IllegalArgumentException
- 如果缔约方小于零或大于所支持的最大数目
public int register()
onAdvance(int, int)
正在进行中,则此方法可能会在返回之前等待完成。
如果此移动设备有父母,而此移动设备以前没有注册方,则该儿童移动设备也向其父母注册。
如果此移相器被终止,则尝试注册不起作用,并返回负值。
IllegalStateException
- 如果尝试注册超过最多支持的派对数量
public int bulkRegister(int parties)
onAdvance(int, int)
正在进行中,则此方法可能会在返回之前等待完成。
如果此移动设备有父母,而且指定的当事人数量大于零,并且此移动设备以前没有注册方,则该子版移动设备也向其父母注册。
如果此移相器被终止,则尝试注册不起作用,并返回负值。
parties
- 需要进入下一阶段的附加方的数量
IllegalStateException
- 如果尝试注册超过最多支持的派对数量
IllegalArgumentException
- 如果
parties < 0
public int arrive()
这是非注册方调用此方法的使用错误。 但是,可能会导致这个错误IllegalStateException
只有在这个相位一些后续操作,如果有的话。
IllegalStateException
- 如果没有终止,而有争议的党的人数将变为负数
public int arriveAndDeregister()
这是非注册方调用此方法的使用错误。 但是,可能会导致这个错误IllegalStateException
只有在这个相位一些后续操作,如果有的话。
IllegalStateException
- 如果没有终止,登记的或没有任何一方的人数将变为负数
public int arriveAndAwaitAdvance()
awaitAdvance(arrive())
。
如果您需要等待中断或超时,您可以使用awaitAdvance
方法的其他形式之一使用类似的结构进行awaitAdvance
。
如果相反,您需要在抵达时awaitAdvance(arriveAndDeregister())
,请使用awaitAdvance(arriveAndDeregister())
。
这是非注册方调用此方法的使用错误。 但是,这个错误可能会导致IllegalStateException
只有在这个移相器的一些后续操作,如果有的话。
IllegalStateException
- 如果没有被终止,而有争议的党的人数将变为负数
public int awaitAdvance(int phase)
phase
- 抵达阶段数,如果终止则为负值;
此参数通常是前一次调用arrive
或arriveAndDeregister
返回的值。
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
InterruptedException
则抛出
InterruptedException
,或者如果当前相位不等于给定相位值,则立即返回,或者该相位器被终止。
phase
- 到达阶段号码,如果终止则为负值;
此参数通常是前一次调用arrive
或arriveAndDeregister
返回的值。
InterruptedException
- 如果线程在等待时中断
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
InterruptedException
到等待时抛出
InterruptedException
,如果当前相位不等于给定的相位值,则立即返回,或者该相位器被终止。
phase
- 到达阶段数,如果终止则为负值;
此参数通常是前一次调用arrive
或arriveAndDeregister
返回的值。
timeout
- 放弃之前等待多久,以
unit
为单位
unit
- a
TimeUnit
确定如何解释
timeout
参数
InterruptedException
- 如果线程在等待时中断
TimeoutException
- 如果在等待时超时
public void forceTermination()
public final int getPhase()
Integer.MAX_VALUE
,之后重新开始为零。
一旦终止,相数为负,在这种情况下,终止前的主要阶段可以通过getPhase() + Integer.MIN_VALUE
获得。
public int getRegisteredParties()
public int getArrivedParties()
public int getUnarrivedParties()
public Phaser getParent()
null
。
null
public Phaser getRoot()
public boolean isTerminated()
true
如果移相器已被终止。
true
如果此移相器已被终止
protected boolean onAdvance(int phase, int registeredParties)
true
,则该移相器将在提前设置为最终终止状态,后续调用isTerminated()
将返回true。
任何(未选中)通过调用此方法引发的异常或错误将传播给尝试推进此移相器的方,在这种情况下不会发生提前。
该方法的参数提供了当前转换占优势的状态。 在onAdvance
内对这个移相器引用到达,注册和等待方法的onAdvance
是未指定的,不应该依赖。
如果这个移相器是分层的相位器的成员,那么onAdvance
仅在每个onAdvance
的根onAdvance
器上被调用。
支持最常见的用例,此方法的默认实现返回true
时注册方的数量已变为零作为党调用的结果arriveAndDeregister
。 您可以禁用此行为,从而通过覆盖此方法始终返回false
:
Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int parties) { return false; } }
phase
- 在此移相器提前之前输入此方法的当前阶段编号
registeredParties
- 当前注册方的数量
true
如果这个移相器应该终止