如何用Kotlin协程摆脱RxJava回调地狱的困扰?
- 内容介绍
- 文章标签
- 相关推荐
本文共计1989个文字,预计阅读时间需要8分钟。
在Tomcat服务端接口编程中,探讨的是在异步Servlet场景下,利用RxJava重构接口为全流程异步方式。这种做法有效提升了Tomcat的Worker线程利用率,极大增强了接口的并发处理能力。
本文探讨的是在tomcat服务端接口编程中, 异步servlet场景下( 参考我另外一个文章),用rxjava来改造接口为全流程异步方式
好处不用说
-
tomcat的worker线程利用率大幅提高,接口的并发能力提升 -
全流程无阻塞等待式(非例如像Future.get这种伪异步) -
业务逻辑处理上多个操作上无依赖的可以并发处理,接口性能大幅提高
但是缺点也没法逃避
-
编码复杂度增加 -
回调地狱,原来同步几十行代码可能要变成几百行代码 -
难以调试,大部分代码都是以链式表达式的形式出现,出错了问题定位难
解决这些缺点,在其他语言上有
-
csharp/js 的 async await -
go的 goroutine channel
实现上有的是语法层面,有的是语法糖(编译成状态机),抛开机制不同,他们都是为了解决了一个关键问题:
-
它帮你去做复杂的线程切换 -
让你像写同步代码一样去写异步代码
那么java咋办,作为同时jvm语言的kotlin的Coroutine(协程)可以帮到我们!
回到刚开头说的探讨场景,可能有人会觉得奇怪,如果用kotlin的话,有kotlin方式的服务端异步编程框架啊,比如ktor。或者spring webflux + kotlin suspend等 没错,建议都采用这种方式最好! 那在源头上就是非上面的,我们又如何利用kotlin的协程,是今天主要讨论的话题!
设定一个业务场景这里举例下分销订单接口, 不同的分销商都得call一次,call完后还要根据结果来做别的操作(A和B)。 假设有5个分销商 因为每个分销商之间没有依赖,所以优化方式自然想到用rxjava来改造!
要想在tomcat容器里实现全流程异步, 那肯定是用异步servlet的方式,如上图所示,tomcat的nio线程调用业务接口返回ListenableFuture, 会调用addListener设定一个callback,在callback里面进行异步上下文的提交
//异步servlet标准式操作
finalAsyncContextasyncContext=request.startAsync();
finalListenableFuture<?>responseFuture=distributorsOrder();//业务方法
responseFuture.addListener(()->{
try{
//略
}catch(Throwableex){
_logger.error("Executeasynccontexterror!",t);
}finally{
asyncContext.complete();
}
},executorService);
用rxjava的实现方式(示意伪代码)
privateSingle<Optional<List<String>>>createByAsync(DetailorderItem){
List<Single<Optional<List<String>>>>singleOptList=newArrayList<>();
for(List<Distributor>distributor:distributorList){
Single<Optional<List<String>>>orderId=distributor
.createOrderAsync(orderItem);
singleOptList.add(orderId);
}
returnSingle.zip(singleOptList,objects->{
//回调处理略
returnOptional.of(result);
});
}
Single<Optional<List<String>>>createDistributorOrderSingle=createByAsync(orderItem);
createDistributorOrderSingle.flatMap((Function<Optional<List<String>>,SingleSource<List<ResultEntity>>>)objects->{
Single<Optional<List<ActionAResult>>>actionASingle=getActionABySoaAsync(objects);
Single<Optional<List<ActionBResult>>>actionASingle=getActionBBySoaAsync(objects);
returnSingle.zip(actionASingle,actionASingle,(actionATypes,actionBTypes)->{
//回调处理略
returnresultEntity;
});
});
可能你第一次写完,尽管看起来很复杂,但是一看95线明显降低,是不是觉得还有点成就感呢, 后面业务变得复杂,继续叠加callback, 排查报错,一堆函数式链路,是不是觉得很难受。 好吧,这个项目重构代价太大了,那么后面你在写一个新业务的时候,你会还想要这么写吗? 有没有别的刚好的方式呢?
kotlin协程一般我们都微服务化,基本上调用都是通过微服务框架方式调用,微服务框架层一般会提供代理类来封装。 那么我们就可以通过包装代理类来实现kotlin的协程调用方式(灵感来自retrofit)
在设计这个功能的时候,我首先会想,暴露出来的使用方式怎么样是友好的,包括写单元测试。 那就是面向接口封装
interfaceSoaClientInterface{
suspendfunsoaMethod1(request:GetMethod1RequestType):GetMethod1ResponseType
}
@RunWith(SpringRunner::class)
@SpringBootTest(webEnvironment=SpringBootTest.WebEnvironment.NONE)
classSoaClientTest{
@SoaClass
privatelateinitvarsoaClients:SoaClientInterface
@Test
funtest()=runBlocking{
valresaponse=soaClients.soaMethod1(request)
}
}
如上,我要调用的微服务方法 soaMethod1 (suspend方法) 我把他定义到一个interface里面,然后我在使用的时候只需要打上一个注解@SoaClass 在使用的时候就直接用就可以了。
这样一来, soaMethod1 原本是返回ListenableFuture 被我包装成一个代理类,代理类返回的是Coroutine 借助suspend语法糖,内部会帮我们自动切换上下文。
实现思路 @SoaClass注解是我自定义的spring BeanPostProcessor 处理标识, 在spring容器的流程中,会发掘打了这个注解的field并注入我自定义的接口实现类!
SoaClientFactory我的接口实现类的目的是为了包装ListenableFuture为suspend的Coroutine方式调用
这里用jdk的proxy功能创建代理类,当调用代理类的任何方法,都会走到这里
public<T>Tcreate(finalClass<T>service,ISoaFactorysoaFactory)throwsException{
validateServiceInterface(service,soaFactory);
return(T)Proxy.newProxyInstance(service.getClassLoader(),newClass<?>[]{service},newInvocationHandler(){
privatefinalObject[]emptyArgs=newObject[0];
@Override
public@NullableObjectinvoke(Objectproxy,Methodmethod,@NullableObject[]args)throwsThrowable{
//IfthemethodisamethodfromObjectthendefertonormalinvocation.
if(method.getDeclaringClass()==Object.class){
returnmethod.invoke(this,args);
}
args=args!=null?args:emptyArgs;
returnmethod.isDefault()?
invokeDefaultMethod(method,service,proxy,args):
loadServiceMethod(method,soaFactory).invoke(args);
}
});
}
代理接口定义的每个方法都会解析成一个SoaServiceMethod<?>,缓存起来下次调用
SoaServiceMethod<?>loadServiceMethod(Methodmethod,ISoaFactorysoaFactory)throwsException{
SoaServiceMethod<?>result=serviceMethodCache.get(method);
if(result!=null){
returnresult;
}
synchronized(serviceMethodCache){
result=serviceMethodCache.get(method);
if(result==null){
result=SoaServiceMethod.parseAnnotations(method,soaFactory);
serviceMethodCache.put(method,result);
}
}
returnresult;
}
每个方法需要去解析且拿到以下信息
-
原本的调用的方法名称 -
请求类型 -
返回类型 -
是否是kotlin的suspend方式
SoaRequestFactorybuild(){
intparameterCount=parameterAnnotationsArray.length;
if(parameterCount>2||parameterCount<1){
thrownewIllegalArgumentException("MethodrequestparameterCountinvalid"
+"\nformethod"
+method.getDeclaringClass().getSimpleName()
+"."
+method.getName());
}
try{
if(TypeUtils.getRawType(parameterTypes[parameterTypes.length-1])==Continuation.class){
isKotlinSuspendFunction=true;
}
}catch(NoClassDefFoundErrorignored){
//Ignored
}
if(!isKotlinSuspendFunction&¶meterCount>1){
thrownewIllegalArgumentException("MethodrequestparameterCountinvalid"
+"\nformethod"
+method.getDeclaringClass().getSimpleName()
+"."
+method.getName());
}
TypereturnType=method.getGenericReturnType();
if(hasUnresolvableType(returnType)){
thrownewIllegalArgumentException(String.format("Methodreturntypemustnotincludeatypevariableorwildcard:%s",returnType)
+"\nformethod"
+method.getDeclaringClass().getSimpleName()
+"."
+method.getName());
}
if(returnType==void.class){
thrownewIllegalArgumentException("Servicemethodscannotreturnvoid."
+"\nformethod"
+method.getDeclaringClass().getSimpleName()
+"."
+method.getName());
}
//返回类型
TypeadapterType;
if(isKotlinSuspendFunction){
adapterType=
TypeUtils.getParameterLowerBound(
0,(ParameterizedType)parameterTypes[parameterTypes.length-1]);
if(TypeUtils.getRawType(adapterType)==AsyncResult.class&&adapterTypeinstanceofParameterizedType){
adapterType=TypeUtils.getParameterUpperBound(0,(ParameterizedType)adapterType);
continuationWantsResponse=true;
}
continuationIsUnit=isUnit(adapterType);
}else{
adapterType=returnType;
}
this.requestType=method.getParameterTypes()[0];
this.responseType=(Class<?>)adapterType;
this.methodName=method.getName();
returnnewSoaRequestFactory(this);
}
如果是kotlin的suspend方式 那么需要在java里面直接调用kotlin写的扩展方法
@Override
Objectinvoke(Object[]args){
Continuation<ResponseT>continuation=(Continuation<ResponseT>)args[args.length-1];
try{
returnSoaExtendKotlinKt.await(soaClient,args[0],continuation);
}catch(Exceptione){
returnSoaExtendKotlinKt.suspendAndThrow(e,continuation);
}
}
这里是最核心的实现方式 ListenableFuture -> suspend func
suspendfun<T:Any,K:Any>SoaClient<T,K>.await(request:T):K?{
returnsuspendCancellableCoroutine{continuation->
continuation.invokeOnCancellation{
this.cancel()
}
Futures.addCallback(
this.handleAsync(request),
CatAsync.wrap(object:FutureCallback<K>{
overridefunonSuccess(result:K?){
continuation.resume(result)
}
overridefunonFailure(t:Throwable){
continuation.resumeWithException(t)
}
}),ThreadPool.INSTANCE
)
}
}
只要思路定下来,技术细节实现就很简单了。 那么这么一包装,用的时候的好处怎么体现出来呢?我们把上面用rxjava的实现的伪代码换成kotlin方式的伪代码
interfaceSoaClientInterface{
suspendfuncreateOrderAsync(request:CreateOrderRequestType):CreateOrderResponseType
}
@SoaClass
privatelateinitvarsoaClients:SoaClientInterface
suspendfunccreateDistributorsOrder(request:createRequestType)=coroutineScope{
valchannel=Channel<List<User>>()
for(distributorindistributorList){
launch{
//并发调用
valusers=soaClients.createOrderAsync(CreateOrderRequestType().also{
it.orderItem=request.orderItem
it.distributorId=distributor.id
})
.also{log(repo,it)}
.bodyList()
channel.send(users)
}
}
repeat(distributorList.size){
valrt=channel.receive()
//处理其他suspend
}
}
采用了协程Coroutine的方式解决了异步回调,如果有报错也非常清楚(归功于kotlin的Coroutine的功能强大) 其中最难的是依赖对方提供的方法返回的是ListenableFuture 如何包装成 suspend func 来达到整体的suspend一路到底的全链路异步方式~!
我是正东,追求高效率编程~
如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!欢迎各位转载,转载文章之后须在文章页面明显位置给出作者和原文连接,谢谢。
本文共计1989个文字,预计阅读时间需要8分钟。
在Tomcat服务端接口编程中,探讨的是在异步Servlet场景下,利用RxJava重构接口为全流程异步方式。这种做法有效提升了Tomcat的Worker线程利用率,极大增强了接口的并发处理能力。
本文探讨的是在tomcat服务端接口编程中, 异步servlet场景下( 参考我另外一个文章),用rxjava来改造接口为全流程异步方式
好处不用说
-
tomcat的worker线程利用率大幅提高,接口的并发能力提升 -
全流程无阻塞等待式(非例如像Future.get这种伪异步) -
业务逻辑处理上多个操作上无依赖的可以并发处理,接口性能大幅提高
但是缺点也没法逃避
-
编码复杂度增加 -
回调地狱,原来同步几十行代码可能要变成几百行代码 -
难以调试,大部分代码都是以链式表达式的形式出现,出错了问题定位难
解决这些缺点,在其他语言上有
-
csharp/js 的 async await -
go的 goroutine channel
实现上有的是语法层面,有的是语法糖(编译成状态机),抛开机制不同,他们都是为了解决了一个关键问题:
-
它帮你去做复杂的线程切换 -
让你像写同步代码一样去写异步代码
那么java咋办,作为同时jvm语言的kotlin的Coroutine(协程)可以帮到我们!
回到刚开头说的探讨场景,可能有人会觉得奇怪,如果用kotlin的话,有kotlin方式的服务端异步编程框架啊,比如ktor。或者spring webflux + kotlin suspend等 没错,建议都采用这种方式最好! 那在源头上就是非上面的,我们又如何利用kotlin的协程,是今天主要讨论的话题!
设定一个业务场景这里举例下分销订单接口, 不同的分销商都得call一次,call完后还要根据结果来做别的操作(A和B)。 假设有5个分销商 因为每个分销商之间没有依赖,所以优化方式自然想到用rxjava来改造!
要想在tomcat容器里实现全流程异步, 那肯定是用异步servlet的方式,如上图所示,tomcat的nio线程调用业务接口返回ListenableFuture, 会调用addListener设定一个callback,在callback里面进行异步上下文的提交
//异步servlet标准式操作
finalAsyncContextasyncContext=request.startAsync();
finalListenableFuture<?>responseFuture=distributorsOrder();//业务方法
responseFuture.addListener(()->{
try{
//略
}catch(Throwableex){
_logger.error("Executeasynccontexterror!",t);
}finally{
asyncContext.complete();
}
},executorService);
用rxjava的实现方式(示意伪代码)
privateSingle<Optional<List<String>>>createByAsync(DetailorderItem){
List<Single<Optional<List<String>>>>singleOptList=newArrayList<>();
for(List<Distributor>distributor:distributorList){
Single<Optional<List<String>>>orderId=distributor
.createOrderAsync(orderItem);
singleOptList.add(orderId);
}
returnSingle.zip(singleOptList,objects->{
//回调处理略
returnOptional.of(result);
});
}
Single<Optional<List<String>>>createDistributorOrderSingle=createByAsync(orderItem);
createDistributorOrderSingle.flatMap((Function<Optional<List<String>>,SingleSource<List<ResultEntity>>>)objects->{
Single<Optional<List<ActionAResult>>>actionASingle=getActionABySoaAsync(objects);
Single<Optional<List<ActionBResult>>>actionASingle=getActionBBySoaAsync(objects);
returnSingle.zip(actionASingle,actionASingle,(actionATypes,actionBTypes)->{
//回调处理略
returnresultEntity;
});
});
可能你第一次写完,尽管看起来很复杂,但是一看95线明显降低,是不是觉得还有点成就感呢, 后面业务变得复杂,继续叠加callback, 排查报错,一堆函数式链路,是不是觉得很难受。 好吧,这个项目重构代价太大了,那么后面你在写一个新业务的时候,你会还想要这么写吗? 有没有别的刚好的方式呢?
kotlin协程一般我们都微服务化,基本上调用都是通过微服务框架方式调用,微服务框架层一般会提供代理类来封装。 那么我们就可以通过包装代理类来实现kotlin的协程调用方式(灵感来自retrofit)
在设计这个功能的时候,我首先会想,暴露出来的使用方式怎么样是友好的,包括写单元测试。 那就是面向接口封装
interfaceSoaClientInterface{
suspendfunsoaMethod1(request:GetMethod1RequestType):GetMethod1ResponseType
}
@RunWith(SpringRunner::class)
@SpringBootTest(webEnvironment=SpringBootTest.WebEnvironment.NONE)
classSoaClientTest{
@SoaClass
privatelateinitvarsoaClients:SoaClientInterface
@Test
funtest()=runBlocking{
valresaponse=soaClients.soaMethod1(request)
}
}
如上,我要调用的微服务方法 soaMethod1 (suspend方法) 我把他定义到一个interface里面,然后我在使用的时候只需要打上一个注解@SoaClass 在使用的时候就直接用就可以了。
这样一来, soaMethod1 原本是返回ListenableFuture 被我包装成一个代理类,代理类返回的是Coroutine 借助suspend语法糖,内部会帮我们自动切换上下文。
实现思路 @SoaClass注解是我自定义的spring BeanPostProcessor 处理标识, 在spring容器的流程中,会发掘打了这个注解的field并注入我自定义的接口实现类!
SoaClientFactory我的接口实现类的目的是为了包装ListenableFuture为suspend的Coroutine方式调用
这里用jdk的proxy功能创建代理类,当调用代理类的任何方法,都会走到这里
public<T>Tcreate(finalClass<T>service,ISoaFactorysoaFactory)throwsException{
validateServiceInterface(service,soaFactory);
return(T)Proxy.newProxyInstance(service.getClassLoader(),newClass<?>[]{service},newInvocationHandler(){
privatefinalObject[]emptyArgs=newObject[0];
@Override
public@NullableObjectinvoke(Objectproxy,Methodmethod,@NullableObject[]args)throwsThrowable{
//IfthemethodisamethodfromObjectthendefertonormalinvocation.
if(method.getDeclaringClass()==Object.class){
returnmethod.invoke(this,args);
}
args=args!=null?args:emptyArgs;
returnmethod.isDefault()?
invokeDefaultMethod(method,service,proxy,args):
loadServiceMethod(method,soaFactory).invoke(args);
}
});
}
代理接口定义的每个方法都会解析成一个SoaServiceMethod<?>,缓存起来下次调用
SoaServiceMethod<?>loadServiceMethod(Methodmethod,ISoaFactorysoaFactory)throwsException{
SoaServiceMethod<?>result=serviceMethodCache.get(method);
if(result!=null){
returnresult;
}
synchronized(serviceMethodCache){
result=serviceMethodCache.get(method);
if(result==null){
result=SoaServiceMethod.parseAnnotations(method,soaFactory);
serviceMethodCache.put(method,result);
}
}
returnresult;
}
每个方法需要去解析且拿到以下信息
-
原本的调用的方法名称 -
请求类型 -
返回类型 -
是否是kotlin的suspend方式
SoaRequestFactorybuild(){
intparameterCount=parameterAnnotationsArray.length;
if(parameterCount>2||parameterCount<1){
thrownewIllegalArgumentException("MethodrequestparameterCountinvalid"
+"\nformethod"
+method.getDeclaringClass().getSimpleName()
+"."
+method.getName());
}
try{
if(TypeUtils.getRawType(parameterTypes[parameterTypes.length-1])==Continuation.class){
isKotlinSuspendFunction=true;
}
}catch(NoClassDefFoundErrorignored){
//Ignored
}
if(!isKotlinSuspendFunction&¶meterCount>1){
thrownewIllegalArgumentException("MethodrequestparameterCountinvalid"
+"\nformethod"
+method.getDeclaringClass().getSimpleName()
+"."
+method.getName());
}
TypereturnType=method.getGenericReturnType();
if(hasUnresolvableType(returnType)){
thrownewIllegalArgumentException(String.format("Methodreturntypemustnotincludeatypevariableorwildcard:%s",returnType)
+"\nformethod"
+method.getDeclaringClass().getSimpleName()
+"."
+method.getName());
}
if(returnType==void.class){
thrownewIllegalArgumentException("Servicemethodscannotreturnvoid."
+"\nformethod"
+method.getDeclaringClass().getSimpleName()
+"."
+method.getName());
}
//返回类型
TypeadapterType;
if(isKotlinSuspendFunction){
adapterType=
TypeUtils.getParameterLowerBound(
0,(ParameterizedType)parameterTypes[parameterTypes.length-1]);
if(TypeUtils.getRawType(adapterType)==AsyncResult.class&&adapterTypeinstanceofParameterizedType){
adapterType=TypeUtils.getParameterUpperBound(0,(ParameterizedType)adapterType);
continuationWantsResponse=true;
}
continuationIsUnit=isUnit(adapterType);
}else{
adapterType=returnType;
}
this.requestType=method.getParameterTypes()[0];
this.responseType=(Class<?>)adapterType;
this.methodName=method.getName();
returnnewSoaRequestFactory(this);
}
如果是kotlin的suspend方式 那么需要在java里面直接调用kotlin写的扩展方法
@Override
Objectinvoke(Object[]args){
Continuation<ResponseT>continuation=(Continuation<ResponseT>)args[args.length-1];
try{
returnSoaExtendKotlinKt.await(soaClient,args[0],continuation);
}catch(Exceptione){
returnSoaExtendKotlinKt.suspendAndThrow(e,continuation);
}
}
这里是最核心的实现方式 ListenableFuture -> suspend func
suspendfun<T:Any,K:Any>SoaClient<T,K>.await(request:T):K?{
returnsuspendCancellableCoroutine{continuation->
continuation.invokeOnCancellation{
this.cancel()
}
Futures.addCallback(
this.handleAsync(request),
CatAsync.wrap(object:FutureCallback<K>{
overridefunonSuccess(result:K?){
continuation.resume(result)
}
overridefunonFailure(t:Throwable){
continuation.resumeWithException(t)
}
}),ThreadPool.INSTANCE
)
}
}
只要思路定下来,技术细节实现就很简单了。 那么这么一包装,用的时候的好处怎么体现出来呢?我们把上面用rxjava的实现的伪代码换成kotlin方式的伪代码
interfaceSoaClientInterface{
suspendfuncreateOrderAsync(request:CreateOrderRequestType):CreateOrderResponseType
}
@SoaClass
privatelateinitvarsoaClients:SoaClientInterface
suspendfunccreateDistributorsOrder(request:createRequestType)=coroutineScope{
valchannel=Channel<List<User>>()
for(distributorindistributorList){
launch{
//并发调用
valusers=soaClients.createOrderAsync(CreateOrderRequestType().also{
it.orderItem=request.orderItem
it.distributorId=distributor.id
})
.also{log(repo,it)}
.bodyList()
channel.send(users)
}
}
repeat(distributorList.size){
valrt=channel.receive()
//处理其他suspend
}
}
采用了协程Coroutine的方式解决了异步回调,如果有报错也非常清楚(归功于kotlin的Coroutine的功能强大) 其中最难的是依赖对方提供的方法返回的是ListenableFuture 如何包装成 suspend func 来达到整体的suspend一路到底的全链路异步方式~!
我是正东,追求高效率编程~
如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!欢迎各位转载,转载文章之后须在文章页面明显位置给出作者和原文连接,谢谢。

