3

【Soul源码阅读-09】数据同步之nacos

 3 years ago
source link: https://segmentfault.com/a/1190000039072073
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

目标

  • soulnacos 方式数据同步原理及源码分析

上一篇我们对 Soul 网关的 http长轮询 数据同步方式做了简单的分析,了解了一下 http长轮询 同步的基本流程。接下来我们看一下 Soul 网关的 nacos 数据同步方式。

Soul 网关开启 nacos 同步:

  • soul-bootstrap 新增如下依赖:

    <!--soul data sync start use nacos-->
        <dependency>
              <groupId>org.dromara</groupId>
               <artifactId>soul-spring-boot-starter-sync-data-nacos</artifactId>
               <version>2.2.1</version>
         </dependency>
  • application.yml 添加相关配置

    soul :
         sync:
            nacos:
                 url: localhost:8848
                 namespace: 1c10d748-af86-43b9-8265-75f487d20c6c
                 acm:
                   enabled: false
                   endpoint: acm.aliyun.com
                   namespace:
                   accessKey:
                   secretKey:
     #url: 配置成你的nacos地址,集群环境请使用(,)分隔。
     # 其他参数配置,请参考naocs官网。

soul-admin 配置,或在 soul-admin 启动参数中设置 --soul.sync.zookeeper='' ,然后重启服务

soul :
      sync:
         nacos:
              url: localhost:8848
              namespace: 1c10d748-af86-43b9-8265-75f487d20c6c
              acm:
                enabled: false
                endpoint: acm.aliyun.com
                namespace:
                accessKey:
                secretKey:

源码分析

soul-admin 数据同步

soul-admin 的数据变更通知,Soul 网关的四种数据同步方式 webscoket、zookeeper、http长轮询、nacos 原理都是一样的,只是不同的数据同步配置对应的事件处理器不一样,之前 zookeeper 数据同步已做了分析,这里就不在赘述。。

  • nacos 监听器源码分析

同之前的分析的同步方式类似, NacosDataChangedListener 类为 DataChangedListener 接口的具体实现,以修改 Selector 为例:

public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
        //getConfig 通过 configService 获取配置信息
        updateSelectorMap(getConfig(SELECTOR_DATA_ID));
        switch (eventType) {
            case DELETE:
                ...
                break;
            case REFRESH:
            case MYSELF:
                ...
                break;
            default:
                changed.forEach(selector -> {
                    List<SelectorData> ls = SELECTOR_MAP
                            .getOrDefault(selector.getPluginName(), new ArrayList<>())
                            .stream()
                            .filter(s -> !s.getId().equals(selector.getId()))
                            .sorted(SELECTOR_DATA_COMPARATOR)
                            .collect(Collectors.toList());
                    ls.add(selector);
                     //替换成最新的选择器信息
                    SELECTOR_MAP.put(selector.getPluginName(), ls);
                });
                break;
        }
        //发布数据
        publishConfig(SELECTOR_DATA_ID, SELECTOR_MAP);
    }

 private void updateSelectorMap(final String configInfo) {
        JsonObject jo = GsonUtils.getInstance().fromJson(configInfo, JsonObject.class);
        //当前 SELECTOR_MAP 所有 key
        Set<String> set = new HashSet<>(SELECTOR_MAP.keySet());
        for (Entry<String, JsonElement> e : jo.entrySet()) {
            set.remove(e.getKey());
            List<SelectorData> ls = new ArrayList<>();
            e.getValue().getAsJsonArray().forEach(je -> ls.add(GsonUtils.getInstance().fromJson(je, SelectorData.class)));
            //将获取的配置信息放入SELECTOR_MAP
            SELECTOR_MAP.put(e.getKey(), ls);
        }
        //为什么还要再remove?set已经在for循环中remove为空,并发考虑吗?
        SELECTOR_MAP.keySet().removeAll(set);
    }

至此, soul-admin 已经完成了数据发送。

soul-bootstrap 网关数据同步

开启 nacos 同步,需要在 soul-bootstrap 中引入 soul-spring-boot-starter-sync-data-nacos ,在项目中找到对应的自定义spring-boot-starter,发现了 NacosSyncDataService 配置类。

@Configuration
@ConditionalOnClass(NacosSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.nacos", name = "url")
@Slf4j
public class NacosSyncDataConfiguration {

    /**
     * Nacos sync data service.
     *
     * @param configService     the config service
     * @param pluginSubscriber the plugin subscriber
     * @param metaSubscribers   the meta subscribers
     * @param authSubscribers   the auth subscribers
     * @return the sync data service
     */
    @Bean
    public SyncDataService nacosSyncDataService(final ObjectProvider<ConfigService> configService, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
        log.info("you use nacos sync soul data.......");
        return new NacosSyncDataService(configService.getIfAvailable(), pluginSubscriber.getIfAvailable(),
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
    }

    /**
     * Nacos config service config service.
     *
     * @param nacosConfig the nacos config
     * @return the config service
     * @throws Exception the exception
     */
    @Bean
    public ConfigService nacosConfigService(final NacosConfig nacosConfig) throws Exception {
        Properties properties = new Properties();
        if (nacosConfig.getAcm() != null && nacosConfig.getAcm().isEnabled()) {
            properties.put(PropertyKeyConst.ENDPOINT, nacosConfig.getAcm().getEndpoint());
            properties.put(PropertyKeyConst.NAMESPACE, nacosConfig.getAcm().getNamespace());
            properties.put(PropertyKeyConst.ACCESS_KEY, nacosConfig.getAcm().getAccessKey());
            properties.put(PropertyKeyConst.SECRET_KEY, nacosConfig.getAcm().getSecretKey());
        } else {
            properties.put(PropertyKeyConst.SERVER_ADDR, nacosConfig.getUrl());
            properties.put(PropertyKeyConst.NAMESPACE, nacosConfig.getNamespace());
        }
        return NacosFactory.createConfigService(properties);
    }

    /**
     * Http config http config.
     *
     * @return the http config
     */
    @Bean
    @ConfigurationProperties(prefix = "soul.sync.nacos")
    public NacosConfig nacosConfig() {
        return new NacosConfig();
    }
}

Selector 为例,看一下 NacosSyncDataService 类监听 Selector 数据变化的逻辑:

protected void updateSelectorMap(final String configInfo) {
        /*if(configInfo == null){
            return;
        }*/
        try {
            //configInfo 为空,导致 json 反序列化失败
            List<SelectorData> selectorDataList = GsonUtils.getInstance().toObjectMapList(configInfo, SelectorData.class).values().stream().flatMap(Collection::stream).collect(Collectors.toList());
            selectorDataList.forEach(selectorData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
                //移除缓存数据
                subscriber.unSelectorSubscribe(selectorData);
                //保存缓存数据
                subscriber.onSelectorSubscribe(selectorData);
            }));
        } catch (JsonParseException e) {
            log.error("sync selector data have error:", e);
        }
    }

上面 unSelectorSubscribe(selectorData)、onSelectorSubscribe(selectorData) 为更新缓存数据的方法,具体的实现类为 CommonPluginDataSubscriber ,这和上一篇 webscoket、zookeeper 更新缓存数据的调用是一样的。

nacos 更新数据方式和 webscoket、zookeeper 最大不同: nacos 每次都是全量更新,而 webscoket、zookeeper 只有在启动的时候进行一次全量更新,其他时候都是增量更新。

问题

soul-bootstrap 启动 NPE 错误导致启动失败

Caused by: java.lang.NullPointerException: null
    at org.dromara.soul.sync.data.nacos.handler.NacosCacheHandler.updatePluginMap(NacosCacheHandler.java:90) ~[classes/:na]
    at org.dromara.soul.sync.data.nacos.handler.NacosCacheHandler.watcherData(NacosCacheHandler.java:167) ~[classes/:na]
    at org.dromara.soul.sync.data.nacos.NacosSyncDataService.start(NacosSyncDataService.java:56) ~[classes/:na]
    at org.dromara.soul.sync.data.nacos.NacosSyncDataService.<init>(NacosSyncDataService.java:49) ~[classes/:na]
    at org.dromara.soul.springboot.starter.sync.data.nacos.NacosSyncDataConfiguration.nacosSyncDataService(NacosSyncDataConfiguration.java:66)

通过错误信息,定位到代码

protected void updateSelectorMap(final String configInfo) {
        /*if(configInfo == null){
            return;
        }*/
        try {
            //configInfo 为空,导致 json 反序列化失败
            List<SelectorData> selectorDataList = GsonUtils.getInstance().toObjectMapList(configInfo, SelectorData.class).values().stream().flatMap(Collection::stream).collect(Collectors.toList());
            selectorDataList.forEach(selectorData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
                //移除缓存数据
                subscriber.unSelectorSubscribe(selectorData);
                //保存缓存数据
                subscriber.onSelectorSubscribe(selectorData);
            }));
        } catch (JsonParseException e) {
            log.error("sync selector data have error:", e);
        }
    }

soul-admin 后台更改插件、选择器和规则配置,再次启动,异常消失。怀疑与 nacos 的处理机制有关,每次更新是全量更新,第一次初始化的时候 nacos 没有数据,但更新过数据后就会触发 publishConfig 发布数据。

soulissues 中已经有人提了 issue ,应该很快会修复的。

至此, nacos 数据同步源码分析完成。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK