5

Spring Cloud源码分析之Eureka第六章:服务注册

 2 years ago
source link: https://blog.51cto.com/zq2599/5081951
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Spring Cloud源码分析之Eureka第六章:服务注册

推荐 原创

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos

  1. 周期性更新服务列表;
  2. 周期性服务续约;
  3. 服务注册逻辑;
  • 本章学习的是服务注册逻辑的相关代码,对应用如何将自身信息注册到Eureka进行深入了解
  • 以下图片来自Netflix官方,图中显示Eureka Client会发起Register请求将自身注册到注册中心,这样其他Eureka client通过Get Registry请求就能获取到新注册应用的相关信息:
    Spring Cloud源码分析之Eureka第六章:服务注册_eureka

关于源码版本

  • 本次分析的Spring Cloud版本为Edgware.RELEASE,对应的eureka-client版本为1.7.0;
  1. 首先回顾com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法,Eureka client在启动的时侯都会执行此方法,如下方所示,已经略去了周期性更新服务列表相关的代码:
//来自EurekaClientConfigBean,默认为true
if (clientConfig.shouldRegisterWithEureka()) {
			//续租间隔
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            //周期性任务处理超时后,下一次执行时将超时事件翻倍,但是不可超过expBackOffBound的设定范围
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

            //指定时间后启动周期性续租的任务
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            //上报自身信息到Eureka server的操作委托给InstanceInfoReplicator实例发起,
            //如果有多个场景需要上报,都由InstanceInfoReplicator进行调度和安排,
            //并且还有限流逻辑,避免频繁先服务端请求
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
			
			//监听和响应应用状态变化,包括从停止状态恢复或者进入停止状态,
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    //将自身状态上报都Eureka server(有限流逻辑避免频繁上报)
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            	//注册状态变化的监听
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
			//更新信息并注册到Eureka server
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
  1. 从上述代码可以看出,主动更新和状态变化触发的更新,都委托给成员变量instanceInfoReplicator执行,InstanceInfoReplicator是个辅助类,在服务注册过程中主要负责并发控制、周期性执行等工作,有关此类的详细介绍请参考文章《Eureka的InstanceInfoReplicator类(服务注册辅助工具)》
  2. 本文聚焦服务注册,因此InstanceInfoReplicator类本身的细节就不在此展开,这里主要关注的是InstanceInfoReplicator的run方法中注册到Eureka server的代码,如下图红框,discoveryClient.register()实现了注册的功能:
    Spring Cloud源码分析之Eureka第六章:服务注册_spring cloud_02
  • 注意:由上图绿框中代码可见,注册完成后又会提交一个一次性的延时任务,这就相当于周期性的执行run方法了,这么一来岂不是会周期性注册?其实并不会,红框上面是有个判断条件的:if (dirtyTimestamp != null),只要成员变量instanceInfo的isDirtyWithTime方法返回为空,就不会执行注册;
  1. 先看代码discoveryClient.refreshInstanceInfo(),弄清楚即将上报到Eureka server的信息是如何更新的,如下代码所示,信息更新的操作是委托给ApplicationInfoManager实例来完成的:
void refreshInstanceInfo() {
        //更新数据
        applicationInfoManager.refreshDataCenterInfoIfRequired();
        //如果续租时间有变化就要及时更新
        applicationInfoManager.refreshLeaseInfoIfRequired();

        InstanceStatus status;
        try {
            status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
        } catch (Exception e) {
            logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
            //如果获取状态异常,就设置当前状态为DOWN
            status = InstanceStatus.DOWN;
        }

        if (null != status) {
            applicationInfoManager.setInstanceStatus(status);
        }
    }
  1. 接下来看看服务注册相关的代码,也就是DiscoveryClient类的register方法,如下所示,源码注释中说到是注册请求类型是Restful的,Eureka server的返回码如果是204表示注册成功,然而在前面的discoveryClient.register()方法内,其实并不关注这个返回值:
    /**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + appPathIdentifier + ": registering service...");
        EurekaHttpResponse<Void> httpResponse;
        try {
            //注册操作
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }
  1. 继续展开注册操作的源码eurekaTransport.registrationClient.register(instanceInfo),多层调用一路展开,最终由JerseyApplicationClient类来完成注册操作,对应源码在父类AbstractJerseyEurekaHttpClient中,如下所示,主要工作是利用jersey库的Restful Api将自身的信息POST到Eureka server:
@Override
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }
  • 至此,Eureka client向服务注册的源码就分析完毕了,过程相对简单,DiscoveryClient、InstanceInfoReplicator、ApplicationInfoManager、JerseyApplicationClient等实例各司其职将应用自身信息上报到Eureka server,由Eureka server保存,再被其他实例下载;

欢迎关注51CTO博客:程序员欣宸

 学习路上,你不孤单,欣宸原创一路相伴…

  • 收藏
  • 评论
  • 分享
  • 举报

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK