网站首页 > java教程 正文
引言
使用 Java 的任务管理框架执行任务过程中,当任务等待队列被填满时、又有新的任务提交后,饱和策略开始发挥作用。
ThreadPollExecutor 提供了四种饱和策略:
上一节已经看过它们的源码了,本节来验证一下它们的差异。
测试类准备
先定义一个 MyCommand 的任务,接收一个字符串信息:
public class MyCommand implements Runnable {
private String name;
public MyCommand(String name){
this.name = name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" ," +
"name: "+name+","+new Date());
try {
Thread.sleep(5000);
} catch (InterruptedException execption) {
execption.printStackTrace();
}
}
@Override
public String toString() {
return "MyCommand [name=" + name + "]";
}
}
编写统一的测试类,线程池初始化大小为 2,等待队列大小为 2,当提交任务大于 4 时,第 5 个任务会因饱和策略的不同,而得到不同的执行结果。后文将通过设置不同的饱和策略,来测试它们的行为差异。
public class RejectPolicyTest {
private final ThreadPoolExecutor exec ;
public RejectPolicyTest(){
exec = new ThreadPoolExecutor(2,2,0L,TimeUnit.MICROSECONDS,
new LinkedBlockingQueue<Runnable>(2));
//TODO 设置不同的饱和策略
//exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
}
public static void main(String[] args) {
MyCommand c1 = new MyCommand("c1");
MyCommand c2 = new MyCommand("c2");
MyCommand c3 = new MyCommand("c3");
MyCommand c4 = new MyCommand("c4");
MyCommand c5 = new MyCommand("c5");
RejectPolicyTest c = new RejectPolicyTest();
c.submit(c1);
c.submit(c2);
c.submit(c3);
c.submit(c4);
c.submit(c5);
}
public void submit(Runnable command){
System.out.println(Thread.currentThread().getName()+" submit tast..."+command);
try{
exec.submit(command);
}catch(Exception e){
System.out.println("Exception when submit task:"+e.getMessage());
}
}
}
策略一:通知模式抛弃
AbortPolicy 是默认的饱和策略,该策略会抛出未检查异常 RejectedExecutionException,调用者可以捕获这个异常,然后根据自己的需求编写代码。比如,捕获异常并尝试重新提交任务,该策略还算友好,至少抛弃之前会通知任务提交者。
由于是默认策略,直接运行第一部分准备的测试类,结果如下:
main submit tast...MyCommand [name=c1]
main submit tast...MyCommand [name=c2]
main submit tast...MyCommand [name=c3]
main submit tast...MyCommand [name=c4]
main submit tast...MyCommand [name=c5]
Exception when submit task:Task java.util.concurrent.FutureTask@33909752 rejected from java.util.concurrent.ThreadPoolExecutor@55f96302[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
pool-1-thread-2 ,name: c2,Sun Dec 22 19:10:56 CST 2019
pool-1-thread-1 ,name: c1,Sun Dec 22 19:10:56 CST 2019
pool-1-thread-2 ,name: c3,Sun Dec 22 19:11:01 CST 2019
pool-1-thread-1 ,name: c4,Sun Dec 22 19:11:01 CST 2019
测试结果:主线程提交了 4 个任务后,队列满了,此时提交第 5 个任务时,线程池抛出了 RejectedExecutionException异常,主线程能够捕获并处理该异常。
策略二:静音模式抛弃
DiscardPolicy,默默收下任务,但是啥都不做,连异常都不抛,调用者根本不知道任务的状况。显然,这不利于任务的控制,所以笔者不建议用这种策略。
修改测试类,调整策略:
exec.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
测试结果:提交了 5 个线程,只有 4 个任务执行了,最后提交的那个任务被无情抛弃了,而调用者浑然不觉。
main submit tast...MyCommand [name=c1]
main submit tast...MyCommand [name=c2]
main submit tast...MyCommand [name=c3]
main submit tast...MyCommand [name=c4]
main submit tast...MyCommand [name=c5]
pool-1-thread-2 ,name: c2,Sun Dec 22 19:12:38 CST 2019
pool-1-thread-1 ,name: c1,Sun Dec 22 19:12:38 CST 2019
pool-1-thread-2 ,name: c3,Sun Dec 22 19:12:43 CST 2019
pool-1-thread-1 ,name: c4,Sun Dec 22 19:12:43 CST 2019
策略三:抛弃等待最久的任务
DiscardOldestPolicy 策略抛弃掉等待队列中等待最久的任务,将其移除队列,然后执行当前任务,对等待最久的任务不利。
修改测试类的策略:
exec.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
测试结果:提交了 5 个线程,但是只有 4 个任务执行了,等待最久的任务 c3 被抛弃,调用者也无从知晓。
main submit tast...MyCommand [name=c1]
main submit tast...MyCommand [name=c2]
main submit tast...MyCommand [name=c3]
main submit tast...MyCommand [name=c4]
main submit tast...MyCommand [name=c5]
pool-1-thread-2 ,name: c2,Sun Dec 22 19:13:10 CST 2019
pool-1-thread-1 ,name: c1,Sun Dec 22 19:13:10 CST 2019
pool-1-thread-2 ,name: c4,Sun Dec 22 19:13:15 CST 2019
pool-1-thread-1 ,name: c5,Sun Dec 22 19:13:15 CST 2019
策略四:调用者执行
CallerRunsPolicy 策略提供了一种调节机制,它不抛弃任务,也不抛出异常,而是将任务的运行请求回退到任务调用者,由提交任务的线程去执行自己刚提交的任务。
exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
测试结果:提交了 5 个线程,但是只有 4 个由工作线程执行,第 5 个任务由调用者执行。
main submit tast...MyCommand [name=c1]
main submit tast...MyCommand [name=c2]
main submit tast...MyCommand [name=c3]
main submit tast...MyCommand [name=c4]
main submit tast...MyCommand [name=c5]
pool-1-thread-2 ,name: c2,Sun Dec 22 19:18:28 CST 2019
pool-1-thread-1 ,name: c1,Sun Dec 22 19:18:28 CST 2019
main ,name: c5,Sun Dec 22 19:18:28 CST 2019
pool-1-thread-1 ,name: c3,Sun Dec 22 19:18:33 CST 2019
pool-1-thread-2 ,name: c4,Sun Dec 22 19:18:33 CST 2019
结论:调用者运行的饱和策略实现了一种弹性调节机制,当工作队列被填满时,下一个待执行的任务会在提交任务的主线程中执行。
主线程执行任务该期间,线程资源被占用,将不能再提交任务。因此降低了任务的提交速率,为线程池争取了更多的时间来完成正在排队的任务。
策略五:调用者限制提交
前四种都是线程池自己的饱和策略,除此之外,还可以在任务提交方控制任务的提交速率,即限制任务的提交,避免产生任务饱和的情况。比如,借助信号量 Semaphore 来限制任务的到达率,这个同步工具类,可以控制同时访问某个特定资源的操作数量。
可以利用 Semaphore 的 acquire 获取一个虚拟许可,如果没有可用的许可,则阻塞该方法的调用线程直到有可用许可为止。如果线程池使用无界队列缓冲任务时,且未对任务数量做控制,容易导致内存耗尽。这时,可以和 Semaphore 搭配使用,设置信号量的上界,来控制任务的提交速率。
使用上一章的 MyCommand 任务,结合 Semaphore,实现一个调用者控制任务提交的示例:
/**
*
* @title :BoundedExecutor
* @description :使用Semaphore控制任务的提交速率
* @since :2019-12-22
*/
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec,int bound){
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command) throws InterruptedException{
try{
semaphore.acquire();
exec.execute(new Runnable(){
@Override
public void run() {
try{
command.run();
}finally{
System.out.println("执行完成 ,release...");
semaphore.release();
}
}
});
}catch(RejectedExecutionException e){
System.out.println("队列已满,拒绝执行");
semaphore.release();
}
}
public static void main(String[] args) {
//虽然线程池大小为4,但是Semaphore限制每次只能有两个任务被执行
Executor exec = Executors.newCachedThreadPool();
BoundedExecutor b = new BoundedExecutor(exec,2);
MyCommand c1 = new MyCommand("c1");
MyCommand c2 = new MyCommand("c2");
MyCommand c3 = new MyCommand("c3");
MyCommand c4 = new MyCommand("c4");
MyCommand c5 = new MyCommand("c5");
try {
b.submitTask(c1);
b.submitTask(c2);
b.submitTask(c3);
b.submitTask(c4);
b.submitTask(c5);
} catch (InterruptedException execption) {
execption.printStackTrace();
}
}
}
任务执行结果:
pool-1-thread-2 ,name: c2,Mon Dec 15 16:20:17 CST 2019
pool-1-thread-1 ,name: c1,Mon Dec 15 16:20:17 CST 2019
执行完成 ,release...
执行完成 ,release...
pool-1-thread-1 ,name: c4,Mon Dec 15 16:20:22 CST 2019
pool-1-thread-3 ,name: c3,Mon Dec 15 16:20:22 CST 2019
执行完成 ,release...
执行完成 ,release...
pool-1-thread-1 ,name: c5,Mon Dec 15 16:20:27 CST 2019
执行完成 ,release...
执行结果分析:使用 Semaphore 限制每次只能提交两个任务,任务完成后释放信号量许可,可以有效地控制任务的提交速率。
启示录
有界线程池的四种饱和策略,只有 AbortPolicy 和 CallerRunPolicy 对任务提交者是友好的,其他几种都会导致任务漏执行,对任务提交方是不利的。
折中的方案是,由调用者控制任务提交速率,自己根据线程池的配置大小,利用信号量控制任务的提交,这样就不会产生任务提交过量的情况了。
猜你喜欢
- 2024-12-12 Java 应用性能瓶颈剖析与多线程优化实战
- 2024-12-12 面试突击35:如何判断线程池已经执行完所有任务了?
- 2024-12-12 吞吐下降、RT增长、CPU飚高,都是 线程状态惹的祸?
- 2024-12-12 一文搞懂Java多线程
- 2024-12-12 Java线程池的关闭
- 2024-12-12 你能说出多线程中sleep、yield、join的用法及sleep与wait区别?
- 2024-12-12 「重磅开篇」形成完善的多线程世界观
- 2024-12-12 一文深入理解java中的线程
- 2024-12-12 全局视角看技术-Java多线程演进史
- 2024-12-12 Java线程间的通信方式
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- java反编译工具 (77)
- java反射 (57)
- java接口 (61)
- java随机数 (63)
- java7下载 (59)
- java数据结构 (61)
- java 三目运算符 (65)
- java对象转map (63)
- Java继承 (69)
- java字符串替换 (60)
- 快速排序java (59)
- java并发编程 (58)
- java api文档 (60)
- centos安装java (57)
- java调用webservice接口 (61)
- java深拷贝 (61)
- 工厂模式java (59)
- java代理模式 (59)
- java.lang (57)
- java连接mysql数据库 (67)
- java重载 (68)
- java 循环语句 (66)
- java反序列化 (58)
- java时间函数 (60)
- java是值传递还是引用传递 (62)
本文暂时没有评论,来添加一个吧(●'◡'●)