26

Dubbo(七):redis注册中心的应用

 4 years ago
source link: http://www.cnblogs.com/yougewe/p/12852106.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

上篇我们讲了Dubbo中有一个非常本质和重要的功能,那就是服务的自动注册与发现,而这个功能是通过注册中心来实现的。上篇中使用zookeeper实现了注册中心的功能,同时了提了dubbo中有其他许多的注册中心的实现。

今天我们就来看看另一个注册中心的实现吧: redis

1. dubbo在 Redis 中的服务分布

dubbo在zk中的服务体现是一个个的文件路径形式,如 /dubbo/xxx.xx.XxxService/providers/xxx 。 而在redis中,则体现是一个个的缓存key-value。具体分布如下:

/dubbo/xxx.xx.XxxService/providers: 以hash类型存放所有提供者列表, 每个hash的字段为 url -> expireTime

/dubbo/xxx.xx.XxxService/consumers: 以hash类型存放所有消费者列表, 每个hash的字段为 url -> expireTime

/dubbo/xxx.xx.XxxService/configurators: 存放配置信息

/dubbo/xxx.xx.XxxService/routers: 存放路由配置信息

如上,同样,redis也是以service为粒度进行存储划分的。

2. Redis 组件的接入

你可能需要先引入redis注册依赖包:

        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo-registry-redis</artifactId>
        </dependency>

在配置dubbo服务时,需要将注册中心换为 redis, 如下选合适的一个即可:

    <dubbo:registry address="redis://127.0.0.1:6379" cluster="failover" />
    <dubbo:registry address="redis://10.20.153.10:6379?backup=10.20.153.11:6379,10.20.153.12:6379" cluster="failover" />
    <dubbo:registry protocol="redis" address="127.0.0.1:6379" cluster="failover" />
    <dubbo:registry protocol="redis" address="10.20.153.10:6379,10.20.153.11:6379,10.20.153.12:6379" cluster="failover" />

cluster 设置 redis 集群策略,缺省为 failover:(这个配置不会和集群容错配置有误会么,尴尬)

failover: 失效转移策略。只写入和读取任意一台,失败时重试另一台,需要服务器端自行配置数据同步;

replicate: 复制模式策略。在客户端同时写入所有服务器,只读取单台,服务器端不需要同步,注册中心集群增大,性能压力也会更大;

redis作为注册中心与zk作为注册的前置操作都是一样的。都是一是作为服务提供者时会在 ServiceConfig#doExportUrlsFor1Protocol 中,进行远程服务暴露时会拉起。二是在消费者在进行远程调用时会 ReferenceConfig#createProxy 时拉取以便获取提供者列表。

只是在依赖注入 RegistryFactory 时,根据是 zookeeper/redis, 选择了不一样的 RegistryFactory, 所以创建了不同的注册中心实例。

redis 中根据SPI的配置创建, RedisRegistryFactory 工厂, 配置文件 META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory 的内容如下:

redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
    /**
     * Get an instance of registry based on the address of invoker
     *
     * @param originInvoker
     * @return
     */
    protected Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = getRegistryUrl(originInvoker);
        // RegistryFactory 又是通过 SPI 机制生成的    
        // 会根据具体的注册中心的类型创建调用具体实例,如此处为: redis, 所以会调用 RedisRegistryFactory.getRegistry()
        return registryFactory.getRegistry(registryUrl);
    }
    // 所有 RegistryFactory 都会被包装成 RegistryFactoryWrapper, 以便修饰
    // org.apache.dubbo.registry.RegistryFactoryWrapper#getRegistry
    @Override
    public Registry getRegistry(URL url) {
        // 对于zk, 会调用 RedisRegistryFactory
        return new ListenerRegistryWrapper(registryFactory.getRegistry(url),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class)
                        .getActivateExtension(url, "registry.listeners")));
    }
    // org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry(org.apache.dubbo.common.URL)
    @Override
    public Registry getRegistry(URL url) {
        if (destroyed.get()) {
            LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
                    "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
            return DEFAULT_NOP_REGISTRY;
        }

        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        String key = createRegistryCacheKey(url);
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
            // 调用子类方法创建 registry 实例,此处为 RedisRegistryFactory.createRegistry
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
    // org.apache.dubbo.registry.redis.RedisRegistryFactory#createRegistry
    @Override
    protected Registry createRegistry(URL url) {
        // 最终将redis组件接入到应用中了,后续就可以使用redis提供的相应功能了
        return new RedisRegistry(url);
    }

至此,redis被接入了。我们先来看下 redis 注册中心构造方法实现:

    // org.apache.dubbo.registry.redis.RedisRegistry#RedisRegistry
    public RedisRegistry(URL url) {
        // RedisRegistry 与zk一样,同样继承了 FailbackRegistry
        // 所以,同样会创建retryTimer, 同样会创建缓存文件
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // 使用redis连接池处理事务
        // 设置各配置项
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
        config.setTestOnReturn(url.getParameter("test.on.return", false));
        config.setTestWhileIdle(url.getParameter("test.while.idle", false));
        if (url.getParameter("max.idle", 0) > 0) {
            config.setMaxIdle(url.getParameter("max.idle", 0));
        }
        if (url.getParameter("min.idle", 0) > 0) {
            config.setMinIdle(url.getParameter("min.idle", 0));
        }
        if (url.getParameter("max.active", 0) > 0) {
            config.setMaxTotal(url.getParameter("max.active", 0));
        }
        if (url.getParameter("max.total", 0) > 0) {
            config.setMaxTotal(url.getParameter("max.total", 0));
        }
        if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) {
            config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0)));
        }
        if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {
            config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
        }
        if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {
            config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
        }
        if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {
            config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));
        }
        // redis 复用了cluster配置项?
        String cluster = url.getParameter("cluster", "failover");
        if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
            throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
        }
        replicate = "replicate".equals(cluster);

        List<String> addresses = new ArrayList<>();
        addresses.add(url.getAddress());
        String[] backups = url.getParameter(RemotingConstants.BACKUP_KEY, new String[0]);
        if (ArrayUtils.isNotEmpty(backups)) {
            addresses.addAll(Arrays.asList(backups));
        }
        //获得Redis主节点名称
        String masterName = url.getParameter(REDIS_MASTER_NAME_KEY);
        if (StringUtils.isEmpty(masterName)) {
            //单机版redis
            for (String address : addresses) {
                int i = address.indexOf(':');
                String host;
                int port;
                if (i > 0) {
                    host = address.substring(0, i);
                    port = Integer.parseInt(address.substring(i + 1));
                } else {
                    host = address;
                    port = DEFAULT_REDIS_PORT;
                }
                this.jedisPools.put(address, new JedisPool(config, host, port,
                        url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
                        url.getParameter("db.index", 0)));
            }
        } else {
            //哨兵版redis
            Set<String> sentinelSet = new HashSet<>(addresses);
            int index = url.getParameter("db.index", 0);
            int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            String password = StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword();
            JedisSentinelPool pool = new JedisSentinelPool(masterName, sentinelSet, config, timeout, password, index);
            this.jedisPools.put(masterName, pool);
        }

        this.reconnectPeriod = url.getParameter(REGISTRY_RECONNECT_PERIOD_KEY, DEFAULT_REGISTRY_RECONNECT_PERIOD);
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        if (!group.endsWith(PATH_SEPARATOR)) {
            group = group + PATH_SEPARATOR;
        }
        this.root = group;
        // session=60000, 默认1分钟过期
        this.expirePeriod = url.getParameter(SESSION_TIMEOUT_KEY, DEFAULT_SESSION_TIMEOUT);
        // 使用定时任务刷新存活状态,相当于心跳维护线程,定时任务频率为 session有效其的1/2
        this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
            try {
                deferExpired(); // Extend the expiration time
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
            }
        }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
    }

RedisRegistry构造方法中,主要完成redis配置信息的转换接入,创建连接池,默认使用0号数据库。另外,每个客户端都是单例的RedisRegistry, 所以也就是说会开启一个过期扫描定时任务(可以称之为心跳任务)。

3. Redis 服务提供者注册

与ZK过程类似,服务注册主要就分两步:1. 获取registry实例(通过SPI机制); 2. 将服务的信息注册到注册中心。只是zk是路径,redis是kv.

    // org.apache.dubbo.registry.redis.RedisRegistry#doRegister
    @Override
    public void doRegister(URL url) {
        // 与zk一致,按服务组装key前缀
        String key = toCategoryPath(url);
        // 全服务路径作为value
        String value = url.toFullString();
        String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
        boolean success = false;
        RpcException exception = null;
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    // 使用hash存储提供者/消费者 标识,带过期时间(该时间需后续主动判定,redis并不维护该状态)
                    // 注册好自向标识后,pub一条消息,以便其他客户端可以sub感知到该服务
                    jedis.hset(key, value, expire);
                    jedis.publish(key, REGISTER);
                    success = true;
                    // 如果不是复制模式的redis 服务(即为failover模式),只需往一个redis写数据即可,
                    // 剩余redis自行同步实际上这里应该是存在数据一致性问题的
                    if (!replicate) {
                        break; //  If the server side has synchronized data, just write a single machine
                    }
                }
            } catch (Throwable t) {
                exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
            }
        }
        // 只要有一个成功,即算成功
        if (exception != null) {
            if (success) {
                logger.warn(exception.getMessage(), exception);
            } else {
                throw exception;
            }
        }
    }

以hash类型存放所有提供者列表, key为服务粒度的前缀信息: /dubbo/xxx.xx.XxxService/providers, hash中每个field->value表示,服务全路径信息->过期时间。

通过redis的 pub/sub 机制,通知其他客户端变化。注册时发布一条消息到提供者路径, publish <key> register 。

4. redis 消费者服务订阅

服务注册的目的,主要是让注册中心及其他应用端可以发现自己。而服务订阅则为了让自己可以发现别的系统的变化。如查找所有提供者列表,接收应用上下线通知,开启监听等等。

    // org.apache.dubbo.registry.redis.RedisRegistry#doSubscribe
    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        String service = toServicePath(url);
        // 基于service开启订阅线程
        Notifier notifier = notifiers.get(service);
        if (notifier == null) {
            // 主动开启一个 notifier 线程,进行subscribe处理
            // 如果service很多,那就意味着有很多的此类线程,这并不是件好事
            Notifier newNotifier = new Notifier(service);
            notifiers.putIfAbsent(service, newNotifier);
            notifier = notifiers.get(service);
            if (notifier == newNotifier) {
                notifier.start();
            }
        }
        boolean success = false;
        RpcException exception = null;
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    if (service.endsWith(ANY_VALUE)) {
                        admin = true;
                        Set<String> keys = jedis.keys(service);
                        if (CollectionUtils.isNotEmpty(keys)) {
                            Map<String, Set<String>> serviceKeys = new HashMap<>();
                            for (String key : keys) {
                                String serviceKey = toServicePath(key);
                                Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
                                sk.add(key);
                            }
                            for (Set<String> sk : serviceKeys.values()) {
                                doNotify(jedis, sk, url, Collections.singletonList(listener));
                            }
                        }
                    } else {
                        // 首次订阅,使用 keys xx/* 将所有服务信息存储到本地
                        doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener));
                    }
                    success = true;
                    break; // Just read one server's data
                }
            } catch (Throwable t) { // Try the next server
                exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
            }
        }
        if (exception != null) {
            if (success) {
                logger.warn(exception.getMessage(), exception);
            } else {
                throw exception;
            }
        }
    }

与zk的直接调用zkClient.addChildListener()实现订阅不同,redis中使用了多个独立的订阅线程,使用pub/sub机制进行处理。(因redis的pub/sub是基于channel进行的长连接通信,所以每个service只能使用单独的线程,有点伤!)。 使用 doNotify() 将redis中的数据接入应用中。在做订阅的同时,也拉取了提供者服务列表达到初始化的作用。

5. Redis 服务下线处理

当应用要关闭,或者注册失败时,需要进行服务下线。当然,如果应用没有及时做下线处理,zk会通过其自身的临时节点过期机制,也会将该服务做下线处理。从而避免消费者或管理台看到无效的服务存在。

应用服务的主动下线操作是由 ShutdownHookCallbacks 和在判断服务不可用时进行的 invoker.destroy() 来实现优雅下线。

    // org.apache.dubbo.registry.integration.RegistryDirectory#destroy
    @Override
    public void destroy() {
        if (isDestroyed()) {
            return;
        }

        // unregister.
        try {
            if (getRegisteredConsumerUrl() != null && registry != null && registry.isAvailable()) {
                registry.unregister(getRegisteredConsumerUrl());
            }
        } catch (Throwable t) {
            logger.warn("unexpected error when unregister service " + serviceKey + "from registry" + registry.getUrl(), t);
        }
        // unsubscribe.
        try {
            if (getConsumerUrl() != null && registry != null && registry.isAvailable()) {
                registry.unsubscribe(getConsumerUrl(), this);
            }
            ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension()
                    .removeListener(ApplicationModel.getApplication(), CONSUMER_CONFIGURATION_LISTENER);
        } catch (Throwable t) {
            logger.warn("unexpected error when unsubscribe service " + serviceKey + "from registry" + registry.getUrl(), t);
        }
        super.destroy(); // must be executed after unsubscribing
        try {
            destroyAllInvokers();
        } catch (Throwable t) {
            logger.warn("Failed to destroy service " + serviceKey, t);
        }
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#unregister
    @Override
    public void unregister(URL url) {
        super.unregister(url);
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // Sending a cancellation request to the server side
            doUnregister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedUnregistered(url);
        }
    }
    // org.apache.dubbo.registry.redis.RedisRegistry#doUnregister
    @Override
    public void doUnregister(URL url) {
        String key = toCategoryPath(url);
        String value = url.toFullString();
        RpcException exception = null;
        boolean success = false;
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    // 直接删除当前服务对应的 key-field 信息
                    // 然后发布一条 UNREGISTER 消息,通知其他客户端
                    jedis.hdel(key, value);
                    jedis.publish(key, UNREGISTER);
                    success = true;
                    // 如果redis 是复制模型,需要在每个redis上都做一次删除
                    // 此时各应用端将会重复收到消息,重复处理,看起来并不是件好事
                    if (!replicate) {
                        break; //  If the server side has synchronized data, just write a single machine
                    }
                }
            } catch (Throwable t) {
                exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
            }
        }
        if (exception != null) {
            if (success) {
                logger.warn(exception.getMessage(), exception);
            } else {
                throw exception;
            }
        }
    }

总结: 下线处理两步骤: 1. 删除对应的hash key-field; 2. publish 一个下线消息通知其他应用; 3. 针对redis的集群配置决定是删除1次或n次,且反复通知操作;

6. redis 服务解除事件订阅

事实上,redis的 doUnsubscribe, 已不再处理任何事件。

    @Override
    public void doUnsubscribe(URL url, NotifyListener listener) {
    }

那么,前面注册的多个 Notifier 监听线程就不管了吗?那肯定是不行的,它会在 destroy() 被调用时进行收尾处理。实际上,它是 unregister() 的后续工作。

    // org.apache.dubbo.registry.support.AbstractRegistryFactory#destroyAll
    /**
     * Close all created registries
     */
    public static void destroyAll() {
        if (!destroyed.compareAndSet(false, true)) {
            return;
        }

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Close all registries " + getRegistries());
        }
        // Lock up the registry shutdown process
        LOCK.lock();
        try {
            for (Registry registry : getRegistries()) {
                try {
                    registry.destroy();
                } catch (Throwable e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            REGISTRIES.clear();
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
    // org.apache.dubbo.registry.redis.RedisRegistry#destroy
    @Override
    public void destroy() {
        // 该方法甚至可以去调用 unregister(), unsubscribe() 方法
        super.destroy();
        try {
            expireFuture.cancel(true);
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
        try {
            // 遍历所有 notifiers, 依次调用 shutdown, 即停止订阅工作
            for (Notifier notifier : notifiers.values()) {
                notifier.shutdown();
            }
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                jedisPool.destroy();
            } catch (Throwable t) {
                logger.warn("Failed to destroy the redis registry client. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
            }
        }
        // 最后优雅关闭过期扫描定时任务线程池,即 shutdown()..awaitTermination()的应用。
        ExecutorUtil.gracefulShutdown(expireExecutor, expirePeriod);
    }
        // 停止notifier
        // org.apache.dubbo.registry.redis.RedisRegistry.Notifier#shutdown
        public void shutdown() {
            try {
                // step1. 设置停止标识
                // step2. 断开redis连接,这不只是一断开的操作,它会停止psubscribe的调用,从而间接中止订阅线程工作
                running = false;
                jedis.disconnect();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    // 如下方法,即是其父类的 destroy(), 里面涵盖了未关闭的 地址信息,则会触发 unregister, unsubscribe
    // org.apache.dubbo.registry.support.AbstractRegistry#destroy
    @Override
    public void destroy() {
        if (logger.isInfoEnabled()) {
            logger.info("Destroy registry:" + getUrl());
        }
        Set<URL> destroyRegistered = new HashSet<>(getRegistered());
        // step1. unregister 未下线的服务
        if (!destroyRegistered.isEmpty()) {
            for (URL url : new HashSet<>(getRegistered())) {
                if (url.getParameter(DYNAMIC_KEY, true)) {
                    try {
                        unregister(url);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unregister url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        // step2. unsubscribe 未取消订阅的服务
        Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
        if (!destroySubscribed.isEmpty()) {
            for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    try {
                        unsubscribe(url, listener);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unsubscribe url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        // step3. 从已注册列表中删除当前实例
        AbstractRegistryFactory.removeDestroyedRegistry(this);
    }
    // org.apache.dubbo.registry.support.AbstractRegistryFactory#removeDestroyedRegistry
    public static void removeDestroyedRegistry(Registry toRm) {
        LOCK.lock();
        try {
            REGISTRIES.entrySet().removeIf(entry -> entry.getValue().equals(toRm));
        } finally {
            LOCK.unlock();
        }
    }

总结:此处讲了更多unregister,unsubscribe的前置操作。而 notifier.shutdown(); 才是关闭redis订阅相关工作的关键。它是通过设置停止循环标识,以及关闭redis连接实现的。事实上,这各取消订阅方式并没有很优雅。

7. 服务心跳的维护处理

redis本身只是一个缓存存储系统,心跳逻辑需要自行实现。实际上,我们也可以依赖于redis的自动过期机制,进行心跳续期。那么,redis注册中心是否也是这样实现的呢?好像并不是!

    // 在 RedisRegistry 的构造方法中,初始化了一个定时任务的调度
     this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
            try {
                deferExpired(); // Extend the expiration time
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
            }
        }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
    // org.apache.dubbo.registry.redis.RedisRegistry#deferExpired
    private void deferExpired() {
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    // 取出所有注册了的服务,进行心跳更新
                    for (URL url : new HashSet<>(getRegistered())) {
                        if (url.getParameter(DYNAMIC_KEY, true)) {
                            String key = toCategoryPath(url);
                            // 增加过期时间+expirePeriod, url -> expireAt
                            if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
                                // 如果是第一次新增该值,或者重新新增该值(可能由于原来的地址过期被删除),则触发一次regiter的消息发布,自会有相应订阅者处理该变更
                                jedis.publish(key, REGISTER);
                            }
                        }
                    }
                    // 如果是管理类配置,interface=*, 则会开启清理服务功能,注意此类操作会很重,将会消耗很大
                    // 该值会在subscribe()的时候置为 true
                    // 按文档说明该操作会在 监控中心执行,而非存在于应用端
                    if (admin) {
                        clean(jedis);
                    }
                    if (!replicate) {
                        break;//  If the server side has synchronized data, just write a single machine
                    }
                }
            } catch (Throwable t) {
                logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
            }
        }
    }
    // The monitoring center is responsible for deleting outdated dirty data
    private void clean(Jedis jedis) {
        // redis: keys * , 列举所有相关的key, 根据服务数量来定该值多少
        Set<String> keys = jedis.keys(root + ANY_VALUE);
        if (CollectionUtils.isNotEmpty(keys)) {
            for (String key : keys) {
                // redis: hgetall <key>
                Map<String, String> values = jedis.hgetAll(key);
                if (CollectionUtils.isNotEmptyMap(values)) {
                    boolean delete = false;
                    long now = System.currentTimeMillis();
                    for (Map.Entry<String, String> entry : values.entrySet()) {
                        URL url = URL.valueOf(entry.getKey());
                        // 根据hash中value 指定的时间,判定是否过期,如果过期则做删除操作
                        // redis: hdel <key> <field>
                        if (url.getParameter(DYNAMIC_KEY, true)) {
                            long expire = Long.parseLong(entry.getValue());
                            if (expire < now) {
                                jedis.hdel(key, entry.getKey());
                                delete = true;
                                if (logger.isWarnEnabled()) {
                                    logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
                                }
                            }
                        }
                    }
                    // 只要有一个服务被判定为过期,则订阅了该服务的客户端都应该被通知到
                    // 多个服务下线只会被通知一次
                    if (delete) {
                        jedis.publish(key, UNREGISTER);
                    }
                }
            }
        }
    }

deferExpired() 的作用,就是维护本实例的所有服务的有效性,做续期作用。两个重量级操作: 1. 依次延期某service下的所有url的过期时间;2. 做全量清理过期服务url;keys xx* 的操作,也对redis提出了一些要求,因为有些redis出于安全限制可能会禁用keys命令。

8. 服务信息变更通知处理notify

redis注册中心其实不会主动发现服务变更,只有应用自己发布regiter或unregister消息后,其他应用才能感知到变化。前面在 doRegister() 时,我看到,应用是通过hash添加字段注册自己,并同时发布 REGISTER 消息通知所有订阅者。在 doSubscribe() 时开启另一个服务线程处理subscribe();

    // org.apache.dubbo.registry.redis.RedisRegistry#doSubscribe
    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        String service = toServicePath(url);
        // 订阅是基于服务处理的,每个服务一个订阅处理线程
        Notifier notifier = notifiers.get(service);
        if (notifier == null) {
            Notifier newNotifier = new Notifier(service);
            notifiers.putIfAbsent(service, newNotifier);
            notifier = notifiers.get(service);
            // 此处应为防止并发所做的努力
            if (notifier == newNotifier) {
                notifier.start();
            }
        }
        boolean success = false;
        RpcException exception = null;
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    // 使用 /dubbo/* 代表是管理服务,其需要做清理过期key的作用
                    if (service.endsWith(ANY_VALUE)) {
                        admin = true;
                        ...
                    } else {
                        // 使用 keys xxx/* 命令,列举出该服务下所有缓存key, 实际上就是 providers, consumers, configurators, routers
                        doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener));
                    }
                    success = true;
                    break; // Just read one server's data
                }
            } catch (Throwable t) { // Try the next server
                exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
            }
        }
        if (exception != null) {
            if (success) {
                logger.warn(exception.getMessage(), exception);
            } else {
                throw exception;
            }
        }
    }
    // 根据列如上得到redis-key信息,做服务信息变更
    private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
        if (keys == null || keys.isEmpty()
                || listeners == null || listeners.isEmpty()) {
            return;
        }
        long now = System.currentTimeMillis();
        List<URL> result = new ArrayList<>();
        List<String> categories = Arrays.asList(url.getParameter(CATEGORY_KEY, new String[0]));
        String consumerService = url.getServiceInterface();
        for (String key : keys) {
            if (!ANY_VALUE.equals(consumerService)) {
                // 截取出 service
                String providerService = toServiceName(key);
                if (!providerService.equals(consumerService)) {
                    continue;
                }
            }
            String category = toCategoryName(key);
            // consumers应用只会处理, providers,routers,configurators 的服务, 从而忽略 consumers 下的数据
            if (!categories.contains(ANY_VALUE) && !categories.contains(category)) {
                continue;
            }
            List<URL> urls = new ArrayList<>();
            // 获取所有hash值
            Map<String, String> values = jedis.hgetAll(key);
            if (CollectionUtils.isNotEmptyMap(values)) {
                for (Map.Entry<String, String> entry : values.entrySet()) {
                    URL u = URL.valueOf(entry.getKey());
                    // 判断服务是否过期,过期且存在的服务将不会被利用,但不会做更多处理
                    if (!u.getParameter(DYNAMIC_KEY, true)
                            || Long.parseLong(entry.getValue()) >= now) {
                        if (UrlUtils.isMatch(url, u)) {
                            urls.add(u);
                        }
                    }
                }
            }
            // 如果没有找到合适的可用服务,则添加一个 empty:// 的地址
            if (urls.isEmpty()) {
                urls.add(URLBuilder.from(url)
                        .setProtocol(EMPTY_PROTOCOL)
                        .setAddress(ANYHOST_VALUE)
                        .setPath(toServiceName(key))
                        .addParameter(CATEGORY_KEY, category)
                        .build());
            }
            result.addAll(urls);
            if (logger.isInfoEnabled()) {
                logger.info("redis notify: " + key + " = " + urls);
            }
        }
        if (CollectionUtils.isEmpty(result)) {
            return;
        }
        // 调用父类 FailbackRegistry.notify 方法,与zk调用一致了
        // 刷新提供者列表,路由,配置等本地缓存信息
        for (NotifyListener listener : listeners) {
            notify(url, listener, result);
        }
    }
    private String toServiceName(String categoryPath) {
        // 截取root+interfaceName
        // 截取 interfaceName
        String servicePath = toServicePath(categoryPath);
        return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath;
    }
    private String toServicePath(String categoryPath) {
        int i;
        // 排除root路径,找到第一个'/', 取出servicePath
        if (categoryPath.startsWith(root)) {
            i = categoryPath.indexOf(PATH_SEPARATOR, root.length());
        } else {
            i = categoryPath.indexOf(PATH_SEPARATOR);
        }
        return i > 0 ? categoryPath.substring(0, i) : categoryPath;
    }
    // 另外,对于某个服务发生变更时,需要遍历所有consumer, 确认是否需要刷新
    // 额,意义嘛,暂是没太明白
    private void doNotify(Jedis jedis, String key) {
        for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) {
            doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue()));
        }
    }

总结:

1. redis 做初次subscribe时,notify会通过redis-keys 命令获取所有需要的key, 然后依次将其提供者、路由、配置等信息都缓存起来。

2. 针对每个服务,都会开启相关的订阅线程Notifier处理订阅工作。

3. 最终的listener处理默认会由 RegistryDirectory 处理。

接下来,我们来看 Notifier 是如何处理订阅的?

        // org.apache.dubbo.registry.redis.RedisRegistry.Notifier#run
        @Override
        public void run() {
            // 每个订阅线程,死循环处理只是为了避免网络等其他异常情况出现,以便重新尝试连接redis 订阅channel
            while (running) {
                try {
                    // 额,这是个优化,我不懂的
                    if (!isSkip()) {
                        try {
                            for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
                                Pool<Jedis> jedisPool = entry.getValue();
                                try {
                                    if (jedisPool.isClosed()) {
                                        continue;
                                    }
                                    jedis = jedisPool.getResource();
                                    if (!jedis.isConnected()) {
                                        continue;
                                    }
                                    try {
                                        if (service.endsWith(ANY_VALUE)) {
                                            if (first) {
                                                first = false;
                                                Set<String> keys = jedis.keys(service);
                                                if (CollectionUtils.isNotEmpty(keys)) {
                                                    for (String s : keys) {
                                                        doNotify(jedis, s);
                                                    }
                                                }
                                                resetSkip();
                                            }
                                            jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
                                        } else {
                                            if (first) {
                                                // 首次处理,通知RegistryDirectory 按service刷新缓存
                                                first = false;
                                                doNotify(jedis, service);
                                                resetSkip();
                                            }
                                            // 使用 psubscribe channel 命令,阻塞监听channel信息
                                            // 当消息返回时,使用 NotifySub 进行业务处理,实际就是调用 doNotify() 的过程
                                            // 订阅的channel 为: /dubbo/xxx.xx.XxxService/*
                                            jedis.psubscribe(new NotifySub(jedisPool), service + PATH_SEPARATOR + ANY_VALUE); // blocking
                                        }
                                        break;
                                    } finally {
                                        jedis.close();
                                    }
                                } catch (Throwable t) { // Retry another server
                                    logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
                                    // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
                                    sleep(reconnectPeriod);
                                }
                            }
                        } catch (Throwable t) {
                            logger.error(t.getMessage(), t);
                            // 异常发生后,sleep片刻再重试
                            sleep(reconnectPeriod);
                        }
                    }
                } catch (Throwable t) {
                    logger.error(t.getMessage(), t);
                }
            }
        }
        // org.apache.dubbo.registry.redis.RedisRegistry.NotifySub#onMessage
        @Override
        public void onMessage(String key, String msg) {
            if (logger.isInfoEnabled()) {
                logger.info("redis event: " + key + " = " + msg);
            }
            // 只关注 REGISTER / UNREGISTER, 两个消息
            if (msg.equals(REGISTER)
                    || msg.equals(UNREGISTER)) {
                try {
                    Jedis jedis = jedisPool.getResource();
                    try {
                        // 复用 doNotify
                        doNotify(jedis, key);
                    } finally {
                        jedis.close();
                    }
                } catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
                    logger.error(t.getMessage(), t);
                }
            }
        }
        // 最后还是来看下 isSkip() 的小优化吧
        // 虽然不懂为什么,但是感觉很厉害的样子
        // org.apache.dubbo.registry.redis.RedisRegistry.Notifier#isSkip
        private boolean isSkip() {
            // connectSkip: 已经跳过连接的总次数, connectSkipped: 当前周期内已跳过连接的次数
            // step1. 在connectSkip < 10 情况下,直接用 connectSkipped 与其比较,connectSkipped<connectSkip, 则继续跳过本次,否则不跳过,进入连接逻辑connectSkipped, connectSkip次数增加
            // step2. connectSkip >= 10, 不可再用其作为判定跳过次数, 使用一个10-20间的随机值,作为跳过连接次数判定
            // step3. 如果本次判定为不跳过,则重置 connectSkipped已连接次数自增
            int skip = connectSkip.get(); // Growth of skipping times
            if (skip >= 10) { // If the number of skipping times increases by more than 10, take the random number
                if (connectRandom == 0) {
                    connectRandom = ThreadLocalRandom.current().nextInt(10);
                }
                skip = 10 + connectRandom;
            }
            if (connectSkipped.getAndIncrement() < skip) { // Check the number of skipping times
                return true;
            }
            connectSkip.incrementAndGet();
            connectSkipped.set(0);
            connectRandom = 0;
            return false;
        }

监听服务就做好一件事就行,调用 psubscribe命令订阅channel, 发生变化时调用 doNotify() 回调listener处理刷新。为避免异常情况下订阅功能仍然成立,使用外部的while循环包裹订阅逻辑重试。

注意其订阅的redis channel 为 /dubbo/xxx.xx.XxxService/*, 所以相当于其自身的变更也被包含在内了。而是否要处理该事件,则依赖于url中的categorys配置,如消费为:category=providers,configurators,routers, 即它会处理这三种类型的key变更。

9. 一点感想

dubbo用redis做注册中心,可以看作是一个简单的扩展实现。其核心是基于redis的 pub/sub 能力。

但和zk比起来,redis功能实现会相对困难些,甚至看起来有些蹩脚。因为它需要单独去维护一些心跳、过期类的事务。过多的服务会导致这类工作更加繁重。

但这也许不能成为大家拒绝应用的理由,毕竟,按官方说明阿里内部是基于数据库实现的注册中心,自然有其道理。


Recommend

  • 34
    • 微信 mp.weixin.qq.com 5 years ago
    • Cache

    记一次Dubbo服务注册的坑

  • 56
    • ilovey.live 5 years ago
    • Cache

    Dubbo(二):zookeeper 注册中心

    zookeeper 注册中心 Zookeeper 是 Apacahe Hadoop 的子项目,是一个树型的目录服务,支持变更推送,适合作为 Dubbo 服务的注册中心,工业强度较高,可用于生产环境,并推荐使用 [1]。 流程说明: 服务提...

  • 27
    • www.cnblogs.com 5 years ago
    • Cache

    Dubbo监控中心

    Dubbo-admin管理控制台目前还没有正式发布,但是源码已托管在github上,我们可以自行下载使用; 目前的管理控制台已经发布0.1版本,结构上采取了前后端分离的方式,前端使用Vue和Vuetify分别作为Javascript框架和UI框架,后端采用Sp...

  • 7

    作者 | java_keith 来源| 阿里巴巴云原生公众号 很久没有写技术分享博客,因为发现一个好的工具确实有点忍不住分享一下,毕竟独乐乐不如众乐乐。&g...

  • 8

    安装ZooKeeper 我这里使用zookeeper作为服务注册中心,版本3.4.9,下载地址: http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.12/ 下载后,...

  • 3

    最近看到一篇Apache Dubbo官方Blog中对dubbo网卡实现的代码解读,觉得讲的非常好所以特别分享一下,话不多说直接看原文吧。 研究 Dubbo 网卡地址注册时的一点思考1 如何选择合适的网卡地址可能相当一部分人还不知道我这篇文章到底要讲...

  • 3

    记一个容器中 dubbo 注册的小知识点在目前环境下使用容器部署Java应用还是挺普遍的,但是有一些问题也是随之而来需要解决的,比如容器中应用的dubbo注册,在比较早的版本的dubbo中,就是简单地获取网卡的ip地址。具体代码在这个方法...

  • 6

    一、Dubbo简介 Dubbo是一款典型的高扩展、高性能、高可用的RPC微服务框架,用于解决微服务架构下的服务治理与通信问题。其核心模块包含【RPC通信】和【服务治理】,其中服务治理又分为服务注册与发现、服务容错、负载均衡、流量调度等。今天...

  • 9

    当服务提供者启动时,服务提供者将自己提供的服务信息注册到注册中心,注册中心将这些信息记录下来。服务消费...

  • 3

    Dubbo 应用切换 ZooKeeper 注册中心实例,流量无损迁移 2023/08/30 Java 共 585 字,约 2 分钟

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK