Java8增强的Future:CompletableFuture

CompletableFuture是Java8新增的一个超大型工具类,为什么说她大呢?因为一方面它实现了Future接口,更重要的是,它实现了CompletionStage接口.这个接口也是Java8新增加的,而CompletionStage拥有多达约40种方法,

通过CompletableFuture提供进一步封装,我们很容易实现Future模式那样的异步调用,例如:

public static Integer cale(Integer para) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return para * para;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> cale(50));
System.out.println(future.get());
}

上述代码中CompletableFuture.supplyAsync()方法构造了一个CompletableFuture实例,在supplyAsync()函数中,他会在一个新的线程中,执行传入的参数.在这里,他会执行calc()方法,而calc()方法的执行可能是比较慢的,但是不影响CompletableFuture实例的构造速度,因此supplyAsync()会立即返回,他返回的CompletableFuture对象实例就可以作为这次调用的契约,在将来任何场合,用于获得最终的计算结果.在CompletableFuture中,类似的工厂方法有以下几个:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

其中supplyAsync()方法用于那些需要返回值的场景,比如计算某个数据,而runAsync()方法用于没有返回值的场景,比如,仅仅是简单地执行某一个异步动作.

首先说明一下已Async结尾的方法都是可以异步执行的,如果指定了线程池,会在指定的线程池中执行,如果没有指定,默认会在ForkJoinPool.commonPool()中执行

在这两对方法中,都有一个方法可以接手一个Executor参数,这使我们可以让Supplier<U>或者Runnable在指定的线程池工作,如果不指定,则在默认的系统公共的ForkJoinPool.common线程池中执行.

流式调用

在前文中我已经简单的提到CompletionStage的约40个接口为函数式编程做准备的,在这里,就让我们看一下,如果使用这些接口进行函数式的流式API调用

public static Integer cale(Integer para) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return para * para;
}
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(() -> cale(50))
.thenApply(i -> Integer.toString(i))
.thenApply(str -> "\"" + str + "\"")
.thenAccept(System.out::println);
future.get();

上述代码中,使用supplyAsync()函数执行了一个异步任务,接着连续使用流式调用对任务处理结果进行在加工,直到最后的结果输出:

CompletableFuture中的异常处理

public static Integer cale(Integer para) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return para * para;
}
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(() -> cale(50))
.exceptionally(ex -> {
System.out.println("ex.toString() = " + ex.toString());
return 0;
})
.thenApply(i -> Integer.toString(i))
.thenApply(str -> "\"" + str + "\"")
.thenAccept(System.out::println);
future.get()

组合多个CompletableFuture

CompletableFuture还允许你将多个CompletableFuture进行组合,一种方法是使用thenCompose(),它的方法签名如下:

public <U>CompletableFuture<U>thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)

CompletableFuture<Void> future = CompletableFuture
.supplyAsync(() -> cale(50))
.thenCompose(i -> CompletableFuture
.supplyAsync(() -> cale(i)))
.thenApply(i -> Integer.toString(i))
.thenApply(str -> "\"" + str + "\"")
.thenAccept(System.out::println);
future.get();

另外一种组和多个CompletableFuture的方法是thenCombine()它的签名如下:

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
/*方法thenCombine()首先完成当前CompletableFuture和other的执行,
接着,将这两者的执行结果传递给BiFunction(该接口接受两个参数,并有一个返回值),
并返回代表BiFuntion实例的CompletableFuture对象:*/
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> cale(50));
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> cale(25));
CompletableFuture<Void> fu = future1.thenCombine(future2, (i, j) -> (i + j))
.thenApply(str -> "\"" + str + "\"")
.thenAccept(System.out::println);
fu.get();

实现异步API

public double getPrice(String product) {
return calculatePrice(product);
}
/**
* 同步计算商品价格的方法
*
* @param product 商品名称
* @return 价格
*/
private double calculatePrice(String product) {
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
/**
* 模拟计算,查询数据库等耗时
*/
public static void delay() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}


/**
* 异步计算商品的价格.
*
* @param product 商品名称
* @return 价格
*/
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
double price = calculatePrice(product);
futurePrice.complete(price);
}).start();
return futurePrice;
}

将同步方法装换为异步方法

/**
* 异步计算商品的价格.
*
* @param product 商品名称
* @return 价格
*/
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
double price = calculatePrice(product);
futurePrice.complete(price);
}).start();
return futurePrice;
}

使用异步API 模拟客户端
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long incocationTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("执行时间:" + incocationTime + " msecs");
try {
Double price = futurePrice.get();
System.out.printf("Price is %.2f%n", price);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
long retrievalTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("retrievalTime:" + retrievalTime + " msecs");

//>执行时间:37 msecs
//>Price is 125.79
//>retrievalTime:1055 msecs

错误处理

上述代码,如果没有意外,可以正常工作,但是如果价格计算过程中生产了错误会怎样呢?非常不幸,这种情况下你会得到一个相当糟糕的结果:

用于提示错误的异常会限制在视图计算商品的价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法放回结果的客户端永久的被阻塞,而这会导致等待get方法放回结果的客户端永久的被阻塞, 为了让客户端能了解商店无法提供请求商品价格的原因.我们对代码优化,!

/**
* 异步计算商品的价格.
*
* @param product 商品名称
* @return 价格
*/
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch (Exception e) {
//否则就抛出异常,完成这次future操作
futurePrice.completeExceptionally(e);
}
}).start();
return futurePrice;
}

使用工厂方法supplyAsync创建CompletableFuture

/**
* 异步计算商品的价格.
*
* @param product 商品名称
* @return 价格
*/
public Future<Double> getPriceAsync(String product) {
/* CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch (Exception e) {
//否则就抛出异常,完成这次future操作
futurePrice.completeExceptionally(e);
}
}).start();
return futurePrice;*/
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

让代码免受阻塞之苦

//案例:最佳价格查询器
private static List<Shop> shops = Arrays.asList(
new Shop("BestPrice"),
new Shop(":LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll"));

/**
* 最佳价格查询器
*
* @param product 商品
* @return
*/
public static List<String> findprices(String product) {
return shops
.stream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(Collectors.toList());
}

//验证findprices的正确性和执行性能
long start = System.nanoTime();
System.out.println(findprices("myPhones27s"));
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Done in " + duration+" msecs");
/*
[BestPrice price is 197.76, :
LetsSaveBig price is 155.39,
MyFavoriteShop price is 124.21,
BuyItAll price is 139.23]
Done in 4071 msecs
*/

使用平行流对请求进行并行操作

/**
* 最佳价格查询器(并行流)
*
* @param product 商品
* @return
*/
public static List<String> parallelFindprices(String product) {
return shops
.parallelStream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(Collectors.toList());
}
/*
[BestPrice price is 201.41, :
LetsSaveBig price is 153.64,
MyFavoriteShop price is 224.65,
BuyItAll price is 211.83]
Done in 1064 msecs
*/

相当不错,看起来这是个简单有效的主意,对4个不同商店的查询实现了并行.所有完成操作的总耗时只有1秒多一点,让我们尝试使用CompletableFuture,将findprices方法中对不同商店的同步调用替换为异步调用.

使用CompletableFuture发起异步请求

/**
* 最佳价格查询器(异步调用实现)
* @param product 商品
* @return
*/
public static List<String> asyncFindprices(String product) {
//使用这种方式,你会得到一个List<CompletableFuture<String>>,
//列表中的每一个CompletableFuture对象在计算完成后都包含商店的String类型的名称.
//但是,由于你用CompletableFuture实现了asyncFindprices方法要求返回一个List<String>.
//你需要等待所有的future执行完毕,将其包含的值抽取出来,填充到列表中才能返回
List<CompletableFuture<String>> priceFuture = shops
.stream()
.map(shop ->CompletableFuture.supplyAsync(() ->
String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))))
.collect(Collectors.toList());
//为了实现这个效果,我门可以向最初的List<CompletableFuture<String>>
//施加第二个map操作,对list中的每一个future对象执行join操作,一个接一个地等待他们允许结束,join和get方法
//有相同的含义,不同的在于join不会抛出任何检测到的异常
return priceFuture
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
[BestPrice price is 187.24, :
LetsSaveBig price is 158.26,
MyFavoriteShop price is 169.78,
BuyItAll price is 170.59]
Done in 1061 msecs

结果让我们失望了.我们采用异步调用新版方法,和并行差不多

寻找更好的方案

经过我增加商店数量,然后使用三种方式反复的测试,发现了一个问题,并行流和异步调用的性能不分伯仲,究其原因都一样,它们内部采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime.availableProcessors()反回值,然而,CompletableFuture具有一定的优势,因为它允许你对执行器进行配置,尤其是线程池的大小,让它以适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的.

private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100));
/**
* 最佳价格查询器(异步调用实现,自定义执行器)
*
* @param product 商品
* @return
*/
public static List<String> asyncFindpricesThread(String product) {
List<CompletableFuture<String>> priceFuture = shops
.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getName() + " price is " + shop.getPrice(product), executor))
.collect(Collectors.toList());
return priceFuture
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}

经过测试处理5个商店 是1秒多,处理9个商店也是1秒多

并行–使用流还是CompletableFutures?

目前为止,我们已经知道对集合进行并行计算有两种方式,要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作,后者提供了更多的灵活性,你可以调整线程池大小,二者能帮助你确保整体计算机不会因为线程都在等待I/O而发生阻塞 我们使用这些API的建议如下:

  1. 如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的
  2. 反之,如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待).那么使用CompletableFuture是灵活性更好,你可以像前面讨论的那样,依据等待/计算,或者W/C的比率设定需要使用的线程数,

博主主页

可以加博主微信一起交流:twobixiaoxin