【Soul源码阅读-09】数据同步之nacos
source link: https://segmentfault.com/a/1190000039072073
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
目标
- soulnacos 方式数据同步原理及源码分析
上一篇我们对 Soul
网关的 http长轮询
数据同步方式做了简单的分析,了解了一下 http长轮询
同步的基本流程。接下来我们看一下 Soul
网关的 nacos
数据同步方式。
Soul
网关开启 nacos
同步:
-
soul-bootstrap
新增如下依赖:<!--soul data sync start use nacos--> <dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-sync-data-nacos</artifactId> <version>2.2.1</version> </dependency>
-
application.yml
添加相关配置soul : sync: nacos: url: localhost:8848 namespace: 1c10d748-af86-43b9-8265-75f487d20c6c acm: enabled: false endpoint: acm.aliyun.com namespace: accessKey: secretKey: #url: 配置成你的nacos地址,集群环境请使用(,)分隔。 # 其他参数配置,请参考naocs官网。
soul-admin
配置,或在 soul-admin 启动参数中设置 --soul.sync.zookeeper=''
,然后重启服务
soul : sync: nacos: url: localhost:8848 namespace: 1c10d748-af86-43b9-8265-75f487d20c6c acm: enabled: false endpoint: acm.aliyun.com namespace: accessKey: secretKey:
源码分析
soul-admin 数据同步
soul-admin 的数据变更通知,Soul 网关的四种数据同步方式 webscoket、zookeeper、http长轮询、nacos
原理都是一样的,只是不同的数据同步配置对应的事件处理器不一样,之前 zookeeper
数据同步已做了分析,这里就不在赘述。。
-
nacos
监听器源码分析
同之前的分析的同步方式类似, NacosDataChangedListener
类为 DataChangedListener
接口的具体实现,以修改 Selector
为例:
public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) { //getConfig 通过 configService 获取配置信息 updateSelectorMap(getConfig(SELECTOR_DATA_ID)); switch (eventType) { case DELETE: ... break; case REFRESH: case MYSELF: ... break; default: changed.forEach(selector -> { List<SelectorData> ls = SELECTOR_MAP .getOrDefault(selector.getPluginName(), new ArrayList<>()) .stream() .filter(s -> !s.getId().equals(selector.getId())) .sorted(SELECTOR_DATA_COMPARATOR) .collect(Collectors.toList()); ls.add(selector); //替换成最新的选择器信息 SELECTOR_MAP.put(selector.getPluginName(), ls); }); break; } //发布数据 publishConfig(SELECTOR_DATA_ID, SELECTOR_MAP); } private void updateSelectorMap(final String configInfo) { JsonObject jo = GsonUtils.getInstance().fromJson(configInfo, JsonObject.class); //当前 SELECTOR_MAP 所有 key Set<String> set = new HashSet<>(SELECTOR_MAP.keySet()); for (Entry<String, JsonElement> e : jo.entrySet()) { set.remove(e.getKey()); List<SelectorData> ls = new ArrayList<>(); e.getValue().getAsJsonArray().forEach(je -> ls.add(GsonUtils.getInstance().fromJson(je, SelectorData.class))); //将获取的配置信息放入SELECTOR_MAP SELECTOR_MAP.put(e.getKey(), ls); } //为什么还要再remove?set已经在for循环中remove为空,并发考虑吗? SELECTOR_MAP.keySet().removeAll(set); }
至此, soul-admin
已经完成了数据发送。
soul-bootstrap 网关数据同步
开启 nacos
同步,需要在 soul-bootstrap
中引入 soul-spring-boot-starter-sync-data-nacos
,在项目中找到对应的自定义spring-boot-starter,发现了 NacosSyncDataService
配置类。
@Configuration @ConditionalOnClass(NacosSyncDataService.class) @ConditionalOnProperty(prefix = "soul.sync.nacos", name = "url") @Slf4j public class NacosSyncDataConfiguration { /** * Nacos sync data service. * * @param configService the config service * @param pluginSubscriber the plugin subscriber * @param metaSubscribers the meta subscribers * @param authSubscribers the auth subscribers * @return the sync data service */ @Bean public SyncDataService nacosSyncDataService(final ObjectProvider<ConfigService> configService, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { log.info("you use nacos sync soul data......."); return new NacosSyncDataService(configService.getIfAvailable(), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); } /** * Nacos config service config service. * * @param nacosConfig the nacos config * @return the config service * @throws Exception the exception */ @Bean public ConfigService nacosConfigService(final NacosConfig nacosConfig) throws Exception { Properties properties = new Properties(); if (nacosConfig.getAcm() != null && nacosConfig.getAcm().isEnabled()) { properties.put(PropertyKeyConst.ENDPOINT, nacosConfig.getAcm().getEndpoint()); properties.put(PropertyKeyConst.NAMESPACE, nacosConfig.getAcm().getNamespace()); properties.put(PropertyKeyConst.ACCESS_KEY, nacosConfig.getAcm().getAccessKey()); properties.put(PropertyKeyConst.SECRET_KEY, nacosConfig.getAcm().getSecretKey()); } else { properties.put(PropertyKeyConst.SERVER_ADDR, nacosConfig.getUrl()); properties.put(PropertyKeyConst.NAMESPACE, nacosConfig.getNamespace()); } return NacosFactory.createConfigService(properties); } /** * Http config http config. * * @return the http config */ @Bean @ConfigurationProperties(prefix = "soul.sync.nacos") public NacosConfig nacosConfig() { return new NacosConfig(); } }
以 Selector
为例,看一下 NacosSyncDataService
类监听 Selector
数据变化的逻辑:
protected void updateSelectorMap(final String configInfo) { /*if(configInfo == null){ return; }*/ try { //configInfo 为空,导致 json 反序列化失败 List<SelectorData> selectorDataList = GsonUtils.getInstance().toObjectMapList(configInfo, SelectorData.class).values().stream().flatMap(Collection::stream).collect(Collectors.toList()); selectorDataList.forEach(selectorData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> { //移除缓存数据 subscriber.unSelectorSubscribe(selectorData); //保存缓存数据 subscriber.onSelectorSubscribe(selectorData); })); } catch (JsonParseException e) { log.error("sync selector data have error:", e); } }
上面 unSelectorSubscribe(selectorData)、onSelectorSubscribe(selectorData)
为更新缓存数据的方法,具体的实现类为 CommonPluginDataSubscriber
,这和上一篇 webscoket、zookeeper
更新缓存数据的调用是一样的。
nacos
更新数据方式和 webscoket、zookeeper
最大不同: nacos
每次都是全量更新,而 webscoket、zookeeper
只有在启动的时候进行一次全量更新,其他时候都是增量更新。
问题
soul-bootstrap
启动 NPE
错误导致启动失败
Caused by: java.lang.NullPointerException: null at org.dromara.soul.sync.data.nacos.handler.NacosCacheHandler.updatePluginMap(NacosCacheHandler.java:90) ~[classes/:na] at org.dromara.soul.sync.data.nacos.handler.NacosCacheHandler.watcherData(NacosCacheHandler.java:167) ~[classes/:na] at org.dromara.soul.sync.data.nacos.NacosSyncDataService.start(NacosSyncDataService.java:56) ~[classes/:na] at org.dromara.soul.sync.data.nacos.NacosSyncDataService.<init>(NacosSyncDataService.java:49) ~[classes/:na] at org.dromara.soul.springboot.starter.sync.data.nacos.NacosSyncDataConfiguration.nacosSyncDataService(NacosSyncDataConfiguration.java:66)
通过错误信息,定位到代码
protected void updateSelectorMap(final String configInfo) { /*if(configInfo == null){ return; }*/ try { //configInfo 为空,导致 json 反序列化失败 List<SelectorData> selectorDataList = GsonUtils.getInstance().toObjectMapList(configInfo, SelectorData.class).values().stream().flatMap(Collection::stream).collect(Collectors.toList()); selectorDataList.forEach(selectorData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> { //移除缓存数据 subscriber.unSelectorSubscribe(selectorData); //保存缓存数据 subscriber.onSelectorSubscribe(selectorData); })); } catch (JsonParseException e) { log.error("sync selector data have error:", e); } }
在 soul-admin
后台更改插件、选择器和规则配置,再次启动,异常消失。怀疑与 nacos
的处理机制有关,每次更新是全量更新,第一次初始化的时候 nacos
没有数据,但更新过数据后就会触发 publishConfig
发布数据。
在 soul
的 issues
中已经有人提了 issue
,应该很快会修复的。
至此, nacos
数据同步源码分析完成。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK