Apollo 客户端启动过程分析
source link: https://nicksxs.me/2022/09/18/Apollo-%E5%AE%A2%E6%88%B7%E7%AB%AF%E5%90%AF%E5%8A%A8%E8%BF%87%E7%A8%8B%E5%88%86%E6%9E%90/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Apollo 客户端启动过程分析
入口是可以在 springboot 的启动类上打上EnableApolloConfig
注解
@Import(ApolloConfigRegistrar.class)
public @interface EnableApolloConfig {
这个 import 实现了
public class ApolloConfigRegistrar implements ImportBeanDefinitionRegistrar {
private ApolloConfigRegistrarHelper helper = ServiceBootstrap.loadPrimary(ApolloConfigRegistrarHelper.class);
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
helper.registerBeanDefinitions(importingClassMetadata, registry);
}
}
然后就调用了
com.ctrip.framework.apollo.spring.spi.DefaultApolloConfigRegistrarHelper#registerBeanDefinitions
接着是注册了这个 bean,com.ctrip.framework.apollo.spring.config.PropertySourcesProcessor
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
AnnotationAttributes attributes = AnnotationAttributes
.fromMap(importingClassMetadata.getAnnotationAttributes(EnableApolloConfig.class.getName()));
String[] namespaces = attributes.getStringArray("value");
int order = attributes.getNumber("order");
PropertySourcesProcessor.addNamespaces(Lists.newArrayList(namespaces), order);
Map<String, Object> propertySourcesPlaceholderPropertyValues = new HashMap<>();
// to make sure the default PropertySourcesPlaceholderConfigurer's priority is higher than PropertyPlaceholderConfigurer
propertySourcesPlaceholderPropertyValues.put("order", 0);
BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, PropertySourcesPlaceholderConfigurer.class.getName(),
PropertySourcesPlaceholderConfigurer.class, propertySourcesPlaceholderPropertyValues);
// 注册了这个 bean
BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, PropertySourcesProcessor.class.getName(),
PropertySourcesProcessor.class);
而com.ctrip.framework.apollo.spring.config.PropertySourcesProcessor 实现了 org.springframework.beans.factory.config.BeanFactoryPostProcessor
它里面的 com.ctrip.framework.apollo.spring.config.PropertySourcesProcessor#postProcessBeanFactory 方法就会被 spring 调用,
private void initializePropertySources() {
if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_PROPERTY_SOURCE_NAME)) {
//already initialized
return;
}
CompositePropertySource composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_PROPERTY_SOURCE_NAME);
//sort by order asc
ImmutableSortedSet<Integer> orders = ImmutableSortedSet.copyOf(NAMESPACE_NAMES.keySet());
Iterator<Integer> iterator = orders.iterator();
while (iterator.hasNext()) {
int order = iterator.next();
for (String namespace : NAMESPACE_NAMES.get(order)) {
// 这里获取每个 namespace 的配置
Config config = ConfigService.getConfig(namespace);
composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
}
}
然后是 com.ctrip.framework.apollo.ConfigService#getConfig
接着就是它
com.ctrip.framework.apollo.internals.DefaultConfigManager#getConfig
@Override
public Config getConfig(String namespace) {
Config config = m_configs.get(namespace);
if (config == null) {
synchronized (this) {
config = m_configs.get(namespace);
if (config == null) {
ConfigFactory factory = m_factoryManager.getFactory(namespace);
// 通过 factory 来创建配置获取
config = factory.create(namespace);
m_configs.put(namespace, config);
}
}
}
com.ctrip.framework.apollo.spi.DefaultConfigFactory#create
@Override
public Config create(String namespace) {
ConfigFileFormat format = determineFileFormat(namespace);
if (ConfigFileFormat.isPropertiesCompatible(format)) {
return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
}
// 调用到这
return new DefaultConfig(namespace, createLocalConfigRepository(namespace));
}
LocalFileConfigRepository createLocalConfigRepository(String namespace) {
if (m_configUtil.isInLocalMode()) {
logger.warn(
"==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
namespace);
return new LocalFileConfigRepository(namespace);
}
// 正常会走这个,因为要从配置中心获取
return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
}
然后是创建远程配置仓库
com.ctrip.framework.apollo.spi.DefaultConfigFactory#createRemoteConfigRepository
RemoteConfigRepository createRemoteConfigRepository(String namespace) {
return new RemoteConfigRepository(namespace);
}
继续对当前的 namespace 创建远程配置仓库
com.ctrip.framework.apollo.internals.RemoteConfigRepository#RemoteConfigRepository
public RemoteConfigRepository(String namespace) {
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
gson = new Gson();
// 尝试同步
this.trySync();
this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh();
}
然后是同步配置,下面的日志异常经常可以看到,比如配置拉取地址不通
com.ctrip.framework.apollo.internals.AbstractConfigRepository#trySync
protected boolean trySync() {
try {
sync();
return true;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
logger
.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
.getDetailMessage(ex));
}
return false;
}
实际的同步方法,加了synchronized
锁,
com.ctrip.framework.apollo.internals.RemoteConfigRepository#sync
@Override
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try {
// 获取本地配置
ApolloConfig previous = m_configCache.get();
// 获取配置
ApolloConfig current = loadApolloConfig();
//reference equals means HTTP 304
if (previous != current) {
logger.debug("Remote Config refreshed!");
m_configCache.set(current);
this.fireRepositoryChange(m_namespace, this.getConfig());
}
if (current != null) {
Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
current.getReleaseKey());
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
然后走到这
com.ctrip.framework.apollo.internals.RemoteConfigRepository#loadApolloConfig
private ApolloConfig loadApolloConfig() {
if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter();
Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
long onErrorSleepTime = 0; // 0 means no sleep
Throwable exception = null;
// 获取配置
List<ServiceDTO> configServices = getConfigServices();
String url = null;
for (int i = 0; i < maxRetries; i++) {
List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
Collections.shuffle(randomConfigServices);
//Access the server which notifies the client first
if (m_longPollServiceDto.get() != null) {
randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
}
for (ServiceDTO configService : randomConfigServices) {
if (onErrorSleepTime > 0) {
logger.warn(
"Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);
try {
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
} catch (InterruptedException e) {
//ignore
}
}
url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
dataCenter, m_remoteMessages.get(), m_configCache.get());
logger.debug("Loading config from {}", url);
HttpRequest request = new HttpRequest(url);
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "queryConfig");
transaction.addData("Url", url);
try {
HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class);
m_configNeedForceRefresh.set(false);
m_loadConfigFailSchedulePolicy.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
if (response.getStatusCode() == 304) {
logger.debug("Config server responds with 304 HTTP status code.");
return m_configCache.get();
}
ApolloConfig result = response.getBody();
logger.debug("Loaded config for {}: {}", m_namespace, result);
return result;
} catch (ApolloConfigStatusCodeException ex) {
ApolloConfigStatusCodeException statusCodeException = ex;
//config not found
if (ex.getStatusCode() == 404) {
String message = String.format(
"Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +
"please check whether the configs are released in Apollo!",
appId, cluster, m_namespace);
statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),
message);
}
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(statusCodeException));
transaction.setStatus(statusCodeException);
exception = statusCodeException;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
exception = ex;
} finally {
transaction.complete();
}
// if force refresh, do normal sleep, if normal config load, do exponential sleep
onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
m_loadConfigFailSchedulePolicy.fail();
}
}
String message = String.format(
"Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s",
appId, cluster, m_namespace, url);
throw new ApolloConfigException(message, exception);
}
com.ctrip.framework.apollo.internals.RemoteConfigRepository#getConfigServices
private List<ServiceDTO> getConfigServices() {
List<ServiceDTO> services = m_serviceLocator.getConfigServices();
if (services.size() == 0) {
throw new ApolloConfigException("No available config service");
}
return services;
}
com.ctrip.framework.apollo.internals.ConfigServiceLocator#getConfigServices
public List<ServiceDTO> getConfigServices() {
if (m_configServices.get().isEmpty()) {
updateConfigServices();
}
return m_configServices.get();
}
更新配置服务
com.ctrip.framework.apollo.internals.ConfigServiceLocator#updateConfigServices
private synchronized void updateConfigServices() {
String url = assembleMetaServiceUrl();
HttpRequest request = new HttpRequest(url);
int maxRetries = 2;
Throwable exception = null;
for (int i = 0; i < maxRetries; i++) {
Transaction transaction = Tracer.newTransaction("Apollo.MetaService", "getConfigService");
transaction.addData("Url", url);
try {
// 发起 http 请求获取配置
HttpResponse<List<ServiceDTO>> response = m_httpUtil.doGet(request, m_responseType);
transaction.setStatus(Transaction.SUCCESS);
List<ServiceDTO> services = response.getBody();
if (services == null || services.isEmpty()) {
logConfigService("Empty response!");
continue;
}
setConfigServices(services);
return;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
exception = ex;
} finally {
transaction.complete();
}
try {
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(m_configUtil.getOnErrorRetryInterval());
} catch (InterruptedException ex) {
//ignore
}
}
throw new ApolloConfigException(
String.format("Get config services failed from %s", url), exception);
}
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK