Spring Boot如何详细整合Reactor实例?

2026-05-24 02:133阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1771个文字,预计阅读时间需要8分钟。

Spring Boot如何详细整合Reactor实例?

目录+引言+1. 创建项目+2. 集成H2数据库+3. 创建测试类+3.1 user实体+3.2 UserRepository+3.3 UserService+3.4 UserController+3.5 SpringReactorApplication+添加注解支持+测试+总结+引言+Reactor是一个完整+非阻塞的框架

目录
  • 引言
  • 1 创建项目
  • 2 集成 H2 数据库
  • 3 创建测试类
    • 3.1 user 实体
    • 3.2 UserRepository
    • 3.3 UserService
    • 3.4 UserController
    • 3.5 SpringReactorApplication 添加注解支持
  • 测试
    • 总结

      引言

      Reactor 是一个完全非阻塞的 JVM响应式编程基础,有着高效的需求管理(背压的形式)。它直接整合 Java8 的函数式 API,尤其是CompletableFutureStream,还有Duration。提供了可组合的异步化序列 API — Flux (对于 [N] 个元素) and Mono (对于 [0|1] 元素) — 并广泛实现 响应式Stream 规范。

      这次带大家从零开始,使用 Spring Boot 框架建立一个 Reactor 响应式项目。

      1 创建项目

      使用;start.spring.io/创建项目。添加依赖项:H2、Lombok、Spring Web、JPA、JDBC

      然后导入Reactor

      <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency>

      2 集成 H2 数据库

      application.properties文件中添加H2数据连接信息。此外,端口使用 8081(随意,本地未被使用的端口即可)。

      server.port=8081 ################ H2 数据库 基础配置 ############## spring.datasource.driverClassName=org.h2.Driver spring.datasource.url=jdbc:h2:~/user spring.datasource.username=sa spring.datasource.password= spring.jpa.database=h2 spring.jpa.hibernate.ddl-auto=update spring.h2.console.path=/h2-console spring.h2.console.enable=true

      3 创建测试类

      3.1 user 实体

      建立简单数据操作实体 User。

      import lombok.Data; import lombok.NoArgsConstructor; import javax.persistence.*; /** * @Author: prepared * @Date: 2022/8/29 21:40 */ @Data @NoArgsConstructor @Table(name = "t_user") @Entity public class User { @Id @GeneratedValue(strategy = GenerationType.AUTO) private Long id; private String userName; private int age; private String sex; public User(String userName, int age, String sex) { this.userName = userName; this.age = age; this.sex = sex; } }

      3.2 UserRepository

      数据模型层使用JPA框架。

      import com.prepared.user.domain.User; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; /** * @Author: prepared * @Date: 2022/8/29 21:45 */ @Repository public interface UserRepository extends JpaRepository<User, Long> { }

      3.3 UserService

      service 增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。所有接口返回 Mono/Flux 对象。

      Spring Boot如何详细整合Reactor实例?

      最佳实践:所有的第三方接口、IO 耗时比较长的操作都可以放在 Mono 对象中。

      doOnError监控异常情况;

      doFinally监控整体执行情况,如:耗时、调用量监控等。

      import com.prepared.user.dao.UserRepository; import com.prepared.user.domain.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; import javax.annotation.Resource; import java.util.List; /** * @Author: prepared * @Date: 2022/8/29 21:45 */ @Service public class UserService { private Logger logger = LoggerFactory.getLogger(UserService.class); @Resource private UserRepository userRepository; public Mono<Boolean> save(User user) { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.save(user) != null; }) .doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("save.user.error, user={}, e", user, e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime); }); } public Mono<User> findById(Long id) { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.getReferenceById(id); }).doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("findById.user.error, id={}, e", id, e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("findById.user.time={}, id={}", id, System.currentTimeMillis() - startTime); }); } public Mono<List<User>> list() { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.findAll(); }).doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("list.user.error, e", e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime); }); } public Flux<User> listFlux() { long startTime = System.currentTimeMillis(); return Flux.fromIterable(userRepository.findAll()) .doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("list.user.error, e", e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime); }); } }

      3.4 UserController

      controller增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。

      list 方法还有另外一种写法,这就涉及到 Mono 和 Flux 的不同了。

      返回List可以使用Mono<List<User>>,也可以使用Flux<User>

      • Mono<T>是一个特定的Publisher<T>,最多可以发出一个元素
      • Flux<T>是一个标准的Publisher<T>,表示为发出 0 到 N 个元素的异步序列

      import com.prepared.user.domain.User; import com.prepared.user.service.UserService; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; /** * @Author: prepared * @Date: 2022/8/29 21:47 */ @RestController public class UserController { @Resource private UserService userService; @RequestMapping("/add") public Mono<Boolean> add() { User user = new User("xiaoming", 10, "F"); return userService.save(user) ; } @RequestMapping("/list") public Mono<List<User>> list() { return userService.list(); } } @RequestMapping("/listFlux") public Flux<User> listFlux() { return userService.listFlux(); }

      3.5 SpringReactorApplication 添加注解支持

      Application 启动类添加注解@EnableJpaRepositories

      import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; /** * Hello world! */ @SpringBootApplication @EnableJpaRepositories public class SpringReactorApplication { public static void main(String[] args) { SpringApplication.run(SpringReactorApplication.class, args); } }

      测试

      启动项目,访问localhost:8081/add,正常返回 true。

      查询所有数据,访问localhost:8081/list,可以看到插入的数据,已经查询出来了。PS:我这里执行了多次 add,所以有多条记录。

      后台日志:

      2022-09-05 20:13:17.385 INFO 15696 --- [nio-8082-exec-2] com.prepared.user.service.UserService : list.user.time=181,

      执行了UserService list()方法的doFinnally代码块,打印耗时日志。

      总结

      响应式编程的优势是不会阻塞。那么正常我们的代码中有哪些阻塞的操作呢?

      • Futureget()方法;
      • Reactor中的block()方法,subcribe()方法,所以在使用 Reactor 的时候,除非编写测试代码,否则不要直接调用以上两个方法;
      • 同步方法调用,所以高并发情况下,会使用异步调用(如Future)来提升响应速度

      以上就是Spring Boot 整合 Reactor实例详解的详细内容,更多关于Spring Boot 整合 Reactor的资料请关注自由互联其它相关文章!

      本文共计1771个文字,预计阅读时间需要8分钟。

      Spring Boot如何详细整合Reactor实例?

      目录+引言+1. 创建项目+2. 集成H2数据库+3. 创建测试类+3.1 user实体+3.2 UserRepository+3.3 UserService+3.4 UserController+3.5 SpringReactorApplication+添加注解支持+测试+总结+引言+Reactor是一个完整+非阻塞的框架

      目录
      • 引言
      • 1 创建项目
      • 2 集成 H2 数据库
      • 3 创建测试类
        • 3.1 user 实体
        • 3.2 UserRepository
        • 3.3 UserService
        • 3.4 UserController
        • 3.5 SpringReactorApplication 添加注解支持
      • 测试
        • 总结

          引言

          Reactor 是一个完全非阻塞的 JVM响应式编程基础,有着高效的需求管理(背压的形式)。它直接整合 Java8 的函数式 API,尤其是CompletableFutureStream,还有Duration。提供了可组合的异步化序列 API — Flux (对于 [N] 个元素) and Mono (对于 [0|1] 元素) — 并广泛实现 响应式Stream 规范。

          这次带大家从零开始,使用 Spring Boot 框架建立一个 Reactor 响应式项目。

          1 创建项目

          使用;start.spring.io/创建项目。添加依赖项:H2、Lombok、Spring Web、JPA、JDBC

          然后导入Reactor

          <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency>

          2 集成 H2 数据库

          application.properties文件中添加H2数据连接信息。此外,端口使用 8081(随意,本地未被使用的端口即可)。

          server.port=8081 ################ H2 数据库 基础配置 ############## spring.datasource.driverClassName=org.h2.Driver spring.datasource.url=jdbc:h2:~/user spring.datasource.username=sa spring.datasource.password= spring.jpa.database=h2 spring.jpa.hibernate.ddl-auto=update spring.h2.console.path=/h2-console spring.h2.console.enable=true

          3 创建测试类

          3.1 user 实体

          建立简单数据操作实体 User。

          import lombok.Data; import lombok.NoArgsConstructor; import javax.persistence.*; /** * @Author: prepared * @Date: 2022/8/29 21:40 */ @Data @NoArgsConstructor @Table(name = "t_user") @Entity public class User { @Id @GeneratedValue(strategy = GenerationType.AUTO) private Long id; private String userName; private int age; private String sex; public User(String userName, int age, String sex) { this.userName = userName; this.age = age; this.sex = sex; } }

          3.2 UserRepository

          数据模型层使用JPA框架。

          import com.prepared.user.domain.User; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; /** * @Author: prepared * @Date: 2022/8/29 21:45 */ @Repository public interface UserRepository extends JpaRepository<User, Long> { }

          3.3 UserService

          service 增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。所有接口返回 Mono/Flux 对象。

          Spring Boot如何详细整合Reactor实例?

          最佳实践:所有的第三方接口、IO 耗时比较长的操作都可以放在 Mono 对象中。

          doOnError监控异常情况;

          doFinally监控整体执行情况,如:耗时、调用量监控等。

          import com.prepared.user.dao.UserRepository; import com.prepared.user.domain.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; import javax.annotation.Resource; import java.util.List; /** * @Author: prepared * @Date: 2022/8/29 21:45 */ @Service public class UserService { private Logger logger = LoggerFactory.getLogger(UserService.class); @Resource private UserRepository userRepository; public Mono<Boolean> save(User user) { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.save(user) != null; }) .doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("save.user.error, user={}, e", user, e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime); }); } public Mono<User> findById(Long id) { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.getReferenceById(id); }).doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("findById.user.error, id={}, e", id, e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("findById.user.time={}, id={}", id, System.currentTimeMillis() - startTime); }); } public Mono<List<User>> list() { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.findAll(); }).doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("list.user.error, e", e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime); }); } public Flux<User> listFlux() { long startTime = System.currentTimeMillis(); return Flux.fromIterable(userRepository.findAll()) .doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("list.user.error, e", e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime); }); } }

          3.4 UserController

          controller增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。

          list 方法还有另外一种写法,这就涉及到 Mono 和 Flux 的不同了。

          返回List可以使用Mono<List<User>>,也可以使用Flux<User>

          • Mono<T>是一个特定的Publisher<T>,最多可以发出一个元素
          • Flux<T>是一个标准的Publisher<T>,表示为发出 0 到 N 个元素的异步序列

          import com.prepared.user.domain.User; import com.prepared.user.service.UserService; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; /** * @Author: prepared * @Date: 2022/8/29 21:47 */ @RestController public class UserController { @Resource private UserService userService; @RequestMapping("/add") public Mono<Boolean> add() { User user = new User("xiaoming", 10, "F"); return userService.save(user) ; } @RequestMapping("/list") public Mono<List<User>> list() { return userService.list(); } } @RequestMapping("/listFlux") public Flux<User> listFlux() { return userService.listFlux(); }

          3.5 SpringReactorApplication 添加注解支持

          Application 启动类添加注解@EnableJpaRepositories

          import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; /** * Hello world! */ @SpringBootApplication @EnableJpaRepositories public class SpringReactorApplication { public static void main(String[] args) { SpringApplication.run(SpringReactorApplication.class, args); } }

          测试

          启动项目,访问localhost:8081/add,正常返回 true。

          查询所有数据,访问localhost:8081/list,可以看到插入的数据,已经查询出来了。PS:我这里执行了多次 add,所以有多条记录。

          后台日志:

          2022-09-05 20:13:17.385 INFO 15696 --- [nio-8082-exec-2] com.prepared.user.service.UserService : list.user.time=181,

          执行了UserService list()方法的doFinnally代码块,打印耗时日志。

          总结

          响应式编程的优势是不会阻塞。那么正常我们的代码中有哪些阻塞的操作呢?

          • Futureget()方法;
          • Reactor中的block()方法,subcribe()方法,所以在使用 Reactor 的时候,除非编写测试代码,否则不要直接调用以上两个方法;
          • 同步方法调用,所以高并发情况下,会使用异步调用(如Future)来提升响应速度

          以上就是Spring Boot 整合 Reactor实例详解的详细内容,更多关于Spring Boot 整合 Reactor的资料请关注自由互联其它相关文章!