如何将回调地狱中的层层嵌套改写为简洁流畅的长尾?

2026-04-11 11:561阅读0评论SEO资源
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2058个文字,预计阅读时间需要9分钟。

如何将回调地狱中的层层嵌套改写为简洁流畅的长尾?

原文:本探讨的是在Tomcat服务端接口编程中,异步Servlet场景下,参考我另一篇文章,用RxJava来改造接口为全流程异步方式,好处不言而喻,无需使用Tomcat的worker线程利用率大峰值提升,接口的并发能力提升。

改写后:探讨在Tomcat服务器端编程中,针对异步Servlet的使用场景,借鉴相关文章,运用RxJava实现接口的全异步流程优化。此方法优点显著,无需依赖Tomcat的worker线程,有效提高线程利用率及接口并发处理能力。

本文探讨的是在tomcat服务端接口编程中, 异步servlet场景下( 参考我另外一个文章),用rxjava来改造接口为全流程异步方式

好处不用说

  • tomcat的worker线程利用率大幅提高,接口的并发能力提升
  • 全流程无阻塞等待式(非例如像Future.get这种伪异步)
  • 业务逻辑处理上多个操作上无依赖的可以并发处理,接口性能大幅提高

但是缺点也没法逃避

  • 编码复杂度增加
  • 回调地狱,原来同步几十行代码可能要变成几百行代码
  • 难以调试,大部分代码都是以链式表达式的形式出现,出错了问题定位难

解决这些缺点,在其他语言上有

  • csharp/js 的 async await
  • go的 goroutine channel

实现上有的是语法层面,有的是语法糖(编译成状态机),抛开机制不同,他们都是为了解决了一个关键问题:

  • 它帮你去做复杂的线程切换
  • 让你像写同步代码一样去写异步代码

那么java咋办,作为同时jvm语言的kotlin的Coroutine(协程)可以帮到我们!

回到刚开头说的探讨场景,可能有人会觉得奇怪,如果用kotlin的话,有kotlin方式的服务端异步编程框架啊,比如ktor。或者spring webflux + kotlin suspend等 没错,建议都采用这种方式最好! 那在源头上就是非上面的,我们又如何利用kotlin的协程,是今天主要讨论的话题!

设定一个业务场景
image

这里举例下分销订单接口, 不同的分销商都得call一次,call完后还要根据结果来做别的操作(A和B)。 假设有5个分销商 因为每个分销商之间没有依赖,所以优化方式自然想到用rxjava来改造!

要想在tomcat容器里实现全流程异步, 那肯定是用异步servlet的方式,如上图所示,tomcat的nio线程调用业务接口返回ListenableFuture, 会调用addListener设定一个callback,在callback里面进行异步上下文的提交

//异步servlet标准式操作 finalAsyncContextasyncContext=request.startAsync(); finalListenableFuture<?>responseFuture=distributorsOrder();//业务方法 responseFuture.addListener(()->{ try{ //略 }catch(Throwableex){ _logger.error("Executeasynccontexterror!",t); }finally{ asyncContext.complete(); } },executorService);

用rxjava的实现方式(示意伪代码)

privateSingle<Optional<List<String>>>createByAsync(DetailorderItem){ List<Single<Optional<List<String>>>>singleOptList=newArrayList<>(); for(List<Distributor>distributor:distributorList){ Single<Optional<List<String>>>orderId=distributor .createOrderAsync(orderItem); singleOptList.add(orderId); } returnSingle.zip(singleOptList,objects->{ //回调处理略 returnOptional.of(result); }); } Single<Optional<List<String>>>createDistributorOrderSingle=createByAsync(orderItem); createDistributorOrderSingle.flatMap((Function<Optional<List<String>>,SingleSource<List<ResultEntity>>>)objects->{ Single<Optional<List<ActionAResult>>>actionASingle=getActionABySoaAsync(objects); Single<Optional<List<ActionBResult>>>actionASingle=getActionBBySoaAsync(objects); returnSingle.zip(actionASingle,actionASingle,(actionATypes,actionBTypes)->{ //回调处理略 returnresultEntity; }); });

可能你第一次写完,尽管看起来很复杂,但是一看95线明显降低,是不是觉得还有点成就感呢, 后面业务变得复杂,继续叠加callback, 排查报错,一堆函数式链路,是不是觉得很难受。 好吧,这个项目重构代价太大了,那么后面你在写一个新业务的时候,你会还想要这么写吗? 有没有别的刚好的方式呢?

kotlin协程

一般我们都微服务化,基本上调用都是通过微服务框架方式调用,微服务框架层一般会提供代理类来封装。 那么我们就可以通过包装代理类来实现kotlin的协程调用方式(灵感来自retrofit)

在设计这个功能的时候,我首先会想,暴露出来的使用方式怎么样是友好的,包括写单元测试。 那就是面向接口封装

interfaceSoaClientInterface{ suspendfunsoaMethod1(request:GetMethod1RequestType):GetMethod1ResponseType } @RunWith(SpringRunner::class) @SpringBootTest(webEnvironment=SpringBootTest.WebEnvironment.NONE) classSoaClientTest{ @SoaClass privatelateinitvarsoaClients:SoaClientInterface @Test funtest()=runBlocking{ valresaponse=soaClients.soaMethod1(request) } }

如上,我要调用的微服务方法 soaMethod1 (suspend方法) 我把他定义到一个interface里面,然后我在使用的时候只需要打上一个注解@SoaClass 在使用的时候就直接用就可以了。

这样一来, soaMethod1 原本是返回ListenableFuture 被我包装成一个代理类,代理类返回的是Coroutine 借助suspend语法糖,内部会帮我们自动切换上下文。

实现思路 @SoaClass注解

是我自定义的spring BeanPostProcessor 处理标识, 在spring容器的流程中,会发掘打了这个注解的field并注入我自定义的接口实现类!

SoaClientFactory

我的接口实现类的目的是为了包装ListenableFuture为suspend的Coroutine方式调用

这里用jdk的proxy功能创建代理类,当调用代理类的任何方法,都会走到这里

public<T>Tcreate(finalClass<T>service,ISoaFactorysoaFactory)throwsException{ validateServiceInterface(service,soaFactory); return(T)Proxy.newProxyInstance(service.getClassLoader(),newClass<?>[]{service},newInvocationHandler(){ privatefinalObject[]emptyArgs=newObject[0]; @Override public@NullableObjectinvoke(Objectproxy,Methodmethod,@NullableObject[]args)throwsThrowable{ //IfthemethodisamethodfromObjectthendefertonormalinvocation. if(method.getDeclaringClass()==Object.class){ returnmethod.invoke(this,args); } args=args!=null?args:emptyArgs; returnmethod.isDefault()? invokeDefaultMethod(method,service,proxy,args): loadServiceMethod(method,soaFactory).invoke(args); } }); }

代理接口定义的每个方法都会解析成一个SoaServiceMethod<?>,缓存起来下次调用

SoaServiceMethod<?>loadServiceMethod(Methodmethod,ISoaFactorysoaFactory)throwsException{ SoaServiceMethod<?>result=serviceMethodCache.get(method); if(result!=null){ returnresult; } synchronized(serviceMethodCache){ result=serviceMethodCache.get(method); if(result==null){ result=SoaServiceMethod.parseAnnotations(method,soaFactory); serviceMethodCache.put(method,result); } } returnresult; }

每个方法需要去解析且拿到以下信息

如何将回调地狱中的层层嵌套改写为简洁流畅的长尾?

  • 原本的调用的方法名称
  • 请求类型
  • 返回类型
  • 是否是kotlin的suspend方式

SoaRequestFactorybuild(){ intparameterCount=parameterAnnotationsArray.length; if(parameterCount>2||parameterCount<1){ thrownewIllegalArgumentException("MethodrequestparameterCountinvalid" +"\nformethod" +method.getDeclaringClass().getSimpleName() +"." +method.getName()); } try{ if(TypeUtils.getRawType(parameterTypes[parameterTypes.length-1])==Continuation.class){ isKotlinSuspendFunction=true; } }catch(NoClassDefFoundErrorignored){ //Ignored } if(!isKotlinSuspendFunction&&parameterCount>1){ thrownewIllegalArgumentException("MethodrequestparameterCountinvalid" +"\nformethod" +method.getDeclaringClass().getSimpleName() +"." +method.getName()); } TypereturnType=method.getGenericReturnType(); if(hasUnresolvableType(returnType)){ thrownewIllegalArgumentException(String.format("Methodreturntypemustnotincludeatypevariableorwildcard:%s",returnType) +"\nformethod" +method.getDeclaringClass().getSimpleName() +"." +method.getName()); } if(returnType==void.class){ thrownewIllegalArgumentException("Servicemethodscannotreturnvoid." +"\nformethod" +method.getDeclaringClass().getSimpleName() +"." +method.getName()); } //返回类型 TypeadapterType; if(isKotlinSuspendFunction){ adapterType= TypeUtils.getParameterLowerBound( 0,(ParameterizedType)parameterTypes[parameterTypes.length-1]); if(TypeUtils.getRawType(adapterType)==AsyncResult.class&&adapterTypeinstanceofParameterizedType){ adapterType=TypeUtils.getParameterUpperBound(0,(ParameterizedType)adapterType); continuationWantsResponse=true; } continuationIsUnit=isUnit(adapterType); }else{ adapterType=returnType; } this.requestType=method.getParameterTypes()[0]; this.responseType=(Class<?>)adapterType; this.methodName=method.getName(); returnnewSoaRequestFactory(this); }

如果是kotlin的suspend方式 那么需要在java里面直接调用kotlin写的扩展方法

@Override Objectinvoke(Object[]args){ Continuation<ResponseT>continuation=(Continuation<ResponseT>)args[args.length-1]; try{ returnSoaExtendKotlinKt.await(soaClient,args[0],continuation); }catch(Exceptione){ returnSoaExtendKotlinKt.suspendAndThrow(e,continuation); } }

这里是最核心的实现方式 ListenableFuture -> suspend func

suspendfun<T:Any,K:Any>SoaClient<T,K>.await(request:T):K?{ returnsuspendCancellableCoroutine{continuation-> continuation.invokeOnCancellation{ this.cancel() } Futures.addCallback( this.handleAsync(request), CatAsync.wrap(object:FutureCallback<K>{ overridefunonSuccess(result:K?){ continuation.resume(result) } overridefunonFailure(t:Throwable){ continuation.resumeWithException(t) } }),ThreadPool.INSTANCE ) } }

只要思路定下来,技术细节实现就很简单了。 那么这么一包装,用的时候的好处怎么体现出来呢?我们把上面用rxjava的实现的伪代码换成kotlin方式的伪代码

interfaceSoaClientInterface{ suspendfuncreateOrderAsync(request:CreateOrderRequestType):CreateOrderResponseType } @SoaClass privatelateinitvarsoaClients:SoaClientInterface suspendfunccreateDistributorsOrder(request:createRequestType)=coroutineScope{ valchannel=Channel<List<User>>() for(distributorindistributorList){ launch{ //并发调用 valusers=soaClients.createOrderAsync(CreateOrderRequestType().also{ it.orderItem=request.orderItem it.distributorId=distributor.id }) .also{log(repo,it)} .bodyList() channel.send(users) } } repeat(distributorList.size){ valrt=channel.receive() //处理其他suspend } }

采用了协程Coroutine的方式解决了异步回调,如果有报错也非常清楚(归功于kotlin的Coroutine的功能强大) 其中最难的是依赖对方提供的方法返回的是ListenableFuture 如何包装成 suspend func 来达到整体的suspend一路到底的全链路异步方式~!


我是正东,追求高效率编程~


如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!欢迎各位转载,转载文章之后须在文章页面明显位置给出作者和原文连接,谢谢。
标签:Tomcat

本文共计2058个文字,预计阅读时间需要9分钟。

如何将回调地狱中的层层嵌套改写为简洁流畅的长尾?

原文:本探讨的是在Tomcat服务端接口编程中,异步Servlet场景下,参考我另一篇文章,用RxJava来改造接口为全流程异步方式,好处不言而喻,无需使用Tomcat的worker线程利用率大峰值提升,接口的并发能力提升。

改写后:探讨在Tomcat服务器端编程中,针对异步Servlet的使用场景,借鉴相关文章,运用RxJava实现接口的全异步流程优化。此方法优点显著,无需依赖Tomcat的worker线程,有效提高线程利用率及接口并发处理能力。

本文探讨的是在tomcat服务端接口编程中, 异步servlet场景下( 参考我另外一个文章),用rxjava来改造接口为全流程异步方式

好处不用说

  • tomcat的worker线程利用率大幅提高,接口的并发能力提升
  • 全流程无阻塞等待式(非例如像Future.get这种伪异步)
  • 业务逻辑处理上多个操作上无依赖的可以并发处理,接口性能大幅提高

但是缺点也没法逃避

  • 编码复杂度增加
  • 回调地狱,原来同步几十行代码可能要变成几百行代码
  • 难以调试,大部分代码都是以链式表达式的形式出现,出错了问题定位难

解决这些缺点,在其他语言上有

  • csharp/js 的 async await
  • go的 goroutine channel

实现上有的是语法层面,有的是语法糖(编译成状态机),抛开机制不同,他们都是为了解决了一个关键问题:

  • 它帮你去做复杂的线程切换
  • 让你像写同步代码一样去写异步代码

那么java咋办,作为同时jvm语言的kotlin的Coroutine(协程)可以帮到我们!

回到刚开头说的探讨场景,可能有人会觉得奇怪,如果用kotlin的话,有kotlin方式的服务端异步编程框架啊,比如ktor。或者spring webflux + kotlin suspend等 没错,建议都采用这种方式最好! 那在源头上就是非上面的,我们又如何利用kotlin的协程,是今天主要讨论的话题!

设定一个业务场景
image

这里举例下分销订单接口, 不同的分销商都得call一次,call完后还要根据结果来做别的操作(A和B)。 假设有5个分销商 因为每个分销商之间没有依赖,所以优化方式自然想到用rxjava来改造!

要想在tomcat容器里实现全流程异步, 那肯定是用异步servlet的方式,如上图所示,tomcat的nio线程调用业务接口返回ListenableFuture, 会调用addListener设定一个callback,在callback里面进行异步上下文的提交

//异步servlet标准式操作 finalAsyncContextasyncContext=request.startAsync(); finalListenableFuture<?>responseFuture=distributorsOrder();//业务方法 responseFuture.addListener(()->{ try{ //略 }catch(Throwableex){ _logger.error("Executeasynccontexterror!",t); }finally{ asyncContext.complete(); } },executorService);

用rxjava的实现方式(示意伪代码)

privateSingle<Optional<List<String>>>createByAsync(DetailorderItem){ List<Single<Optional<List<String>>>>singleOptList=newArrayList<>(); for(List<Distributor>distributor:distributorList){ Single<Optional<List<String>>>orderId=distributor .createOrderAsync(orderItem); singleOptList.add(orderId); } returnSingle.zip(singleOptList,objects->{ //回调处理略 returnOptional.of(result); }); } Single<Optional<List<String>>>createDistributorOrderSingle=createByAsync(orderItem); createDistributorOrderSingle.flatMap((Function<Optional<List<String>>,SingleSource<List<ResultEntity>>>)objects->{ Single<Optional<List<ActionAResult>>>actionASingle=getActionABySoaAsync(objects); Single<Optional<List<ActionBResult>>>actionASingle=getActionBBySoaAsync(objects); returnSingle.zip(actionASingle,actionASingle,(actionATypes,actionBTypes)->{ //回调处理略 returnresultEntity; }); });

可能你第一次写完,尽管看起来很复杂,但是一看95线明显降低,是不是觉得还有点成就感呢, 后面业务变得复杂,继续叠加callback, 排查报错,一堆函数式链路,是不是觉得很难受。 好吧,这个项目重构代价太大了,那么后面你在写一个新业务的时候,你会还想要这么写吗? 有没有别的刚好的方式呢?

kotlin协程

一般我们都微服务化,基本上调用都是通过微服务框架方式调用,微服务框架层一般会提供代理类来封装。 那么我们就可以通过包装代理类来实现kotlin的协程调用方式(灵感来自retrofit)

在设计这个功能的时候,我首先会想,暴露出来的使用方式怎么样是友好的,包括写单元测试。 那就是面向接口封装

interfaceSoaClientInterface{ suspendfunsoaMethod1(request:GetMethod1RequestType):GetMethod1ResponseType } @RunWith(SpringRunner::class) @SpringBootTest(webEnvironment=SpringBootTest.WebEnvironment.NONE) classSoaClientTest{ @SoaClass privatelateinitvarsoaClients:SoaClientInterface @Test funtest()=runBlocking{ valresaponse=soaClients.soaMethod1(request) } }

如上,我要调用的微服务方法 soaMethod1 (suspend方法) 我把他定义到一个interface里面,然后我在使用的时候只需要打上一个注解@SoaClass 在使用的时候就直接用就可以了。

这样一来, soaMethod1 原本是返回ListenableFuture 被我包装成一个代理类,代理类返回的是Coroutine 借助suspend语法糖,内部会帮我们自动切换上下文。

实现思路 @SoaClass注解

是我自定义的spring BeanPostProcessor 处理标识, 在spring容器的流程中,会发掘打了这个注解的field并注入我自定义的接口实现类!

SoaClientFactory

我的接口实现类的目的是为了包装ListenableFuture为suspend的Coroutine方式调用

这里用jdk的proxy功能创建代理类,当调用代理类的任何方法,都会走到这里

public<T>Tcreate(finalClass<T>service,ISoaFactorysoaFactory)throwsException{ validateServiceInterface(service,soaFactory); return(T)Proxy.newProxyInstance(service.getClassLoader(),newClass<?>[]{service},newInvocationHandler(){ privatefinalObject[]emptyArgs=newObject[0]; @Override public@NullableObjectinvoke(Objectproxy,Methodmethod,@NullableObject[]args)throwsThrowable{ //IfthemethodisamethodfromObjectthendefertonormalinvocation. if(method.getDeclaringClass()==Object.class){ returnmethod.invoke(this,args); } args=args!=null?args:emptyArgs; returnmethod.isDefault()? invokeDefaultMethod(method,service,proxy,args): loadServiceMethod(method,soaFactory).invoke(args); } }); }

代理接口定义的每个方法都会解析成一个SoaServiceMethod<?>,缓存起来下次调用

SoaServiceMethod<?>loadServiceMethod(Methodmethod,ISoaFactorysoaFactory)throwsException{ SoaServiceMethod<?>result=serviceMethodCache.get(method); if(result!=null){ returnresult; } synchronized(serviceMethodCache){ result=serviceMethodCache.get(method); if(result==null){ result=SoaServiceMethod.parseAnnotations(method,soaFactory); serviceMethodCache.put(method,result); } } returnresult; }

每个方法需要去解析且拿到以下信息

如何将回调地狱中的层层嵌套改写为简洁流畅的长尾?

  • 原本的调用的方法名称
  • 请求类型
  • 返回类型
  • 是否是kotlin的suspend方式

SoaRequestFactorybuild(){ intparameterCount=parameterAnnotationsArray.length; if(parameterCount>2||parameterCount<1){ thrownewIllegalArgumentException("MethodrequestparameterCountinvalid" +"\nformethod" +method.getDeclaringClass().getSimpleName() +"." +method.getName()); } try{ if(TypeUtils.getRawType(parameterTypes[parameterTypes.length-1])==Continuation.class){ isKotlinSuspendFunction=true; } }catch(NoClassDefFoundErrorignored){ //Ignored } if(!isKotlinSuspendFunction&&parameterCount>1){ thrownewIllegalArgumentException("MethodrequestparameterCountinvalid" +"\nformethod" +method.getDeclaringClass().getSimpleName() +"." +method.getName()); } TypereturnType=method.getGenericReturnType(); if(hasUnresolvableType(returnType)){ thrownewIllegalArgumentException(String.format("Methodreturntypemustnotincludeatypevariableorwildcard:%s",returnType) +"\nformethod" +method.getDeclaringClass().getSimpleName() +"." +method.getName()); } if(returnType==void.class){ thrownewIllegalArgumentException("Servicemethodscannotreturnvoid." +"\nformethod" +method.getDeclaringClass().getSimpleName() +"." +method.getName()); } //返回类型 TypeadapterType; if(isKotlinSuspendFunction){ adapterType= TypeUtils.getParameterLowerBound( 0,(ParameterizedType)parameterTypes[parameterTypes.length-1]); if(TypeUtils.getRawType(adapterType)==AsyncResult.class&&adapterTypeinstanceofParameterizedType){ adapterType=TypeUtils.getParameterUpperBound(0,(ParameterizedType)adapterType); continuationWantsResponse=true; } continuationIsUnit=isUnit(adapterType); }else{ adapterType=returnType; } this.requestType=method.getParameterTypes()[0]; this.responseType=(Class<?>)adapterType; this.methodName=method.getName(); returnnewSoaRequestFactory(this); }

如果是kotlin的suspend方式 那么需要在java里面直接调用kotlin写的扩展方法

@Override Objectinvoke(Object[]args){ Continuation<ResponseT>continuation=(Continuation<ResponseT>)args[args.length-1]; try{ returnSoaExtendKotlinKt.await(soaClient,args[0],continuation); }catch(Exceptione){ returnSoaExtendKotlinKt.suspendAndThrow(e,continuation); } }

这里是最核心的实现方式 ListenableFuture -> suspend func

suspendfun<T:Any,K:Any>SoaClient<T,K>.await(request:T):K?{ returnsuspendCancellableCoroutine{continuation-> continuation.invokeOnCancellation{ this.cancel() } Futures.addCallback( this.handleAsync(request), CatAsync.wrap(object:FutureCallback<K>{ overridefunonSuccess(result:K?){ continuation.resume(result) } overridefunonFailure(t:Throwable){ continuation.resumeWithException(t) } }),ThreadPool.INSTANCE ) } }

只要思路定下来,技术细节实现就很简单了。 那么这么一包装,用的时候的好处怎么体现出来呢?我们把上面用rxjava的实现的伪代码换成kotlin方式的伪代码

interfaceSoaClientInterface{ suspendfuncreateOrderAsync(request:CreateOrderRequestType):CreateOrderResponseType } @SoaClass privatelateinitvarsoaClients:SoaClientInterface suspendfunccreateDistributorsOrder(request:createRequestType)=coroutineScope{ valchannel=Channel<List<User>>() for(distributorindistributorList){ launch{ //并发调用 valusers=soaClients.createOrderAsync(CreateOrderRequestType().also{ it.orderItem=request.orderItem it.distributorId=distributor.id }) .also{log(repo,it)} .bodyList() channel.send(users) } } repeat(distributorList.size){ valrt=channel.receive() //处理其他suspend } }

采用了协程Coroutine的方式解决了异步回调,如果有报错也非常清楚(归功于kotlin的Coroutine的功能强大) 其中最难的是依赖对方提供的方法返回的是ListenableFuture 如何包装成 suspend func 来达到整体的suspend一路到底的全链路异步方式~!


我是正东,追求高效率编程~


如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!欢迎各位转载,转载文章之后须在文章页面明显位置给出作者和原文连接,谢谢。
标签:Tomcat