1、背景
面试官问这个,主要想考察 为什么需要线程池? 进一步为什么要用多线程 什么是线程和进程等等。要想学问大,就要多读、多抄、多写。
2、解答
2.1、线程和进程
名称 | 描述 |
进程 | 进程是资源分配的最小单位。每个进程都有独立的代码和数据空间(进程上下文),进程间的切换会有较大的开销,一个进程包含1--n个线程。 |
线程 | 线程是CPU调度的最小单位。同一类线程共享代码和数据空间,每个线程有独立的运行栈和程序计数器(PC),线程切换开销小。 |
2.2、线程池作用
- 降低资源消耗
通过重复利用现有的线程来执行任务,避免多次创建和销毁线程。
- 提高响应速度
因为省去了创建线程这个步骤,- 当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性
线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。
2.3、线程池核心参数与参数协同工作
参数 | 参数名称 | 描述 |
corePoolSize | 线程池核心线程大小 | 核心线程会一直存活,即使没有任务需要执行。当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理。设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭 |
maximumPoolSize | 线程池最大线程数量 | 当线程数>=corePoolSize且maximumPoolSize>corePoolSize,且任务队列已满时,线程池会创建新线程来处理任务。当线程数=maxPoolSize,且任务队列已满时,线程池会根据拒绝策略做相应的处理 |
keepAliveTime | 空闲线程存活时间 | 一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定。 |
unit | 空闲线程存活时间单位 | keepAliveTime的计量单位 |
workQueue | 工作任务队列 | 新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。阻塞队列,设计模式是生产者与消费者模式 |
threadFactory | 线程工厂 | 创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等 |
handler | 拒绝策略 | 当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题的。 |
2.4、线程池拒绝策略
- CallerRunsPolicy:该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。
- AbortPolicy:该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。
- DiscardPolicy:该策略下,直接丢弃任务,什么都不做。
- DiscardOldestPolicy:该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列。
2.5、线程池生命周期
3、最佳实践
3.1、运行状态监控
ThreadPoolExecutor提供了多个get方法供开发者获取线程池运行时信息,开发者可以以打印日志的方式或者集成springboot Prometheus插件,使用Grafana面板查看线程池的各项指标。
拒绝时监控:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("thread-monitor-abort:%s"
+ ", poolSize: %d (activeCount: %d, corePoolSize: %d, maxPoolSize: %d, largestPoolSize: %d), taskCount: %d (completed: "
+ "%d, rejectCount:%d)" + ", Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s) !",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), count.incrementAndGet(),
e.isShutdown(), e.isTerminated(), e.isTerminating());
logger.debug(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}
实时监控:
继承ThreadPoolExecutor,重写beforeExecute(Thread t, Runnable r)和afterExecute(Runnable r, Throwable t) 方法,在方法前/后打印出上面的指标。
public class TraceThreadPool extends ThreadPoolExecutor {
private final ThreadLocalstartTime = new ThreadLocal ();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
@Override
protected void beforeExecute(Thread t, Runnable r) {
......
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
.......
super.afterExecute(r, t);
}
注意:不同业务的线程一定要用名字去区分,在导出线程快照/内存dump分析问题的时候极其重要
3.2、动态参数调整
随着线上的业务量增长,线上的参数需要跟着调整,线程池参数也是其中的一部分。在ThreadPoolExecutor中针对corePoolSize和maximumPoolSize等核心参数提供了set方法,可以方便我们在运行时设置线程池参数,如在配置中心等系统修改。但是队列容量确没法调整。如果需要修改队列容量,则需要copy一份LinkedBlockingQueue代码稍作修改。
public class LinkedBlockingQueue
extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
...
private final int capacity;
}
所以我们需要重写这个队列。一般是复制一份LinkedBlockingQueue源码,修改capacity去掉final,加上set方法。
private volatile int capacity;
public int getCapacity() {
return capacity;
}
public void setCapacity(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
final int oldCapacity = this.capacity;
this.capacity = capacity;
final int size = count.get();
if (capacity > size && size >= oldCapacity) {
signalNotFull();
}
}
使用到capacity的判断需要改正。特别是 count.get() == capacity的地方。
在设置参数的时候需要先设置核心线程数,再设置最大线程数,代码如下:
ThreadPoolExecutor threadPoolExecutor = traceThreadPoolTaskExecutor.getThreadPoolExecutor();
threadPoolExecutor.setCorePoolSize(conf.getCorePoolSize());
threadPoolExecutor.setMaximumPoolSize(conf.getMaxPoolSize());
BlockingQueuequeue = threadPoolExecutor.getQueue();
if (queue instanceof ResizableCapacityLinkedBlockIngQueue) {
((ResizableCapacityLinkedBlockIngQueue>) queue).setCapacity(conf.getQueueCapacity());
}
3.3、配置参数推荐估算
合理的评估核心线程数和最大线程数,没有固定的公式
较为固定的公式: 计算密集 线程数 ≈ CPU核数。
初始估算公式: 线程数= CPU核心数/(1-阻塞系数) ,阻塞系数取值0.8-0.9
IO密集型没有固定的公式,只能大体取一个初值,然后根据压测和线程快照分析去实践一个合理的线程池数量。
压测的过程中需要检测服务器以及中间件的各项指标。同时根据情况不断的调整线程数量和队列大小,调节遵循以下原则:
1:最大线程数设置太小,工作队列设置偏小,导致服务接口大量出现
RejectedExecutionException
2:最大线程数设置太小,工作队列设置过大,任务堆积过多,接口响应时间变长。
3:最大线程数设置太大,线程调度开销增大,处理速度反而下降。
4:核心线程数设置过大,空闲线程太多,占用系统资源。
如果在调节线程数和队列数后,在测结果的过程中导出线程快照,连续导出3次,对比三次线程快照,如果发现大部分线程都处于运行状态,说明线程数量设置的还算合理,此时还可以稍微调大点。如果对比发现大部分线程都处于waiting状态,可能是线程数设置过多(够用),需要调小再试。
jstack -l pid > pid.log
Thread Name
appliance-svc-task-36
State
Waiting on condition
Java Stack
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Native Stack
No Native stack trace available
如果waiting on condition/monitor的是业务代码,那么可能意味着这片代码可能存在阻塞或者性能问题,如果是有性能问题则根据实际情况做优化后再压测。
队列大小估算:如果一个任务的执行时间在50~100ms,如果以上压测估算的核心线程数是50,假设此接口认为在1s内返回算合理,按照1s算,1s内50个线程可以处理(不考虑外部因素):1*1000ms / 75 ms * 50= 666,则在此范围浮动即可。这也算是个估算值,然后根据压测结果比如响应时间+QPS再结合上面说的监控拒绝数量调节一个实际的参数。
3.5、阿里开发手册建议
【强制】线程池不允许使用 Executors 去创建,而是通过ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
- FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
- CachedThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
本文暂时没有评论,来添加一个吧(●'◡'●)