Java线程池的实现原理及其在业务中的最佳实践
阿里妹导读
一、线程池简介
1.什么是线程池?
2.线程池有什么好处?
- 减少线程创建和销毁的开销,线程的创建和销毁需要消耗系统资源,线程池通过复用线程,避免了对资源的频繁操作,从而提高系统性能;
- 控制和优化系统资源利用,线程池通过控制线程的数量,可以尽可能地压榨机器性能,提高系统资源利用率;
提高响应速度,线程池可以预先创建线程且通过多线程并发处理任务,提升任务的响应速度及系统的并发性能;
二、Java线程池的实现原理
1.类继承关系
execute(Runnable r):没有返回值,仅仅是把一个任务提交给线程池处理
submit(Runnable r):返回值为Future类型,当任务处理完毕后,通过Future的get()方法获取返回值时候,得到的是null
submit(Runnable r,Objectresult):返回值为Future类型,当任务处理完毕后,通过Future的get()方法获取返回值时候,得到的是传入的第二个参数result
shutdown():关闭线程池,不接受新任务,但是等待队列中的任务处理完毕才能真正关闭
shutdownNow():立即关闭线程池,不接受新任务,也不再处理等待队列中的任务,同时中断正在执行的线程
setCorePoolSize(int corePoolSize):设置核心线程数
setKeepAliveTime(longtime, TimeUnit unit):设置线程的空闲时间
setMaximumPoolSize(int maximumPoolSize):设置最大线程数
setRejectedExecutionHandler(RejectedExecutionHandler rh):设置拒绝策略
setThreadFactory(ThreadFactory tf):设置线程工厂
beforeExecute(Thread t, Runnable r):任务执行之前的钩子函数,这是一个空函数,使用者可以继承ThreadPoolExecutor后重写这个方法,实现其中的逻辑
afterExecute(Runnable r, Throwable t):任务执行之后的钩子函数,这是一个空函数,使用者可以继承ThreadPoolExecutor后重写这个方法,实现其中的逻辑
2.线程池的状态
- RUNNING:线程池一旦被创建,就处于RUNNING状态,任务数为0,能够接收新任务,对已排队的任务进行处理。
- SHUTDOWN:不接收新任务,但能处理已排队的任务。当调用线程池的shutdown()方法时,线程池会由RUNNING转变为SHUTDOWN状态。
- STOP:不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。当调用线程池的shutdownNow()方法时,线程池会由RUNNING或SHUTDOWN转变为STOP状态。
- TIDYING:当线程池在SHUTDOWN状态下,任务队列为空且执行中任务为空,或者线程池在STOP状态下,线程池中执行中任务为空时,线程池会变为TIDYING状态,会执行terminated()方法。这个方法在线程池中是空实现,可以重写该方法进行相应的处理。
TERMINATED:线程池彻底终止。线程池在TIDYING状态执行完terminated()方法后,就会由TIDYING转变为TERMINATED状态。
3.线程池的执行流程
4.问题思考
线程池的核心线程可以回收吗?
线程池在提交任务前,可以提前创建线程吗?
三、源码分析
1.execute(Runnable command)
2.addWorker(Runnable firstTask, boolean core)
线程创建成功并添加到线程池后,会调用start()方法,启动线程,执行任务。
3.runWorker(Worker w)
4.getTask()
根据是否需要超时控制,提供两个阻塞方法获取阻塞队列中的任务。
5.processWorkerExit(w, completedAbruptly)
四、线程池在业务中的最佳实践
1.如何选择合适的线程池参数
publicThreadPoolExecutor(int corePoolSize,
intmaximumPoolSize,
longkeepAliveTime,
TimeUnitunit,
workQueue,
ThreadFactorythreadFactory,
RejectedExecutionHandlerhandler)
1.corePoolSize: 核心线程数
2.maximumPoolSize: 最大线程数
3.keepAliveTime: 线程的空闲时间
4.unit: 空闲时间的单位(秒、分、小时等等)
5.workQueue: 等待队列
6.threadFactory: 线程工厂
7.handler: 拒绝策略
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程直接处理该任务(可能为主线程Main),保证每个任务执行完毕
如何选择合适的线程池参数?
2.如何正确地创建线程池对象
- FixedThreadPool:具有固定线程数量的线程池,无界阻塞队列;
- CachedThreadPool:线程数量可以动态伸缩的线程池,最大线程数为Integer.MAX_VALUE
SingleThreadPool:单个线程的线程,核心线程数和最大线程数都是1,无界阻塞队列
publicclassTestThreadPool {
/**
* 线程池
*/
privatestatic ExecutorService executor = initDefaultExecutor();
/**
* 统一的获取线程池对象方法
*/
publicstatic ExecutorService getExecutor() {
return executor;
}
privatestatic final int DEFAULT_THREAD_SIZE = 16;
privatestatic final int DEFAULT_QUEUE_SIZE = 10240;
privatestatic ExecutorService initDefaultExecutor() {
returnnew ThreadPoolExecutor(DEFAULT_THREAD_SIZE, DEFAULT_THREAD_SIZE,
300, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE),
new DefaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
}
局部变量定义的线程池对象在方法结束后可以被垃圾回收吗?
publicstaticvoidmain(String[] args) {
test1();
test2();
}
publicstaticvoidtest1(){
Object obj = new Object();
System.out.println("方法一执行完成");
}
publicstaticvoidtest2(){
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new Runnable() {
@Override
publicvoidrun() {
System.out.println("方法二执行完成");
}
});
}
问题分析与解答
- 虚拟机栈(栈帧中的本地变量表)中引用的对象;
- 方法区中的类静态属性引用的对象;
- 方法区中常量引用的对象;
- 本地方法栈中JNI(即一般说的Native方法)引用的对象;
正在运行的线程;
privatefinalclassWorker
extendsAbstractQueuedSynchronizer
implementsRunnable
publicclassOuter {
private String name;
private Inner inner;
publicintouterMethod() {
return1;
}
/**
* 非静态内部类
*/
classInner {
privatevoidinnerMethod() {
//在非静态内部类中可以直接调用外部类的方法
outerMethod();
}
private String address;
}
}
classOuter$Inner{
private String address;
Outer$Inner(Outer var1) {
this.this$0 = var1;
}
private void innerMethod() {
this.this$0.outerMethod();
}
}
publicclassOuter {
private String name;
private Inner inner;
publicintouterMethod() {
return1;
}
/**
* 静态内部类
*/
staticclassInner {
private String address;
}
}
classOuter$Inner{
private String address;
Outer$Inner() {
}
}
这个问题带来两个启发:
3.相互依赖的子任务避免使用同一线程池
publicclassFartherAndSonTask{
publicstatic ExecutorService executor= TestThreadPool.getExecutor();
publicstaticvoidmain(String[] args)throws Exception {
FatherTask fatherTask = new FatherTask();
Future<String> future = executor.submit(fatherTask);
future.get();
}
/**
* 父任务,里面异步执行子任务
*/
staticclassFatherTaskimplementsCallable<String> {
public String call()throws Exception {
System.out.println("开始执行父任务");
SonTask sonTask = new SonTask();
Future<String> future = executor.submit(sonTask);
String s = future.get();
System.out.println("父任务已拿到子任务执行结果");
return s;
}
}
/**
* 子任务
*/
staticclassSonTaskimplementsCallable<String> {
public String call()throws Exception {
//处理一些业务逻辑
System.out.println("子任务执行完成");
returnnull;
}
}
}
- 使用不同的线程池隔离有相互依赖的任务;
调用future.get()方法设置超时时间,这样做可以避免线程阻塞,但是依然会出现大量的超时异常;
4.合理选择submit()和execute()方法
- execute(Runnable r):没有返回值,仅仅是把一个任务提交给线程池处理,轻量级方法,适用于处理不需要返回结果的任务;
submit(Runnable r):返回值为Future类型,future可以用来检查任务是否已经完成,获取任务的结果等,适用于需要处理返回结果的任务;
private void asyncSupplyPriceSync(List<Long> shidList, SupplyPriceSyncMsg msg) {
if (CollectionUtils.isEmpty(shidList)) {
return;
}
PlatformLogUtil.logInfo("异步推送酒店报价信息供给总数:", shidList.size());
final Map<String, Future >> futures = Maps.newLinkedHashMap();
//分批提交线程池处理
Lists.partition(shidList, SwitchConfig.HOTEL_PRICE_ASYNC_LIST_SIZE)
.forEach(subList -> {
try {
futures.put(UUID.randomUUID().toString(), executorService
.submit(() -> batchSupplyPriceSync(subList, msg)));
} catch (Exception e) {
PlatformLogUtil.logFail("异步推送报价信息线程池子任务执行异常", LogListUtil.newArrayList(subList), e);
}
});
//阻塞,等所有子任务都处理完,才返回结果
futures.forEach((uuid, future) -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
PlatformLogUtil.logFail("异步推送报价信息获取子任务执行结果异常", LogListUtil.newArrayList(e));
}
});
}
5.请捕获线程池中子任务的代码异常
publicclassExceptionTest {
publicstatic ExecutorService executor = TestThreadPool.getExecutor();
publicstaticvoidmain(String[] args) {
executor.execute(() -> test("正常"));
executor.execute(() -> test("正常"));
executor.execute(() -> test("任务执行异常"));
executor.execute(() -> test("正常"));
executor.shutdown();
}
publicstaticvoidtest(String str) {
String result = "当前ThreadName为" + Thread.currentThread().getName() + ":结果" + str;
if (str.equals("任务执行异常")) {
thrownew RuntimeException(result + "****执行异常");
} else {
System.out.println(result);
}
}
}
如果线程池中执行任务的线程异常,发生异常的线程会销毁吗?其他任务还能正常执行吗?
可以发现
在processWorkerExit(w, completedAbruptly)方法内,可以看到如果运行中的线程池有线程执行异常,会调用workers.remove()移除当前线程,并调用addWorker()重新创建新的线程。
所以在任务3销毁线程再重新创建线程,和任务4创建线程这两个动作会有时序问题,具体看下图:
那么控制打印的异常信息是怎么来的呢?
所以,在业务代码中,请捕获子任务中的异常,否则会导致线程池中的工作线程频繁销毁、创建,造成资源浪费,违背了线程复用的设计原则。
最新评论
推荐文章
作者最新文章
你可能感兴趣的文章
Copyright Disclaimer: The copyright of contents (including texts, images, videos and audios) posted above belong to the User who shared or the third-party website which the User shared from. If you found your copyright have been infringed, please send a DMCA takedown notice to [email protected]. For more detail of the source, please click on the button "Read Original Post" below. For other communications, please send to [email protected].
版权声明:以上内容为用户推荐收藏至CareerEngine平台,其内容(含文字、图片、视频、音频等)及知识版权均属用户或用户转发自的第三方网站,如涉嫌侵权,请通知[email protected]进行信息删除。如需查看信息来源,请点击“查看原文”。如需洽谈其它事宜,请联系[email protected]。
版权声明:以上内容为用户推荐收藏至CareerEngine平台,其内容(含文字、图片、视频、音频等)及知识版权均属用户或用户转发自的第三方网站,如涉嫌侵权,请通知[email protected]进行信息删除。如需查看信息来源,请点击“查看原文”。如需洽谈其它事宜,请联系[email protected]。