Spring中实现面向写入的批量和批处理API

实现标准 REST API 涵盖了大多数典型用例。但是,基于 REST 的架构风格在处理任何批量或批处理操作时存在一些限制。

在本教程中,我们将学习如何在微服务中应用批量和批处理操作。此外,我们还将实现一些自定义的面向写入的批量和批处理 API。

bBulk批量处理和 Batch批处理API 介绍/b
批量bBulk/b和批bBatch/b操作这两个术语经常被互换使用。不过,两者之间还是有明显区别的。

1、通常情况下,批量bBulk/b操作是指对b同一类型/b的多个条目执行相同的操作。为每个请求调用相同的应用程序接口来执行批量操作可能是一种平常的方法。这种方法可能太慢,而且会浪费资源。相反,我们可以在一次往返中处理多个条目。

我们可以通过在一次调用中对同一类型的多个条目应用相同的操作来实现批量操作。这种对条目集合进行操作的方式可以减少整体延迟,提高应用程序性能。要实施批量操作,我们可以重新使用用于单个条目的现有端点,或者为批量方法创建一个单独的路由。

2、批处理bBatch/b操作通常意味着在b多个资源类型/b上执行不同的操作。批处理bBatch/b API 是在一次调用中对资源执行各种操作的捆绑。这些资源操作可能没有任何连贯性。每个请求路由都可能独立于其他路由。

简而言之,"批处理batch "一词是指批处理处理不同的请求。

我们并没有很多定义明确的标准或规范来实现批量bBulk/b或批处理bBatch/b操作。此外,许多流行的框架(如 Spring)也不支持批量bBulk/b操作。

不过,在本教程中,我们将使用常见的 REST 结构自定义实现批量bBulk/b和批处理​​​​​​​bBatch/b处理操作。


bSpring中的示例应用程序/b
假设我们需要构建一个支持批量和批处理操作的简单微服务。

b1. Maven 依赖项/b
首先,让我们包含 spring-boot-starter-web和spring-boot-starter-validation依赖项:

code<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>3.1.5</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-validation</artifactId>
    <version>3.1.5</version>
</dependency>/code
通过上述spring-boot-starter-validation依赖项,我们在应用程序中启用了输入数据验证。我们将需要它来验证批量和批处理请求的大小。

b2. 实现第一个 Spring 服务/b
我们将实现在存储库中创建、更新和删除数据的服务。

首先,让我们对 Customer 类进行建模:

codepublic class Customer implements Serializable {
    private String id;
    private String name;
    private String email;
    private String address;
    // standard getters and setters
}/code
接下来,让我们使用createCustomers()方法实现CustomerService 类,以在内存存储库中存储多个Customer对象:

code@Service
public class CustomerService {
    private final Map<String, Customer> customerRepoMap = new HashMap<>();
    public List<Customer> createCustomers(List<Customers> customers) {
        return customers.stream()
          .map(this::createCustomer)
          .filter(Optional::isPresent)
          .map(Optional::get)
          .collect(toList());
    }
}/code
然后,我们将实现createCustomer()方法来创建单个Customer对象:

codepublic Optional<Customer> createCustomer(Customer customer) {
    if (!customerRepoMap.containsKey(customer.getEmail()) && customer.getId() == 0) {
        Customer customerToCreate = new Customer(customerRepoMap.size() + 1, 
          customer.getName(), customer.getEmail());
        customerToCreate.setAddress(customer.getAddress());
        customerRepoMap.put(customerToCreate.getEmail(), customerToCreate);  
        return Optional.of(customerToCreate);
    }
    return Optional.empty();
}/code
在上述方法中,如果存储库中不存在客户,我们才会创建客户,否则,我们返回一个空对象。

类似地,我们将实现一种方法来更新现有的客户详细信息:

codeprivate Optional<Customer> updateCustomer(Customer customer) {
    Customer customerToUpdate = customerRepoMap.get(customer.getEmail());
    if (customerToUpdate != null && customerToUpdate.getId() == customer.getId()) {
        customerToUpdate.setName(customer.getName());
        customerToUpdate.setAddress(customer.getAddress());
    }
    return Optional.ofNullable(customerToUpdate);
}/code
最后,我们将实现一个deleteCustomer()方法来从存储库中删除现有的客户:

codepublic Optional<Customer> deleteCustomer(Customer customer) {
    Customer customerToDelete = customerRepoMap.get(customer.getEmail());
    if (customerToDelete != null && customerToDelete.getId() == customer.getId()) {
        customerRepoMap.remove(customer.getEmail());
    }
   return Optional.ofNullable(customerToDelete);
}/code

b3. 实现第二个 Spring 服务/b
我们还来实现另一项在存储库中获取和创建地址数据的服务。

首先,我们定义Address 类:

codepublic class Address implements Serializable {
    private int id;
    private String street;
    private String city;
    //standard getters and setters
}/code
然后,让我们用createAddress()方法实现AddressService 类:

codepublic Address createAddress(Address address) {
    Address createdAddress = null;
    String addressUniqueKey = address.getStreet().concat(address.getCity());
    if (!addressRepoMap.containsKey(addressUniqueKey)) {
        createdAddress = new Address(addressRepoMap.size() + 1, 
          address.getStreet(), address.getCity());
        addressRepoMap.put(addressUniqueKey, createdAddress);
    }
    return createdAddress;
}/code

b使用现有端点实现批量 API/b
现在让我们创建一个 API 来支持批量和单个项目创建操作。

b1. 实现批量控制器/b
我们将实现一个带有端点的BulkController类,以便在一次调用中创建一个或多个客户。

首先,我们将以 JSON 格式定义批量请求:

code
    {
        "name": "<name>",
        "email": "<email>",
        "address": "<address>"
    }
/code
通过这种方法,我们可以使用自定义HTTP标头 - X-ActionType -处理批量操作,以区分批量或单项操作。

然后,我们将在BulkController类中实现bulkCreateCustomers()方法并使用上述CustomerService 的方法:

code@PostMapping(path = "/customers/bulk")
public ResponseEntity<List<Customer>> bulkCreateCustomers(
  @RequestHeader(value="X-ActionType") String actionType, 
  @RequestBody @Valid @Size(min = 1, max = 20) List<Customer> customers) {
    List<Customer> customerList = actionType.equals("bulk") ? 
      customerService.createCustomers(customers) :
      singletonList(customerService.createCustomer(customers.get(0)).orElse(null));
    return new ResponseEntity<>(customerList, HttpStatus.CREATED);
}/code
在上面的代码中,我们使用X-ActionType标头来接受任何批量请求。此外,我们还使用@ Size注释添加了输入请求大小验证。代码决定是将整个列表传递给 createCustomers()还是仅将元素 0传递给 createCustomer()。

不同的创建函数返回一个列表或一个单个的Optional,因此我们将后者转换为 列表,以便在两种情况下 HTTP 响应相同。

b2. 验证批量 API/b
我们将运行应用程序并通过执行上述端点来验证批量操作:

code$ curl -i --request POST 'http://localhost:8080/api/customers/bulk' \
--header 'X-ActionType: bulk' \
--header 'Content-Type: application/json' \
--data-raw '
    {
        "name": "test1",
        "email": "test1@email.com",
        "address": "address1"
    },
    {
        "name": "test2",
        "email": "test2@email.com",
        "address": "address2"
    }
'/code
当客户创建完成后,我们将收到以下成功响应:

codeHTTP/1.1 201 
{"id":1,"name":"test1","email":"test1@email.com","address":"address1"},
{"id":1,"name":"test2","email":"test2@email.com","address":"address2"},
.../code
接下来,我们将实现另一种批量操作方法。

b使用不同的端点实现批量 API/b
在批量 API 中对同一资源执行不同的操作并不常见。不过,让我们看看最灵活的方法,看看如何做到这一点。

我们可以实现原子批量操作,其中整个请求在单个事务中成功或失败。或者,我们可以允许成功的更新独立于失败的更新进行,并通过响应指示它是完全成功还是部分成功。我们将实现其中的第二种。

b1. 定义请求和响应模型/b
让我们考虑在一次调用中创建、更新和删除多个客户的用例。

我们将批量请求定义为 JSON 格式:

code
    {
        "bulkActionType": "<CREATE OR UPDATE OR DELETE>",
        "customers":
            {
                "name": "<name>",
                "email": "<email>",
                "address": "<address>"
            }
       
    }
/code
首先,我们将上述 JSON 格式建模到CustomerBulkRequest 类中:

codepublic class CustomerBulkRequest {
    private BulkActionType bulkActionType;
    private List<Customer> customers;
    //standard getters and setters
}/code
接下来,我们将实现BulkActionType 枚举:

codepublic enum BulkActionType {
    CREATE, UPDATE, DELETE
}/code
然后,我们将CustomerBulkResponse 类定义为 HTTP 响应对象:

codepublic class CustomerBulkResponse {
    private BulkActionType bulkActionType;
    private List<Customer> customers;
    private BulkStatus status;
    //standard getters and setters
}/code
最后,我们将定义BulkStatus枚举来指定每个操作的返回状态:

codepublic enum BulkStatus {
    PROCESSED, PARTIALLY_PROCESSED, NOT_PROCESSED
}/code

b2. 实现批量控制器/b
我们将实现一个批量 API,该 API 接受批量请求并根据 bulkActionType枚举 进行处理 ,然后一起返回批量状态和客户数据。

首先,我们将在BulkController类中创建一个EnumMap ,并将BulkActionType 枚举映射 到其自己的CustomerService 的Function:

code@RestController
@RequestMapping("/api")
@Validated
public class BulkController {
    private final CustomerService customerService;
    private final EnumMap<BulkActionType, Function<Customer, Optional<Customer>>> bulkActionFuncMap = 
      new EnumMap<>(BulkActionType.class);
    public BulkController(CustomerService customerService) {
        this.customerService = customerService;
        bulkActionFuncMap.put(BulkActionType.CREATE, customerService::createCustomer);
        bulkActionFuncMap.put(BulkActionType.UPDATE, customerService::updateCustomer);
        bulkActionFuncMap.put(BulkActionType.DELETE, customerService::deleteCustomer);
    }
}/code
此 EnumMap提供请求类型与我们需要满足的 CustomerService上的方法之间的绑定。它帮助我们避免冗长的switch或if 语句。

我们可以将 EnumMap返回的 针对操作类型 的函数传递给Customer 对象流上的 map()方法:

codeList<Customer> customers = customerBulkRequest.getCustomers().stream()
   .map(bulkActionFuncMap.get(customerBulkRequest.getBulkActionType()))
   .../code
由于我们所有的 Function对象都从 Customer映射到 Optional<Customer>,这本质上使用流中的 map()操作来执行批量请求,并将生成的Customer留在流中(如果可用)。

让我们在完整的控制器方法中将它们放在一起:

code@PostMapping(path = "/customers/bulk")
public ResponseEntity<List<CustomerBulkResponse>> bulkProcessCustomers(
  @RequestBody @Valid @Size(min = 1, max = 20) 
  List<CustomerBulkRequest> customerBulkRequests) {
    List<CustomerBulkResponse> customerBulkResponseList = new ArrayList<>();
    customerBulkRequests.forEach(customerBulkRequest -> {
        List<Customer> customers = customerBulkRequest.getCustomers().stream()
          .map(bulkActionFuncMap.get(customerBulkRequest.getBulkActionType()))
          .filter(Optional::isPresent)
          .map(Optional::get)
          .collect(toList());
        
        BulkStatus bulkStatus = getBulkStatus(customerBulkRequest.getCustomers(), 
          customers);     
        customerBulkResponseList.add(CustomerBulkResponse.getCustomerBulkResponse(customers, 
          customerbulkRequest.getBulkActionType(), bulkStatus));
    });
    return new ResponseEntity<>(customerBulkResponseList, HttpStatus.Multi_Status);
}/code
此外,我们将完成getBulkStatus方法,以根据创建的客户数量 返回特定的bulkStatus 枚举:

codeprivate BulkStatus getBulkStatus(List<Customer> customersInRequest, 
  List<Customer> customersProcessed) {
    if (!customersProcessed.isEmpty()) {
        return customersProcessed.size() == customersInRequest.size() ?
          BulkStatus.PROCESSED : 
          BulkStatus.PARTIALLY_PROCESSED;
    }
    return BulkStatus.NOT_PROCESSED;
}/code
我们应该注意,还可以考虑添加针对每个操作之间任何冲突的输入验证。

b3. 验证批量 API/b
我们将运行该应用程序并调用上述端点,即 /customers/bulk:

code$ curl -i --request POST 'http://localhost:8080/api/customers/bulk' \
--header 'Content-Type: application/json' \
--data-raw '
    {
        "bulkActionType": "CREATE",
        "customers":
            {
                "name": "test4",
                "email": "test4@email.com",
                ...
            }
       
    },
    {
        "bulkActionType": "UPDATE",
        "customers":
            ...
       
    },
    {
        "bulkActionType": "DELETE",
        "customers":
            ...
       
    }
'/code
现在让我们验证一下成功的响应:

codeHTTP/1.1 207 
{"customers":{"id":4,"name":"test4","email":"test4@email.com","address":"address4"},"status":"PROCESSED","bulkType":"CREATE"},
.../code
接下来,我们将实现一个批处理 API,它将客户和地址捆绑在一个批处理调用中。

b实现批batch处理 API/b
通常,批次 API 请求是具有其自己的方法、资源 URL 和有效负载的子请求的集合。

我们将实现一个批处理 API,用于创建和更新两种资源类型。当然,我们可以包含其他操作,例如删除操作。但为简单起见,我们仅考虑 POST和PATCH方法。

b1. 实现批请求模型/b
首先,我们将以 JSON 格式定义混合数据请求模型:

code
    {
        "method": "POST",
        "relativeUrl": "/address",
        "data": {
            "street": "<street>",
            "city": "<city>"
        }
    },
    {
        "method": "PATCH",
        "relativeUrl": "/customer",
        "data": {
            "id": "<id>",
            "name": "<name>",
            "email": "<email>",
            "address": "<address>"
        }
    }
/code
我们将把上述 JSON 结构实现为BatchRequest 类:

codepublic class BatchRequest {
    private HttpMethod method;
    private String relativeUrl;
    private JsonNode data;
    //standard getters and setters
}/code

b2. 实现批处理控制器/b
我们将实现一个批处理 API,以便在单个请求中创建地址并向客户更新其地址。为简单起见,我们将在同一个微服务中编写此 API。在另一种架构模式中,我们可能选择在并行调用各个端点的其他微服务中实现它。

使用上面的BatchRequest类,我们将遇到将JsonNode反序列化为特定类型类的问题。我们可以通过使用ObjectMapper 的convertValue方法将JsonNode转换为强类型对象来轻松解决这个问题。

对于批处理 API,我们将根据BatchRequest 类中请求的HttpMethod和relatedUrl参数调用AddressService或CustomerService方法。

 我们将在BatchController 类中实现批处理端点:

code@PostMapping(path = "/batch")
public String batchUpdateCustomerWithAddress(
  @RequestBody @Valid @Size(min = 1, max = 20) List<BatchRequest> batchRequests) {
    batchRequests.forEach(batchRequest -> {
        if (batchRequest.getMethod().equals(HttpMethod.POST) && 
          batchRequest.getRelativeUrl().equals("/address")) {
            addressService.createAddress(objectMapper.convertValue(batchRequest.getData(), 
              Address.class));
        } else if (batchRequest.getMethod().equals(HttpMethod.PATCH) && 
            batchRequest.getRelativeUrl().equals("/customer")) {
            customerService.updateCustomer(objectMapper.convertValue(batchRequest.getData(), 
              Customer.class));
        }
    });
    return "Batch update is processed";
}/code

b3. 验证批处理 API/b
我们将执行上述/batch端点:

code$ curl -i --request POST 'http://localhost:8080/api/batch' \
--header 'Content-Type: application/json' \
--data-raw '
    {
        "method": "POST",
        "relativeUrl": "/address",
        "data": {
            "street": "test1",
            "city": "test"
        }
    },
    {
        "method": "PATCH",
        "relativeUrl": "/customer",
        "data": {
            "id": "1",
            "name": "test1",
            "email": "test1@email.com",
            "address": "address2"
        }
    }
'/code
我们将验证以下回应:

codeHTTP/1.1 200
Batch update is processed/code

b结论/b
在本文中,我们学习了如何在 Spring 应用程序中应用批操作。我们还了解了它们的功能和区别。

对于批量bulk操作,我们在两个不同的 API 中实现了它,一个是重用现有的POST端点来创建多个资源,另一个是创建单独的端点以允许对同一类型的多个资源进行多个操作。

我们还实现了批batch处理 API,允许我们将不同的操作应用于不同的资源。批处理 API 使用 HttpMethod和relativeUrl以及有效负载组合不同的子请求。