CompletableFuture中实现多个 REST 调用

创建软件功能时,日常活动是从不同来源检索数据并将其聚合到响应中。在微服务中,这些源通常是外部REST API。

在本教程中,我们将使用 Java 的CompletableFuture高效地并行地从多个外部 REST API 检索数据。

为什么在 REST 调用中使用并行性
让我们想象一个场景,我们需要更新对象中的各个字段,每个字段值都来自外部REST 调用。一种替代方法是按顺序调用每个 API 来更新每个字段。

然而,等待一个 REST 调用完成才能启动另一个 REST 调用会增加我们服务的响应时间。例如,如果我们调用两个 API,每个 API 需要 5 秒,则总时间将至少为 10 秒,因为第二个调用需要等待第一个调用完成。

相反,我们可以并行调用所有 API,这样总时间就是最慢的 REST 调用的时间。例如,一个呼叫需要 7 秒,另一个呼叫需要 5 秒。在这种情况下,我们将等待 7 秒,因为我们已经并行处理了所有内容,并且必须等待所有结果完成。

因此,并行性是减少服务响应时间、使其更具可扩展性并改善用户体验的绝佳替代方案。

使用CompletableFuture实现并行性
Java 中的CompletableFuture类是一个方便的工具,用于组合和运行不同的并行任务以及处理单个任务错误。

在以下部分中,我们将使用它为输入列表中的每个对象组合并运行三个 REST 调用。

1.创建演示应用程序
让我们首先定义更新的目标POJO :

public class Purchase {
    String orderDescription;
    String paymentDescription;
    String buyerName;
    String orderId;
    String paymentId;
    String userId;
    // all-arg constructor, getters and setters
}

Purchase类 具有三个应更新的字段,每个字段均由 ID 查询的不同 REST 调用进行更新。

我们首先创建一个类,定义一个 RestTemplate bean 和一个 用于 REST 调用的域URL :

@Component
public class PurchaseRestCallsAsyncExecutor {
    RestTemplate restTemplate;
    static final String BASE_URL = "https://internal-api.com";
   
// all-arg constructor
}

现在,让我们定义 / orders API 调用:

public String getOrderDescription(String orderId) {
    ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/orders/%s", BASE_URL, orderId),
        String.class);
    return result.getBody();
}

然后,让我们定义 / payment API 调用:

public String getPaymentDescription(String paymentId) {
    ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/payments/%s", BASE_URL, paymentId),
        String.class);
    return result.getBody();
}

最后,我们定义/users API 调用:

public String getUserName(String userId) {
    ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/users/%s", BASE_URL, userId),
        String.class);
    return result.getBody();
}

所有三个方法都使用getForEntity()方法进行 REST 调用并将结果包装在ResponseEntity对象中。

然后,我们调用getBody()从 REST 调用中获取响应正文。

2.使用CompletableFuture进行多个 REST 调用
现在,让我们创建构建并运行一组三个CompletableFuture的方法:

public void updatePurchase(Purchase purchase) {
    CompletableFuture.allOf(
      CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
        .thenAccept(purchase::setOrderDescription),
      CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
        .thenAccept(purchase::setPaymentDescription),
      CompletableFuture.supplyAsync(() -> getUserName(purchase.getUserId()))
        .thenAccept(purchase::setBuyerName)
    ).join();
}

我们使用 allOf()方法来构建CompletableFuture的步骤 。每个参数都是一个并行任务,其形式是使用 REST 调用及其结果构建的另一个CompletableFuture 。

为了构建每个并行任务,我们首先使用SupplyAsync()方法来提供 供应商,我们将从中检索数据。然后,我们使用thenAccept()来使用SupplyAsync()的结果 并将其设置到Purchasing类中的相应字段上。

在allOf()的末尾,我们刚刚构建了任务。没有采取任何行动。

最后,我们在最后调用join()来并行运行所有任务并收集它们的结果。由于join()是一个线程阻塞操作,因此我们只在最后调用它,而不是在每个任务步骤中调用它。这是为了通过减少线程块来优化应用程序性能。

由于我们没有为 SupplyAsync()方法提供自定义的ExecutorService,因此所有任务都在同一个执行器中运行。默认情况下,Java 使用ForkJoinPool.commonPool()。

一般来说,为 SupplyAsync()指定自定义ExecutorService是一个很好的做法,这样我们就可以更好地控制线程池参数。

3.对列表中的每个元素执行多个 REST 调用
要将updatePurchase()方法应用于集合,我们可以简单地在forEach()循环中调用它:

public void updatePurchases(List<Purchase> purchases) {
    purchases.forEach(this::updatePurchase);
}

我们的updatePurchase()方法接收Purchasing列表,并将之前创建的updatePurchase()方法应用于每个元素。

每次调用updatePurchases()都会运行CompletableFuture中定义的三个并行任务。因此,每次购买都有自己的 CompletableFuture对象来运行三个并行 REST 调用。

处理错误
在分布式系统中,服务不可用或网络故障是很常见的。这些故障可能发生在外部 REST API 中,而我们作为该 API 的客户端并不知道。例如,如果应用程序关闭,则通过线路发送的请求永远不会完成。

1.使用handle()优雅地处理错误
REST调用执行期间可能会出现异常。例如,如果 API 服务关闭或者我们输入无效参数,我们就会收到错误消息。

因此,我们可以使用handle()方法单独处理每个REST调用异常:

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

方法参数是一个BiFunction,包含前一个任务的结果和异常作为参数。

为了说明这一点,让我们将 handle()步骤添加到CompletableFuture的步骤之一中 :

public void updatePurchaseHandlingExceptions(Purchase purchase) {
    CompletableFuture.allOf(
        CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
          .thenAccept(purchase::setPaymentDescription)
          .handle((result, exception) -> {
              if (exception != null) {
                  // handle exception
                  return null;
              }
              return result;
          })
    ).join();
}

在示例中,  handle ()从thenAccept()调用的setOrderDescription()获取Void类型。

然后,它将thenAccept()操作中抛出的任何错误存储在异常中。因此,我们用它来检查错误并在if语句中正确处理它。

最后, 如果没有抛出异常, handle()将返回作为参数传递的值。否则,它返回 null。

2.处理 REST 调用超时
当我们使用 CompletableFuture时,我们可以指定一个类似于我们在 REST 调用中定义的任务超时。因此,如果任务未在指定时间内完成,Java 将通过 TimeoutException 结束任务执行。

为此,我们修改CompletableFuture 的一项任务来处理超时:

public void updatePurchaseHandlingExceptions(Purchase purchase) {
    CompletableFuture.allOf(
        CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
          .thenAccept(purchase::setOrderDescription)
          .orTimeout(5, TimeUnit.SECONDS)
          .handle((result, exception) -> {
              if (exception instanceof TimeoutException) {
                  // handle exception
                  return null;
              }
              return result;
          })
    ).join();
}

我们已将 orTimeout()行添加到 CompletableFuture构建器中,以便在5秒内未完成任务时突然停止任务执行。

我们还在handle()方法中 添加了一个if语句来单独处理 TimeoutException  。

向CompletableFuture添加超时可确保任务始终完成。这对于避免线程无限期挂起、等待可能永远不会完成的操作的结果非常重要。因此,它减少了长时间处于RUNNING状态的线程数量并提高了应用程序的运行状况。