1

CompletableFuture组合式异步编程

 2 years ago
source link: https://perkins4j2.github.io/posts/13999100/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Callable与Runnable

java.lang.Runnable吧,它是一个接口,在它里面只声明了一个run()方法,由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call(),这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

Future 接口

要使用Future,通常你只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService。如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成,返回相应的结果。

  • 将两个异步计算合并为一个
  • 等待Future集合中的所有任务都完成
  • 仅等待Future集合中最快结束的任务完成
  • 通过编程方式完成一个Future任务的执行
  • 应对Future的完成事件

同步API,使调用方和被调用方在不同的线程中运行,调用方还是需要等待被调用方结束运行,这就是阻塞式调用。

异步API会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交给另一个线程去做,该线程和调用方是异步的——这就是非阻塞式调用的由来。

FutureTask

为避免发生客户端被阻塞的风险,使用FutureTask执行完毕可以发送一个通知,仅在计算结果可用时执行一个由Lambda表达式或者方法引用定义的回调函数。

Task task = new Task();// 新建异步任务
FutureTask<Integer> future = new FutureTask<Integer>(task) {
// 异步任务执行完成,回调
@Override
protected void done() {
try {
System.out.println("future.done():" + get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
};
// 创建线程池(使用了预定义的配置)
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(future);

parallelStream并行流

public List<String> findPrices(String product) { 
return shops.parallelStream()
.map(shop -> String.format("%s price is %.2f",
shop.getName(), shop.getPrice(product)))
.collect(toList());
}

CompletableFuture异步

public List<String> findPrices(String product) { 
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getName() + " price is " +
shop.getPrice(product)))
.collect(Collectors.toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
  • 如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的。
  • 如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程。
  • 如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,可以设定需要使用的线程数。
  • 涉及等待I/O的操作不使用并行流的另一个原因是,处理流的
    流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。

CompletableFuture内的异常

public Future<Double> getPriceAsync(String product) { 
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread( () -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch (Exception ex) {
futurePrice.completeExceptionally(ex);
}
}).start();
return futurePrice;
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK