3

对dubbo的DubboReference.check的参数进行剖析 - eaglelihh

 1 year ago
source link: https://www.cnblogs.com/eaglelihh/p/17339915.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

对dubbo的DubboReference.check的参数进行剖析

在使用dubbo的时候,发现当消费者启动的时候,如果提供者没有启动,即使提供者后来启动了,消费者也调不通提供者提供的接口了。

注册中心使用都是nacos

dubbo版本是3.0.4

public interface DemoService {
    String sayHello();
}
@DubboService
public class DemoServiceImpl implements DemoService {
    @Override
    public String sayHello() {
        return "hello";
    }
}

@EnableDubbo
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class ReferenceCheckProviderStarter {
    public static void main(String[] args) {
        new SpringApplicationBuilder(ReferenceCheckProviderStarter.class)
                .web(WebApplicationType.NONE) // .REACTIVE, .SERVLET
                .run(args);
        System.out.println("dubbo service started");
    }
}
@EnableDubbo
@RestController
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class ReferenceCheckConsumerStarter {

    @DubboReference
    private DemoService demoService;

    @GetMapping("/dubbo/nacos/test")
    public Object test() {
        return demoService.sayHello();
    }

    public static void main(String[] args) {
        SpringApplication.run(ReferenceCheckConsumerStarter.class, args);
    }
}

1. 先启动provider,再启动consumer

a. 启动provider
1656633-20230422152300398-329578141.png
nacos出现provider的服务

b. 启动consumer
1656633-20230422151852830-457557778.png
nacos出现consumer的服务

访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回

hello

c. 终止provider
1656633-20230422152131848-655046797.png
nacos上provider的服务消失了

访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回

No provider available from registry

d. 重新启动provider
1656633-20230422151852830-457557778.png
nacos出现provider的服务

访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回

hello

可以看出:先启动provider,再启动consumer,整个过程是没问题。

2. 先启动consumer,再启动provider

a. 启动consumer
1656633-20230422152131848-655046797.png
1656633-20230422153037091-104517036.png
nacos出现consumer的服务,但立即又消失了

b. 启动provider
1656633-20230422152300398-329578141.png
nacos出现provider的服务

访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回

Directory already destroyed .

可以看出:当consumer先启动时,如果provider此时没有启动,consumer就再也访问不到provider的服务了。

3. 先启动consumer,再启动provider (check=false)

修改一下注解@DubboRefere的参数

@DubboReference(check = false)
private DemoService demoService;

a. 启动consumer
1656633-20230422152131848-655046797.png
nacos出现consumer的服务

访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
No provider available from registry

b. 启动provider
1656633-20230422151852830-457557778.png
nacos出现provider的服务

访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
hello

可以看出:即使是consumer先启动,当provider启动后,consumer还是能够访问到provider的服务的。

org.apache.dubbo.rpc.RpcException: No provider available from registry

public class RegistryDirectory<T> extends DynamicDirectory<T> {
@Override
    public List<Invoker<T>> doList(Invocation invocation) {
        if (forbidden) {
            // 1. No service provider 2. Service providers are disabled
            throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
                    getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
                    NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
                    ", please check status of providers(disabled, not registered or in blacklist).");
        }

        // ......
    }
}
public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
    String EMPTY_PROTOCOL = "empty";

    private void refreshInvoker(List<URL> invokerUrls) {
        Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty url list to clear address.");
        this.originalUrls = invokerUrls;

        if (invokerUrls.size() == 0) {
            logger.info("Received empty url list...");
            this.forbidden = true; // Forbid to access // 这里
            this.invokers = Collections.emptyList();
            routerChain.setInvokers(this.invokers);
            destroyAllInvokers(); // Close all invokers
        } else {
            this.forbidden = false; // Allow accessing // 这里
            if (CollectionUtils.isEmpty(invokerUrls)) {
                return;
            }

            // can't use local reference because this.urlInvokerMap might be accessed at isAvailable() by main thread concurrently.
            Map<String, Invoker<T>> oldUrlInvokerMap = null;
            if (this.urlInvokerMap != null) {
                // the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.
                oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + this.urlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
                this.urlInvokerMap.forEach(oldUrlInvokerMap::put);
            }
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map // 这里
            logger.info("Refreshed invoker size " + newUrlInvokerMap.size());

            if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
                logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")"));
                return;
            }
            List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
            // pre-route and build cache, notice that route cache should build on original Invoker list.
            // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
            routerChain.setInvokers(newInvokers);
            this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
            this.urlInvokerMap = newUrlInvokerMap; // 这里

            if (oldUrlInvokerMap != null) {
                try {
                    destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
                } catch (Exception e) {
                    logger.warn("destroyUnusedInvokers error. ", e);
                }
            }
        }

        // notify invokers refreshed
        this.invokersChanged();
    }

    private synchronized void refreshOverrideAndInvoker(List<URL> instanceUrls) {
        // mock zookeeper://xxx?mock=return null
        if (enableConfigurationListen) {
            overrideDirectoryUrl();
        }
        refreshInvoker(instanceUrls); // 这里
    }
}
public class RegistryDirectory<T> extends DynamicDirectory<T> {
    @Override
    public synchronized void notify(List<URL> urls) {
        if (isDestroyed()) {
            return;
        }

        Map<String, List<URL>> categoryUrls = urls.stream()
                .filter(Objects::nonNull)
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.groupingBy(this::judgeCategory));

        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
        this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
        toRouters(routerURLs).ifPresent(this::addRouters);

        // providers
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());

        // 3.x added for extend URL address
        ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);
        List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
        if (supportedListeners != null && !supportedListeners.isEmpty()) {
            for (AddressListener addressListener : supportedListeners) {
                providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
            }
        }

        refreshOverrideAndInvoker(providerURLs); // 这里
    }
}

public abstract class AbstractRegistry implements Registry {
    /**
     * Notify changes from the Provider side.
     *
     * @param url      consumer side url
     * @param listener listener
     * @param urls     provider latest urls
     */
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((CollectionUtils.isEmpty(urls))
            && !ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size());
        }
        // keep every provider's category.
        Map<String, List<URL>> result = new HashMap<>(); // 这里
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getCategory(DEFAULT_CATEGORY);
                List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); // 这里
                categoryList.add(u); // 这里
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            listener.notify(categoryList); // 这里
            // We will update our cache file after each notification.
            // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
            if (localCacheEnabled) {
                saveProperties(url);
            }
        }
    }
}
public class NacosRegistry extends FailbackRegistry {
    private void notifySubscriber(URL url, NotifyListener listener, Collection<Instance> instances) {
        List<Instance> enabledInstances = new LinkedList<>(instances);
        if (enabledInstances.size() > 0) {
            //  Instances
            filterEnabledInstances(enabledInstances);
        }
        List<URL> urls = toUrlWithEmpty(url, enabledInstances);
        NacosRegistry.this.notify(url, listener, urls); // 这里
    }

    String EMPTY_PROTOCOL = "empty";

    private List<URL> toUrlWithEmpty(URL consumerURL, Collection<Instance> instances) {
        List<URL> urls = buildURLs(consumerURL, instances);
        if (urls.size() == 0) { // 这里
            URL empty = URLBuilder.from(consumerURL)
                .setProtocol(EMPTY_PROTOCOL)
                .addParameter(CATEGORY_KEY, DEFAULT_CATEGORY)
                .build();
            urls.add(empty);
        }
        return urls;
    }
}

当没有可用的服务时,instances是空的

1656633-20230423150526324-775700039.png

当有可用的服务时,instances是不为空的

1656633-20230423150718508-1788576938.png

是怎么通知的

public class ServiceInfoHolder implements Closeable {
    public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
        String serviceKey = serviceInfo.getKey();
        if (serviceKey == null) {
            return null;
        }
        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
        if (isEmptyOrErrorPush(serviceInfo)) {
            //empty or error push, just ignore
            return oldService;
        }
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        boolean changed = isChangedServiceInfo(oldService, serviceInfo);
        if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
            serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
        }
        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
        if (changed) { // 这里
            NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                    + JacksonUtils.toJson(serviceInfo.getHosts()));
            NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                    serviceInfo.getClusters(), serviceInfo.getHosts())); // 这里
            DiskCache.write(serviceInfo, cacheDir);
        }
        return serviceInfo;
    }
}

public class DefaultPublisher extends Thread implements EventPublisher {
    private BlockingQueue<Event> queue;

    @Override
    public void init(Class<? extends Event> type, int bufferSize) {
        setDaemon(true);
        setName("nacos.publisher-" + type.getName());
        this.eventType = type;
        this.queueMaxSize = bufferSize;
        this.queue = new ArrayBlockingQueue<>(bufferSize); // 这里
        start();
    }

    @Override
    public boolean publish(Event event) {
        checkIsStart();
        boolean success = this.queue.offer(event); // 这里
        if (!success) {
            LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
            receiveEvent(event);
            return true;
        }
        return true;
    }

    @Override
    public void run() {
        openEventHandler();
    }
    
    void openEventHandler() {
        try {
            
            // This variable is defined to resolve the problem which message overstock in the queue.
            int waitTimes = 60;
            // To ensure that messages are not lost, enable EventHandler when
            // waiting for the first Subscriber to register
            for (; ; ) {
                if (shutdown || hasSubscriber() || waitTimes <= 0) {
                    break;
                }
                ThreadUtils.sleep(1000L);
                waitTimes--;
            }
            
            for (; ; ) {
                if (shutdown) {
                    break;
                }
                final Event event = queue.take(); // 这里
                receiveEvent(event);  // 这里
                UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
            }
        } catch (Throwable ex) {
            LOGGER.error("Event listener exception : ", ex);
        }
    }

    void receiveEvent(Event event) {
        final long currentEventSequence = event.sequence();
        
        if (!hasSubscriber()) {
            LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.");
            return;
        }
        
        // Notification single event listener
        for (Subscriber subscriber : subscribers) {
            // Whether to ignore expiration events
            if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
                LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
                        event.getClass());
                continue;
            }
            
            // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
            // Remove original judge part of codes.
            notifySubscriber(subscriber, event); // 这里
        }
    }

    @Override
    public void notifySubscriber(final Subscriber subscriber, final Event event) {
        
        LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
        
        final Runnable job = () -> subscriber.onEvent(event);
        final Executor executor = subscriber.executor(); 
        
        if (executor != null) {
            executor.execute(job); // 这里
        } else {
            try {
                job.run(); // 这里
            } catch (Throwable e) {
                LOGGER.error("Event callback exception: ", e);
            }
        }
    }
}

public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
    @Override
    public void onEvent(InstancesChangeEvent event) {
        String key = ServiceInfo
                .getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
        ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
        if (CollectionUtils.isEmpty(eventListeners)) {
            return;
        }
        for (final EventListener listener : eventListeners) {
            final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
            if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
                ((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent)); // 这里
            } else {
                listener.onEvent(namingEvent); // 这里
            }
        }
    }
}

public class NacosRegistry extends FailbackRegistry {
        @Override
        public void onEvent(Event event) {
            if (event instanceof NamingEvent) {
                NamingEvent e = (NamingEvent) event;
                notifier.notify(e.getInstances()); // 这里
            }
        }
}

public abstract class RegistryNotifier {
    public synchronized void notify(Object rawAddresses) {
        this.rawAddresses = rawAddresses;
        long notifyTime = System.currentTimeMillis();
        this.lastEventTime = notifyTime;

        long delta = (System.currentTimeMillis() - lastExecuteTime) - delayTime;

        // more than 10 calls && next execute time is in the future
        boolean delay = shouldDelay.get() && delta < 0;
        if (delay) {
            scheduler.schedule(new NotificationTask(this, notifyTime), -delta, TimeUnit.MILLISECONDS); // 这里
        } else {
            // check if more than 10 calls
            if (!shouldDelay.get() && executeTime.incrementAndGet() > DEFAULT_DELAY_EXECUTE_TIMES) {
                shouldDelay.set(true);
            }
            scheduler.submit(new NotificationTask(this, notifyTime)); // 这里
        }
    }

    public static class NotificationTask implements Runnable {
        private final RegistryNotifier listener;
        private final long time;

        public NotificationTask(RegistryNotifier listener, long time) {
            this.listener = listener;
            this.time = time;
        }

        @Override
        public void run() {
            try {
                if (this.time == listener.lastEventTime) {
                    listener.doNotify(listener.rawAddresses); // 这里
                    listener.lastExecuteTime = System.currentTimeMillis();
                    synchronized (listener) {
                        if (this.time == listener.lastEventTime) {
                            listener.rawAddresses = null;
                        }
                    }
                }
            } catch (Throwable t) {
                logger.error("Error occurred when notify directory. ", t);
            }
        }
    }}
}

public class NacosRegistry extends FailbackRegistry {

    private class RegistryChildListenerImpl implements EventListener {
        private RegistryNotifier notifier;

        public RegistryChildListenerImpl(String serviceName, URL consumerUrl, NotifyListener listener) {
            notifier = new RegistryNotifier(getUrl(), NacosRegistry.this.getDelay()) {
                @Override
                protected void doNotify(Object rawAddresses) {
                    List<Instance> instances = (List<Instance>) rawAddresses;
                    if (isServiceNamesWithCompatibleMode(consumerUrl)) {
                        /**
                         * Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
                         * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
                         */
                        NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
                        instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
                    }
                    NacosRegistry.this.notifySubscriber(consumerUrl, listener, instances); // 这里
                }
            };
        }
}

然后就调用了上面的👆🏻

什么时候添加监听器的?

public class NacosRegistry extends FailbackRegistry {

    private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
        throws NacosException {
        EventListener eventListener = new RegistryChildListenerImpl(serviceName, url, listener);  // 这里
        namingService.subscribe(serviceName,
            getUrl().getGroup(Constants.DEFAULT_GROUP),
            eventListener); // 这里
    }

    private void doSubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) {
        try {
            if (isServiceNamesWithCompatibleMode(url)) {
                List<Instance> allCorrespondingInstanceList = Lists.newArrayList();

                /**
                 * Get all instances with serviceNames to avoid instance overwrite and but with empty instance mentioned
                 * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
                 *
                 * namingService.getAllInstances with {@link org.apache.dubbo.registry.support.AbstractRegistry#registryUrl}
                 * default {@link DEFAULT_GROUP}
                 *
                 * in https://github.com/apache/dubbo/issues/5978
                 */
                for (String serviceName : serviceNames) {
                    List<Instance> instances = namingService.getAllInstances(serviceName,
                        getUrl().getGroup(Constants.DEFAULT_GROUP));
                    NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
                    allCorrespondingInstanceList.addAll(instances);
                }
                notifySubscriber(url, listener, allCorrespondingInstanceList); 
                for (String serviceName : serviceNames) {
                    subscribeEventListener(serviceName, url, listener); // 这里
                }
            } else {
                for (String serviceName : serviceNames) {
                    List<Instance> instances = new LinkedList<>();
                    instances.addAll(namingService.getAllInstances(serviceName
                        , getUrl().getGroup(Constants.DEFAULT_GROUP)));
                    String serviceInterface = serviceName;
                    String[] segments = serviceName.split(SERVICE_NAME_SEPARATOR, -1);
                    if (segments.length == 4) {
                        serviceInterface = segments[SERVICE_INTERFACE_INDEX];
                    }
                    URL subscriberURL = url.setPath(serviceInterface).addParameters(INTERFACE_KEY, serviceInterface,
                        CHECK_KEY, String.valueOf(false));
                    notifySubscriber(subscriberURL, listener, instances);
                    subscribeEventListener(serviceName, subscriberURL, listener);
                }
            }
        } catch (Throwable cause) {
            throw new RpcException("Failed to subscribe " + url + " to nacos " + getUrl() + ", cause: " + cause.getMessage(), cause);
        }
    }
}

org.apache.dubbo.rpc.RpcException: Directory already destroyed

public abstract class AbstractDirectory<T> implements Directory<T> {
    @Override
    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }

        return doList(invocation);
    }

    @Override
    public void destroy() {
        destroyed = true; // 这里
    }
}
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {

    private void checkInvokerAvailable() throws IllegalStateException {
        if (shouldCheck() && !invoker.isAvailable()) {
            invoker.destroy(); // 这里
            throw new IllegalStateException("Should has at least one way to know which services this interface belongs to," +
                " subscription url: " + invoker.getUrl());
        }
    }

    protected synchronized void init() {
        // ......

        checkInvokerAvailable(); // 这里
    }

}
public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig {
    public boolean shouldCheck() {
        checkDefault();
        Boolean shouldCheck = isCheck(); // 这里
        if (shouldCheck == null && getConsumer() != null) {
            shouldCheck = getConsumer().isCheck(); 
        }
        if (shouldCheck == null) {
            // default true // 这里
            shouldCheck = true;
        }
        return shouldCheck;
    }
}
public class RegistryDirectory<T> extends DynamicDirectory<T> {
    @Override
    public boolean isAvailable() {
        if (isDestroyed() || this.forbidden) { // 这里
            return false;
        }
        Map<URL, Invoker<T>> localUrlInvokerMap = urlInvokerMap; // 这里
        return CollectionUtils.isNotEmptyMap(localUrlInvokerMap)
                && localUrlInvokerMap.values().stream().anyMatch(Invoker::isAvailable);
    }
}

如果没有设置check字段,那么就会在启动的时候检查提供方是否可用,如果不可用,就销毁了。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK