⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 juejin.cn/post/7140244126679138312 「小蚂蚁技术」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

问题背景

问题:当查询接口较复杂时候,数据的获取都需要[远程调用],必然需要花费更多的时间。假如查询文章详情页面,需要如下标注的时间才能完成,比如如下场景:

1. 查询文章详情 0.5s
2. 查询文章博主个人信息 0.5s
3. 查询文章评论 1s
4. 查询博主相关文章分类 1s
5. 相关推荐文章 1s
上面的描述只是举个例子不要在意这里的查询描述,看实际情况使用,有些相关的查询我们可以拆分接口实现,上面的描述只是为了举例子。

那么,用户需要4s后才能统计的数据。很显然是不能接受的。如果有多个线程同时完成这4步操作,也许只需要1s左右即可完成响应。

CompletableFuture介绍

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。CompletableFuture和FutureTask同属于Future接口的实现类,都可以获取线程的执行结果。

CompletableFuture介绍

具体场景

我们先从单个任务开始

runAsync:无返回值

/**
* runAsync无返回值
*/
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}, executor);

supplyAsync:有返回值

「whenComplete:能感知异常,能感知结果,但没办法给返回值 exceptionally:能感知异常,不能感知结果,能给返回值。相当于,如果出现异常就返回这个值」

/**
* supplyAsync有返回值
* whenComplete能感知异常,能感知结果,但没办法给返回值
* exceptionally能感知异常,不能感知结果,能给返回值。相当于,如果出现异常就返回这个值
*/
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).whenComplete((res,excption)->{
//whenComplete虽然能得到异常信息,但是没办法修改返回值
System.out.println("异步任务成功完成...结果是:"+res+";异常是:"+excption);
}).exceptionally(throwable -> {
//exceptionally能感知异常,而且能返回一个默认值,相当于,如果出现异常就返回这个值
return 10;
});

supplyAsync:有返回值

「handle能拿到返回结果,也能得到异常信息,也能修改返回值」

/**
* supplyAsync有返回值
* handle能拿到返回结果,也能得到异常信息,也能修改返回值
*/
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("运行结果:" + i);
return i;
}, executor).handle((res,excption)->{
if(excption!=null){
return 0;
}else {
return res * 2;
}
});

两个任务编排

两任务组合(「线程串行化」) 可以是两任务的串行化,就是一个任务执行完了再执行下一个 也可以是多个任务的串行化,就是按照顺序一个个的执行

thenRunAsync:不能接收上一次的执行结果,也没返回值

/**
* thenRunXXX 不能接收上一次的执行结果,也没返回值
* .thenRunAsync(() -> {
* System.out.println("任务2启动了...");
* }, executor);
*/
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("运行结果:" + i);
return i;
}, executor).thenRunAsync(() -> {
System.out.println("任务2启动了...");
}, executor);

thenAcceptAsync:能接收上一次的执行结果,但没返回值

/**
* thenAcceptXXX 能接收上一次的执行结果,但没返回值
* .thenAcceptAsync(res->{
* System.out.println("任务2启动了..."+res);
* },executor);
*/
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("运行结果:" + i);
return i;
}, executor).thenAcceptAsync(res -> {
System.out.println("任务2启动了..." + res);
}, executor);

thenApplyAsync:能接收上一次的执行结果,又可以有返回值

/**
* thenApplyXXX 能接收上一次的执行结果,又可以有返回值
* .thenApplyAsync(res -> {
* System.out.println("任务2启动了..." + res);
* return "hello " + res;
* }, executor);
*/
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("运行结果:" + i);
return i;
}, executor).thenApplyAsync(res -> {
System.out.println("任务2启动了..." + res);
return "hello " + res;
}, executor);

三任务编排

先准备两个任务

CompletableFuture<Object> future01 =CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("任务1结束:");
return i;
}, executor);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程" + Thread.currentThread().getId());
try {
Thread.sleep(3000);
System.out.println("任务2结束:");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}, executor);

三任务组合,前两个任务都完成,才执行任务3

「runAfterBothAsync」「任务01 任务02都完成了,再开始执行任务3,不感知任务1、2的结果的,也没返回值」

CompletableFuture<Void> future = future01.runAfterBothAsync(future02, () -> {
System.out.println("任务3开始");
}, executor);

「thenAcceptBothAsync:任务01 任务02都完成了,再开始执行任务3,能感知到任务1、2的结果,但没返回值」

CompletableFuture<Void> future = future01.thenAcceptBothAsync(future02, (f1, f2) -> {
System.out.println("任务3开始...得到之前的结果:f1:" + f1 + ", f2:" + f2);
}, executor);

「thenCombineAsync」「任务01 任务02都完成了,再开始执行任务3,能感知到任务1、2的结果,而且自己可以带返回值」

CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
return f1+":"+f2+":哈哈";
}, executor);

三任务组合,前两个任务只要有一个完成,就执行任务3

「runAfterEitherAsync」「两个任务只要有一个完成,就执行任务3,不感知结果,自己没返回值」

CompletableFuture<Void> future = future01.runAfterEitherAsync(future02, () -> {
System.out.println("任务3开始...");
}, executor);

「acceptEitherAsync:两个任务只要有一个完成,就执行任务3,感知结果,自己没返回值」

CompletableFuture<Void> future = future01.acceptEitherAsync(future02, (res) -> {
System.out.println("任务3开始...之前的结果" + res);
}, executor);

「applyToEitherAsync:两个任务只要有一个完成,就执行任务3,感知结果,自己有返回值」

CompletableFuture<String> future = future01.applyToEitherAsync(future02, (res) -> {
System.out.println("任务3开始...之前的结果" + res);
return "任务3的结果...";
}, executor);

多任务的编排

/**
* 多任务组合
*/
CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品图片信息");
return "hello.jpg";
},executor);

CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品属性信息");
return "黑色+256G";
},executor);

CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("查询商品介绍信息");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "华为...";
},executor);

allOf:所有任务都执行完

/**
* allOf 所有任务都执行完
*/
CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
allOf.get();//等待所有结果完成

anyOf:其中有一个任务执行完就可以

/**
* anyOf 其中有一个任务执行完就可以
*/
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc);
anyOf.get();

一个实际的例子

public SkuItemVo item(Long skuId) {
SkuItemVo skuItemVo = new SkuItemVo();

//1、sku详细信息 sku_info
SkuInfoEntity skuInfo = getById(skuId);
skuItemVo.setInfo(skuInfo);

//2、sku 图片信息 sku_img
List<SkuImagesEntity> images = skuImagesService.getImagesBySkuId(skuId);
skuItemVo.setImages(images);

//3、spu 销售属性组合
List<SkuItemSaleAttrVo> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(skuInfo.getSpuId());
skuItemVo.setSaleAttr(saleAttr);

//4、spu 的介绍
SpuInfoDescEntity spuInfoDesc = spuInfoDescService.getById(skuInfo.getSpuId());
skuItemVo.setDesc(spuInfoDesc);

//5、spu 规格参数信息
List<SpuItemAttrGroupVo> groupAttrs = attrGroupService.getAttrGroupWithAttrsBySpuId(skuInfo.getSpuId(),skuInfo.getCatalogId());
skuItemVo.setGroupAttrs(groupAttrs);

return skuItemVo;
}

使用CompletableFuture异步编排后

private SkuItemVo item(Long skuId) {
SkuItemVo skuItemVo = new SkuItemVo();

/**
* 3、4、5需要依赖1的运行结果,需要返回skuInfo后从中获取spuId和catalogId
* 而2不需要依赖1的运行结果
*/

//1、sku详细信息 sku_info
CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
SkuInfoEntity skuInfo = getById(skuId);
skuItemVo.setInfo(skuInfo);
return skuInfo;
}, executor);

//2、sku 图片信息 sku_img 2不需要等待上边1的执行结果
CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
List<SkuImagesEntity> images = skuImagesService.getImagesBySkuId(skuId);
skuItemVo.setImages(images);
}, executor);

//下边的3、4、5都需要上边1的执行结果
//所以下边的3、4、5都是基于上边1的执行结果 infoFuture 开始的
//都是以infoFuture.thenAcceptAsync(skuInfo -> {})开始的
CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync(skuInfo -> {
//3、spu 销售属性组合 3
List<SkuItemSaleAttrVo> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(skuInfo.getSpuId());
skuItemVo.setSaleAttr(saleAttr);
System.out.println(saleAttr);
}, executor);

CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync(skuInfo -> {
//4、spu 的介绍
SpuInfoDescEntity spuInfoDesc = spuInfoDescService.getById(skuInfo.getSpuId());
skuItemVo.setDesc(spuInfoDesc);
}, executor);

CompletableFuture<Void> attrGroupFuture = infoFuture.thenAcceptAsync(skuInfo -> {
//5、spu 规格参数信息
List<SpuItemAttrGroupVo> groupAttrs = attrGroupService.getAttrGroupWithAttrsBySpuId(skuInfo.getSpuId(),skuInfo.getCatalogId());
System.out.println(groupAttrs);
skuItemVo.setGroupAttrs(groupAttrs);
}, executor);

//等待所有任务完成
try {
CompletableFuture.allOf(saleAttrFuture,descFuture,attrGroupFuture,imageFuture).get() ;
} catch (InterruptedException e) {
log.error("查询商品详情异步编排错误: ");
log.error(e.getMessage() );
} catch (ExecutionException e) {
log.error(e.getMessage() );
}
return skuItemVo;
}

文章目录
  1. 1. 问题背景
  2. 2. CompletableFuture介绍
  3. 3. 具体场景
    1. 3.1. runAsync:无返回值
    2. 3.2. supplyAsync:有返回值
    3. 3.3. supplyAsync:有返回值
  4. 4. 两个任务编排
    1. 4.1. thenRunAsync:不能接收上一次的执行结果,也没返回值
    2. 4.2. thenAcceptAsync:能接收上一次的执行结果,但没返回值
    3. 4.3. thenApplyAsync:能接收上一次的执行结果,又可以有返回值
  5. 5. 三任务编排
    1. 5.1. 三任务组合,前两个任务都完成,才执行任务3
    2. 5.2. 三任务组合,前两个任务只要有一个完成,就执行任务3
  6. 6. 多任务的编排
    1. 6.1. allOf:所有任务都执行完
    2. 6.2. anyOf:其中有一个任务执行完就可以
  7. 7. 一个实际的例子