👉 这是一个或许对你有用的社群
🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入芋道快速开发平台知识星球。下面是星球提供的部分资料:
👉这是一个或许对你有用的开源项目
国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。
功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号、CRM 等等功能:
  • Boot 仓库:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 仓库:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn
【国内首批】支持 JDK 21 + SpringBoot 3.2.2、JDK 8 + Spring Boot 2.7.18 双版本 

前言

大多数程序员在平时工作中,都是增删改查。这里我跟大家讲解如何利用CompletableFuture优化项目代码,使项目性能更佳!
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

为什么要用异步编程

举个例子:用户登录成功,需要返回前端用户角色,菜单权限,个人信息,用户余额,积分情况等。正常逻辑是依次查询不同表,得到对应的数据封装返回给前端,代码如下:
@Test
publicvoidlogin(Long userId)
{

    log.info(
"开始查询用户全部信息---串行!"
);

// 查询用户角色信息
    getUserRole(userId);

// 查询用户菜单信息
    getUserMenu(userId);

// 查询用户余额信息
    getUserAmount(userId);

// 查询用户积分信息
    getUserIntegral(userId);

    log.info(
"封装用户信息返回给前端!"
);

}

假如查询用户角色,用户菜单,用户余额,用户积分分别耗时500,200,200,100毫秒,则登录接口耗时为1秒。如果采用异步(多线程并行)形式,则登录接口耗时以单个查询最慢的任务为主,为查询用户角色信息500毫秒。相当于登录接口性能提升一倍!查询任务越多,则其性能提升越大!
代码演示(串行):
@Test
publicvoidlogin()throws InterruptedException 
{


long
 startTime = System.currentTimeMillis();

    log.info(
"开始查询用户全部信息!"
);


    log.info(
"开始查询用户角色信息!"
);

    Thread.sleep(
500
);

    String role = 
"管理员"
;

    log.info(
"开始查询用户菜单信息!"
);

    Thread.sleep(
200
);

    String menu = 
"首页,账户管理,积分管理"
;

    log.info(
"开始查询查询用户余额信息!"
);

    Thread.sleep(
200
);

    Integer amount = 
1999
;

    log.info(
"开始查询查询查询用户积分信息!"
);

    Thread.sleep(
100
);

    Integer integral = 
1015
;


    log.info(
"封装用户信息返回给前端!"
);

    log.info(
"查询用户全部信息总耗时:"
 + (System.currentTimeMillis() - startTime) / 
1000
 + 
"秒"
);

}

结果:
代码演示(异步):
@Test
publicvoidasyncLogin()
{

long
 startTime = System.currentTimeMillis();

    log.info(
"开始查询用户角色信息!"
);

    CompletableFuture<Map<String, Object>> roleFuture = CompletableFuture.supplyAsync(() -> {

try
 {

            Thread.sleep(
500
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

        Map<String, Object> roleMap = 
new
 HashMap<String, Object>();

        roleMap.put(
"role"
"管理员"
);

long
 endTime = System.currentTimeMillis();

        log.info(
"查询用户角色信息耗时:"
 + (endTime - startTime)  + 
"毫秒"
);

return
 roleMap;

    });


    log.info(
"开始查询用户菜单信息!"
);

    CompletableFuture<Map<String, Object>> menuFuture = CompletableFuture.supplyAsync(() -> {

try
 {

            Thread.sleep(
200
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

        Map<String, Object> menuMap = 
new
 HashMap<String, Object>();

        menuMap.put(
"menu"
"首页,账户管理,积分管理"
);

long
 endTime = System.currentTimeMillis();

        log.info(
"查询用户菜单信息耗时:"
 + (endTime - startTime)  + 
"毫秒"
);

return
 menuMap;

    });


    log.info(
"开始查询用户余额信息!"
);

    CompletableFuture<Map<String, Object>> amountFuture = CompletableFuture.supplyAsync(() -> {

try
 {

            Thread.sleep(
200
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

        Map<String, Object> amountMap = 
new
 HashMap<String, Object>();

        amountMap.put(
"amount"
1999
);

long
 endTime = System.currentTimeMillis();

        log.info(
"查询用户余额信息耗时:"
 + (endTime - startTime)  + 
"毫秒"
);

return
 amountMap;

    });


    log.info(
"开始查询用户积分信息!"
);

    CompletableFuture<Map<String, Object>> integralFuture = CompletableFuture.supplyAsync(() -> {

try
 {

            Thread.sleep(
200
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

        Map<String, Object> integralMap = 
new
 HashMap<String, Object>();

        integralMap.put(
"integral"
1015
);

long
 endTime = System.currentTimeMillis();

        log.info(
"查询用户积分信息耗时:"
 + (endTime - startTime)  + 
"毫秒"
);

return
 integralMap;

    });


    roleFuture.join();

    menuFuture.join();

    amountFuture.join();

    integralFuture.join();

    log.info(
"查询用户全部信息总耗时:"
 + (System.currentTimeMillis() - startTime)  + 
"毫秒"
);


}

结果:
直观的可以看出,异步执行的优势!
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

回顾Future

Future是什么?
  • Java 1.5中引入Callable解决多线程执行无返回值的问题。
  • Future是为了配合Callable/Runnable而产生的。简单来讲,我们可以通过future来对任务查询、取消、执行结果的获取,是调用方与异步执行方之间沟通的桥梁。
  • FutureTask实现了RunnableFuture接口,同时具有Runnable、Future的能力,即既可以作为Future得到Callable的返回值,又可以作为一个Runnable。
  • CompletableFuture实现了Futrue接口。
  • Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们可以将这个任务通过Future放到异步线程中去执行。主线程继续处理其他任务,处理完成后,再通过Future获取计算结果。
  • Future可以在连续流程中满足数据驱动的并发需求,既获得了并发执行的性能提升,又不失连续流程的简洁优雅。
代码演示(不使用自定义线程池):
@Test
publicvoidcallable()throws ExecutionException, InterruptedException 
{

long
 startTime = System.currentTimeMillis();


    Callable amountCall = 
new
 Callable() {

@Override
public Object call()throws Exception 
{

long
 startTime = System.currentTimeMillis();

            Thread.sleep(
6000
);

            Map<String, Object> amountMap = 
new
 HashMap<String, Object>();

            amountMap.put(
"amount"
99
);

long
 endTime = System.currentTimeMillis();

            log.info(
"查询金额信息耗时:"
 + (endTime - startTime) / 
1000
 + 
"秒"
);

return
 amountMap;

        }

    };


    FutureTask<Map> amountFuture = 
new
 FutureTask<>(amountCall);

new
 Thread(amountFuture).start();


    Callable roleCall = 
new
 Callable() {

@Override
public Object call()throws Exception 
{

long
 startTime = System.currentTimeMillis();

            Thread.sleep(
5000
);

            Map<String, String> roleMap = 
new
 HashMap<String, String>();

            roleMap.put(
"name"
"管理员"
);

long
 endTime = System.currentTimeMillis();

            log.info(
"查询角色信息耗时:"
 + (endTime - startTime) / 
1000
 + 
"秒"
);

return
 roleMap;

        }

    };


    FutureTask<Map> roleFuture = 
new
 FutureTask<>(roleCall);

new
 Thread(roleFuture).start();


    log.info(
"金额查询结果为:"
 + amountFuture.get());

    log.info(
"角色查询结果为:"
 + roleFuture.get());


long
 endTime = System.currentTimeMillis();

    log.info(
"总耗时:"
 + (endTime - startTime) / 
1000
 + 
"秒"
);


}

这里要注意:Future对于结果的获取,不是很友好,只能通过阻塞或者轮询的方式得到任务的结果。Future.get() 就是阻塞调用,在线程获取结果之前get方法会一直阻塞;Future提供了一个isDone方法,可以在程序中轮询这个方法查询执行结果。
这里的 amountFuture.get()如果放到如下图所示的位置,则amountFuture下面的线程将等amountFuture.get()完成后才能执行,没有执行完,则一直阻塞。
结果:
代码演示(使用自定义线程池):
@Test
publicvoidexecutor()throws ExecutionException, InterruptedException 
{

long
 startTime = System.currentTimeMillis();


    ExecutorService executor = Executors.newFixedThreadPool(
2
);


    Callable amountCall = 
new
 Callable() {

@Override
public Object call()throws Exception 
{

long
 startTime = System.currentTimeMillis();

            Thread.sleep(
6000
);

            Map<String, Object> amountMap = 
new
 HashMap<String, Object>();

            amountMap.put(
"amount"
99
);

long
 endTime = System.currentTimeMillis();

            log.info(
"查询金额信息耗时:"
 + (endTime - startTime) / 
1000
 + 
"秒"
);

return
 amountMap;

        }

    };


    Callable roleCall = 
new
 Callable() {

@Override
public Object call()throws Exception 
{

long
 startTime = System.currentTimeMillis();

            Thread.sleep(
5000
);

            Map<String, String> roleMap = 
new
 HashMap<String, String>();

            roleMap.put(
"name"
"管理员"
);

long
 endTime = System.currentTimeMillis();

            log.info(
"查询用户角色信息耗时:"
 + (endTime - startTime) / 
1000
 + 
"秒"
);

return
 roleMap;

        }

    };


    Future amountFuture = executor.submit(amountCall);

    Future roleFuture = executor.submit(roleCall);


    log.info(
"金额查询结果为:"
 + amountFuture.get());

    log.info(
"角色查询结果为:"
 + roleFuture.get());


long
 endTime = System.currentTimeMillis();

    log.info(
"总耗时:"
 + (endTime - startTime) / 
1000
 + 
"秒"
);


}

结果:

CompletableFuture使用场景

创建异步任务

CompletableFuture创建异步任务,一般有supplyAsync和runAsync两个方法:
  • supplyAsync执行CompletableFuture任务,支持返回值。
  • runAsync执行CompletableFuture任务,没有返回值。
supplyAsync方法
//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
publicstatic <U> 
CompletableFuture<U> 
supplyAsync(Supplier<U> supplier)
//自定义线程,根据supplier构建执行任务
publicstatic
 <U> CompletableFuture<U> 
supplyAsync(Supplier<U> supplier, Executor executor)
runAsync方法
//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
publicstatic
 CompletableFuture<Void> 
runAsync(Runnable runnable)
//自定义线程,根据runnable构建执行任务
publicstatic
 CompletableFuture<Void> 
runAsync(Runnable runnable,  Executor executor)
代码演示:
@Test
// supplyAsync执行CompletableFuture任务,支持返回值
publicvoiddefaultSupplyAsync()throws ExecutionException, InterruptedException 
{

long
 startTime = System.currentTimeMillis();

// 构建执行任务
        CompletableFuture<Map<String, Object>> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {

try
 {

                Thread.sleep(
6000
);

            } 
catch
 (InterruptedException e) {

                e.printStackTrace();

            }

            Map<String, Object> amountMap = 
new
 HashMap<String, Object>();

            amountMap.put(
"amount"
99
);

long
 endTime = System.currentTimeMillis();

            log.info(
"查询金额信息耗时:"
 + (endTime - startTime) / 
1000
 + 
"秒"
);

return
 amountMap;

        });

//        这行代码在这里 则会进行6秒的阻塞 下面代码其他线程无法创建
//        只能等这个线程6秒过后结束才能创建其他线程
//        Map<String, Object> userMap = userCompletableFuture.get();
        CompletableFuture<Map<String, Object>> roleCompletableFuture = CompletableFuture.supplyAsync(() -> {

try
 {

                Thread.sleep(
5000
);

            } 
catch
 (InterruptedException e) {

                e.printStackTrace();

            }

            Map<String, Object> roleMap = 
new
 HashMap<String, Object>();

            roleMap.put(
"name"
"管理员"
);

return
 roleMap;

        });


        log.info(
"金额查询结果为:"
 + amountCompletableFuture.join());

        log.info(
"角色查询结果为:"
 + roleCompletableFuture.join());


        log.info(
"总耗时:"
 + (System.currentTimeMillis() - startTime) / 
1000
 + 
"秒"
);

    }


@Test
// supplyAsync执行CompletableFuture任务,支持返回值
publicvoidcustomSupplyAsync()throws ExecutionException, InterruptedException 
{

// 自定义线程池
        ExecutorService executorService = Executors.newCachedThreadPool();

long
 startTime = System.currentTimeMillis();

        CompletableFuture<Map<String, Object>> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {

try
 {

                Thread.sleep(
6000
);

            } 
catch
 (InterruptedException e) {

                e.printStackTrace();

            }

            Map<String, Object> amountMap = 
new
 HashMap<String, Object>();

            amountMap.put(
"amount"
99
);

long
 endTime = System.currentTimeMillis();

            log.info(
"查询金额信息耗时:"
 + (endTime - startTime) / 
1000
 + 
"秒"
);

return
 amountMap;

        }, executorService);

//        这行代码在这里 则会进行6秒的阻塞 下面代码其他线程无法创建
//        只能等这个线程6秒过后结束才能创建其他线程
//        Map<String, Object> userMap = userCompletableFuture.get();
        CompletableFuture<Map<String, Object>> roleCompletableFuture = CompletableFuture.supplyAsync(() -> {

try
 {

                Thread.sleep(
5000
);

            } 
catch
 (InterruptedException e) {

                e.printStackTrace();

            }

            Map<String, Object> roleMap = 
new
 HashMap<String, Object>();

            roleMap.put(
"name"
"管理员"
);

return
 roleMap;

        }, executorService);

        log.info(
"金额查询结果为:"
 + amountCompletableFuture.join());

        log.info(
"角色查询结果为:"
 + roleCompletableFuture.join());

// 线程池需要关闭
        executorService.shutdown();

        log.info(
"总耗时:"
 + (System.currentTimeMillis() - startTime) / 
1000
 + 
"秒"
);

    }


@Test
// runAsync执行CompletableFuture任务,没有返回值
publicvoiddefaultRunAsync()
{


long
 lordStartTime = System.currentTimeMillis();


        CompletableFuture<Void> amountCompletableFuture = CompletableFuture.runAsync(() -> {

long
 startTime = System.currentTimeMillis();

try
 {

                Thread.sleep(
3000
);

            } 
catch
 (InterruptedException e) {

                e.printStackTrace();

            }

            log.info(
"执行金额增删改操作用时:"
 + (System.currentTimeMillis() - startTime) / 
1000
 + 
"秒"
);

        });


        CompletableFuture<Void> roleCompletableFuture = CompletableFuture.runAsync(() -> {

long
 startTime = System.currentTimeMillis();

try
 {

                Thread.sleep(
4000
);

            } 
catch
 (InterruptedException e) {

                e.printStackTrace();

            }

            log.info(
"执行角色增删改操作用时:"
 + (System.currentTimeMillis() - startTime) / 
1000
 + 
"秒"
);

        });

        log.info(
"金额查询结果为:"
 + amountCompletableFuture.join());

        log.info(
"角色查询结果为:"
 + roleCompletableFuture.join());


        log.info(
"总耗时:"
 + (System.currentTimeMillis() - lordStartTime) / 
1000
 + 
"秒"
);

    }


@Test
// runAsync执行CompletableFuture任务,没有返回值
publicvoidcustomRunAsync()
{


long
 lordStartTime = System.currentTimeMillis();

        ExecutorService executor = Executors.newCachedThreadPool();


        CompletableFuture<Void> amountCompletableFuture = CompletableFuture.runAsync(() -> {

long
 startTime = System.currentTimeMillis();

try
 {

                Thread.sleep(
2000
);

            } 
catch
 (InterruptedException e) {

                e.printStackTrace();

            }

            log.info(
"执行金额增删改操作用时:"
 + (System.currentTimeMillis() - startTime) / 
1000
 + 
"秒"
);

        }, executor);


        CompletableFuture<Void> roleCompletableFuture = CompletableFuture.runAsync(() -> {

long
 startTime = System.currentTimeMillis();

try
 {

                Thread.sleep(
1000
);

            } 
catch
 (InterruptedException e) {

                e.printStackTrace();

            }

            log.info(
"执行角色增删改操作用时:"
 + (System.currentTimeMillis() - startTime) / 
1000
 + 
"秒"
);

        }, executor);

        log.info(
"金额查询结果为:"
 + amountCompletableFuture.join());

        log.info(
"角色查询结果为:"
 + roleCompletableFuture.join());


// 关闭线程池
        executor.shutdown();

        log.info(
"总耗时:"
 + (System.currentTimeMillis() - lordStartTime) / 
1000
 + 
"秒"
);

    }

注意:这里的get()与join()都是获取任务线程的返回值。join()方法抛出的是uncheck异常(即RuntimeException),不会强制开发者抛出, 会将异常包装成CompletionException异常 /CancellationException异常,但是本质原因还是代码内存在的真正的异常;
get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)。

异步任务回调

thenRun / thenRunAsync
CompletableFuture的thenRun方法,通俗点讲就是,做完第一个任务后,再做第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值。
public CompletableFuture<Void> thenRun(Runnable action)
;

public CompletableFuture<Void> thenRunAsync(Runnable action)
;

thenRun / thenRunAsync的区别? 源码解释:
privatestaticfinal
 Executor asyncPool = useCommonPool ?

    ForkJoinPool.commonPool() : 
new
 ThreadPerTaskExecutor();


public CompletableFuture<Void> thenRun(Runnable action)
{

return
 uniRunStage(
null
, action);

}


public CompletableFuture<Void> thenRunAsync(Runnable action)
{

return
 uniRunStage(asyncPool, action);

}

如果你执行第一个任务的时候,传入了一个自定义线程池:
  • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
  • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。
后面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个哈!
代码演示:
@Test
// 执行第一个任务后 可以继续执行第二个任务 两个任务之间无传参 无返回值
publicvoiddefaultThenRun()throws ExecutionException, InterruptedException 
{

long
 lordStartTime = System.currentTimeMillis();

    CompletableFuture<Void> amountCompletableFuture = CompletableFuture.runAsync(() -> {

long
 startTime = System.currentTimeMillis();

try
 {

            Thread.sleep(
2000
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

        log.info(
"执行金额增删改操作用时:"
 + (System.currentTimeMillis() - startTime) / 
1000
 + 
"秒"
);

    });

    CompletableFuture<Void> thenCompletableFuture = amountCompletableFuture.thenRun(() -> {

long
 startTime = System.currentTimeMillis();

try
 {

            Thread.sleep(
1000
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

        log.info(
"执行角色增删改操作用时:"
 + (System.currentTimeMillis() - startTime) / 
1000
 + 
"秒"
);

    });

    thenCompletableFuture.get();

    log.info(
"总耗时:"
 + (System.currentTimeMillis() - lordStartTime) / 
1000
 + 
"秒"
);


}

结果:
thenAccept / thenAcceptAsync
CompletableFuture的thenAccept方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
;

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
;

代码演示:
@Test
// 执行第一个任务后 可以继续执行第二个任务 并携带第一个任务的返回值 第二个任务执行完没有返回值
publicvoiddefaultThenAccept()throws ExecutionException, InterruptedException 
{

long
 lordStartTime = System.currentTimeMillis();

    CompletableFuture<Map> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {

long
 startTime = System.currentTimeMillis();

try
 {

            Thread.sleep(
2000
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

        Map<String, Object> amountMap = 
new
 HashMap<String, Object>();

        amountMap.put(
"amount"
90
);

        log.info(
"执行金额查询操作用时:"
 + (System.currentTimeMillis() - startTime) / 
1000
 + 
"秒"
);

return
 amountMap;

    });

    CompletableFuture<Void> thenCompletableFuture = amountCompletableFuture.thenAccept((map) -> {

long
 startTime = System.currentTimeMillis();

if
 (Integer.parseInt(map.get(
"amount"
).toString()) > 
90
) {

try
 {

                Thread.sleep(
1000
);

            } 
catch
 (InterruptedException e) {

                e.printStackTrace();

            }

            log.info(
"金额充足,可以购买!:"
 + (System.currentTimeMillis() - startTime) / 
1000
 + 
"秒"
);

        } 
else
 {

            log.info(
"金额不足,无法购买!:"
 + (System.currentTimeMillis() - startTime) / 
1000
 + 
"秒"
);

        }

    });

    thenCompletableFuture.get();

    log.info(
"总耗时:"
 + (System.currentTimeMillis() - lordStartTime) / 
1000
 + 
"秒"
);


}

结果:
thenApply / thenApplyAsync
CompletableFuture的thenApply方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。
public
 <U> 
CompletableFuture<U> thenApplyAsync()
;

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
;

代码演示:
@Test
// 执行第一个任务后 可以继续执行第二个任务 并携带第一个任务的返回值 第二个任务执行完有返回值
publicvoiddefaultThenApply()throws ExecutionException, InterruptedException 
{

long
 lordStartTime = System.currentTimeMillis();

    CompletableFuture<Map> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {

long
 startTime = System.currentTimeMillis();

try
 {

            Thread.sleep(
2000
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

        Map<String, Object> amountMap = 
new
 HashMap<String, Object>();

        amountMap.put(
"amount"
90
);

        log.info(
"执行金额查询操作用时:"
 + (System.currentTimeMillis() - startTime) / 
1000
 + 
"秒"
);

return
 amountMap;

    });

    CompletableFuture<Integer> thenCompletableFuture = amountCompletableFuture.thenApply((map) -> {

int
 number = 
0
;

if
 (Integer.parseInt(map.get(
"amount"
).toString()) > 
3
) {

try
 {

                Thread.sleep(
1000
);

            } 
catch
 (InterruptedException e) {

                e.printStackTrace();

            }

// 可口可乐3元一瓶 看金额一共能购买多少瓶
            number = Integer.parseInt(map.get(
"amount"
).toString()) / 
3
;

        }

return
 number;

    });


    log.info(
"当前金额一共可以买"
 + thenCompletableFuture.get() + 
"瓶可口可乐!"
);

    Integer integer = thenCompletableFuture.get();

    log.info(
"总耗时:"
 + (System.currentTimeMillis() - lordStartTime) / 
1000
 + 
"秒"
);

}

结果:
exceptionally
CompletableFuture的exceptionally方法表示,某个任务执行异常时,执行的回调方法;并且有抛出异常作为参数,传递到回调方法。
public CompletableFuture<T> exceptionally
(

    Function<Throwable, ? extends T> fn)
{

return
 uniExceptionallyStage(fn);

}

代码演示:
@Test
// 某个任务执行异常时,执行的回调方法;并且有抛出异常作为参数,传递到回调方法。
publicvoidexceptionally()throws ExecutionException, InterruptedException 
{

long
 lordStartTime = System.currentTimeMillis();

    CompletableFuture<Map> amountCompletableFuture = CompletableFuture.supplyAsync(() -> {

long
 startTime = System.currentTimeMillis();

try
 {

            Thread.sleep(
2000
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

        Map<String, Object> amountMap = 
new
 HashMap<String, Object>();

        amountMap.put(
"amount"
90
);

        log.info(
"执行金额查询操作用时:"
 + (System.currentTimeMillis() - startTime) / 
1000
 + 
"秒"
);

return
 amountMap;

    });

    CompletableFuture<Integer> thenCompletableFuture = amountCompletableFuture.thenApply((map) -> {

int
 number = 
0
;

if
 (Integer.parseInt(map.get(
"amount"
).toString()) > 
3
) {

try
 {

                Thread.sleep(
1000
);

// 可口可乐3元一瓶 看金额一共能购买多少瓶
                number = Integer.parseInt(map.get(
"amount"
).toString()) / 
0
;

            } 
catch
 (ArithmeticException | InterruptedException e) {

                e.printStackTrace();

thrownew
 ArithmeticException(); 
// 这里一定要将异常抛除了,不然exceptionally无效
            }

        }

return
 number;

    });


    CompletableFuture<Integer> exceptionFuture = thenCompletableFuture.exceptionally((e) -> {

        log.error(
"除数为0,则默认商为0!"
);

return0
;

    });

    log.info(
"当前金额一共可以买"
 + thenCompletableFuture.get() + 
"瓶可口可乐!"
);

    exceptionFuture.get();

    log.info(
"总耗时:"
 + (System.currentTimeMillis() - lordStartTime) / 
1000
 + 
"秒"
);

}

注意:这里的异常一定要抛出来,不然exceptionally无效!
whenComplete
CompletableFuture的whenComplete方法表示,某个任务执行完成后,执行的回调方法,无返回值;并且whenComplete方法返回的CompletableFuture的result是上个任务的结果。
public CompletableFuture<T> whenComplete
(

    BiConsumer<? 
super
 T, ? 
super
 Throwable> action)
{

return
 uniWhenCompleteStage(
null
, action);

}

代码演示:
@Test
// 某个任务执行完成后,执行的回调方法,无返回值;并且whenComplete方法返回的CompletableFuture的result是上个任务的结果。
publicvoidwhenComplete()
{

    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {

return"周杰伦"
;

    });

    CompletableFuture<String> stringCompletableFuture1 = stringCompletableFuture.whenComplete((a, throwable) -> {

        log.info(
"周杰伦喜欢唱"
);

    });

    log.info(
"输出结果为第一个任务:"
 + stringCompletableFuture1.join());

}

结果:
handle
CompletableFuture的handle方法表示,某个QQ账号买号平台地图任务执行完成后,执行回调方法,并且是有返回值的;并且handle方法返回的CompletableFuture的result是回调方法执行的结果。
public
 <U> 
CompletableFuture<U> handle
(

    BiFunction<? 
super
 T, Throwable, ? extends U> fn)
{

return
 uniHandleStage(
null
, fn);

}

代码演示:
@Test
//    某个任务执行完成后,执行的回调方法,有返回值;并且handle方法返回的CompletableFuture的result是第二个任务的结果。
publicvoidhandle()
{

    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {

return"周杰伦"
;

    });

    CompletableFuture<String> stringCompletableFuture1 = stringCompletableFuture.handle((a, throwable) -> {

return"周杰伦喜欢唱歌!"
;

    });

    log.info(
"输出结果为第二个任务:"
 + stringCompletableFuture1.join());

}

结果:

多个任务组合处理

AND组合关系
thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务。
  • thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值。
  • thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值。
  • runAfterBoth 不会把执行结果当做方法入参,且没有返回值。
代码演示:
@Test
publicvoidthenCombine()
{

    CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> {

return7
;

    });

    CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 
2
).thenCombine(first, Integer::sum);

    log.info(
"结果为:"
 + second.join());

}

结果为:
OR组合关系
applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。
  • applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值。
  • acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值。
  • runAfterEither:不会把执行结果当做方法入参,且没有返回值。
代码演示:
@Test
publicvoidapplyToEither1()
{

    log.info(
"魏凯下班准备回家。。。"
);

    log.info(
"魏凯等待2号,4号地铁。。。"
);

    CompletableFuture<String> busCF = CompletableFuture.supplyAsync(() -> {

        log.info(
"2号在路上。。。"
);

try
 {

            Thread.sleep(
3000
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

return"2"
;

    }).applyToEither(CompletableFuture.supplyAsync(() -> {

        log.info(
"4号地铁在路上。。。"
);

try
 {

            Thread.sleep(
4000
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

return"4"
;

    }), first -> first + 
"号"
);

    log.info(
"魏凯坐上"
 + busCF.join() + 
"地铁"
);

}


@Test
// OR
publicvoidapplyToEither()
{

    CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> {

try
 {

            Thread.sleep(
2000L
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

return7
;

    });

    CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> {

try
 {

            Thread.sleep(
3000L
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

return7
;

    }).applyToEither(first, num -> num);

    log.info(
"最后结果为:"
 + second.join());

}

结果演示:
AllOf
所有任务都执行完成后,才执行 allOf返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常。
代码演示:
@Test
// 所有任务都执行完成后,才执行 allOf返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常。
// 这里第一次执行没有睡眠的话,是可以直接执行第三个任务的。如果有睡眠,则需要手动join启动。
publicvoidallOf()
{

    CompletableFuture<Void> first = CompletableFuture.runAsync(() -> {

//            try {
//                Thread.sleep(2000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
        log.info(
"第一个任务执行完成!"
);

    });

    CompletableFuture<Void> second = CompletableFuture.runAsync(() -> {

//            try {
//                Thread.sleep(500);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
        log.info(
"第二个任务执行完成!"
);

    });

    CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(first, second).whenComplete((m, n) -> {

        log.info(
"第三个任务完成!"
);

    });

//        voidCompletableFuture.join();
}

结果:
注意:这里第一次启动执行没有睡眠的话,是可以直接执行第三个任务的,因为这两个任务都执行完成,启动的瞬间第三个也同时执行完。如果有睡眠,则需要手动join启动,等待最长睡眠任务时间过后,第三个任务完成!
AnyOf
任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常。
代码演示:
@Test
// 前提任务任意执行完一个,则目标任务执行。其他前提任务则不在执行。
// 任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常。
publicvoidanyOf()
{

    CompletableFuture<Void> first = CompletableFuture.runAsync(() -> {

try
 {

            Thread.sleep(
2000
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

        log.info(
"第一个任务执行完成!"
);

    });

    CompletableFuture<Void> second = CompletableFuture.runAsync(() -> {

try
 {

            Thread.sleep(
500
);

        } 
catch
 (InterruptedException e) {

            e.printStackTrace();

        }

        log.info(
"第二个任务执行完成!"
);

    });

    CompletableFuture<Object> voidCompletableFuture = CompletableFuture.anyOf(first, second).whenComplete((m, n) -> {

        log.info(
"第三个任务完成!"
);

    });

    voidCompletableFuture.join();

}

结果:
thenCompose
thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例。
  • 如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例。
  • 如果该CompletableFuture实例为null,然后就执行这个新任务。
代码演示:
@Test
publicvoidthenCompose1()
{

    CompletableFuture<Integer> stringCompletableFuture = CompletableFuture.supplyAsync(() -> 
4
)

            .thenCompose(value -> CompletableFuture.supplyAsync(() -> {

// thenCompose方法返回一个新的CompletableFuture
if
 (Integer.valueOf(
4
).equals(value)) {

return66
;

                } 
else
 {

return99
;

                }

            }));

    log.info(
"结果:"
 + stringCompletableFuture.join());


}


@Test
publicvoidthenCompose()
{

    CompletableFuture<String> first = CompletableFuture.completedFuture(
"第一个任务"
);

    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> 
4
)

            .thenCompose((data) -> {

                log.info(
"data为:"
 + data);

return
 first;

            });

    log.info(
"结果:"
 + stringCompletableFuture.join());


}

结果:

CompletableFuture注意点

Future需要获取返回值,才能获取异常信息
@Test
publicvoidfutureTest()
{

        ExecutorService executor = Executors.newFixedThreadPool(
2
);


        CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {

int
 m = 
9
;

int
 n = 
0
;

return
 m / n;

        },executor);

//        integerCompletableFuture.join(); // 这行代码不加,则不会抛出异常
    }

Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。小伙伴们使用的时候,注意一下哈,考虑是否加try…catch…或者使用exceptionally方法。
CompletableFuture的get()方法是阻塞的
//反例
 CompletableFuture.get();

//正例
CompletableFuture.get(
9
, TimeUnit.SECONDS);

CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间
默认线程池的注意点
CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。
自定义线程池时,注意饱和策略
CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(3, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。
因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离!

欢迎加入我的知识星球,全面提升技术能力。
👉 加入方式,长按”或“扫描”下方二维码噢
星球的内容包括:项目实战、面试招聘、源码解析、学习路线。
文章有帮助的话,在看,转发吧。
谢谢支持哟 (*^__^*)
继续阅读
阅读原文