2

rxjava回调地狱-kotlin协程来帮忙 - 俞正东

 1 year ago
source link: https://www.cnblogs.com/yudongdong/p/16439318.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

rxjava回调地狱-kotlin协程来帮忙

472365-20220703100936908-1235624513.png

本文探讨的是在tomcat服务端接口编程中, 异步servlet场景下( 参考我另外一个文章),用rxjava来改造接口为全流程异步方式

好处不用说

  • tomcat的worker线程利用率大幅提高,接口的并发能力提升
  • 全流程无阻塞等待式(非例如像Future.get这种伪异步)
  • 业务逻辑处理上多个操作上无依赖的可以并发处理,接口性能大幅提高

但是缺点也没法逃避

  • 编码复杂度增加
  • 回调地狱,原来同步几十行代码可能要变成几百行代码
  • 难以调试,大部分代码都是以链式表达式的形式出现,出错了问题定位难

解决这些缺点,在其他语言上有

  • csharp/js 的 async await
  • go的 goroutine channel

实现上有的是语法层面,有的是语法糖(编译成状态机),抛开机制不同,他们都是为了解决了一个关键问题:

  • 它帮你去做复杂的线程切换
  • 让你像写同步代码一样去写异步代码

那么java咋办,作为同时jvm语言的kotlin的Coroutine(协程)可以帮到我们!

回到刚开头说的探讨场景,可能有人会觉得奇怪,如果用kotlin的话,有kotlin方式的服务端异步编程框架啊,比如ktor。或者spring webflux + kotlin suspend等 没错,建议都采用这种方式最好! 那在源头上就是非上面的,我们又如何利用kotlin的协程,是今天主要讨论的话题!

设定一个业务场景

image

image

这里举例下分销订单接口, 不同的分销商都得call一次,call完后还要根据结果来做别的操作(A和B)。 假设有5个分销商 因为每个分销商之间没有依赖,所以优化方式自然想到用rxjava来改造!

要想在tomcat容器里实现全流程异步, 那肯定是用异步servlet的方式,如上图所示,tomcat的nio线程调用业务接口返回ListenableFuture, 会调用addListener设定一个callback,在callback里面进行异步上下文的提交

//异步servlet标准式操作
final AsyncContext asyncContext = request.startAsync();
final ListenableFuture<?> responseFuture = distributorsOrder();//业务方法
responseFuture.addListener(() -> {
        try {
            // 略
        } catch (Throwable ex) {
            _logger.error("Execute async context error!", t);
        } finally {
            asyncContext.complete();
        }
    }, executorService);

用rxjava的实现方式(示意伪代码)

private Single<Optional<List<String>>> createByAsync(Detail orderItem) {
    List<Single<Optional<List<String>>>> singleOptList = new ArrayList<>();
    for (List<Distributor> distributor : distributorList) {
        Single<Optional<List<String>>> orderId = distributor
                .createOrderAsync(orderItem);
        singleOptList.add(orderId);
    }
    return Single.zip(singleOptList, objects -> {
        //回调处理略
        return Optional.of(result);
    });
}
    
Single<Optional<List<String>>> createDistributorOrderSingle = createByAsync(orderItem);
createDistributorOrderSingle.flatMap( (Function<Optional<List<String>>, SingleSource<List<ResultEntity>>>) objects -> { 
    Single<Optional<List<ActionAResult>>> actionASingle = getActionABySoaAsync(objects);
    Single<Optional<List<ActionBResult>>> actionASingle = getActionBBySoaAsync(objects);
    return Single.zip(actionASingle, actionASingle, (actionATypes, actionBTypes) -> {
        // 回调处理略
        return resultEntity;
    });
});

可能你第一次写完,尽管看起来很复杂,但是一看95线明显降低,是不是觉得还有点成就感呢, 后面业务变得复杂,继续叠加callback, 排查报错,一堆函数式链路,是不是觉得很难受。 好吧,这个项目重构代价太大了,那么后面你在写一个新业务的时候,你会还想要这么写吗? 有没有别的刚好的方式呢?

kotlin协程

一般我们都微服务化,基本上调用都是通过微服务框架方式调用,微服务框架层一般会提供代理类来封装。 那么我们就可以通过包装代理类来实现kotlin的协程调用方式(灵感来自retrofit)

在设计这个功能的时候,我首先会想,暴露出来的使用方式怎么样是友好的,包括写单元测试。 那就是面向接口封装


  interface SoaClientInterface {
        suspend fun soaMethod1(request: GetMethod1RequestType): GetMethod1ResponseType
  }

@RunWith(SpringRunner::class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
class SoaClientTest {
    @SoaClass
    private lateinit var soaClients: SoaClientInterface
    
    @Test
    fun test() = runBlocking {
        val resaponse = soaClients.soaMethod1(request)
    }
}

如上,我要调用的微服务方法 soaMethod1 (suspend方法) 我把他定义到一个interface里面,然后我在使用的时候只需要打上一个注解@SoaClass 在使用的时候就直接用就可以了。

这样一来, soaMethod1 原本是返回ListenableFuture 被我包装成一个代理类,代理类返回的是Coroutine 借助suspend语法糖,内部会帮我们自动切换上下文。

@SoaClass注解

是我自定义的spring BeanPostProcessor 处理标识, 在spring容器的流程中,会发掘打了这个注解的field并注入我自定义的接口实现类!

SoaClientFactory

我的接口实现类的目的是为了包装ListenableFuture为suspend的Coroutine方式调用

这里用jdk的proxy功能创建代理类,当调用代理类的任何方法,都会走到这里

public <T> T create(final Class<T> service, ISoaFactory soaFactory) throws Exception {
        validateServiceInterface(service, soaFactory);
        return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[]{service}, new InvocationHandler() {
            private final Object[] emptyArgs = new Object[0];

            @Override
            public @Nullable Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable {
                // If the method is a method from Object then defer to normal invocation.
                if (method.getDeclaringClass() == Object.class) {
                    return method.invoke(this, args);
                }
                args = args != null ? args : emptyArgs;
                return method.isDefault() ? 
                invokeDefaultMethod(method, service, proxy, args) :
                loadServiceMethod(method, soaFactory).invoke(args);
            }
        });
    }

代理接口定义的每个方法都会解析成一个SoaServiceMethod<?>,缓存起来下次调用

    
SoaServiceMethod<?> loadServiceMethod(Method method, ISoaFactory soaFactory) throws Exception {
        SoaServiceMethod<?> result = serviceMethodCache.get(method);
        if (result != null) {
            return result;
        }

        synchronized (serviceMethodCache) {
            result = serviceMethodCache.get(method);
            if (result == null) {
                result = SoaServiceMethod.parseAnnotations(method, soaFactory);
                serviceMethodCache.put(method, result);
            }
        }
        return result;
    }

每个方法需要去解析且拿到以下信息

  • 原本的调用的方法名称
  • 是否是kotlin的suspend方式

 SoaRequestFactory build() {
        int parameterCount = parameterAnnotationsArray.length;
        if (parameterCount > 2 || parameterCount < 1) {
            throw new IllegalArgumentException("Method request parameterCount invalid"
                    + "\n    for method "
                    + method.getDeclaringClass().getSimpleName()
                    + "."
                    + method.getName());
        }

        try {
            if (TypeUtils.getRawType(parameterTypes[parameterTypes.length - 1]) == Continuation.class) {
                isKotlinSuspendFunction = true;
            }
        } catch (NoClassDefFoundError ignored) {
            // Ignored
        }

        if (!isKotlinSuspendFunction && parameterCount > 1) {
            throw new IllegalArgumentException("Method request parameterCount invalid"
                    + "\n    for method "
                    + method.getDeclaringClass().getSimpleName()
                    + "."
                    + method.getName());
        }


        Type returnType = method.getGenericReturnType();
        if (hasUnresolvableType(returnType)) {
            throw new IllegalArgumentException(String.format("Method return type must not include a type variable or wildcard: %s", returnType)
                    + "\n    for method "
                    + method.getDeclaringClass().getSimpleName()
                    + "."
                    + method.getName());
        }
        if (returnType == void.class) {
            throw new IllegalArgumentException("Service methods cannot return void."
                    + "\n    for method "
                    + method.getDeclaringClass().getSimpleName()
                    + "."
                    + method.getName());
        }

        // 返回类型
        Type adapterType;
        if (isKotlinSuspendFunction) {

            adapterType =
                    TypeUtils.getParameterLowerBound(
                            0, (ParameterizedType) parameterTypes[parameterTypes.length - 1]);
            if (TypeUtils.getRawType(adapterType) == AsyncResult.class && adapterType instanceof ParameterizedType) {
                adapterType = TypeUtils.getParameterUpperBound(0, (ParameterizedType) adapterType);
                continuationWantsResponse = true;
            }

            continuationIsUnit = isUnit(adapterType);
        } else {
            adapterType = returnType;
        }
        this.requestType = method.getParameterTypes()[0];
        this.responseType = (Class<?>) adapterType;
        this.methodName = method.getName();
        return new SoaRequestFactory(this);
    }

如果是kotlin的suspend方式 那么需要在java里面直接调用kotlin写的扩展方法

@Override
Object invoke(Object[] args) {
    Continuation<ResponseT> continuation = (Continuation<ResponseT>) args[args.length - 1];
    try {
        return SoaExtendKotlinKt.await(soaClient, args[0], continuation);
    } catch (Exception e) {
        return SoaExtendKotlinKt.suspendAndThrow(e, continuation);
    }
}

这里是最核心的实现方式 ListenableFuture -> suspend func

suspend fun <T : Any, K : Any> SoaClient<T, K>.await(request: T): K? {
    return suspendCancellableCoroutine { continuation ->
        continuation.invokeOnCancellation {
            this.cancel()
        }
        Futures.addCallback(
            this.handleAsync(request),
            CatAsync.wrap(object : FutureCallback<K> {
                override fun onSuccess(result: K?) {
                    continuation.resume(result)
                }

                override fun onFailure(t: Throwable) {
                    continuation.resumeWithException(t)
                }
            }), ThreadPool.INSTANCE
        )
    }
}

只要思路定下来,技术细节实现就很简单了。 那么这么一包装,用的时候的好处怎么体现出来呢?我们把上面用rxjava的实现的伪代码换成kotlin方式的伪代码

interface SoaClientInterface {
    suspend fun createOrderAsync(request: CreateOrderRequestType): CreateOrderResponseType
}


@SoaClass
private lateinit var soaClients: SoaClientInterface

suspend func createDistributorsOrder(request:createRequestType)=coroutineScope{
    val channel = Channel<List<User>>()
    for (distributor in distributorList) {
        launch {
            // 并发调用
            
            val users = soaClients.createOrderAsync(CreateOrderRequestType().also{
                    it.orderItem = request.orderItem
                    it.distributorId = distributor.id
                })
                .also { log(repo, it) }
                .bodyList()
            channel.send(users)
        }
    }
    
    repeat(distributorList.size) {
        val rt = channel.receive()
        //处理其他 suspend 
    }
    
}

采用了协程Coroutine的方式解决了异步回调,如果有报错也非常清楚(归功于kotlin的Coroutine的功能强大) 其中最难的是依赖对方提供的方法返回的是ListenableFuture 如何包装成 suspend func 来达到整体的suspend一路到底的全链路异步方式~!

我是正东,追求高效率编程~

472365-20220703101003372-1973581605.png

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK