What are the best practices in Java Reactive Programming?

2026-05-19 12:391阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计4549个文字,预计阅读时间需要19分钟。

What are the best practices in Java Reactive Programming?

Java 响应式编程在 Spring Boot 中,实现了响应式编程,提升了性能和内存使用效率。详情见:Spring:阻塞与非阻塞:R2DBC 对比 JDBC,WebFlux 对比 Web MVC,克服困难但各有特色。

Java Reactive Programming 响应式编程

在 Spring Boot 中,支持了响应式编程,带来了性能和内存使用方面的优化。

详见:

  • Spring: Blocking vs non-blocking: R2DBC vs JDBC and WebFlux vs Web MVC
困难

但是不同于 async/await 模式,响应式编程也给编码带来了一些困难,主要如下:

  • 一个代码块只能最多调用一个响应式 API。
  • null 处理不友好,甚至是灾难性的。
响应式编程的规则
  • 控制层,返回响应式对象,大多数情况下使用 Mono<T>

  • 服务层,使用@Transactional 的 API 必须返回响应式对象。

  • 数据访问层(R2DBC)返回响应式对象: Mono<T>, Flux<T>

  • 使用响应式方法的 API 尽量返回响应式对象。

  • 不要使用任何 block(), blockFirst(), share().block() 等 API,会引起严重的性能问题。

  • 在重载传统接口的情况下使用 subscribe()

  • 对于计数的 API,使用 Mono<Long> 作为返回对象。
    这是因为 Flux.count() 返回的是一个Mono<Long>。因此在其它的计数 API 中使用 Mono<Long> 作为返回对象,让我们可以保持一致。

  • Mono<Void>null

    • 响应式 API 不能返回 null 或者 Mono.just(null) 或者其等价方式。
      会引起下面的错误:

      Caused by: java.lang.NullPointerException: The mapper returned a null value.

    • Mono.empty() 不能调用随后的映射方法 map()flatMap()transform()等。
    • Mono.empty() 在 doOnSuccess() 等函数中获取值是 null
    • Flux 中使用元素 Mono<Void> 可以调用随后的映射方法 colllectList()等方法。
      Mono<Void> 不会被记入 count()colllectList()
    • 建议: 避免定义返回 Mono<Void>的方法。
      这种返回不能调用随后的映射方法 map()flatMap()transform()等方法。
      一个例外是,这个方法在控制层的最后被调用。
响应式编程模式

响应式编程是一种流编程,我把编程模式分为: 启动模式、映射模式、返回模式、异常模式。

空模式
  • 响应式 API 不能返回 null 或者 Mono.just(null) 或者其等价方式。
    会引起下面的错误:

    Caused by: java.lang.NullPointerException: The mapper returned a null value.

public static Mono<Void> monoNullTest() { return Mono.just(null); } monoNullTest().log().subscribe(); /* Exception in thread "main" java.lang.NullPointerException: value at java.base/java.util.Objects.requireNonNull(Objects.java:246) at reactor.core.publisher.MonoJust.<init>(MonoJust.java:35) at reactor.core.publisher.Mono.just(Mono.java:719) at demo.ReactiveVoidTest.monoNullTest(ReactiveVoidTest.java:22) at demo.ReactiveVoidTest.main(ReactiveVoidTest.java:16) */

  • Mono.empty() 不能调用随后的映射方法 map()flatMap()transform()等。

public static Mono<Integer> monoVoidTest() { logger.info("case: mono void test"); return Mono.empty().map(o -> { logger.info(MessageFormat.format("map: {0}", o)); return o; }).doOnSuccess(o -> { logger.info(MessageFormat.format("doOnSuccess: {0}", o)); }).thenReturn(1); } monoVoidTest().log().subscribe(); /* 10:52:06.993 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 10:52:07.023 [main] INFO reactor.Mono.IgnoreThen.1 - onSubscribe(MonoIgnoreThen.ThenIgnoreMain) 10:52:07.030 [main] INFO reactor.Mono.IgnoreThen.1 - request(unbounded) 10:52:07.036 [main] INFO demo.ReactiveVoidTest - doOnSuccess: null 10:52:07.038 [main] INFO reactor.Mono.IgnoreThen.1 - onNext(1) 10:52:07.042 [main] INFO reactor.Mono.IgnoreThen.1 - onComplete() */

  • Flux 中使用元素 Mono<Void> 可以调用随后的映射方法 colllectList()等方法。
    Mono<Void> 不会被记入 count()colllectList()

public static Mono<String> fluxVoidTest() { logger.info("case: flux void test"); return Flux.fromIterable(Arrays.asList(0, 1, 2)).flatMap(o -> { logger.info(MessageFormat.format("emit an empty: {0}", o)); return Mono.empty(); }).map(o -> { logger.info(MessageFormat.format("map: {0}", o)); return o; }).count().doOnSuccess(o -> { logger.info(MessageFormat.format("doOnSuccess: count is {0}", o)); }).map(o -> { return "abc"; }); } fluxVoidTest().log().subscribe(); /* 11:20:35.788 [main] INFO demo.ReactiveVoidTest - case: flux void test 11:20:35.986 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 11:20:36.121 [main] INFO reactor.Mono.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) 11:20:36.127 [main] INFO reactor.Mono.MapFuseable.1 - | request(unbounded) 11:20:36.129 [main] INFO demo.ReactiveVoidTest - emit an empty: 0 11:20:36.130 [main] INFO demo.ReactiveVoidTest - emit an empty: 1 11:20:36.131 [main] INFO demo.ReactiveVoidTest - emit an empty: 2 11:20:36.133 [main] INFO demo.ReactiveVoidTest - doOnSuccess: count is 0 11:20:36.134 [main] INFO reactor.Mono.MapFuseable.1 - | onNext(abc) 11:20:36.138 [main] INFO reactor.Mono.MapFuseable.1 - | onComplete() */ 异常模式

响应式编程对于异常处理,建议使用下面的方法:

  • 抛出 RuntimeException
  • 要小心使用 Mono.error(t) 方法。
  • 在 Mono API 中返回 Mono.error(t)
    会被当成一个 MonoError 值被处理,
    可以在map, doOnNext, doOnSuccess处理。
    不会被 doOnError处理。

public static Mono<Integer> monoErrorTest() { logger.info("case: mono error test"); return Mono.just(0).onErrorStop().map(o -> { if (o < 2) { return Mono.error(new RuntimeException("test")); } return o; }).doOnError(e -> { logger.info(MessageFormat.format("doOnError: {0}", e.getMessage())); throw new RuntimeException(e); }).doOnNext(o -> { logger.info(MessageFormat.format("doOnNext: {0}", o)); }).doOnSuccess(o -> { logger.info(MessageFormat.format("doOnSuccess: {0}", o)); }).thenReturn(1); } monoErrorTest().log().subscribe(); /* 00:08:22.338 [main] INFO demo.ReactiveErrorDemo - case: mono error test 00:08:22.460 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 00:08:22.484 [main] INFO reactor.Mono.IgnoreThen.1 - onSubscribe(MonoIgnoreThen.ThenIgnoreMain) 00:08:22.488 [main] INFO reactor.Mono.IgnoreThen.1 - request(unbounded) 00:08:22.495 [main] INFO demo.ReactiveErrorDemo - doOnNext: MonoError 00:08:22.496 [main] INFO demo.ReactiveErrorDemo - doOnSuccess: MonoError 00:08:22.497 [main] INFO reactor.Mono.IgnoreThen.1 - onNext(1) 00:08:22.499 [main] INFO reactor.Mono.IgnoreThen.1 - onComplete() */

  • 在 Mono API 中抛出异常,会被doOnError截获,并且跳过 map, doOnSuccess

public static Mono<Integer> monoExceptionTest() { logger.info("case: mono error test"); return Mono.just(0).map(o -> { if (o < 2) { throw new RuntimeException("test"); } return o; }).map(o -> { return 2; }).doOnSuccess(o -> { logger.info(MessageFormat.format("doOnSuccess: {0}", o)); }).doOnError(e -> { logger.info(MessageFormat.format("doOnError: {0}", e.getMessage())); }); } monoExceptionTest().log().subscribe(); /* 00:08:22.499 [main] INFO demo.ReactiveErrorDemo - case: mono exception test 00:08:22.502 [main] INFO reactor.Mono.PeekTerminal.2 - | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber) 00:08:22.502 [main] INFO reactor.Mono.PeekTerminal.2 - | request(unbounded) 00:08:22.508 [main] INFO demo.ReactiveErrorDemo - doOnError: test 00:08:22.510 [main] ERROR reactor.Mono.PeekTerminal.2 - | onError(java.lang.RuntimeException: test) 00:08:22.515 [main] ERROR reactor.Mono.PeekTerminal.2 - java.lang.RuntimeException: test at demo.ReactiveErrorDemo.lambda$0(ReactiveErrorDemo.java:26) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:281) at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:121) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263) at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) at reactor.core.publisher.Mono.subscribe(Mono.java:4400) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515) at reactor.core.publisher.Mono.subscribe(Mono.java:4232) at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:18) 00:08:22.520 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: test Caused by: java.lang.RuntimeException: test at demo.ReactiveErrorDemo.lambda$0(ReactiveErrorDemo.java:26) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:281) at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:121) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263) at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) at reactor.core.publisher.Mono.subscribe(Mono.java:4400) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515) at reactor.core.publisher.Mono.subscribe(Mono.java:4232) at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:18) */

  • 在 Flux API 中返回 Mono.error(t)
    会被当成一个异常被处理,
    不会在map, doOnNext, doOnSuccess处理。
    会被 doOnError处理。

public static Mono<String> fluxErrorTest() { logger.info("case: flux error test"); return Flux.fromIterable(Arrays.asList(0, 1, 2)).flatMap(o -> { logger.info(MessageFormat.format("flatMap: {0}", o)); if (o == 1) { return Mono.error(new RuntimeException("test")); } return Mono.just(o); }).map(o -> { logger.info(MessageFormat.format("map: {0}", o)); return o; }).count().doOnSuccess(o -> { logger.info(MessageFormat.format("doOnSuccess: count is {0}", o)); }).doOnError(e -> { logger.info(MessageFormat.format("doOnError: {0}", e.getMessage())); }).map(o -> { return "abc"; }); } fluxErrorTest().log().subscribe(); /* 00:18:12.204 [main] INFO demo.ReactiveErrorDemo - case: flux error test 00:18:12.367 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 00:18:12.472 [main] INFO reactor.Mono.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) 00:18:12.476 [main] INFO reactor.Mono.MapFuseable.1 - | request(unbounded) 00:18:12.478 [main] INFO demo.ReactiveErrorDemo - flatMap: 0 00:18:12.478 [main] INFO demo.ReactiveErrorDemo - map: 0 00:18:12.478 [main] INFO demo.ReactiveErrorDemo - flatMap: 1 00:18:12.484 [main] INFO demo.ReactiveErrorDemo - doOnError: test 00:18:12.486 [main] ERROR reactor.Mono.MapFuseable.1 - | onError(java.lang.RuntimeException: test) 00:18:12.491 [main] ERROR reactor.Mono.MapFuseable.1 - java.lang.RuntimeException: test at demo.ReactiveErrorDemo.lambda$13(ReactiveErrorDemo.java:82) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386) at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:272) at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) at reactor.core.publisher.Mono.subscribe(Mono.java:4400) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515) at reactor.core.publisher.Mono.subscribe(Mono.java:4232) at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:19) 00:18:12.495 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: test Caused by: java.lang.RuntimeException: test at demo.ReactiveErrorDemo.lambda$13(ReactiveErrorDemo.java:82) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386) at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:272) at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) at reactor.core.publisher.Mono.subscribe(Mono.java:4400) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515) at reactor.core.publisher.Mono.subscribe(Mono.java:4232) at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:19) */

  • 在 Flux API 中抛出异常,和返回 Mono.error() 一样
    会被当成一个异常被处理,
    不会在map, doOnNext, doOnSuccess处理。
    会被 doOnError处理。

public static Mono<String> fluxExceptionTest() { logger.info("case: flux error test"); return Flux.fromIterable(Arrays.asList(0, 1, 2)).flatMap(o -> { logger.info(MessageFormat.format("flatMap: {0}", o)); if (o == 1) { throw new RuntimeException("test"); } return Mono.just(o); }).map(o -> { logger.info(MessageFormat.format("map: {0}", o)); return o; }).count( ).doOnSuccess(o -> { logger.info(MessageFormat.format("doOnSuccess: count is {0}", o)); }).doOnError(e -> { logger.info(MessageFormat.format("doOnError: {0}", e.getMessage())); }).map(o -> { return "abc"; }); } fluxExceptionTest().log().subscribe(); /* 00:20:38.104 [main] INFO demo.ReactiveErrorDemo - case: flux error test 00:20:38.265 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 00:20:38.358 [main] INFO reactor.Mono.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) 00:20:38.364 [main] INFO reactor.Mono.MapFuseable.1 - | request(unbounded) 00:20:38.365 [main] INFO demo.ReactiveErrorDemo - flatMap: 0 00:20:38.366 [main] INFO demo.ReactiveErrorDemo - map: 0 00:20:38.366 [main] INFO demo.ReactiveErrorDemo - flatMap: 1 00:20:38.373 [main] INFO demo.ReactiveErrorDemo - doOnError: test 00:20:38.376 [main] ERROR reactor.Mono.MapFuseable.1 - | onError(java.lang.RuntimeException: test) 00:20:38.381 [main] ERROR reactor.Mono.MapFuseable.1 - java.lang.RuntimeException: test at demo.ReactiveErrorDemo.lambda$8(ReactiveErrorDemo.java:61) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386) at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:272) at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) at reactor.core.publisher.Mono.subscribe(Mono.java:4400) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515) at reactor.core.publisher.Mono.subscribe(Mono.java:4232) at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:20) 00:20:38.385 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: test Caused by: java.lang.RuntimeException: test at demo.ReactiveErrorDemo.lambda$8(ReactiveErrorDemo.java:61) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386) at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:272) at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) at reactor.core.publisher.Mono.subscribe(Mono.java:4400) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515) at reactor.core.publisher.Mono.subscribe(Mono.java:4232) at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:20) */ 启动模式

  • 常用的启动模式

Mono.just(data); Mono.fromXXX(xxx); Flux.from(data);

  • 冷响应式

Mono.defer(() -> supplier);

冷响应式是指,在启动时,不会立即执行,而是在被订阅时才执行。
下面 IllegalArgumentException 会在 subscribe 后才会被调用。

// Sample code private Mono<Integer> monoAdd(Integer a, Integer b) { return Mono.defer(() -> { if (a == null) { throw new IllegalArgumentException("a is null"); } if (b == null) { throw new IllegalArgumentException("b is null"); } return Mono.just(a + b); }); } 映射模式

这里讨论的映射模式,大都是关于多个响应式 API 之间的协作。

平行模式(flat pattern)

主要是用 flatMap() 方法。代码成 flatMap().flatMap().flatMap() 形状。
用于后面的 API 只使用前面 API 输出结果的情况。

public static Mono<Integer> monoFlat(Integer a) { return Mono.defer(() -> { if (a == null) { throw new IllegalArgumentException("a is null"); } return Mono.just(a); }).flatMap(data -> Mono.just(data * 2)) .flatMap(data -> Mono.just(data + 100)); } monoFlat(1).log().subscribe(); /* 00:15:17.005 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 00:15:17.164 [main] INFO reactor.Mono.FlatMap.1 - | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain) 00:15:17.168 [main] INFO reactor.Mono.FlatMap.1 - | request(unbounded) 00:15:17.171 [main] INFO reactor.Mono.FlatMap.1 - | onNext(102) 00:15:17.173 [main] INFO reactor.Mono.FlatMap.1 - | onComplete() */ 嵌套模式(nested pattern)

对于后面的 API 需要使用多个前面 API 输出结果的情况,可以使用嵌套模式。
在嵌套模式中,后面的 API 可以直接使用前面 API 的结果。

What are the best practices in Java Reactive Programming?

public static Mono<Integer> monoNested(Integer a, Integer b) { // return a * 100 + b * 100 return Mono.defer(() -> { if (a == null) { throw new IllegalArgumentException("a is null"); } return Mono.just(a * 100).flatMap(o1 -> { if (b == null) { throw new IllegalArgumentException("a is null"); } return Mono.just(b * 100).map( // 在这里可以同时使用 o1 和 o2 o2 -> o1 + o2); }); }); } monoNested(1, 2).log().subscribe(); /* 00:22:43.816 [main] INFO reactor.Mono.Defer.2 - onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) 00:22:43.817 [main] INFO reactor.Mono.Defer.2 - request(unbounded) 00:22:43.817 [main] INFO reactor.Mono.Defer.2 - onNext(300) 00:22:43.818 [main] INFO reactor.Mono.Defer.2 - onComplete() */ 拉链模式(zip pattern)

对于后面的 API 需要使用多个前面 API 输出结果的情况,可以使用拉链模式。
在拉链模式中,后面的 API 可以通过参数获取前面 API 的结果。

public static Mono<Integer> monoZip(Integer a, Integer b) { // return a * 100 + b * 100 return Mono.zip( Mono.defer(() -> { if (a == null) { throw new IllegalArgumentException("a is null"); } return Mono.just(a * 100); }), Mono.defer(() -> { if (b == null) { throw new IllegalArgumentException("b is null"); } return Mono.just(b * 100); }), (o1, o2) -> o1 + o2); } monoZip(1, 2).log().subscribe(); /* 00:32:22.326 [main] INFO reactor.Mono.Zip.3 - onSubscribe([Fuseable] MonoZip.ZipCoordinator) 00:32:22.326 [main] INFO reactor.Mono.Zip.3 - request(unbounded) 00:32:22.327 [main] INFO reactor.Mono.Zip.3 - onNext(300) 00:32:22.328 [main] INFO reactor.Mono.Zip.3 - onComplete() */ 原子模式(atomic pattern)

拉链模式和嵌套模式都不能处理 null 值,原子模式可以。
注意下面示例中的 return Mono.just(0) 可以确保不会忽略 null 值的情况。

public static Mono<Integer> monoAtomic(Integer a, Integer b) { AtomicReference<Integer> a100Ref = new AtomicReference<>(0); AtomicReference<Integer> b100Ref = new AtomicReference<>(0); // return a * 100 + b * 100 return Mono.defer(() -> { if (a == null) { a100Ref.set(null); } else { a100Ref.set(a * 100); } return Mono.just(0); }).flatMap(o -> { if (b == null) { b100Ref.set(null); } else { b100Ref.set(b * 100); } return Mono.just(0); }).map(o -> { if (a100Ref.get() == null || b100Ref.get() == null) { return 0; } return a100Ref.get() + b100Ref.get(); }); } monoAtomic(1, 2).log().subscribe(); /* 11:03:46.162 [main] INFO reactor.Mono.Map.4 - onSubscribe(FluxMap.MapSubscriber) 11:03:46.163 [main] INFO reactor.Mono.Map.4 - request(unbounded) 11:03:46.163 [main] INFO reactor.Mono.Map.4 - onNext(0) 11:03:46.164 [main] INFO reactor.Mono.Map.4 - onComplete() */ null 对象模式(null pattern)

我们还可以使用默认值来处理 null 值的情况。
在处理 null 值时,一个常见的需求是:

在一个 lambda 闭包中:

  • 可以知道这个值是 null 还是非 null。
  • 可以获取这个值。
  • 可以调用并返回一个新的响应式对象(发布者)

一个技巧是使用 .defaultIfEmpty() 方法来处理 null 值。
这个技巧对于数值或者 String 类型的值可能有效的,但是对于类实例就不好用了。
在这种情况下,可以考虑定义一个接口。

public interface Nullable { boolean isNone(); } package demo.reactive; public class Employee implements Nullable { private static final Employee none = new Employee(true); public static Employee none() { return none; } private Employee(boolean isNone) { this.isNone = isNone; } private Employee() { } private int id; private String name; private boolean isNone; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public boolean isNone() { return isNone; } public void setNull(boolean isNone) { this.isNone = isNone; } } public static Mono<Employee> monoNullable() { // return nullable object return Mono.defer(() -> { return Mono.<Employee>empty(); }).defaultIfEmpty(Employee.none()); } monoNullable().map(o -> { logger.info(MessageFormat.format("map.isNone: {0}", o.isNone())); return o; }).doOnSuccess(o -> { logger.info(MessageFormat.format("doOnSuccess: {0}", o)); }).log().subscribe(); /* 18:28:06.789 [main] INFO reactor.Mono.PeekTerminal.1 - | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber) 18:28:06.794 [main] INFO reactor.Mono.PeekTerminal.1 - | request(unbounded) 18:28:06.796 [main] INFO demo.ReactiveDemo - map.isNone: true 18:28:06.796 [main] INFO demo.ReactiveDemo - doOnSuccess: demo.reactive.Employee@120d6fe6 18:28:06.797 [main] INFO reactor.Mono.PeekTerminal.1 - | onNext(demo.reactive.Employee@120d6fe6) 18:28:06.799 [main] INFO reactor.Mono.PeekTerminal.1 - | onComplete() */ 返回模式

下面是常见的返回模式。

Mono.empty(); Mono.then(); Mono.then(mono); Mono.thenReturn(data); 参照

  • Spring: Blocking vs non-blocking: R2DBC vs JDBC and WebFlux vs Web MVC
  • Spring R2DBC
  • Reactor 文档
非常感谢阅读!如有不足之处,请留下您的评价和问题。
请“推荐”本文!

本文共计4549个文字,预计阅读时间需要19分钟。

What are the best practices in Java Reactive Programming?

Java 响应式编程在 Spring Boot 中,实现了响应式编程,提升了性能和内存使用效率。详情见:Spring:阻塞与非阻塞:R2DBC 对比 JDBC,WebFlux 对比 Web MVC,克服困难但各有特色。

Java Reactive Programming 响应式编程

在 Spring Boot 中,支持了响应式编程,带来了性能和内存使用方面的优化。

详见:

  • Spring: Blocking vs non-blocking: R2DBC vs JDBC and WebFlux vs Web MVC
困难

但是不同于 async/await 模式,响应式编程也给编码带来了一些困难,主要如下:

  • 一个代码块只能最多调用一个响应式 API。
  • null 处理不友好,甚至是灾难性的。
响应式编程的规则
  • 控制层,返回响应式对象,大多数情况下使用 Mono<T>

  • 服务层,使用@Transactional 的 API 必须返回响应式对象。

  • 数据访问层(R2DBC)返回响应式对象: Mono<T>, Flux<T>

  • 使用响应式方法的 API 尽量返回响应式对象。

  • 不要使用任何 block(), blockFirst(), share().block() 等 API,会引起严重的性能问题。

  • 在重载传统接口的情况下使用 subscribe()

  • 对于计数的 API,使用 Mono<Long> 作为返回对象。
    这是因为 Flux.count() 返回的是一个Mono<Long>。因此在其它的计数 API 中使用 Mono<Long> 作为返回对象,让我们可以保持一致。

  • Mono<Void>null

    • 响应式 API 不能返回 null 或者 Mono.just(null) 或者其等价方式。
      会引起下面的错误:

      Caused by: java.lang.NullPointerException: The mapper returned a null value.

    • Mono.empty() 不能调用随后的映射方法 map()flatMap()transform()等。
    • Mono.empty() 在 doOnSuccess() 等函数中获取值是 null
    • Flux 中使用元素 Mono<Void> 可以调用随后的映射方法 colllectList()等方法。
      Mono<Void> 不会被记入 count()colllectList()
    • 建议: 避免定义返回 Mono<Void>的方法。
      这种返回不能调用随后的映射方法 map()flatMap()transform()等方法。
      一个例外是,这个方法在控制层的最后被调用。
响应式编程模式

响应式编程是一种流编程,我把编程模式分为: 启动模式、映射模式、返回模式、异常模式。

空模式
  • 响应式 API 不能返回 null 或者 Mono.just(null) 或者其等价方式。
    会引起下面的错误:

    Caused by: java.lang.NullPointerException: The mapper returned a null value.

public static Mono<Void> monoNullTest() { return Mono.just(null); } monoNullTest().log().subscribe(); /* Exception in thread "main" java.lang.NullPointerException: value at java.base/java.util.Objects.requireNonNull(Objects.java:246) at reactor.core.publisher.MonoJust.<init>(MonoJust.java:35) at reactor.core.publisher.Mono.just(Mono.java:719) at demo.ReactiveVoidTest.monoNullTest(ReactiveVoidTest.java:22) at demo.ReactiveVoidTest.main(ReactiveVoidTest.java:16) */

  • Mono.empty() 不能调用随后的映射方法 map()flatMap()transform()等。

public static Mono<Integer> monoVoidTest() { logger.info("case: mono void test"); return Mono.empty().map(o -> { logger.info(MessageFormat.format("map: {0}", o)); return o; }).doOnSuccess(o -> { logger.info(MessageFormat.format("doOnSuccess: {0}", o)); }).thenReturn(1); } monoVoidTest().log().subscribe(); /* 10:52:06.993 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 10:52:07.023 [main] INFO reactor.Mono.IgnoreThen.1 - onSubscribe(MonoIgnoreThen.ThenIgnoreMain) 10:52:07.030 [main] INFO reactor.Mono.IgnoreThen.1 - request(unbounded) 10:52:07.036 [main] INFO demo.ReactiveVoidTest - doOnSuccess: null 10:52:07.038 [main] INFO reactor.Mono.IgnoreThen.1 - onNext(1) 10:52:07.042 [main] INFO reactor.Mono.IgnoreThen.1 - onComplete() */

  • Flux 中使用元素 Mono<Void> 可以调用随后的映射方法 colllectList()等方法。
    Mono<Void> 不会被记入 count()colllectList()

public static Mono<String> fluxVoidTest() { logger.info("case: flux void test"); return Flux.fromIterable(Arrays.asList(0, 1, 2)).flatMap(o -> { logger.info(MessageFormat.format("emit an empty: {0}", o)); return Mono.empty(); }).map(o -> { logger.info(MessageFormat.format("map: {0}", o)); return o; }).count().doOnSuccess(o -> { logger.info(MessageFormat.format("doOnSuccess: count is {0}", o)); }).map(o -> { return "abc"; }); } fluxVoidTest().log().subscribe(); /* 11:20:35.788 [main] INFO demo.ReactiveVoidTest - case: flux void test 11:20:35.986 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 11:20:36.121 [main] INFO reactor.Mono.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) 11:20:36.127 [main] INFO reactor.Mono.MapFuseable.1 - | request(unbounded) 11:20:36.129 [main] INFO demo.ReactiveVoidTest - emit an empty: 0 11:20:36.130 [main] INFO demo.ReactiveVoidTest - emit an empty: 1 11:20:36.131 [main] INFO demo.ReactiveVoidTest - emit an empty: 2 11:20:36.133 [main] INFO demo.ReactiveVoidTest - doOnSuccess: count is 0 11:20:36.134 [main] INFO reactor.Mono.MapFuseable.1 - | onNext(abc) 11:20:36.138 [main] INFO reactor.Mono.MapFuseable.1 - | onComplete() */ 异常模式

响应式编程对于异常处理,建议使用下面的方法:

  • 抛出 RuntimeException
  • 要小心使用 Mono.error(t) 方法。
  • 在 Mono API 中返回 Mono.error(t)
    会被当成一个 MonoError 值被处理,
    可以在map, doOnNext, doOnSuccess处理。
    不会被 doOnError处理。

public static Mono<Integer> monoErrorTest() { logger.info("case: mono error test"); return Mono.just(0).onErrorStop().map(o -> { if (o < 2) { return Mono.error(new RuntimeException("test")); } return o; }).doOnError(e -> { logger.info(MessageFormat.format("doOnError: {0}", e.getMessage())); throw new RuntimeException(e); }).doOnNext(o -> { logger.info(MessageFormat.format("doOnNext: {0}", o)); }).doOnSuccess(o -> { logger.info(MessageFormat.format("doOnSuccess: {0}", o)); }).thenReturn(1); } monoErrorTest().log().subscribe(); /* 00:08:22.338 [main] INFO demo.ReactiveErrorDemo - case: mono error test 00:08:22.460 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 00:08:22.484 [main] INFO reactor.Mono.IgnoreThen.1 - onSubscribe(MonoIgnoreThen.ThenIgnoreMain) 00:08:22.488 [main] INFO reactor.Mono.IgnoreThen.1 - request(unbounded) 00:08:22.495 [main] INFO demo.ReactiveErrorDemo - doOnNext: MonoError 00:08:22.496 [main] INFO demo.ReactiveErrorDemo - doOnSuccess: MonoError 00:08:22.497 [main] INFO reactor.Mono.IgnoreThen.1 - onNext(1) 00:08:22.499 [main] INFO reactor.Mono.IgnoreThen.1 - onComplete() */

  • 在 Mono API 中抛出异常,会被doOnError截获,并且跳过 map, doOnSuccess

public static Mono<Integer> monoExceptionTest() { logger.info("case: mono error test"); return Mono.just(0).map(o -> { if (o < 2) { throw new RuntimeException("test"); } return o; }).map(o -> { return 2; }).doOnSuccess(o -> { logger.info(MessageFormat.format("doOnSuccess: {0}", o)); }).doOnError(e -> { logger.info(MessageFormat.format("doOnError: {0}", e.getMessage())); }); } monoExceptionTest().log().subscribe(); /* 00:08:22.499 [main] INFO demo.ReactiveErrorDemo - case: mono exception test 00:08:22.502 [main] INFO reactor.Mono.PeekTerminal.2 - | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber) 00:08:22.502 [main] INFO reactor.Mono.PeekTerminal.2 - | request(unbounded) 00:08:22.508 [main] INFO demo.ReactiveErrorDemo - doOnError: test 00:08:22.510 [main] ERROR reactor.Mono.PeekTerminal.2 - | onError(java.lang.RuntimeException: test) 00:08:22.515 [main] ERROR reactor.Mono.PeekTerminal.2 - java.lang.RuntimeException: test at demo.ReactiveErrorDemo.lambda$0(ReactiveErrorDemo.java:26) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:281) at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:121) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263) at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) at reactor.core.publisher.Mono.subscribe(Mono.java:4400) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515) at reactor.core.publisher.Mono.subscribe(Mono.java:4232) at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:18) 00:08:22.520 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: test Caused by: java.lang.RuntimeException: test at demo.ReactiveErrorDemo.lambda$0(ReactiveErrorDemo.java:26) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:281) at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:121) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263) at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) at reactor.core.publisher.Mono.subscribe(Mono.java:4400) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515) at reactor.core.publisher.Mono.subscribe(Mono.java:4232) at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:18) */

  • 在 Flux API 中返回 Mono.error(t)
    会被当成一个异常被处理,
    不会在map, doOnNext, doOnSuccess处理。
    会被 doOnError处理。

public static Mono<String> fluxErrorTest() { logger.info("case: flux error test"); return Flux.fromIterable(Arrays.asList(0, 1, 2)).flatMap(o -> { logger.info(MessageFormat.format("flatMap: {0}", o)); if (o == 1) { return Mono.error(new RuntimeException("test")); } return Mono.just(o); }).map(o -> { logger.info(MessageFormat.format("map: {0}", o)); return o; }).count().doOnSuccess(o -> { logger.info(MessageFormat.format("doOnSuccess: count is {0}", o)); }).doOnError(e -> { logger.info(MessageFormat.format("doOnError: {0}", e.getMessage())); }).map(o -> { return "abc"; }); } fluxErrorTest().log().subscribe(); /* 00:18:12.204 [main] INFO demo.ReactiveErrorDemo - case: flux error test 00:18:12.367 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 00:18:12.472 [main] INFO reactor.Mono.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) 00:18:12.476 [main] INFO reactor.Mono.MapFuseable.1 - | request(unbounded) 00:18:12.478 [main] INFO demo.ReactiveErrorDemo - flatMap: 0 00:18:12.478 [main] INFO demo.ReactiveErrorDemo - map: 0 00:18:12.478 [main] INFO demo.ReactiveErrorDemo - flatMap: 1 00:18:12.484 [main] INFO demo.ReactiveErrorDemo - doOnError: test 00:18:12.486 [main] ERROR reactor.Mono.MapFuseable.1 - | onError(java.lang.RuntimeException: test) 00:18:12.491 [main] ERROR reactor.Mono.MapFuseable.1 - java.lang.RuntimeException: test at demo.ReactiveErrorDemo.lambda$13(ReactiveErrorDemo.java:82) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386) at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:272) at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) at reactor.core.publisher.Mono.subscribe(Mono.java:4400) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515) at reactor.core.publisher.Mono.subscribe(Mono.java:4232) at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:19) 00:18:12.495 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: test Caused by: java.lang.RuntimeException: test at demo.ReactiveErrorDemo.lambda$13(ReactiveErrorDemo.java:82) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386) at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:272) at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) at reactor.core.publisher.Mono.subscribe(Mono.java:4400) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515) at reactor.core.publisher.Mono.subscribe(Mono.java:4232) at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:19) */

  • 在 Flux API 中抛出异常,和返回 Mono.error() 一样
    会被当成一个异常被处理,
    不会在map, doOnNext, doOnSuccess处理。
    会被 doOnError处理。

public static Mono<String> fluxExceptionTest() { logger.info("case: flux error test"); return Flux.fromIterable(Arrays.asList(0, 1, 2)).flatMap(o -> { logger.info(MessageFormat.format("flatMap: {0}", o)); if (o == 1) { throw new RuntimeException("test"); } return Mono.just(o); }).map(o -> { logger.info(MessageFormat.format("map: {0}", o)); return o; }).count( ).doOnSuccess(o -> { logger.info(MessageFormat.format("doOnSuccess: count is {0}", o)); }).doOnError(e -> { logger.info(MessageFormat.format("doOnError: {0}", e.getMessage())); }).map(o -> { return "abc"; }); } fluxExceptionTest().log().subscribe(); /* 00:20:38.104 [main] INFO demo.ReactiveErrorDemo - case: flux error test 00:20:38.265 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 00:20:38.358 [main] INFO reactor.Mono.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) 00:20:38.364 [main] INFO reactor.Mono.MapFuseable.1 - | request(unbounded) 00:20:38.365 [main] INFO demo.ReactiveErrorDemo - flatMap: 0 00:20:38.366 [main] INFO demo.ReactiveErrorDemo - map: 0 00:20:38.366 [main] INFO demo.ReactiveErrorDemo - flatMap: 1 00:20:38.373 [main] INFO demo.ReactiveErrorDemo - doOnError: test 00:20:38.376 [main] ERROR reactor.Mono.MapFuseable.1 - | onError(java.lang.RuntimeException: test) 00:20:38.381 [main] ERROR reactor.Mono.MapFuseable.1 - java.lang.RuntimeException: test at demo.ReactiveErrorDemo.lambda$8(ReactiveErrorDemo.java:61) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386) at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:272) at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) at reactor.core.publisher.Mono.subscribe(Mono.java:4400) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515) at reactor.core.publisher.Mono.subscribe(Mono.java:4232) at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:20) 00:20:38.385 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: test Caused by: java.lang.RuntimeException: test at demo.ReactiveErrorDemo.lambda$8(ReactiveErrorDemo.java:61) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386) at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:272) at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) at reactor.core.publisher.Mono.subscribe(Mono.java:4400) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515) at reactor.core.publisher.Mono.subscribe(Mono.java:4232) at demo.ReactiveErrorDemo.main(ReactiveErrorDemo.java:20) */ 启动模式

  • 常用的启动模式

Mono.just(data); Mono.fromXXX(xxx); Flux.from(data);

  • 冷响应式

Mono.defer(() -> supplier);

冷响应式是指,在启动时,不会立即执行,而是在被订阅时才执行。
下面 IllegalArgumentException 会在 subscribe 后才会被调用。

// Sample code private Mono<Integer> monoAdd(Integer a, Integer b) { return Mono.defer(() -> { if (a == null) { throw new IllegalArgumentException("a is null"); } if (b == null) { throw new IllegalArgumentException("b is null"); } return Mono.just(a + b); }); } 映射模式

这里讨论的映射模式,大都是关于多个响应式 API 之间的协作。

平行模式(flat pattern)

主要是用 flatMap() 方法。代码成 flatMap().flatMap().flatMap() 形状。
用于后面的 API 只使用前面 API 输出结果的情况。

public static Mono<Integer> monoFlat(Integer a) { return Mono.defer(() -> { if (a == null) { throw new IllegalArgumentException("a is null"); } return Mono.just(a); }).flatMap(data -> Mono.just(data * 2)) .flatMap(data -> Mono.just(data + 100)); } monoFlat(1).log().subscribe(); /* 00:15:17.005 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 00:15:17.164 [main] INFO reactor.Mono.FlatMap.1 - | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain) 00:15:17.168 [main] INFO reactor.Mono.FlatMap.1 - | request(unbounded) 00:15:17.171 [main] INFO reactor.Mono.FlatMap.1 - | onNext(102) 00:15:17.173 [main] INFO reactor.Mono.FlatMap.1 - | onComplete() */ 嵌套模式(nested pattern)

对于后面的 API 需要使用多个前面 API 输出结果的情况,可以使用嵌套模式。
在嵌套模式中,后面的 API 可以直接使用前面 API 的结果。

What are the best practices in Java Reactive Programming?

public static Mono<Integer> monoNested(Integer a, Integer b) { // return a * 100 + b * 100 return Mono.defer(() -> { if (a == null) { throw new IllegalArgumentException("a is null"); } return Mono.just(a * 100).flatMap(o1 -> { if (b == null) { throw new IllegalArgumentException("a is null"); } return Mono.just(b * 100).map( // 在这里可以同时使用 o1 和 o2 o2 -> o1 + o2); }); }); } monoNested(1, 2).log().subscribe(); /* 00:22:43.816 [main] INFO reactor.Mono.Defer.2 - onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) 00:22:43.817 [main] INFO reactor.Mono.Defer.2 - request(unbounded) 00:22:43.817 [main] INFO reactor.Mono.Defer.2 - onNext(300) 00:22:43.818 [main] INFO reactor.Mono.Defer.2 - onComplete() */ 拉链模式(zip pattern)

对于后面的 API 需要使用多个前面 API 输出结果的情况,可以使用拉链模式。
在拉链模式中,后面的 API 可以通过参数获取前面 API 的结果。

public static Mono<Integer> monoZip(Integer a, Integer b) { // return a * 100 + b * 100 return Mono.zip( Mono.defer(() -> { if (a == null) { throw new IllegalArgumentException("a is null"); } return Mono.just(a * 100); }), Mono.defer(() -> { if (b == null) { throw new IllegalArgumentException("b is null"); } return Mono.just(b * 100); }), (o1, o2) -> o1 + o2); } monoZip(1, 2).log().subscribe(); /* 00:32:22.326 [main] INFO reactor.Mono.Zip.3 - onSubscribe([Fuseable] MonoZip.ZipCoordinator) 00:32:22.326 [main] INFO reactor.Mono.Zip.3 - request(unbounded) 00:32:22.327 [main] INFO reactor.Mono.Zip.3 - onNext(300) 00:32:22.328 [main] INFO reactor.Mono.Zip.3 - onComplete() */ 原子模式(atomic pattern)

拉链模式和嵌套模式都不能处理 null 值,原子模式可以。
注意下面示例中的 return Mono.just(0) 可以确保不会忽略 null 值的情况。

public static Mono<Integer> monoAtomic(Integer a, Integer b) { AtomicReference<Integer> a100Ref = new AtomicReference<>(0); AtomicReference<Integer> b100Ref = new AtomicReference<>(0); // return a * 100 + b * 100 return Mono.defer(() -> { if (a == null) { a100Ref.set(null); } else { a100Ref.set(a * 100); } return Mono.just(0); }).flatMap(o -> { if (b == null) { b100Ref.set(null); } else { b100Ref.set(b * 100); } return Mono.just(0); }).map(o -> { if (a100Ref.get() == null || b100Ref.get() == null) { return 0; } return a100Ref.get() + b100Ref.get(); }); } monoAtomic(1, 2).log().subscribe(); /* 11:03:46.162 [main] INFO reactor.Mono.Map.4 - onSubscribe(FluxMap.MapSubscriber) 11:03:46.163 [main] INFO reactor.Mono.Map.4 - request(unbounded) 11:03:46.163 [main] INFO reactor.Mono.Map.4 - onNext(0) 11:03:46.164 [main] INFO reactor.Mono.Map.4 - onComplete() */ null 对象模式(null pattern)

我们还可以使用默认值来处理 null 值的情况。
在处理 null 值时,一个常见的需求是:

在一个 lambda 闭包中:

  • 可以知道这个值是 null 还是非 null。
  • 可以获取这个值。
  • 可以调用并返回一个新的响应式对象(发布者)

一个技巧是使用 .defaultIfEmpty() 方法来处理 null 值。
这个技巧对于数值或者 String 类型的值可能有效的,但是对于类实例就不好用了。
在这种情况下,可以考虑定义一个接口。

public interface Nullable { boolean isNone(); } package demo.reactive; public class Employee implements Nullable { private static final Employee none = new Employee(true); public static Employee none() { return none; } private Employee(boolean isNone) { this.isNone = isNone; } private Employee() { } private int id; private String name; private boolean isNone; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public boolean isNone() { return isNone; } public void setNull(boolean isNone) { this.isNone = isNone; } } public static Mono<Employee> monoNullable() { // return nullable object return Mono.defer(() -> { return Mono.<Employee>empty(); }).defaultIfEmpty(Employee.none()); } monoNullable().map(o -> { logger.info(MessageFormat.format("map.isNone: {0}", o.isNone())); return o; }).doOnSuccess(o -> { logger.info(MessageFormat.format("doOnSuccess: {0}", o)); }).log().subscribe(); /* 18:28:06.789 [main] INFO reactor.Mono.PeekTerminal.1 - | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber) 18:28:06.794 [main] INFO reactor.Mono.PeekTerminal.1 - | request(unbounded) 18:28:06.796 [main] INFO demo.ReactiveDemo - map.isNone: true 18:28:06.796 [main] INFO demo.ReactiveDemo - doOnSuccess: demo.reactive.Employee@120d6fe6 18:28:06.797 [main] INFO reactor.Mono.PeekTerminal.1 - | onNext(demo.reactive.Employee@120d6fe6) 18:28:06.799 [main] INFO reactor.Mono.PeekTerminal.1 - | onComplete() */ 返回模式

下面是常见的返回模式。

Mono.empty(); Mono.then(); Mono.then(mono); Mono.thenReturn(data); 参照

  • Spring: Blocking vs non-blocking: R2DBC vs JDBC and WebFlux vs Web MVC
  • Spring R2DBC
  • Reactor 文档
非常感谢阅读!如有不足之处,请留下您的评价和问题。
请“推荐”本文!