在Spring Boot 3中构建WebFlux、R2DBC和Postgres响应式应用


在本文中,您将学习如何使用 Spring WebFlux、R2DBC 和 Postgres 数据库实现和测试响应式 Spring Boot 应用程序。我们将使用最新版本的 Spring Boot 3 创建两个用 Kotlin 编写的简单应用程序。我们的应用程序通过 HTTP 公开一些 REST 端点。为了测试它们之间的通信以及与 Postgres 数据库的集成,我们将使用 Testcontainers 和 Netty Mock Server。

GitHub 存储库:它包含两个应用程序employee-service和organization-service目录。

第一步,我们将添加几个与 Kotlin 相关的依赖项。除了标准库之外,我们还可以包含对 Jackson 的 Kotlin 支持(JSON 序列化/反序列化):

<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-stdlib</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.module</groupId>
  <artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-reflect</artifactId>
</dependency>

我们还需要包含两个 Spring Boot Starter。为了创建响应式 Spring @Controller,我们需要使用 Spring WebFlux 模块。借助 Spring Boot Data R2DBC Starter,我们可以以响应式方式使用 Spring Data Repositories。最后,我们必须包含 R2DBC 提供的 Postgres 驱动程序。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>r2dbc-postgresql</artifactId>
  <scope>runtime</scope>
</dependency>

我们的项目中有几个测试依赖项。我们需要包括一个标准的 Spring Boot Test Starter、带有 JUnit 5、Postgres 和 R2DBC 支持的 Testcontainers,最后是用于模拟反应式 API 的 Mock Server Netty 模块。还值得添加该spring-boot-testcontainers模块以利用 Spring Boot 和 Testcontainers 之间的内置集成。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>r2dbc</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>postgresql</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>junit-jupiter</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-testcontainers</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.mock-server</groupId>
  <artifactId>mockserver-netty</artifactId>
  <version>5.15.0</version>
  <scope>test</scope>
</dependency>

最后一个依赖项是可选的。我们可以在我们的应用程序中包含 Spring Boot Actuator。它将 R2DBC 连接状态添加到运行状况检查以及池状态的多个指标中。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>


业务逻辑代码:

@RestController
@RequestMapping("/employees")
class EmployeeController {

   @Autowired
   lateinit var repository : EmployeeRepository

   @GetMapping // (1)
   fun findAll() : Flux<Employee> = repository.findAll()

   @GetMapping("/{id}") // (2)
   fun findById(@PathVariable id: Int) : Mono<Employee> = 
      repository.findById(id)

   @GetMapping("/organization/{organizationId}") // (3)
   fun findByOrganizationId(@PathVariable organizationId: Int): 
      Flux<Employee> = repository.findByOrganizationId(organizationId)

   @PostMapping // (4)
   fun add(@RequestBody employee: Employee) : Mono<Employee> = 
      repository.save(employee)

}

这是我们的实现@RestController。使用存储库 bean 以响应方式与数据库进行交互。我们的端点还返回由 ReactorMono和Flux. 有三个查找端点和一个 POST 端点:

  • 搜查所有员工(1)
  • 员工搜索id (2)
  • 按组织搜索所有员工id (3)
  • 添加新员工(4)

还需要在 Spring Boot application.yml中配置数据库连接设置:

spring:
  application:
    name: employee-service
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/spring
    username: spring
    password: spring123


主类:

@SpringBootApplication
class EmployeeApplication {

   @Bean
   fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? {
      val initializer = ConnectionFactoryInitializer()
      initializer.setConnectionFactory(connectionFactory)
      initializer.setDatabasePopulator(
         ResourceDatabasePopulator(ClassPathResource("schema.sql")))
      return initializer
   }

}

fun main(args: Array<String>) {
   runApplication<EmployeeApplication>(*args)
}

然后只需将 schema.sql 文件放入该src/main/resources目录中即可。

CREATE TABLE employee (
  id SERIAL PRIMARY KEY, 
  name VARCHAR(255), 
  salary INT, 
  organization_id INT
);

更多点击标题,GitHub 存储库