What specific type of reactor are you referring to in your inquiry?

2026-05-24 04:112阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1690个文字,预计阅读时间需要7分钟。

What specific type of reactor are you referring to in your inquiry?

目录

1.场景

2.创建service

2.1 创建基本接口和实体类 2.2 创建service实现

3.实现主方法

4.实现异步

4.1 使用subcribeOn实现异步 4.2 使用CompletableFuture实现异步

5.实现异步1

5.1 场景 5.2 调用多个级别服务,按服务优先级调用

目录
  • 1 场景
  • 2 创建 service
    • 2.1 创建基本接口和实体类
    • 2.2 创建 service 实现
  • 3 主体方法
    • 4 实现异步
      • 4.1 subcribeOn 实现异步
      • 4.2 CompletableFuture 实现异步

    1 场景

    调用多个平级服务,按照服务优先级返回第一个有效数据。

    具体case:一个页面可能有很多的弹窗,弹窗之间又有优先级。每次只需要返回第一个有数据的弹窗。但是又希望所有弹窗之间的数据获取是异步的。这种场景使用 Reactor 怎么实现呢?

    2 创建 service

    2.1 创建基本接口和实体类

    public interface TestServiceI { Mono request(); }

    提供一个 request 方法,返回一个 Mono 对象。

    @Data @ToString @AllArgsConstructor @NoArgsConstructor public class TestUser { private String name; }

    2.2 创建 service 实现

    @Slf4j public class TestServiceImpl1 implements TestServiceI { @Override public Mono request() { log.info("execute.test.service1"); return Mono.fromSupplier(() -> { try { System.out.println("service1.threadName=" + Thread.currentThread().getName()); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } return ""; }) .map(name -> { return new TestUser(name); }); } }

    第一个 service 执行耗时 500ms。返回空对象;

    创建第二个 service 执行耗时 1000ms。返回空对象;代码如上,改一下sleep时间即可。

    继续创建第三个 service 执行耗时 1000ms。返回 name3。代码如上,改一下 sleep 时间,以及返回为 name3。

    3 主体方法

    public static void main(String[] args) { long startTime = System.currentTimeMillis(); TestServiceI testServiceImpl4 = new TestServiceImpl4(); TestServiceI testServiceImpl5 = new TestServiceImpl5(); TestServiceI testServiceImpl6 = new TestServiceImpl6(); List<TestServiceI> serviceIList = new ArrayList<>(); serviceIList.add(testServiceImpl4); serviceIList.add(testServiceImpl5); serviceIList.add(testServiceImpl6); // 执行 service 列表,这样有多少个 service 都可以 Flux<Mono<TestUser>> monoFlux = Flux.fromIterable(serviceIList) .map(service -> { return service.request(); }); // flatMap(或者flatMapSequential) + map 实现异常继续下一个执行 Flux flux = monoFlux.flatMapSequential(mono -> { return mono.map(user -> { TestUser testUser = JsonUtil.parseJson(JsonUtil.toJson(user), TestUser.class); if (Objects.nonNull(testUser) && StringUtils.isNotBlank(testUser.getName())) { return testUser; } // null 在 reactor 中是异常数据。 return null; }) .onErrorContinue((err, i) -> { log.info("onErrorContinue={}", i); }); }); Mono mono = flux.elementAt(0, Mono.just("")); Object block = mono.block(); System.out.println(block + "blockFirst 执行耗时ms:" + (System.currentTimeMillis() - startTime)); }

    • 1、Flux.fromIterable 执行 service 列表,可以随意增删 service 服务。
    • 2、flatMap(或者flatMapSequential) + map + onErrorContinue 实现异常继续下一个执行。具体参考:Reactor中的onErrorContinue和onErrorResume
    • 3、Mono mono = flux.elementAt(0, Mono.just("")); 返回第一个正常数据。

    执行输出:

    20:54:26.512 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
    20:54:26.553 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
    service1.threadName=main
    20:54:27.237 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
    20:54:27.237 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
    service5.threadName=main
    20:54:28.246 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
    20:54:28.246 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
    service6.threadName=main
    TestUser(name=name3)blockFirst 执行耗时ms:2895

    • 1、service1 和 service2 因为返回空,所以继续下一个,最终返回 name3。
    • 2、查看总耗时:2895ms。service1 耗时 500,service2 耗时1000,service3 耗时 1000。发现耗时基本上等于 service1 + service2 + service3 。这是怎么回事呢?查看返回执行的线程,都是 main。

    总结:这样实现按照顺序返回第一个正常数据。但是执行并没有异步。下一步:如何实现异步呢?

    4 实现异步

    4.1 subcribeOn 实现异步

    修改 service 实现。增加.subscribeOn(Schedulers.boundedElastic())

    如下:

    @Slf4j public class TestServiceImpl1 implements TestServiceI { @Override public Mono request() { log.info("execute.test.service1"); return Mono.fromSupplier(() -> { try { System.out.println("service1.threadName=" + Thread.currentThread().getName()); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } return ""; }) //增加subscribeOn .subscribeOn(Schedulers.boundedElastic()) .map(name -> { return new TestUser(name); }); } }

    再次执行输出如下:

    21:02:04.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
    21:02:04.265 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
    service4.threadName=boundedElastic-1
    21:02:04.300 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
    21:02:04.302 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
    service2.threadName=boundedElastic-2
    service3.threadName=boundedElastic-3
    21:02:04.987 [boundedElastic-1] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
    21:02:05.307 [boundedElastic-2] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
    TestUser(name=name6)blockFirst 执行耗时ms:1242

    • 1、发现具体实现 sleep 的线程都不是 main 线程,而是boundedElastic
    • 2、最终执行耗时 1242ms,只比执行时间最长的 service2 和 service3 耗时 1000ms,多一些。证明是异步了。

    4.2 CompletableFuture 实现异步

    修改 service 实现,使用 CompletableFuture 执行耗时操作(这里是sleep,具体到项目中可能是外部接口调用,DB 操作等);然后使用 Mono.fromFuture 返回 Mono 对象。

    @Slf4j public class TestServiceImpl1 implements TestServiceI{ @Override public Mono request() { log.info("execute.test.service1"); CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> { try { System.out.println("service1.threadName=" + Thread.currentThread().getName()); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } return "testname1"; }); return Mono.fromFuture(uCompletableFuture).map(name -> { return new TestUser(name); }); } }

    执行返回如下:

    21:09:59.465 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
    21:09:59.510 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
    service2.threadName=ForkJoinPool.commonPool-worker-1
    21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
    service3.threadName=ForkJoinPool.commonPool-worker-2
    21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
    service1.threadName=ForkJoinPool.commonPool-worker-3
    21:10:00.526 [ForkJoinPool.commonPool-worker-1] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
    21:10:00.538 [ForkJoinPool.commonPool-worker-2] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
    TestUser(name=testname1)blockFirst 执行耗时ms:1238

    What specific type of reactor are you referring to in your inquiry?

    • 1、耗时操作都是使用 ForkJoinPool 线程池中的线程执行。
    • 2、最终耗时和方法1基本差不多。

    到此这篇关于Reactor 多任务并发执行且结果按顺序返回第一个的文章就介绍到这了,更多相关Reactor 多任务执行内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!

    本文共计1690个文字,预计阅读时间需要7分钟。

    What specific type of reactor are you referring to in your inquiry?

    目录

    1.场景

    2.创建service

    2.1 创建基本接口和实体类 2.2 创建service实现

    3.实现主方法

    4.实现异步

    4.1 使用subcribeOn实现异步 4.2 使用CompletableFuture实现异步

    5.实现异步1

    5.1 场景 5.2 调用多个级别服务,按服务优先级调用

    目录
    • 1 场景
    • 2 创建 service
      • 2.1 创建基本接口和实体类
      • 2.2 创建 service 实现
    • 3 主体方法
      • 4 实现异步
        • 4.1 subcribeOn 实现异步
        • 4.2 CompletableFuture 实现异步

      1 场景

      调用多个平级服务,按照服务优先级返回第一个有效数据。

      具体case:一个页面可能有很多的弹窗,弹窗之间又有优先级。每次只需要返回第一个有数据的弹窗。但是又希望所有弹窗之间的数据获取是异步的。这种场景使用 Reactor 怎么实现呢?

      2 创建 service

      2.1 创建基本接口和实体类

      public interface TestServiceI { Mono request(); }

      提供一个 request 方法,返回一个 Mono 对象。

      @Data @ToString @AllArgsConstructor @NoArgsConstructor public class TestUser { private String name; }

      2.2 创建 service 实现

      @Slf4j public class TestServiceImpl1 implements TestServiceI { @Override public Mono request() { log.info("execute.test.service1"); return Mono.fromSupplier(() -> { try { System.out.println("service1.threadName=" + Thread.currentThread().getName()); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } return ""; }) .map(name -> { return new TestUser(name); }); } }

      第一个 service 执行耗时 500ms。返回空对象;

      创建第二个 service 执行耗时 1000ms。返回空对象;代码如上,改一下sleep时间即可。

      继续创建第三个 service 执行耗时 1000ms。返回 name3。代码如上,改一下 sleep 时间,以及返回为 name3。

      3 主体方法

      public static void main(String[] args) { long startTime = System.currentTimeMillis(); TestServiceI testServiceImpl4 = new TestServiceImpl4(); TestServiceI testServiceImpl5 = new TestServiceImpl5(); TestServiceI testServiceImpl6 = new TestServiceImpl6(); List<TestServiceI> serviceIList = new ArrayList<>(); serviceIList.add(testServiceImpl4); serviceIList.add(testServiceImpl5); serviceIList.add(testServiceImpl6); // 执行 service 列表,这样有多少个 service 都可以 Flux<Mono<TestUser>> monoFlux = Flux.fromIterable(serviceIList) .map(service -> { return service.request(); }); // flatMap(或者flatMapSequential) + map 实现异常继续下一个执行 Flux flux = monoFlux.flatMapSequential(mono -> { return mono.map(user -> { TestUser testUser = JsonUtil.parseJson(JsonUtil.toJson(user), TestUser.class); if (Objects.nonNull(testUser) && StringUtils.isNotBlank(testUser.getName())) { return testUser; } // null 在 reactor 中是异常数据。 return null; }) .onErrorContinue((err, i) -> { log.info("onErrorContinue={}", i); }); }); Mono mono = flux.elementAt(0, Mono.just("")); Object block = mono.block(); System.out.println(block + "blockFirst 执行耗时ms:" + (System.currentTimeMillis() - startTime)); }

      • 1、Flux.fromIterable 执行 service 列表,可以随意增删 service 服务。
      • 2、flatMap(或者flatMapSequential) + map + onErrorContinue 实现异常继续下一个执行。具体参考:Reactor中的onErrorContinue和onErrorResume
      • 3、Mono mono = flux.elementAt(0, Mono.just("")); 返回第一个正常数据。

      执行输出:

      20:54:26.512 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
      20:54:26.553 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
      service1.threadName=main
      20:54:27.237 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
      20:54:27.237 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
      service5.threadName=main
      20:54:28.246 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
      20:54:28.246 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
      service6.threadName=main
      TestUser(name=name3)blockFirst 执行耗时ms:2895

      • 1、service1 和 service2 因为返回空,所以继续下一个,最终返回 name3。
      • 2、查看总耗时:2895ms。service1 耗时 500,service2 耗时1000,service3 耗时 1000。发现耗时基本上等于 service1 + service2 + service3 。这是怎么回事呢?查看返回执行的线程,都是 main。

      总结:这样实现按照顺序返回第一个正常数据。但是执行并没有异步。下一步:如何实现异步呢?

      4 实现异步

      4.1 subcribeOn 实现异步

      修改 service 实现。增加.subscribeOn(Schedulers.boundedElastic())

      如下:

      @Slf4j public class TestServiceImpl1 implements TestServiceI { @Override public Mono request() { log.info("execute.test.service1"); return Mono.fromSupplier(() -> { try { System.out.println("service1.threadName=" + Thread.currentThread().getName()); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } return ""; }) //增加subscribeOn .subscribeOn(Schedulers.boundedElastic()) .map(name -> { return new TestUser(name); }); } }

      再次执行输出如下:

      21:02:04.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
      21:02:04.265 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
      service4.threadName=boundedElastic-1
      21:02:04.300 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
      21:02:04.302 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
      service2.threadName=boundedElastic-2
      service3.threadName=boundedElastic-3
      21:02:04.987 [boundedElastic-1] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
      21:02:05.307 [boundedElastic-2] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
      TestUser(name=name6)blockFirst 执行耗时ms:1242

      • 1、发现具体实现 sleep 的线程都不是 main 线程,而是boundedElastic
      • 2、最终执行耗时 1242ms,只比执行时间最长的 service2 和 service3 耗时 1000ms,多一些。证明是异步了。

      4.2 CompletableFuture 实现异步

      修改 service 实现,使用 CompletableFuture 执行耗时操作(这里是sleep,具体到项目中可能是外部接口调用,DB 操作等);然后使用 Mono.fromFuture 返回 Mono 对象。

      @Slf4j public class TestServiceImpl1 implements TestServiceI{ @Override public Mono request() { log.info("execute.test.service1"); CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> { try { System.out.println("service1.threadName=" + Thread.currentThread().getName()); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } return "testname1"; }); return Mono.fromFuture(uCompletableFuture).map(name -> { return new TestUser(name); }); } }

      执行返回如下:

      21:09:59.465 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
      21:09:59.510 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
      service2.threadName=ForkJoinPool.commonPool-worker-1
      21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
      service3.threadName=ForkJoinPool.commonPool-worker-2
      21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
      service1.threadName=ForkJoinPool.commonPool-worker-3
      21:10:00.526 [ForkJoinPool.commonPool-worker-1] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
      21:10:00.538 [ForkJoinPool.commonPool-worker-2] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
      TestUser(name=testname1)blockFirst 执行耗时ms:1238

      What specific type of reactor are you referring to in your inquiry?

      • 1、耗时操作都是使用 ForkJoinPool 线程池中的线程执行。
      • 2、最终耗时和方法1基本差不多。

      到此这篇关于Reactor 多任务并发执行且结果按顺序返回第一个的文章就介绍到这了,更多相关Reactor 多任务执行内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!