27

【一起学源码-微服务】Nexflix Eureka 源码七:通过单元测试来Debug Eureka注册过程

 4 years ago
source link: http://www.cnblogs.com/wang-meng/p/12116408.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

前言

上一讲eureka client是如何注册的,一直跟到源码发送http请求为止,当时看eureka client注册时如此费尽,光是找一个regiter的地方就找了半天,那么client端发送了http请求给server端,server端是如何处理的呢?

带着这么一个疑问 就开始今天源码的解读了。

如若转载 请标明来源:一枝花算不算浪漫

源码解读

从何读起?

上一讲我们知道,跟进client注册 一直到 AbstractJersey2EurekaHttpClient.register 方法,这里先看下其中的源码:

public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        Response response = null;
        try {
            // 发送请求,类似于:http://localhost:8080/v2/apps/ServiceA
            // 发送的是post请求,服务实例的对象被打成了一个json发送,包括自己的主机、ip、端口号
            // eureka server 就知道了这个ServiceA这个服务,有一个服务实例,比如是在192.168.31.109、host-01、8761端口
            Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
            addExtraProperties(resourceBuilder);
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .accept(MediaType.APPLICATION_JSON)
                    .acceptEncoding("gzip")
                    .post(Entity.json(info));
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

那这种情况我们肯定可以猜测,server端应该有个controller来接收此http请求,然后默默的去做一些注册的逻辑。

紧接着我们从 /apps/ 这个关键词入手,进行全局搜索:

ZZvAzqf.png!web

全局搜索结果如下,这里可以看到很多test 调用,这里框起来的一个是不是类似于我们controller接口的调用呢?直接点进去查看,然后一步步跟进。

源码分析

接着上面说的,跟进 ApplicationResource 这个类,可以找到如下方法:

@Path("{appId}")
public ApplicationResource getApplicationResource(
        @PathParam("version") String version,
        @PathParam("appId") String appId) {
    CurrentRequestVersion.set(Version.toEnum(version));
    return new ApplicationResource(appId, serverConfig, registry);
}

这个appId可以理解为我们之前传递的appName,紧接着这里是直接构造了一个 ApplicationResource 实例,接着跟进代码,进入 ApplicationResource 中我们可以看到很多 @GET@POST 等restful接口,还记得上面我们register方法中,发送的http请求用的就是POST方法,所以我们这里直接看 @POST 请求

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
    // validate that the instanceinfo contains all the necessary required fields
    if (isBlank(info.getId())) {
        return Response.status(400).entity("Missing instanceId").build();
    } else if (isBlank(info.getHostName())) {
        return Response.status(400).entity("Missing hostname").build();
    } else if (isBlank(info.getIPAddr())) {
        return Response.status(400).entity("Missing ip address").build();
    } else if (isBlank(info.getAppName())) {
        return Response.status(400).entity("Missing appName").build();
    } else if (!appName.equals(info.getAppName())) {
        return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
    } else if (info.getDataCenterInfo() == null) {
        return Response.status(400).entity("Missing dataCenterInfo").build();
    } else if (info.getDataCenterInfo().getName() == null) {
        return Response.status(400).entity("Missing dataCenterInfo Name").build();
    }

    // handle cases where clients may be registering with bad DataCenterInfo with missing data
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {
        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
        if (isBlank(dataCenterInfoId)) {
            boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) {
                String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                return Response.status(400).entity(entity).build();
            } else if (dataCenterInfo instanceof AmazonInfo) {
                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                if (effectiveId == null) {
                    amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                }
            } else {
                logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
            }
        }
    }

    registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();  // 204 to be backwards compatible
}

由于代码不是很长,这里都给截取出来了。其实这里做的事情就很简单了。

InstanceInfo
registry.register(info, "true".equals(isReplication));
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock();
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

            // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
            // InstanceInfo instead of the server local copy.
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            }
        } else {
            // The lease does not exist and hence it is a new registration
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    // Since the client wants to cancel it, reduce the threshold
                    // (1
                    // for 30 seconds, 2 for a minute)
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                    this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        gMap.put(registrant.getId(), lease);
        synchronized (recentRegisteredQueue) {
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        registrant.setActionType(ActionType.ADDED);
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
}

到了这里东西就有点多了,我们慢慢梳理。

  1. reda.lock() 这里使用的是读锁,方便多个服务实例同时来注册
  2. 这里关键信息是registry的数据结构,同时这也是保存注册实例的对象。
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

ConcurrentHashMap的key是appName

第二层Map的key是appId,所以数据结构格式类似于:

{
    “ServiceA”: {
        “001”: Lease<InstanceInfo>,
        “002”: Lease<InstanceInfo>,
        “003”: Lease<InstanceInfo>
    },
    “ServiceB”: {
        “001”: Lease<InstanceInfo>
    }
}
  1. 这里面还有两个队列 recentRegisteredQueuerecentlyChangedQueue ,其中registerQueue默认保存最近1000条注册的实例信息。
  2. 后面就是一些状态设置之类的操作

注册表使用场景

我们注册完成之后,打开eureka 后台配置页面,可以看到自己的实例已经在页面上了,那么这个东东是如何展示的呢?

我们都知道eureka-resources模块下有很多jsp信息,点开status.jsp查看一下:

7FriMbA.png!web

这里用到了 serverContext.getRegistry().getSortedApplications() , 然后在通过获取的 Applicaiton 去执行 app.getInstances() 等到了所有大的服务实例信息。

这里我们还需要回头看下 EurekaBootStrap 中的代码,看看Application是如何来的。

PeerAwareInstanceRegistryImpl.javagetSortedApplications() 一直跟到 AbstractInstanceRegistry.javagetApplicationsFromMultipleRegions() ,如下图所示:

N7VfAr2.png!web

看到这里是不是就真相大白了?

这里再总结一下:

在jsp代码中,拿到了EurekaServerContext,所以之前为什么要将这个东东放到一个Holder里面去,就是随时都要从这个里面去获取一些数据

然后会从EurekaServerContext,获取到注册表,PeerAwareInstanceRegistry,注册表,从里面获取所有的服务信息,从底层的map数据结构中,获取所有的服务注册的信息,遍历,封装到一个叫Application的东西里去,一个Application就代表了一个服务,里面包含很多个服务实例。

Eureka的服务注册流程图

z2Ab6zY.png!web

申明

本文章首发自本人博客: https://www.cnblogs.com/wang-meng 和公众号: 壹枝花算不算浪漫 ,如若转载请标明来源!

感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫

NfE7Zbm.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK