Spring Boot Reactor如何与Resilience4j高效整合?

2026-05-24 04:092阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2192个文字,预计阅读时间需要9分钟。

Spring Boot Reactor如何与Resilience4j高效整合?

目录 + 1 + 引入 pom 包 + 2 + 配置说明 + 2.1 + 限流 + RateLimiter + 2.2 + 重试 + Retry + 2.3 + 超时 + TimeLimiter + 2.4 + 断路器 + CircuitBreaker + 2.5 + 仓库 + Bulkhead + 2.5.1 + SemaphoreBulkhead + 2.5.2 + FixedThreadPoolBulkhead + 3 + 使用 + 3.1 + 配置 + 3.2 + 使用注解

目录
  • 1 引入 pom 包
  • 2 配置说明
    • 2.1 限流 ratelimiter
    • 2.2 重试 retry
    • 2.3 超时 TimeLimiter
    • 2.4 断路器 circuitbreaker
    • 2.5 壁仓 bulkhead
      • 2.5.1 SemaphoreBulkhead
      • 2.5.2 FixedThreadPoolBulkhead
  • 3 使用
    • 3.1 配置
      • 3.2 使用注解实现

      1 引入 pom 包

      <dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-all</artifactId> </dependency> <dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-spring-boot2</artifactId> </dependency>

      2 配置说明

      2.1 限流 ratelimiter

      两个限流配置:backendA 1s 中最多允许 10 次请求;

      backendB 每 500ms 最多允许 6 次请求。

      resilience4j.ratelimiter: instances: backendA: limitForPeriod: 10 limitRefreshPeriod: 1s timeoutDuration: 10ms registerHealthIndicator: true eventConsumerBufferSize: 100 backendB: limitForPeriod: 6 limitRefreshPeriod: 500ms timeoutDuration: 3s 配置属性默认值描述timeoutDuration5一个线程等待许可的默认等待时间limitRefreshPeriod500限制刷新的周期。在每个周期之后,速率限制器将其权限计数设置回 limitForPeriod 值limitForPeriod50一个 limitRefreshPeriod (周期)允许访问的数量(许可数量)

      2.2 重试 retry

      注意指定需要重试的异常,不是所有的异常重试都有效。比如 DB 相关校验异常,如唯一约束等,重试也不会成功的。

      重试配置:

      resilience4j.retry: instances: backendA: maxAttempts: 3 waitDuration: 10s enableExponentialBackoff: true exponentialBackoffMultiplier: 2 retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException backendB: maxAttempts: 3 waitDuration: 10s retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException 配置属性默认值描述maxAttempts3最大重试次数(包括第一次)waitDuration500两次重试之间的等待间隔intervalFunctionnumOfAttempts -> waitDuration修改失败后等待间隔的函数。默认情况下,等待时间是个常量。retryOnResultPredicateresult->false配置一个判断结果是否应该重试的 predicate 函数。如果结果应该重试,Predicate 必须返回 true,否则它必须返回 false。retryExceptionPredicatethrowable -> true和 retryOnResultPredicate 类似,如果要重试,Predicate 必须返回true,否则返回 false。retryExceptions空需要重试的异常类型列表ignoreExceptions空不需要重试的异常类型列表failAfterMaxAttemptsfalse当重试达到配置的 maxAttempts 并且结果仍未通过 retryOnResultPredicate 时启用或禁用抛出 MaxRetriesExceededException 的布尔值intervalBiFunction(numOfAttempts, Either<throwable, result>) -> waitDuration根据 maxAttempts 和结果或异常修改失败后等待间隔时间的函数。与 intervalFunction 一起使用时会抛出 IllegalStateException。

      2.3 超时 TimeLimiter

      超时配置:

      resilience4j.timelimiter: instances: backendA: timeoutDuration: 2s cancelRunningFuture: true backendB: timeoutDuration: 1s cancelRunningFuture: false

      超时配置比较简单,主要是配置 timeoutDuration 也就是超时的时间。

      cancelRunningFuture 的意思是:是否应该在运行的 Future 调用 cancel 去掉调用。

      2.4 断路器 circuitbreaker

      断路器有几种状态:关闭、打开、半开。注意:打开,意味着不能访问,会迅速失败。

      CircuitBreaker 使用滑动窗口来存储和汇总调用结果。您可以在基于计数的滑动窗口和基于时间的滑动窗口之间进行选择。基于计数的滑动窗口聚合最后 N 次调用的结果。基于时间的滑动窗口聚合了最后 N 秒的调用结果。

      断路器配置:

      resilience4j.circuitbreaker: instances: backendA: // 健康指标参数,非断路器属性 registerHealthIndicator: true slidingWindowSize: 100 配置属性默认值描述slidingWindowSize100记录断路器关闭状态下(可以访问的情况下)的调用的滑动窗口大小failureRateThreshold50(百分比)当失败比例超过 failureRateThreshold 的时候,断路器会打开,并开始短路呼叫slowCallDurationThreshold60000请求被定义为慢请求的阈值slowCallRateThreshold100(百分比)慢请求百分比大于等于该值时,打开断路器开关permittedNumberOfCalls10半开状态下允许通过的请求数maxWaitDurationInHalfOpenState0配置最大等待持续时间,该持续时间控制断路器在切换到打开之前可以保持在半开状态的最长时间。

      值 0 表示断路器将在 HalfOpen 状态下无限等待,直到所有允许的调用都已完成。

      2.5 壁仓 bulkhead

      resilience4j 提供了两种实现壁仓的方法:

      • SemaphoreBulkhead使用 Semaphore 实现
      • FixedThreadPoolBulkhead使用有界队列和固定线程池实现

      resilience4j.bulkhead: instances: backendA: maxConcurrentCalls: 10 backendB: maxWaitDuration: 10ms maxConcurrentCalls: 20 resilience4j.thread-pool-bulkhead: instances: backendC: maxThreadPoolSize: 1 coreThreadPoolSize: 1 queueCapacity: 1

      2.5.1 SemaphoreBulkhead

      配置属性默认值描述maxConcurrentCalls25允许的并发执行的数量maxWaitDuration0尝试进入饱和隔板时线程应被阻止的最长时间

      Spring Boot Reactor如何与Resilience4j高效整合?

      2.5.2 FixedThreadPoolBulkhead

      配置属性默认值描述maxThreadPoolSizeRuntime.getRuntime().availableProcessors()线程池最大线程个数coreThreadPoolSizeRuntime.getRuntime().availableProcessors()-1线程池核心线程个数queueCapacity100线程池队列容量keepAliveDuration20线程数超过核心线程数之后,空余线程在终止之前等待的最长时间

      3 使用

      3.1 配置

      在 application.yml 文件中添加以下 resilience4j 配置:

      resilience4j.circuitbreaker: instances: backendA: registerHealthIndicator: true slidingWindowSize: 100 resilience4j.retry: instances: backendA: maxAttempts: 3 waitDuration: 10s enableExponentialBackoff: true exponentialBackoffMultiplier: 2 retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException backendB: maxAttempts: 3 waitDuration: 10s retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException resilience4j.bulkhead: instances: backendA: maxConcurrentCalls: 10 backendB: maxWaitDuration: 10ms maxConcurrentCalls: 20 resilience4j.thread-pool-bulkhead: instances: backendC: maxThreadPoolSize: 1 coreThreadPoolSize: 1 queueCapacity: 1 resilience4j.ratelimiter: instances: backendA: limitForPeriod: 10 limitRefreshPeriod: 1s timeoutDuration: 10ms registerHealthIndicator: true eventConsumerBufferSize: 100 backendB: limitForPeriod: 6 limitRefreshPeriod: 500ms timeoutDuration: 3s resilience4j.timelimiter: instances: backendA: timeoutDuration: 2s cancelRunningFuture: true backendB: timeoutDuration: 1s cancelRunningFuture: false

      3.2 使用注解实现

      直接在需要限流的方法上增加注解@RateLimiter实现限流;增加注解@Retry实现重试;增加注解@CircuitBreaker熔断;增加注解@Bulkhead实现壁仓。name 属性中分别填写限流器、重试、熔断、壁仓组件的名字。

      @Bulkhead(name = "backendA") @CircuitBreaker(name = "backendA") @Retry(name = "backendA") @RateLimiter(name = "backendA") public Mono<List<User>> list() { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.findAll(); }).doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("list.user.error, e", e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime); }); } @Bulkhead(name = "backendA") @CircuitBreaker(name = "backendA")//最多支持10个并发量 @Retry(name = "backendA")//使用 backendA 重试器,如果抛出 IOException 会重试三次。 @RateLimiter(name = "backendA")// 限流 10 Qps public Mono<Boolean> save(User user) { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.save(user) != null; }) .doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("save.user.error, user={}, e", user, e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime); }); }

      注意:以上所有组件,都支持自定义。

      到此这篇关于Spring Boot Reactor 整合 Resilience4j详析的文章就介绍到这了,更多相关Spring Boot Reactor 内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!

      本文共计2192个文字,预计阅读时间需要9分钟。

      Spring Boot Reactor如何与Resilience4j高效整合?

      目录 + 1 + 引入 pom 包 + 2 + 配置说明 + 2.1 + 限流 + RateLimiter + 2.2 + 重试 + Retry + 2.3 + 超时 + TimeLimiter + 2.4 + 断路器 + CircuitBreaker + 2.5 + 仓库 + Bulkhead + 2.5.1 + SemaphoreBulkhead + 2.5.2 + FixedThreadPoolBulkhead + 3 + 使用 + 3.1 + 配置 + 3.2 + 使用注解

      目录
      • 1 引入 pom 包
      • 2 配置说明
        • 2.1 限流 ratelimiter
        • 2.2 重试 retry
        • 2.3 超时 TimeLimiter
        • 2.4 断路器 circuitbreaker
        • 2.5 壁仓 bulkhead
          • 2.5.1 SemaphoreBulkhead
          • 2.5.2 FixedThreadPoolBulkhead
      • 3 使用
        • 3.1 配置
          • 3.2 使用注解实现

          1 引入 pom 包

          <dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-all</artifactId> </dependency> <dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-spring-boot2</artifactId> </dependency>

          2 配置说明

          2.1 限流 ratelimiter

          两个限流配置:backendA 1s 中最多允许 10 次请求;

          backendB 每 500ms 最多允许 6 次请求。

          resilience4j.ratelimiter: instances: backendA: limitForPeriod: 10 limitRefreshPeriod: 1s timeoutDuration: 10ms registerHealthIndicator: true eventConsumerBufferSize: 100 backendB: limitForPeriod: 6 limitRefreshPeriod: 500ms timeoutDuration: 3s 配置属性默认值描述timeoutDuration5一个线程等待许可的默认等待时间limitRefreshPeriod500限制刷新的周期。在每个周期之后,速率限制器将其权限计数设置回 limitForPeriod 值limitForPeriod50一个 limitRefreshPeriod (周期)允许访问的数量(许可数量)

          2.2 重试 retry

          注意指定需要重试的异常,不是所有的异常重试都有效。比如 DB 相关校验异常,如唯一约束等,重试也不会成功的。

          重试配置:

          resilience4j.retry: instances: backendA: maxAttempts: 3 waitDuration: 10s enableExponentialBackoff: true exponentialBackoffMultiplier: 2 retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException backendB: maxAttempts: 3 waitDuration: 10s retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException 配置属性默认值描述maxAttempts3最大重试次数(包括第一次)waitDuration500两次重试之间的等待间隔intervalFunctionnumOfAttempts -> waitDuration修改失败后等待间隔的函数。默认情况下,等待时间是个常量。retryOnResultPredicateresult->false配置一个判断结果是否应该重试的 predicate 函数。如果结果应该重试,Predicate 必须返回 true,否则它必须返回 false。retryExceptionPredicatethrowable -> true和 retryOnResultPredicate 类似,如果要重试,Predicate 必须返回true,否则返回 false。retryExceptions空需要重试的异常类型列表ignoreExceptions空不需要重试的异常类型列表failAfterMaxAttemptsfalse当重试达到配置的 maxAttempts 并且结果仍未通过 retryOnResultPredicate 时启用或禁用抛出 MaxRetriesExceededException 的布尔值intervalBiFunction(numOfAttempts, Either<throwable, result>) -> waitDuration根据 maxAttempts 和结果或异常修改失败后等待间隔时间的函数。与 intervalFunction 一起使用时会抛出 IllegalStateException。

          2.3 超时 TimeLimiter

          超时配置:

          resilience4j.timelimiter: instances: backendA: timeoutDuration: 2s cancelRunningFuture: true backendB: timeoutDuration: 1s cancelRunningFuture: false

          超时配置比较简单,主要是配置 timeoutDuration 也就是超时的时间。

          cancelRunningFuture 的意思是:是否应该在运行的 Future 调用 cancel 去掉调用。

          2.4 断路器 circuitbreaker

          断路器有几种状态:关闭、打开、半开。注意:打开,意味着不能访问,会迅速失败。

          CircuitBreaker 使用滑动窗口来存储和汇总调用结果。您可以在基于计数的滑动窗口和基于时间的滑动窗口之间进行选择。基于计数的滑动窗口聚合最后 N 次调用的结果。基于时间的滑动窗口聚合了最后 N 秒的调用结果。

          断路器配置:

          resilience4j.circuitbreaker: instances: backendA: // 健康指标参数,非断路器属性 registerHealthIndicator: true slidingWindowSize: 100 配置属性默认值描述slidingWindowSize100记录断路器关闭状态下(可以访问的情况下)的调用的滑动窗口大小failureRateThreshold50(百分比)当失败比例超过 failureRateThreshold 的时候,断路器会打开,并开始短路呼叫slowCallDurationThreshold60000请求被定义为慢请求的阈值slowCallRateThreshold100(百分比)慢请求百分比大于等于该值时,打开断路器开关permittedNumberOfCalls10半开状态下允许通过的请求数maxWaitDurationInHalfOpenState0配置最大等待持续时间,该持续时间控制断路器在切换到打开之前可以保持在半开状态的最长时间。

          值 0 表示断路器将在 HalfOpen 状态下无限等待,直到所有允许的调用都已完成。

          2.5 壁仓 bulkhead

          resilience4j 提供了两种实现壁仓的方法:

          • SemaphoreBulkhead使用 Semaphore 实现
          • FixedThreadPoolBulkhead使用有界队列和固定线程池实现

          resilience4j.bulkhead: instances: backendA: maxConcurrentCalls: 10 backendB: maxWaitDuration: 10ms maxConcurrentCalls: 20 resilience4j.thread-pool-bulkhead: instances: backendC: maxThreadPoolSize: 1 coreThreadPoolSize: 1 queueCapacity: 1

          2.5.1 SemaphoreBulkhead

          配置属性默认值描述maxConcurrentCalls25允许的并发执行的数量maxWaitDuration0尝试进入饱和隔板时线程应被阻止的最长时间

          Spring Boot Reactor如何与Resilience4j高效整合?

          2.5.2 FixedThreadPoolBulkhead

          配置属性默认值描述maxThreadPoolSizeRuntime.getRuntime().availableProcessors()线程池最大线程个数coreThreadPoolSizeRuntime.getRuntime().availableProcessors()-1线程池核心线程个数queueCapacity100线程池队列容量keepAliveDuration20线程数超过核心线程数之后,空余线程在终止之前等待的最长时间

          3 使用

          3.1 配置

          在 application.yml 文件中添加以下 resilience4j 配置:

          resilience4j.circuitbreaker: instances: backendA: registerHealthIndicator: true slidingWindowSize: 100 resilience4j.retry: instances: backendA: maxAttempts: 3 waitDuration: 10s enableExponentialBackoff: true exponentialBackoffMultiplier: 2 retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException backendB: maxAttempts: 3 waitDuration: 10s retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException resilience4j.bulkhead: instances: backendA: maxConcurrentCalls: 10 backendB: maxWaitDuration: 10ms maxConcurrentCalls: 20 resilience4j.thread-pool-bulkhead: instances: backendC: maxThreadPoolSize: 1 coreThreadPoolSize: 1 queueCapacity: 1 resilience4j.ratelimiter: instances: backendA: limitForPeriod: 10 limitRefreshPeriod: 1s timeoutDuration: 10ms registerHealthIndicator: true eventConsumerBufferSize: 100 backendB: limitForPeriod: 6 limitRefreshPeriod: 500ms timeoutDuration: 3s resilience4j.timelimiter: instances: backendA: timeoutDuration: 2s cancelRunningFuture: true backendB: timeoutDuration: 1s cancelRunningFuture: false

          3.2 使用注解实现

          直接在需要限流的方法上增加注解@RateLimiter实现限流;增加注解@Retry实现重试;增加注解@CircuitBreaker熔断;增加注解@Bulkhead实现壁仓。name 属性中分别填写限流器、重试、熔断、壁仓组件的名字。

          @Bulkhead(name = "backendA") @CircuitBreaker(name = "backendA") @Retry(name = "backendA") @RateLimiter(name = "backendA") public Mono<List<User>> list() { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.findAll(); }).doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("list.user.error, e", e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime); }); } @Bulkhead(name = "backendA") @CircuitBreaker(name = "backendA")//最多支持10个并发量 @Retry(name = "backendA")//使用 backendA 重试器,如果抛出 IOException 会重试三次。 @RateLimiter(name = "backendA")// 限流 10 Qps public Mono<Boolean> save(User user) { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.save(user) != null; }) .doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("save.user.error, user={}, e", user, e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime); }); }

          注意:以上所有组件,都支持自定义。

          到此这篇关于Spring Boot Reactor 整合 Resilience4j详析的文章就介绍到这了,更多相关Spring Boot Reactor 内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!