4

(WebFlux)004、WebFilter踩坑记录 - 编号94530

 2 years ago
source link: https://www.cnblogs.com/lifacheng/p/16739743.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

使用SpringWebFlux的WebFilter时,由于不熟悉或一些思考疏忽,容易出现未知的异常。记录一下排查与解决方案,给大家分享一下。

2.1 问题描述

在测试接口方法时,出现的错误信息如下(对一些项目路径做了修改):

java.lang.IllegalStateException: COMPLETED
	at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:451)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ com.xxx.config.LoginWebFilter$$EnhancerBySpringCGLIB$$f3da6bdf [DefaultWebFilterChain]
	*__checkpoint ⇢ com.xxx.config.TraceIdFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ HTTP POST "/abc/test/testMethod" [ExceptionHandlingWebHandler]
Original Stack Trace:
		at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:451)
		at org.springframework.http.server.reactive.AbstractListenerReadPublisher.subscribe(AbstractListenerReadPublisher.java:105)

2.2 解决问题

通过查看错误信息描述,checkpoint点都在webfilter中,由于对webflux也不是特别熟,所以就只有一个个测试。

通过一系列操作, 把swagger移除,细读TraceIdFilter(内容不多),主要归功于原方案是正确的,修改后错误,最后才定位问题出现在LoginWebFilter。

说说插曲,原实现方式(有阻塞逻辑,没出现上述异常),代码如下:

@Configuration
@Slf4j
@Order(-10)
public class LoginWebFilter implements WebFilter {
    // 略...

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();

        if (!enableGateway) {
            String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN))
                    .orElse("");
            // 获取用户信息
            User user = getUser(token);
            if (user != null) {
                ServerHttpRequest mutateRequest = exchange.getRequest().mutate()
                        .build();
                exchange = exchange.mutate().request(mutateRequest).build();
            }
        }
        return chain.filter(exchange);
    }

    private User getUser(String token) {
        if (StringUtils.isNotBlank(token)) {
            return redisTemplate.opsForValue().get("xxx:tk:" + token)
                    .flatMap(str -> Mono.justOrEmpty(JsonUtils.toObj(str, User.class))).block();
        }
        return null;
    }
}

这样写,没有复杂的业务逻辑,从上到下,完全OJBK,但是调整后,就出现了上述异常。

改完后的问题代码如下:

// 错误
public class LoginWebFilter implements WebFilter {
	/...略
    @Autowired
    private ReactiveStringRedisTemplate redisTemplate;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

        if (!enableGateway) {
            ServerHttpRequest request = exchange.getRequest();
            String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN))
                    .orElse("");

            return getUser(token).flatMap(user -> {
                ServerHttpRequest mutateRequest = exchange.getRequest().mutate()
                        .header(UserUtils.MEMBER_ID, user.getMemId())
                        .header(UserUtils.MOBILE, user.getMobile())
                        .build();

                ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build();
                return chain.filter(newexchange);
               // 问题点 
            }).switchIfEmpty(chain.filter(exchange));
        }
        return chain.filter(exchange);
    }
	// 不在用block
    private Mono<User> getUser(String token) {
        if (StringUtils.isNotBlank(token)) {
            return redisTemplate.opsForValue().get("xxx:tk:" + token)
                    .flatMap(str -> Mono.justOrEmpty(JsonUtils.toObj(str, User.class)));
        }
        return Mono.empty();
    }
}

2.3 如何解决

对比改造前和改造后的代码,其实差异不大,那问题出现在哪呢?

由于对webflux也不是特别熟,那就只能一点点试(太蠢了)。 最后发现问题出现在了switchIfEmpty(chain.filter(exchange)),在去掉了switchIfEmpty(chain.filter(exchange)),就不会在出现上述异常。

修改后部分代码如下:

// 半正确
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

    if (!enableGateway) {

        ServerHttpRequest request = exchange.getRequest();
        String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN))
            .orElse(“”);

        return getUser(token).flatMap(user -> {
            ServerHttpRequest mutateRequest = exchange.getRequest().mutate()
                .header(UserUtils.MEMBER_ID, user.getMemId())
                .header(UserUtils.MOBILE, user.getMobile())
                .build();

            ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build();
            return chain.filter(newexchange);
        });
    }
    return chain.filter(exchange);
}

虽然现在不回在出现异常,但是去掉switchIfEmpty后,代码逻辑是不完整的,当获取不到User时,返回Mono.emtpy,那会直接结束流程,不在执行剩下的filter或其他逻辑。真是连环坑,一坑接一坑。所以对代码需要调整一番,调整后如下:

// 有点正确 但是不多
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

    if (!enableGateway) {

        ServerHttpRequest request = exchange.getRequest();
        String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN))
            .orElse(“”);

        return getUser(token).switchIfEmpty(Mono.error(() -> new BizException(ErrorCode.USER_IS_NULL_ERROR)))
            .flatMap(user -> {
                ServerHttpRequest mutateRequest = exchange.getRequest().mutate()
                    .header(UserUtils.MEMBER_ID, user.getMemId())
                    .header(UserUtils.MOBILE, user.getMobile())
                    .build();

                ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build();
                return chain.filter(newexchange);
            }).onErrorResume(e -> chain.filter(exchange));

    }
    return chain.filter(exchange);
}

当获取用户为空后,抛出异常,然后在兜底,当异常的时候执行chain.filter(exchange)(好蠢的方式.. 但是解决问题了)。

2.4 意外之喜

各位看官,就在我写完上完上面的代码修改方案之后,读了一下修改完后的代码,突然发现问题出在哪了,所以连夜修改了代码方式。现在我听我细细道来。

2.4.1 问题点

原因点chain.filter(exchange)重复执行

switchIfEmpty(chain.filter(exchange))这个点本意是想用在当getUser 方法为空时,执行其它WebFilter的逻辑,从而不影响主流程。

忽略了一点是:当chain.filter(newexchange)这个方法执行完后,返回的也是Mono<Void>,也是为空。所以无论如何,代码最后的逻辑都会走到switchIfEmpty(chain.filter(exchange))

但是当getUser获取到用户后,会重复执行chain.filter(exchange),如下

  • return chain.filter(newexchange)
  • switchIfEmpty(chain.filter(exchange))

由于第一次执行完chain.filter(exchange),request、response都已经关闭,所以出现了xx COMPLETE,那看来的确符合逻辑。

2.4.2 验证猜想

这个验证方式还是挺简单的,那就是分别传入正常的TOKEN和错误的TOKEN。

具体操作:.....(本人已完成)

结论:

当传入错误的token的时候,确实没有抛出异常,完美执行。但是当传入正确的token,出现了熟悉的异常。

2.4.3 代码调整

知道问题的原因,那就好调整代码了。修改后如下:

public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    if (!enableGateway) {
        ServerHttpRequest request = exchange.getRequest();
        String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN))
            .orElse(request.getHeaders().getFirst("suuid"));

        return getUser(token).map(user -> {
            ServerHttpRequest mutateRequest = exchange.getRequest().mutate()
                .header(UserUtils.MEMBER_ID, user.getMemId())
                .header(UserUtils.MOBILE, user.getMobile())
                .build();
            return exchange.mutate().request(mutateRequest).build();
            // 调整当getUser为空时,返回的内容
        }).switchIfEmpty(Mono.just(exchange)).flatMap(chain::filter);
        
    }
    return chain.filter(exchange);
}

至此,问题就完全解决拉!心里美滋滋!

1、遇到问题,还是要多看看呀,细细思考一下

2、多看代码,发现问题,实现完美的解决方案


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK