public class ThreadPoolExecutor extends AbstractExecutorService
ExecutorService
,使用可能的几个合并的线程执行每个提交的任务,通常使用Executors
工厂方法配置。
线程池解决两个不同的问题:由于每个任务的调用开销减少,它们通常在执行大量异步任务时提供改进的性能,并且它们提供了一种限制和管理资源(包括执行一个任务。 每个ThreadPoolExecutor
还维护一些基本统计信息,例如已完成任务的数量。
为了在广泛的上下文中有用,此类提供了许多可调参数和可扩展性钩子。 然而,程序员被敦促使用更方便的Executors
工厂方法Executors.newCachedThreadPool()
(无限线程池,具有自动线程回收), Executors.newFixedThreadPool(int)
(固定大小的线程池)和Executors.newSingleThreadExecutor()
(单个后台线程),可以预先配置最常用的使用场景设置。 否则,手动配置和调优此类时,请使用以下指南:
ThreadPoolExecutor
将自动调整池大小(见getPoolSize()
根据corePoolSize(参见设定的界限) getCorePoolSize()
)和maximumPoolSize(参见getMaximumPoolSize()
)。
当方法execute(Runnable)
中提交了新任务,并且运行的corePoolSize线程少于一个,即使其他工作线程处于空闲状态,也会创建一个新的线程来处理该请求。
如果超过corePoolSize但小于maximumPoolSize线程运行,则仅当队列已满时才会创建一个新线程。
通过将corePoolSize和maximumPoolSize设置为相同,您将创建一个固定大小的线程池。
通过将maximumPoolSize设置为本质上无限制的值(如Integer.MAX_VALUE
,您可以允许池容纳任意数量的并发任务。
最典型的是,核心和最大池大小只能在构建时进行设置,但也可以使用setCorePoolSize(int)
和setMaximumPoolSize(int)
进行动态 更改 。
prestartCoreThread()
或prestartAllCoreThreads()
动态地覆盖 。
如果您使用非空队列构建池,则可能需要预先提供线程。
ThreadFactory
创建。
如果没有另外指定,则使用Executors.defaultThreadFactory()
,它创建所有线程与所有相同的ThreadGroup
并且具有相同的优先级和非守护进程状态NORM_PRIORITY
。
通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护进程状态等。如果ThreadFactory
在从newThread
返回null请求时无法创建线程,则执行程序将继续,但可能无法执行任务
线程应该拥有“modifyThread” RuntimePermission
。
如果使用池的工作线程或其他线程不具有此权限,则服务可能会降级:配置更改可能不会及时生效,并且关闭池可能仍处于可能终止但未完成的状态。
getKeepAliveTime(TimeUnit)
),则多余的线程将被终止。
这提供了当池未被主动使用时减少资源消耗的方法。
如果稍后池变得更加活跃,将构建新的线程。
此参数也可以使用方法setKeepAliveTime(long, TimeUnit)
动态更改 。
使用值Long.MAX_VALUE
TimeUnit.NANOSECONDS
有效地禁用空闲线程在关闭之前终止。
默认情况下,仅当存在多于corePoolSize线程时,保持活动策略才适用。
但是方法allowCoreThreadTimeOut(boolean)
也可以用于将这个超时策略应用于核心线程,只要keepAliveTime值不为零。
BlockingQueue
可用于传送和保留提交的任务。
这个队列的使用与池大小相互作用:
SynchronousQueue
,将任务交给线程,无需另外控制。 在这里,如果没有线程可以立即运行,那么尝试排队任务会失败,因此将构建一个新的线程。 处理可能具有内部依赖关系的请求集时,此策略可避免锁定。 直接切换通常需要无限制的maximumPoolSizes,以避免拒绝新提交的任务。 这反过来允许无限线程增长的可能性,当命令继续以平均速度比他们可以处理的速度更快地到达时。 LinkedBlockingQueue
没有预定容量)会导致新的任务,在队列中等待,当所有corePoolSize线程都很忙。 因此,不会再创建corePoolSize线程。 (因此,最大值大小的值没有任何影响。)每个任务完全独立于其他任务时,这可能是适当的,因此任务不会影响其他执行; 例如,在网页服务器中。 虽然这种排队风格可以有助于平滑瞬态突发的请求,但是当命令继续达到的平均速度比可以处理的速度更快时,它承认无界工作队列增长的可能性。 ArrayBlockingQueue
)有助于在使用有限maxPoolSizes时防止资源耗尽,但可能更难调整和控制。 队列大小和最大池大小可能彼此交易:使用大队列和小型池可以最大限度地减少CPU使用率,OS资源和上下文切换开销,但可能导致人为的低吞吐量。 如果任务频繁阻塞(例如,如果它们是I / O绑定),则系统可能能够安排比您允许的更多线程的时间。 使用小型队列通常需要较大的池大小,这样可以使CPU繁忙,但可能会遇到不可接受的调度开销,这也降低了吞吐量。 execute(Runnable)
中提交的新任务将在执行程序关闭时被拒绝 ,并且当执行程序对最大线程和工作队列容量使用有限边界并且饱和时。
在任一情况下, execute
方法调用RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)
其的方法RejectedExecutionHandler
。
提供了四个预定义的处理程序策略:
ThreadPoolExecutor.AbortPolicy
,处理程序会引发运行RejectedExecutionException
后排斥反应。 ThreadPoolExecutor.CallerRunsPolicy
中,调用execute
本身的线程运行任务。 这提供了一个简单的反馈控制机制,将降低新任务提交的速度。 ThreadPoolExecutor.DiscardPolicy
中 ,简单地删除无法执行的任务。 ThreadPoolExecutor.DiscardOldestPolicy
中 ,如果执行程序没有关闭,则工作队列头部的任务被删除,然后重试执行(可能会再次失败,导致重复)。 RejectedExecutionHandler
类。
这样做需要特别注意,特别是当策略被设计为仅在特定容量或排队策略下工作时。
protected
覆盖的beforeExecute(Thread, Runnable)
和afterExecute(Runnable, Throwable)
方法。
这些可以用来操纵执行环境;
例如,重新初始化ThreadLocals,收集统计信息或添加日志条目。
另外,方法terminated()
可以被覆盖,以执行执行程序完全终止后需要执行的任何特殊处理。
如果钩子或回调方法抛出异常,内部工作线程可能会失败并突然终止。
getQueue()
允许访问工作队列以进行监视和调试。
强烈不鼓励将此方法用于任何其他目的。
当提供大量排队任务被取消时,两种提供的方法remove(Runnable)
和purge()
可用于协助进行存储回收。
shutdown
自动。
如果您希望确保未引用的池被回收,即使用户忘记调用shutdown()
,则必须安排未使用的线程最终死机,通过设置适当的保持活动时间,使用零个核心线程的下限和/或设置allowCoreThreadTimeOut(boolean)
。
扩展示例 。 这个类的大部分扩展覆盖了一个或多个受保护的钩子方法。 例如,这里是一个添加一个简单的暂停/恢复功能的子类:
class PausableThreadPoolExecutor extends ThreadPoolExecutor { private boolean isPaused; private ReentrantLock pauseLock = new ReentrantLock(); private Condition unpaused = pauseLock.newCondition(); public PausableThreadPoolExecutor(...) { super(...); } protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); pauseLock.lock(); try { while (isPaused) unpaused.await(); } catch (InterruptedException ie) { t.interrupt(); } finally { pauseLock.unlock(); } } public void pause() { pauseLock.lock(); try { isPaused = true; } finally { pauseLock.unlock(); } } public void resume() { pauseLock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { pauseLock.unlock(); } } }
Modifier and Type | Class and Description |
---|---|
static class |
ThreadPoolExecutor.AbortPolicy
被拒绝的任务的处理程序,抛出一个
RejectedExecutionException 。
|
static class |
ThreadPoolExecutor.CallerRunsPolicy
一个被拒绝的任务的处理程序,直接在
execute 方法的调用线程中运行被拒绝的任务,除非执行程序已经被关闭,否则这个任务被丢弃。
|
static class |
ThreadPoolExecutor.DiscardOldestPolicy
被拒绝的任务的处理程序,丢弃最旧的未处理请求,然后重试
execute ,除非执行程序关闭,在这种情况下,任务被丢弃。
|
static class |
ThreadPoolExecutor.DiscardPolicy
被拒绝的任务的处理程序静默地丢弃被拒绝的任务。
|
Constructor and Description |
---|
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
创建一个新的
ThreadPoolExecutor 与给定的初始参数和默认线程工厂和拒绝执行处理程序。
|
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
创建一个新的
ThreadPoolExecutor 与给定的初始参数和默认线程工厂。
|
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
创建一个新的
ThreadPoolExecutor 与给定的初始参数和默认拒绝执行处理程序。
|
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
创建一个新
ThreadPoolExecutor 给定的初始参数。
|
Modifier and Type | Method and Description |
---|---|
protected void |
afterExecute(Runnable r, Throwable t)
完成指定Runnable的执行后调用方法。
|
void |
allowCoreThreadTimeOut(boolean value)
设置策略是否核心线程可能会超时,如果任务没有在活着的时间内到达,则在新任务到达时被替换。
|
boolean |
allowsCoreThreadTimeOut()
如果此池允许核心线程超时并终止,如果没有任务在keepAlive时间内到达,则返回true,如果新任务到达时需要更换。
|
boolean |
awaitTermination(long timeout, TimeUnit unit)
阻止所有任务在关闭请求完成后执行,或发生超时,或当前线程中断,以先到者为准。
|
protected void |
beforeExecute(Thread t, Runnable r)
在给定的线程中执行给定的Runnable之前调用方法。
|
void |
execute(Runnable command)
在将来某个时候执行给定的任务。
|
protected void |
finalize()
当这个执行器不再被引用并且没有线程时,调用
shutdown 。
|
int |
getActiveCount()
返回正在执行任务的线程的大概数量。
|
long |
getCompletedTaskCount()
返回完成执行的任务的大致总数。
|
int |
getCorePoolSize()
返回核心线程数。
|
long |
getKeepAliveTime(TimeUnit unit)
返回线程保持活动时间,这是超过核心池大小的线程在终止之前可能保持空闲的时间量。
|
int |
getLargestPoolSize()
返回在池中同时进行的最大线程数。
|
int |
getMaximumPoolSize()
返回允许的最大线程数。
|
int |
getPoolSize()
返回池中当前的线程数。
|
BlockingQueue<Runnable> |
getQueue()
返回此执行程序使用的任务队列。
|
RejectedExecutionHandler |
getRejectedExecutionHandler()
返回不可执行任务的当前处理程序。
|
long |
getTaskCount()
返回计划执行的任务的大概总数。
|
ThreadFactory |
getThreadFactory()
返回用于创建新线程的线程工厂。
|
boolean |
isShutdown()
如果此执行者已关闭,则返回
true 。
|
boolean |
isTerminated()
如果所有任务在关闭后完成,则返回
true 。
|
boolean |
isTerminating()
|
int |
prestartAllCoreThreads()
启动所有核心线程,导致他们等待工作。
|
boolean |
prestartCoreThread()
启动核心线程,使其无法等待工作。
|
void |
purge()
尝试从工作队列中删除已取消的所有 Future 任务。
|
boolean |
remove(Runnable task)
如果此任务存在,则从执行程序的内部队列中删除此任务,从而导致该任务尚未运行。
|
void |
setCorePoolSize(int corePoolSize)
设置核心线程数。
|
void |
setKeepAliveTime(long time, TimeUnit unit)
设置线程在终止之前可能保持空闲的时间限制。
|
void |
setMaximumPoolSize(int maximumPoolSize)
设置允许的最大线程数。
|
void |
setRejectedExecutionHandler(RejectedExecutionHandler handler)
为不可执行的任务设置一个新的处理程序。
|
void |
setThreadFactory(ThreadFactory threadFactory)
设置用于创建新线程的线程工厂。
|
void |
shutdown()
启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。
|
List<Runnable> |
shutdownNow()
尝试停止所有主动执行的任务,停止等待任务的处理,并返回正在等待执行的任务列表。
|
protected void |
terminated()
执行程序已终止时调用方法。
|
String |
toString()
返回标识此池的字符串及其状态,包括运行状态和估计的工作人员和任务计数的指示。
|
invokeAll, invokeAll, invokeAny, invokeAny, newTaskFor, newTaskFor, submit, submit, submit
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
ThreadPoolExecutor
,并拒绝执行处理程序。
使用Executors
工厂方法之一可能更方便,而不是这种通用构造函数。
corePoolSize
- 即使空闲时仍保留在池中的线程数,除非设置
allowCoreThreadTimeOut
maximumPoolSize
- 池中允许的最大线程数
keepAliveTime
- 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间。
unit
-
keepAliveTime
参数的时间单位
workQueue
- 在执行任务之前用于保存任务的队列。
该队列将仅保存execute
方法提交的Runnable
任务。
IllegalArgumentException
- 如果以下某项成立:
corePoolSize < 0
keepAliveTime < 0
maximumPoolSize <= 0
maximumPoolSize < corePoolSize
NullPointerException
- 如果
workQueue
为空
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
ThreadPoolExecutor
与给定的初始参数和默认拒绝执行处理程序。
corePoolSize
- 要保留在池中的线程数,即使它们处于空闲状态,除非设置了
allowCoreThreadTimeOut
maximumPoolSize
- 池中允许的最大线程数
keepAliveTime
- 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间。
unit
-
keepAliveTime
参数的时间单位
workQueue
- 在执行任务之前用于保存任务的队列。
这个队列只会保存execute
方法提交的Runnable
任务。
threadFactory
- 执行程序创建新线程时使用的工厂
IllegalArgumentException
- 如果以下某项成立:
corePoolSize < 0
keepAliveTime < 0
maximumPoolSize <= 0
maximumPoolSize < corePoolSize
NullPointerException
- 如果
workQueue
或
threadFactory
为空
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
ThreadPoolExecutor
与给定的初始参数和默认线程工厂。
corePoolSize
- 要保留在池中的线程数,即使它们处于空闲状态,除非设置了
allowCoreThreadTimeOut
maximumPoolSize
- 池中允许的最大线程数
keepAliveTime
- 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间。
unit
-
keepAliveTime
参数的时间单位
workQueue
- 在执行任务之前用于保存任务的队列。
该队列将仅保存execute
方法提交的Runnable
任务。
handler
- 执行被阻止时使用的处理程序,因为达到线程限制和队列容量
IllegalArgumentException
- 如果以下之一成立:
corePoolSize < 0
keepAliveTime < 0
maximumPoolSize <= 0
maximumPoolSize < corePoolSize
NullPointerException
- 如果
workQueue
或
handler
为空
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
ThreadPoolExecutor
与给定的初始参数。
corePoolSize
- 即使空闲时仍保留在池中的线程数,除非设置
allowCoreThreadTimeOut
maximumPoolSize
- 池中允许的最大线程数
keepAliveTime
- 当线程数大于内核时,这是多余的空闲线程在终止前等待新任务的最大时间。
unit
-
keepAliveTime
参数的时间单位
workQueue
- 用于在执行任务之前使用的队列。
这个队列将仅保存execute
方法提交的Runnable
任务。
threadFactory
- 执行程序创建新线程时使用的工厂
handler
- 执行被阻止时使用的处理程序,因为达到线程限制和队列容量
IllegalArgumentException
- 如果以下某项成立:
corePoolSize < 0
keepAliveTime < 0
maximumPoolSize <= 0
maximumPoolSize < corePoolSize
NullPointerException
- 如果
workQueue
或
threadFactory
或
handler
为空
public void execute(Runnable command)
RejectedExecutionHandler
处理。
command
- 要执行的任务
RejectedExecutionException
- 由RejectedExecutionHandler自行
RejectedExecutionHandler
,如果任务不能被接受执行
NullPointerException
- 如果
command
为空
public void shutdown()
SecurityException
- 如果安全管理器存在并关闭,则ExecutorService可能会操纵调用者不允许修改的线程,因为它不保留RuntimePermission
("modifyThread")
或安全管理器的checkAccess
方法拒绝访问。
public List<Runnable> shutdownNow()
此方法不等待主动执行的任务终止。 使用awaitTermination
做到这一点。
除了努力尝试停止处理积极执行任务之外,没有任何保证。 此实现通过取消任务Thread.interrupt()
,让未能响应中断任何任务可能永远不会终止。
SecurityException
- 如果安全管理器存在并关闭,则ExecutorService可能会操纵调用者不允许修改的线程,因为它不保留RuntimePermission
("modifyThread")
或安全管理器的checkAccess
方法拒绝访问。
public boolean isShutdown()
ExecutorService
复制
true
。
true
如果这个执行者已被关闭
public boolean isTerminating()
shutdown()
或shutdownNow()
之后终止 ,但尚未完全终止,则返回true。
此方法可能对调试有用。
返回true
报告了关机后的足够的时间可能表明提交的任务已经忽略或抑制中断,导致执行者不能正常终止。
true
如果终止但尚未终止
public boolean isTerminated()
ExecutorService
复制
true
。
请注意, isTerminated
从不是true
,除非shutdown
或shutdownNow
被称为第一个。
true
如果所有任务已完成关闭
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
ExecutorService
复制
timeout
- 等待的最长时间
unit
- 超时参数的时间单位
true
如果这个执行者终止了
false
如果在终止之前超时了
InterruptedException
- 如果中断等待
protected void finalize()
shutdown
。
finalize
在类别
Object
WeakReference
, PhantomReference
public void setThreadFactory(ThreadFactory threadFactory)
threadFactory
- 新线工厂
NullPointerException
- 如果threadFactory为null
getThreadFactory()
public ThreadFactory getThreadFactory()
setThreadFactory(ThreadFactory)
public void setRejectedExecutionHandler(RejectedExecutionHandler handler)
handler
- 新的处理程序
NullPointerException
- 如果处理程序为空
getRejectedExecutionHandler()
public RejectedExecutionHandler getRejectedExecutionHandler()
setRejectedExecutionHandler(RejectedExecutionHandler)
public void setCorePoolSize(int corePoolSize)
corePoolSize
- 新的核心尺寸
IllegalArgumentException
- 如果
corePoolSize < 0
getCorePoolSize()
public int getCorePoolSize()
setCorePoolSize(int)
public boolean prestartCoreThread()
false
。
true
如果线程启动
public int prestartAllCoreThreads()
public boolean allowsCoreThreadTimeOut()
true
如果核心线程被允许超时,否则
false
public void allowCoreThreadTimeOut(boolean value)
true
时,保持活动时间必须大于零。
这个方法一般应该在池被主动使用之前调用。
value
-
true
如果要超时,否则
false
IllegalArgumentException
- 如果值为
true
,并且当前保持活动时间不大于零
public void setMaximumPoolSize(int maximumPoolSize)
maximumPoolSize
- 新的最大值
IllegalArgumentException
- 如果新的最大值小于或等于零,或小于
core pool size
getMaximumPoolSize()
public int getMaximumPoolSize()
setMaximumPoolSize(int)
public void setKeepAliveTime(long time, TimeUnit unit)
time
- 等待的时间
时间值为零将导致多余的线程在执行任务后立即终止。
unit
-
time
参数的时间单位
IllegalArgumentException
- 如果
time
小于零或
time
为零,
allowsCoreThreadTimeOut
getKeepAliveTime(TimeUnit)
public long getKeepAliveTime(TimeUnit unit)
unit
- 结果所需的时间单位
setKeepAliveTime(long, TimeUnit)
public BlockingQueue<Runnable> getQueue()
public boolean remove(Runnable task)
该方法作为取消方案的一部分可能是有用的。 在放入内部队列之前,可能无法删除已转换为其他表单的任务。 例如,使用submit
输入的任务可能会转换为维护Future
状态的表单。 然而,在这种情况下,可以使用方法purge()
去除已被取消的那些期货。
task
- 要删除的任务
true
如果任务被删除
public void purge()
Future
任务。
此方法可用作存储回收操作,对功能没有其他影响。
取消的任务永远不会执行,但可能会累积在工作队列中,直到工作线程可以主动删除它们。
现在调用此方法会尝试删除它。
但是,该方法可能无法在其他线程的干扰存在的情况下删除任务。
public int getPoolSize()
public int getActiveCount()
public int getLargestPoolSize()
public long getTaskCount()
public long getCompletedTaskCount()
public String toString()
protected void beforeExecute(Thread t, Runnable r)
r
的线程t
调用,并可用于重新初始化ThreadLocals或执行日志记录。
此实现不执行任何操作,但可以在子类中进行自定义。 注意:为了正确嵌套多个重复数据,子类super.beforeExecute
在此方法结束时调用super.beforeExecute
。
t
- 将运行任务
r
的线程
r
- 将执行的任务
protected void afterExecute(Runnable r, Throwable t)
RuntimeException
或Error
,导致执行突然终止。
此实现不执行任何操作,但可以在子类中进行自定义。 注意:为了正确嵌套多个覆盖,子类应该在此方法开始时调用super.afterExecute
。
注意:当操作被明确地包含在任务(如FutureTask
)中或通过诸如submit
之类的方法被submit
时,这些任务对象捕获和维护计算异常,因此它们不会引起突然终止,并且内部异常不会传递给该方法。 如果您希望在此方法中捕获两种故障,您可以进一步探测这种情况,如在此示例子类中,如果任务已中止,则会打印直接原因或底层异常:
class ExtendedExecutor extends ThreadPoolExecutor { // ... protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<?>) { try { Object result = ((Future<?>) r).get(); } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (t != null) System.out.println(t); } }
r
- 已完成的runnable
t
- 导致终止的异常,如果执行正常完成,则为null
protected void terminated()
super.terminated
在此方法中调用super.terminated
。