👉 这是一个或许对你有用的社群
🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入芋道快速开发平台知识星球。下面是星球提供的部分资料:
👉这是一个或许对你有用的开源项目
国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。
功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号等等功能:
  • Boot 地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 地址:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn

1 前言

说到线程池八股文背的很熟的肯定知道无非就这几个考点:
(1)线程池三大核心参数 corePoolSizemaximumPoolSizeworkQueue 的含义
(2)线程池核心线程数制定策略
(3)建议通过 ThreadPoolExecutor 的构造函数来声明,避免使用 Executors 创建线程池
以上考点作为线程池面试几乎必问的内容,大部分人应该都是如数家珍,张口就来,但是懂了面试八股文真的就不一定在实际运用中真的就会把线程池用好 。且看下面这次真实生产事故还原
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

2 事故还原

某次一位研发同事写出了下面类似的代码:
List<StationData> items = getFromDb();

List<CompletableFuture<AppMapStationData>> completableFutures = items.stream().map(item -> CompletableFuture.supplyAsync(() -> {

    AppMapStationData data = mapper.copy(item);

// 发起价格信息查询的RPC调用
    data.setPriceInfo(priceApi.getPriceInfoById(item.getId())) 

return
 data;

}, apiExecutor)).collect(Collectors.toList());


result = completableFutures.stream().map(e -> {

return
 e.get();  

}).filter(Objects::nonNull).collect(Collectors.toList());

上面的代码中,代码首先从数据库里面查出来一堆对象,然后对每一个对象进行模型转换,由于要获取每个对象的价格信息发起了一次RPC调用,由于RPC服务没有提供批量接口,所以代码里面用了线程池并发请求,以求得接口尽可能快的返回数据。
使用的是CompletableFuture 而且自定义了线程池,线程池指定了10个核心线程,20个最大线程,这段代码在上线后的一段时间确实没有任何问题,但是在灰度放量用户量多起来之后发现接口经常超时告警。
请问为什么上面的代码在用户量稍微大一点的时候就运行缓慢了呢?
实际代码问题出现在了这个get方法中,这个get方法没有指定超时时间,当getPriceInfoById这个接口响应变慢的时候,这个主线程的代码get又没有指定超时时间,这时候问题就来了。
由于某次业务查询查到了非常多的数据,每条数据就是个模型转换任务,这个任务就会在队列排队,get方法没有指定超时时间的情况下,其最终耗时就取决于整个线程池中执行最慢的那一个任务,所以当从DB中查出来的数据量越来越大的时候这个转换任务的最大耗时就会逐渐增加,进而引发接口超时。
所以这里改进上述问题需要做到两个点:
1、数据库中查出来的数据集合必须分页
2、get方法必须设置超时时间
此外需要知道get方法设置超时时间的计算方式也需要留意,考虑下面这种场景
提交两个任务 A 和 B 到线程池,A 任务耗时 3 秒,B 任务耗时 4 秒,Future 以 2 秒为超时时间获取任务结果
代码如下:
ExecutorService executorService = Executors.newFixedThreadPool(
2
);


Callable<String> taskA = () -> {

    sleep(
3
);

return"A"
;

};

Callable<String> taskB = () -> {

    sleep(
4
);

return"B"
;

};


List<Future<String>> futures = Stream.of(taskA, taskB)

        .map(executorService::submit)

        .collect(Collectors.toList());


for
 (Future<String> future : futures) {

try
 {

        String s = future.get(
2
, TimeUnit.SECONDS);

        System.out.println(s);

    } 
catch
 (Exception e) {

continue
;

    }

}

实际运行情况是第一个任务会超时但是第二个不会 ,看起来是不是还有点不可思议,耗时时间长的任务B反而没超时。原因就在于 Future.get(long timeout, TimeUnit unit) ,调用 get 时才开始计时,而非任务加入线程池的时间
从图上就可以看出来,在获取B的任务执行结果的时候B任务已经执行了两秒,所以在等待两秒的情况下可以获取到结果
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

3 线程池不当使用举例

(1)不区分业务一把梭哈,全用一个线程池
曾经有一个项目,对接多个租户,每个租户都有各自的任务需要执行,代码中不区分租户的将所有租户的任务全部丢到一个线程池中执行,结果一个租户的任务提交过多导致线程池执行缓慢,但是由于线程池是同一个,影响了所有租户接口的响应时间。如果说上面说的这个场景用一个线程池产生了租户互相影响的问题还不够严重,那么下面的这种场景就问题更大了。
曾经有一段这样的场景,因为共用线程池直接导致线程池任务永远完成不了,请看下面的这种情况:
首先向线程池中提交了一个任务,然后在这个任务的内部实现中又往同一个线程池中再次提交了一个任务,相当于父子任务在同一个线程池中执行,这时候极有可出现线程死锁也就是循环等待的情况
如上图所示,父任务全部处于执行状态,这时候子任务想要执行需要等父任务执行完成,但是父任务都执行不完,因为还有个子任务没完成,即父任务等待子任务执行完成,而子任务等待父任务释放线程池资源,这也就造成了 "死锁"
所以综上所述,在代码中应该避免各种任务都往一个线程池中投放,对每个线程池指定好线程名称,做好分类比较合适,这里在日常开发中比较推荐使用Guava的工具类,来指定线程名称前缀,这样使用jstack分析线程问题也方便排查。
ThreadFactory threadFactory = 
new
 ThreadFactoryBuilder()

      .setNameFormat(threadNamePrefix + 
"-%d"
)

      .setDaemon(
true
).build();

ExecutorService threadPool = 
new
 ThreadPoolExecutor(

      corePoolSize, 

      maximumPoolSize, 

      keepAliveTime, 

      TimeUnit.MINUTES, 

      workQueue, 

      threadFactory);

(2)@Async注解不自己定义线程池
@Async用在方法上标识这是一个异步方法,如果不自己指定线程池这个方法将直接新建一个线程执行,可以翻看spring实现源码知道这个点
@Async的实现其实非常简单就是利用AOP,容器启动的时候会扫描所有被打上@Async注解的方法,并代理这些方法的执行,在执行这个方法的时候,生成Callable任务丢到线程池中执行(核心代码位于org.springframework.aop.interceptor.AsyncExecutionInterceptor
@Override
@Nullable
public Object invoke(final MethodInvocation invocation)throws Throwable 
{

    Class<?> targetClass = (invocation.getThis() != 
null
 ? AopUtils.getTargetClass(invocation.getThis()) : 
null
);

    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);

final
 Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);


    AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);

if
 (executor == 
null
) {

thrownew
 IllegalStateException(

"No executor specified and no default executor set on AsyncExecutionInterceptor either"
);

    }

// 将方法调用封装成  Callable 实例丢入线程池中执行
    Callable<Object> task = () -> {

try
 {

            Object result = invocation.proceed();

if
 (result 
instanceof
 Future) {

return
 ((Future<?>) result).get();

            }

        }

catch
 (ExecutionException ex) {

            handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());

        }

catch
 (Throwable ex) {

            handleError(ex, userDeclaredMethod, invocation.getArguments());

        }

returnnull
;

    };


return
 doSubmit(task, executor, invocation.getMethod().getReturnType());

}

如果不指定线程池这里就会启用默认的线程池 SimpleAsyncTaskExecutor 然后我们看下这个类的注释
那这个问题就很严重了,假定你的方法执行速度慢,而且qps大,这时候线程数就会直接爆炸,所以建议写一个类继承 AsyncConfigurer接口并复写getAsyncExecutor方法,然后在使用注解的时候指定线程池的名称
//使用注解时指定线程池的Bean名称
@Async
(
"apiExecutor"
)

(3)线程池遇上ThreadLocal
线程池和 ThreadLocal 共用,可能会导致线程从 ThreadLocal 获取到的是旧值/脏数据。这是因为线程池会复用线程对象,与线程对象绑定的类的静态属性 ThreadLocal 变量也会被重用,这就导致一个线程可能获取到其他线程的 ThreadLocal 值。
比较常规的做法是在任务执行完毕之后的finally代码块里面做清理工作
Runnable runnable = () -> {

try
 {

        BizThreadLocal.set(
"xxxx"
);

// do sth
    } 
finally
 {

        BizThreadLocal.remove();

    }

};

但是其实finally的代码块其实也不是百分百一定执行,事实上Thread#stop() 方法打断线程执行的时候 finally代码块中的内容就不会执行,比较推荐的还是# TransmittableThreadLocal

4 再谈线程池,几个关键要点

(1)为什么默认线程池的队列长度不能动态调整?
曾经面对生产环境线程池的参数设定问题,我曾经想到一个方案,既然线程池的参数不好定,那咱们直接动态修改就行不行呢,线程池本身提供了很多的set方法可以做到参数修改,比如我们在springBoot项目往往去使用ThreadPoolTaskExecutor 作为线程池,从下图的set方法列表中可以看出存在很多修改线程池参数的方法
然后实际使用的时候发现核心线程数和最大线程数都能动态修改 但是队列长度却不能 ,为什么队列长度不能调用setQueueCapacity方法进行动态修改呢?
首先我们可以简单理解为spring的ThreadPoolTaskExecutor是Java原生ThreadPoolExecutor的封装,观察这个类的setMaxPoolSizesetQueueCapacity代码实现我们就能发现setQueueCapacity 实际就是一个赋值仅在第一次实例化线程池的使用到了这个参数。
publicvoidsetMaxPoolSize(int maxPoolSize)
{

synchronized
(
this
.poolSizeMonitor) {

if
 (
this
.threadPoolExecutor != 
null
) {

this
.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);

        }


this
.maxPoolSize = maxPoolSize;

    }

}


publicvoidsetQueueCapacity(int queueCapacity)
{

this
.queueCapacity = queueCapacity;

}



protected BlockingQueue<Runnable> createQueue(int queueCapacity)
{

return
 (BlockingQueue)(queueCapacity > 
0
 ? 
new
 LinkedBlockingQueue(queueCapacity) : 
new
 SynchronousQueue());

}

从上面的源码我们还可以看出spring的ThreadPoolTaskExecutor使用的队列是LinkedBlockingQueue ,那么为啥线程池ThreadPoolExecutor不支持修改队列长度呢?这个原因就很简单了因为这个队列的capacity是final类型的,自然不能修改。
那如果我一定要修改这个队列长度应该怎么处理?那完全就可以仿照美团的方式,自定义了一个叫做 ResizableCapacityLinkedBlockIngQueue 的队列(把LinkedBlockingQueue的capacity 字段的final关键字修饰给去掉了,让它变为可变的)是不是也是很简单。
(2)再谈核心线程数的参数设置
核心线程池的参数设置一般各种网络资料中比较推崇的是N+1和2N法,即:
CPU 密集型任务(N+1) :这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N) :这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型 :简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。
IO 密集型 :涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
但是实际上比较科学的线程数计算方式是:
最佳线程数 = N(CPU 核心数)∗(1+WT(线程等待时间)/ST(线程计算时间))
WT(线程等待时间)= 线程运行总时间 - ST(线程计算时间)
线程等待时间所占比例越高,需要越多线程。线程计算时间所占比例越高,需要越少线程。(我们可以通过 JDK 自带的工具 VisualVM 来查看 WT/ST 比例)
CPU 密集型任务的 WT/ST 接近或者等于 0,因此, 线程数可以设置为 N(CPU 核心数)∗(1+0)= N,和我们上面说的 N(CPU 核心数)+1 差不多。IO 密集型任务下,几乎全是线程等待时间,从理论上来说,你就可以将线程数设置为 2N。
这里额外说一句早先我虽然知道线程池核心线程数应该和CPU核心线程数有关,但是悲剧的是我并不知道怎么查Linux系统的核心数,这里把查询命令贴出来供参考:
# 总核数 = 物理CPU个数 X 每颗物理CPU的核数 

# 查看物理CPU个数

cat /proc/cpuinfo| grep 
"physical id"
| sort| uniq| wc -l


# 查看每个物理CPU中core的个数(即核数)

cat /proc/cpuinfo| grep 
"cpu cores"
| uniq

(3)为什么不太推荐你用ParallelStream?
曾经某次代码评审中,有位同学写出了下面类似的代码
// 从数据库中查找学生对象
List<Student> students = searchDataFromDb();


// 使用并行流进行模型转换
List<StudentVo> res = 
new
 ArrayList();    

students.parallelStream().forEach(student -> {

    StudentVo vo = 
new
 StudentVo(student)

    res.add(student);

});

结果测试过程中返回给前端的数据总是莫名其妙的少很多和数据库中的真实数据条数对不上,相信大家都看出来了原因是List并不是线程安全的容器,所以导致了最后结果不对,其实这不能算是parallelStream的问题,但是很多人写代码时,以为并行流就快为了追求效率,不假思索就写了这样的代码,但是往往在线程池的环境下大家又仿佛绷紧了并发神经,又能考虑到了并发问题。
此外parallelStream的默认线程池遇上ThreadLocal同样也存在一些问题,其实如果不做额外线程池指定,代码中的 parallelStream 都是共用同一个线程池的,ParallelStream 底层使用了 ForkJoinPool,当 Stream 流中元素较多时,整个运行效率也会大大降低。

5 总结

本文通过一次生产事故,进一步总结了线程池在日常开发中需要注意的一些要点,希望对大家有所帮助。

欢迎加入我的知识星球,全面提升技术能力。
👉 加入方式,长按”或“扫描”下方二维码噢
星球的内容包括:项目实战、面试招聘、源码解析、学习路线。
文章有帮助的话,在看,转发吧。
谢谢支持哟 (*^__^*)
继续阅读
阅读原文