22

dubbo(四):服务路由的实现

 3 years ago
source link: http://www.cnblogs.com/yougewe/p/12814068.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

上一篇中,我们介绍了dubbo的负载均衡实现,见识了几种常用的负载均衡算法。就单个功能而言,似乎dubbo并没有太多的突出之处。事实上,一个成功的产品不必每个地方都要打破常规。更重要的是其全局优化的架构设计,以及如何使用现有的优秀解决方案为己服务。

本篇将介绍另一种集群环境中的高可用实现:路由服务的实现。它将从另一个角度补充dubbo的集群功能完整性。

1. 路由出现的时机?

服务路由是什么派上用场的呢?实际上,它是在进行消费都调用提供者的第一步操作。集群的几个策略的先后为: 服务路由 -> 负载均衡 -> 集群容错(重试);

其调用入口框架是在 org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker 中的:

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                // 服务路由,入口,由父类中调用
                copyInvokers = list(invocation);
                // check again
                checkInvokers(copyInvokers, invocation);
            }
            // 负载均衡入口
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyInvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                // 集群容错,进行重试调用
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyInvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }
    // org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#list
    protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        // 直接调用对应的路径服务的 list() 方法进行路由。
        return directory.list(invocation);
    }
    // org.apache.dubbo.rpc.cluster.directory.AbstractDirectory#list
    @Override
    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }

        return doList(invocation);
    }
    // org.apache.dubbo.registry.integration.RegistryDirectory#doList
    @Override
    public List<Invoker<T>> doList(Invocation invocation) {
        if (forbidden) {
            // 1. No service provider 2. Service providers are disabled
            throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
                    getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
                    NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
                    ", please check status of providers(disabled, not registered or in blacklist).");
        }

        if (multiGroup) {
            return this.invokers == null ? Collections.emptyList() : this.invokers;
        }

        List<Invoker<T>> invokers = null;
        try {
            // Get invokers from cache, only runtime routers will be executed.
            invokers = routerChain.route(getConsumerUrl(), invocation);
        } catch (Throwable t) {
            logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
        }

        return invokers == null ? Collections.emptyList() : invokers;
    }
    
    // org.apache.dubbo.rpc.cluster.RouterChain#route
    public List<Invoker<T>> route(URL url, Invocation invocation) {
        List<Invoker<T>> finalInvokers = invokers;
        // 根据注册的 routers 依次调用,过滤 finalInvokers 之后返回
        for (Router router : routers) {
            finalInvokers = router.route(finalInvokers, url, invocation);
        }
        return finalInvokers;
    }

2. dubbo提供了哪些路由策略?

Dubbo 目前提供了三种服务路由实现,分别为条件路由 ConditionRouter、脚本路由 ScriptRouter 和标签路由 TagRouter。

router 的创建时机:每次url发生变更后(如后台修改),都会触发一次路由信息重建。

    // org.apache.dubbo.registry.integration.RegistryDirectory#notify
    @Override
    public synchronized void notify(List<URL> urls) {
        Map<String, List<URL>> categoryUrls = urls.stream()
                .filter(Objects::nonNull)
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.groupingBy(this::judgeCategory));

        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
        this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
        // 从url中取出相应的路由服务类,添加 routerChain 中,备用
        toRouters(routerURLs).ifPresent(this::addRouters);

        // providers
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
        /**
         * 3.x added for extend URL address
         */
        ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
        List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
        if (supportedListeners != null && !supportedListeners.isEmpty()) {
            for (AddressListener addressListener : supportedListeners) {
                providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
            }
        }
        refreshOverrideAndInvoker(providerURLs);
    }
    // org.apache.dubbo.registry.integration.RegistryDirectory#toRouters
    /**
     * @param urls
     * @return null : no routers ,do nothing
     * else :routers list
     */
    private Optional<List<Router>> toRouters(List<URL> urls) {
        if (urls == null || urls.isEmpty()) {
            return Optional.empty();
        }

        List<Router> routers = new ArrayList<>();
        for (URL url : urls) {
            if (EMPTY_PROTOCOL.equals(url.getProtocol())) {
                continue;
            }
            String routerType = url.getParameter(ROUTER_KEY);
            if (routerType != null && routerType.length() > 0) {
                url = url.setProtocol(routerType);
            }
            try {
                // 根据router工厂类进行创建router, 该工厂类使用 SPI 机制进行生成,实现 RouterFactory
                // file=org.apache.dubbo.rpc.cluster.router.file.FileRouterFactory
                // script=org.apache.dubbo.rpc.cluster.router.script.ScriptRouterFactory
                // condition=org.apache.dubbo.rpc.cluster.router.condition.ConditionRouterFactory
                // service=org.apache.dubbo.rpc.cluster.router.condition.config.ServiceRouterFactory
                // app=org.apache.dubbo.rpc.cluster.router.condition.config.AppRouterFactory
                // tag=org.apache.dubbo.rpc.cluster.router.tag.TagRouterFactory
                // mock=org.apache.dubbo.rpc.cluster.router.mock.MockRouterFactory
                Router router = ROUTER_FACTORY.getRouter(url);
                if (!routers.contains(router)) {
                    routers.add(router);
                }
            } catch (Throwable t) {
                logger.error("convert router url to router error, url: " + url, t);
            }
        }

        return Optional.of(routers);
    }

所以,整体上整个router的创建,依赖于url中的router参数,用该参数找到对应的router工厂类,然后调用其 getRouter()方法生成具体的router. 我们简单看看router的工厂类一般是什么样的?

2.1. 路由工厂类的构建

/**
 * Application level router factory
 */
@Activate(order = 200)
public class AppRouterFactory implements RouterFactory {
    public static final String NAME = "app";

    private volatile Router router;

    @Override
    public Router getRouter(URL url) {
        // 一个工厂类中,只有一个单例的router
        if (router != null) {
            return router;
        }
        // 双重锁 懒加载
        synchronized (this) {
            if (router == null) {
                router = createRouter(url);
            }
        }
        return router;
    }

    private Router createRouter(URL url) {
        
        return new AppRouter(url);
    }
}

// 可缓存路由
/**
 * Service level router factory
 */
@Activate(order = 300)
public class ServiceRouterFactory extends CacheableRouterFactory {

    public static final String NAME = "service";

    @Override
    protected Router createRouter(URL url) {
        return new ServiceRouter(url);
    }

}

// 条件路径工厂类
public class ConditionRouterFactory implements RouterFactory {

    public static final String NAME = "condition";

    @Override
    public Router getRouter(URL url) {
        // 直接new对象返回
        return new ConditionRouter(url);
    }

}

// 文件路由工厂类,事实上它并不一个单纯的路由工厂类,它需要依赖于别的路由工厂
public class FileRouterFactory implements RouterFactory {

    public static final String NAME = "file";

    private RouterFactory routerFactory;
    // 将别的路由工厂注入进来
    public void setRouterFactory(RouterFactory routerFactory) {
        this.routerFactory = routerFactory;
    }

    @Override
    public Router getRouter(URL url) {
        try {
            // Transform File URL into Script Route URL, and Load
            // file:///d:/path/to/route.js?router=script ==> script:///d:/path/to/route.js?type=js&rule=<file-content>
            String protocol = url.getParameter(ROUTER_KEY, ScriptRouterFactory.NAME); // Replace original protocol (maybe 'file') with 'script'
            String type = null; // Use file suffix to config script type, e.g., js, groovy ...
            String path = url.getPath();
            if (path != null) {
                int i = path.lastIndexOf('.');
                if (i > 0) {
                    type = path.substring(i + 1);
                }
            }
            String rule = IOUtils.read(new FileReader(new File(url.getAbsolutePath())));

            // FIXME: this code looks useless
            boolean runtime = url.getParameter(RUNTIME_KEY, false);
            URL script = URLBuilder.from(url)
                    .setProtocol(protocol)
                    .addParameter(TYPE_KEY, type)
                    .addParameter(RUNTIME_KEY, runtime)
                    .addParameterAndEncoded(RULE_KEY, rule)
                    .build();
            // 将重新组装后的url,传递委托给注入的路由工厂进行处理
            return routerFactory.getRouter(script);
        } catch (IOException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

}
    
@Activate
public class MockRouterFactory implements RouterFactory {
    public static final String NAME = "mock";

    @Override
    public Router getRouter(URL url) {
        return new MockInvokersSelector();
    }

}
// 脚本路由工厂
/**
 * ScriptRouterFactory
 * <p>
 * Example URLS used by Script Router Factory:
 * <ol>
 * <li> script://registryAddress?type=js&rule=xxxx
 * <li> script:///path/to/routerfile.js?type=js&rule=xxxx
 * <li> script://D:\path\to\routerfile.js?type=js&rule=xxxx
 * <li> script://C:/path/to/routerfile.js?type=js&rule=xxxx
 * </ol>
 * The host value in URL points out the address of the source content of the Script Router,Registry、File etc
 *
 */
public class ScriptRouterFactory implements RouterFactory {

    public static final String NAME = "script";

    @Override
    public Router getRouter(URL url) {
        // 直接new对象返回
        return new ScriptRouter(url);
    }

}

// 标签路由工厂,可缓存路由(使用一个ConcurrentHashMap集合容器进行保存已创建的router)
@Activate(order = 100)
public class TagRouterFactory extends CacheableRouterFactory {

    public static final String NAME = "tag";
    // getRouter() 由 父类统一进行框架搭建,子类只需实现 createRouter() 即可
    @Override
    protected Router createRouter(URL url) {
        return new TagRouter(url);
    }
}

/**
 * If you want to provide a router implementation based on design of v2.7.0, please extend from this abstract class.
 * For 2.6.x style router, please implement and use RouterFactory directly.
 */
public abstract class CacheableRouterFactory implements RouterFactory {
    private ConcurrentMap<String, Router> routerMap = new ConcurrentHashMap<>();

    @Override
    public Router getRouter(URL url) {
        return routerMap.computeIfAbsent(url.getServiceKey(), k -> createRouter(url));
    }

    protected abstract Router createRouter(URL url);
}

可以看出这些个工厂类,基本都是使用new的方法就返回了对应的路由实例类。那么是否有必要都在这些类外面包一个工厂类进行创建呢?直接创建不好吗?事实上,这只是个一种工厂模式的最佳实践,是为了更好的隐藏创建逻辑。

2.2. 条件路由 ConditionRouter 详解

路由功能的实现,主要分为规则解析和规则应用两个部分!

    // 构造方法,主要是解析一些参数
    public ConditionRouter(URL url) {
        this.url = url;
        // priority=1
        this.priority = url.getParameter(PRIORITY_KEY, 0);
        // force=false
        this.force = url.getParameter(FORCE_KEY, false);
        // enabled=true
        this.enabled = url.getParameter(ENABLED_KEY, true);
        // rule=xxx
        // init 方法中详细解析路由规则
        init(url.getParameterAndDecoded(RULE_KEY));
    }
    // 解析条件规则 host = 10.20.153.10 => host = 10.20.153.11
    public void init(String rule) {
        try {
            if (rule == null || rule.trim().length() == 0) {
                throw new IllegalArgumentException("Illegal route rule!");
            }
            // 规则如: host = 10.20.153.10 => host = 10.20.153.11
            rule = rule.replace("consumer.", "").replace("provider.", "");
            int i = rule.indexOf("=>");
            // 如果没有=>, 则全部路由到 该规则指定的host中
            String whenRule = i < 0 ? null : rule.substring(0, i).trim();
            String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();
            Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);
            Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);
            // NOTE: It should be determined on the business level whether the `When condition` can be empty or not.
            this.whenCondition = when;
            this.thenCondition = then;
        } catch (ParseException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
    // 解析条件规则键值对
    private static Map<String, MatchPair> parseRule(String rule)
            throws ParseException {
        Map<String, MatchPair> condition = new HashMap<String, MatchPair>();
        if (StringUtils.isBlank(rule)) {
            return condition;
        }
        // Key-Value pair, stores both match and mismatch conditions
        MatchPair pair = null;
        // Multiple values
        Set<String> values = null;
        // ROUTE_PATTERN = Pattern.compile("([&!=,]*)\\s*([^&!=,\\s]+)");
        final Matcher matcher = ROUTE_PATTERN.matcher(rule);
        while (matcher.find()) { // Try to match one by one
            String separator = matcher.group(1);
            String content = matcher.group(2);
            // Start part of the condition expression.
            if (StringUtils.isEmpty(separator)) {
                pair = new MatchPair();
                condition.put(content, pair);
            }
            // The KV part of the condition expression
            // &host=xxx
            else if ("&".equals(separator)) {
                if (condition.get(content) == null) {
                    pair = new MatchPair();
                    condition.put(content, pair);
                } else {
                    pair = condition.get(content);
                }
            }
            // The Value in the KV part.
            else if ("=".equals(separator)) {
                if (pair == null) {
                    throw new ParseException("Illegal route rule \""
                            + rule + "\", The error char '" + separator
                            + "' at index " + matcher.start() + " before \""
                            + content + "\".", matcher.start());
                }

                values = pair.matches;
                values.add(content);
            }
            // The Value in the KV part.
            else if ("!=".equals(separator)) {
                if (pair == null) {
                    throw new ParseException("Illegal route rule \""
                            + rule + "\", The error char '" + separator
                            + "' at index " + matcher.start() + " before \""
                            + content + "\".", matcher.start());
                }

                values = pair.mismatches;
                values.add(content);
            }
            // The Value in the KV part, if Value have more than one items.
            else if (",".equals(separator)) { // Should be separated by ','
                if (values == null || values.isEmpty()) {
                    throw new ParseException("Illegal route rule \""
                            + rule + "\", The error char '" + separator
                            + "' at index " + matcher.start() + " before \""
                            + content + "\".", matcher.start());
                }
                values.add(content);
            } else {
                throw new ParseException("Illegal route rule \"" + rule
                        + "\", The error char '" + separator + "' at index "
                        + matcher.start() + " before \"" + content + "\".", matcher.start());
            }
        }
        return condition;
    }

2. 接下来是如何使用这些配置好的规则

路由服务由routerChain进行统一调用:

    // org.apache.dubbo.rpc.cluster.RouterChain#route
    /**
     *
     * @param url
     * @param invocation
     * @return
     */
    public List<Invoker<T>> route(URL url, Invocation invocation) {
        List<Invoker<T>> finalInvokers = invokers;
        for (Router router : routers) {
            finalInvokers = router.route(finalInvokers, url, invocation);
        }
        return finalInvokers;
    }
    // 以下是条件路由的route()实现:
    @Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
            throws RpcException {
        if (!enabled) {
            return invokers;
        }

        if (CollectionUtils.isEmpty(invokers)) {
            return invokers;
        }
        try {
            // 如果不符合路由条件,直接返回所有原样 invokers 即可 
            if (!matchWhen(url, invocation)) {
                return invokers;
            }
            List<Invoker<T>> result = new ArrayList<Invoker<T>>();
            if (thenCondition == null) {
                logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
                return result;
            }
            for (Invoker<T> invoker : invokers) {
                // 否则依次匹配每个候选 invokers, 符合条件的才返回
                // 具体匹配实现如下:
                if (matchThen(invoker.getUrl(), url)) {
                    result.add(invoker);
                }
            }
            if (!result.isEmpty()) {
                return result;
            } else if (force) {
                logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(RULE_KEY));
                return result;
            }
        } catch (Throwable t) {
            logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
        }
        return invokers;
    }
    // 路由源地址检测,检查要调用的服务地址是否命中了条件路由的规则
    boolean matchWhen(URL url, Invocation invocation) {
        // whenCondition 为空,代表拦截所有路径
        return CollectionUtils.isEmptyMap(whenCondition) || matchCondition(whenCondition, url, null, invocation);
    }
    // 路由目的地址匹配检测,与路由源地址匹配模式相同,仅将 whenCondition 换为 thenCondition
    private boolean matchThen(URL url, URL param) {
        return CollectionUtils.isNotEmptyMap(thenCondition) && matchCondition(thenCondition, url, param, null);
    }
    
    private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param, Invocation invocation) {
        Map<String, String> sample = url.toMap();
        boolean result = false;
        for (Map.Entry<String, MatchPair> matchPair : condition.entrySet()) {
            String key = matchPair.getKey();
            String sampleValue;
            //get real invoked method name from invocation
            if (invocation != null && (METHOD_KEY.equals(key) || METHODS_KEY.equals(key))) {
                sampleValue = invocation.getMethodName();
            } else if (ADDRESS_KEY.equals(key)) {
                sampleValue = url.getAddress();
            } else if (HOST_KEY.equals(key)) {
                sampleValue = url.getHost();
            } else {
                sampleValue = sample.get(key);
                // 为什么要获取两次 sample.get(key); ?
                if (sampleValue == null) {
                    sampleValue = sample.get(key);
                }
            }
            if (sampleValue != null) {
                // 依次调用 MatchPair.isMatch() 方法,进行验证
                // 只要有一次验证不通过,则当前 invocation 即不符合路由条件了
                if (!matchPair.getValue().isMatch(sampleValue, param)) {
                    return false;
                } else {
                    result = true;
                }
            } else {
                //not pass the condition
                if (!matchPair.getValue().matches.isEmpty()) {
                    return false;
                } else {
                    result = true;
                }
            }
        }
        return result;
    }
        // 在 MatchPair 中实现具体的判定是否当前地址是否匹配路由信息
        private boolean isMatch(String value, URL param) {
            // 只有相等匹配情况,直接取 matches 进行校验即可
            if (!matches.isEmpty() && mismatches.isEmpty()) {
                for (String match : matches) {
                    // 简单正则匹配检测, 主要处理 * 规则
                    if (UrlUtils.isMatchGlobPattern(match, value, param)) {
                        return true;
                    }
                }
                return false;
            }
            // 只有不相等匹配情况, 直接取出 mismatches 校验,反向输出即可
            if (!mismatches.isEmpty() && matches.isEmpty()) {
                for (String mismatch : mismatches) {
                    if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
                        return false;
                    }
                }
                return true;
            }
            // 相等和不相等两种条件都存在时,优先使用 mismatches 进行配置,然后使用 matches 匹配,即 mismatches 优先级高于 matches
            if (!matches.isEmpty() && !mismatches.isEmpty()) {
                //when both mismatches and matches contain the same value, then using mismatches first
                for (String mismatch : mismatches) {
                    if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
                        return false;
                    }
                }
                for (String match : matches) {
                    if (UrlUtils.isMatchGlobPattern(match, value, param)) {
                        return true;
                    }
                }
                return false;
            }
            return false;
        }
    }

2.3. 脚本路由的实现 ScriptRouter

构造方法中主要解析一些必要参数,以及根据类型获取操作系统的脚本解析引擎,非常重要。

    public ScriptRouter(URL url) {
        this.url = url;
        this.priority = url.getParameter(PRIORITY_KEY, SCRIPT_ROUTER_DEFAULT_PRIORITY);
        // 获取解析引擎,根据 type=javascript 等返回
        engine = getEngine(url);
        // 获取 rule=xxxx, 规则
        rule = getRule(url);
        try {
            // 有 GroovyScriptEngineImpl, NashornScriptEngine
            Compilable compilable = (Compilable) engine;
            function = compilable.compile(rule);
        } catch (ScriptException e) {
            logger.error("route error, rule has been ignored. rule: " + rule +
                    ", url: " + RpcContext.getContext().getUrl(), e);
        }
    }

而实际路由的方法,也是直接调用脚本引擎进行脚本解析而得:

    // org.apache.dubbo.rpc.cluster.router.script.ScriptRouter#route
    @Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        try {
            // 将参数信息封装为 Bindings, 统一传入脚本引擎
            Bindings bindings = createBindings(invokers, invocation);
            if (function == null) {
                return invokers;
            }
            // 调用脚本引擎的 function.eval() 方法,即将参数传入规则脚本中,得到invokers
            // 并通过 getRoutedInvokers 将结果转换成 List<Invoker<T>> 类型返回
            return getRoutedInvokers(function.eval(bindings));
        } catch (ScriptException e) {
            logger.error("route error, rule has been ignored. rule: " + rule + ", method:" +
                    invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e);
            return invokers;
        }
    }
    /**
     * create bindings for script engine
     */
    private <T> Bindings createBindings(List<Invoker<T>> invokers, Invocation invocation) {
        Bindings bindings = engine.createBindings();
        // create a new List of invokers
        bindings.put("invokers", new ArrayList<>(invokers));
        bindings.put("invocation", invocation);
        bindings.put("context", RpcContext.getContext());
        return bindings;
    }

上面的实现看起来还是有点抽象。我们拿出一个dubbo中的单测试样例,看一下脚本路由的使用方式:

    
    @Test
    public void testRoutePickInvokers() {
        // rule 写法,即是 javascript 的语法,不过它需要调用一些java的方法,以便识别java中传递过来的参数以及返回结果的对接
        // 该js代码脱离了java引擎应该是不可被解析的
        String rule = "var result = new java.util.ArrayList(invokers.size());" +
                "for (i=0;i<invokers.size(); i++){ " +
                // 获取 isAvailable() 属性进行判断是否可将该invoker列入候选列表
                "if (invokers.get(i).isAvailable()) {" +
                "result.add(invokers.get(i)) ;" +
                "}" +
                "} ; " +
                "return result;";
        // 定义一个 route函数,并立即调用它,从而达到返回脚本结果的效果
        String script = "function route(invokers,invocation,context){" + rule + "} route(invokers,invocation,context)";
        Router router = new ScriptRouterFactory().getRouter(getRouteUrl(script));

        List<Invoker<String>> invokers = new ArrayList<Invoker<String>>();
        // 模型invoker 不可用
        Invoker<String> invoker1 = new MockInvoker<String>(false);
        Invoker<String> invoker2 = new MockInvoker<String>(true);
        Invoker<String> invoker3 = new MockInvoker<String>(true);
        invokers.add(invoker1);
        invokers.add(invoker2);
        invokers.add(invoker3);
        List<Invoker<String>> filteredInvokers = router.route(invokers, invokers.get(0).getUrl(), new RpcInvocation());
        Assertions.assertEquals(2, filteredInvokers.size());
        Assertions.assertEquals(invoker2, filteredInvokers.get(0));
        Assertions.assertEquals(invoker3, filteredInvokers.get(1));
    }

所以,其实脚本路由可以写得非常灵活多变,但是维护成本有点高,它不像条件路由那样简洁明了。需要进行反复自测试后才可配置在正式环境中。

2.4. 标签路由 TagRouter

大概就是根据tag=xxx 选择相应的路由地址。该router还未正式发布,不过可以看一下其大概实现:

    @Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        if (CollectionUtils.isEmpty(invokers)) {
            return invokers;
        }

        // since the rule can be changed by config center, we should copy one to use.
        final TagRouterRule tagRouterRuleCopy = tagRouterRule;
        if (tagRouterRuleCopy == null || !tagRouterRuleCopy.isValid() || !tagRouterRuleCopy.isEnabled()) {
            return filterUsingStaticTag(invokers, url, invocation);
        }

        List<Invoker<T>> result = invokers;
        // 从url中取出 dubbo.tag=xxx 值 
        String tag = StringUtils.isEmpty(invocation.getAttachment(TAG_KEY)) ? url.getParameter(TAG_KEY) :
                invocation.getAttachment(TAG_KEY);

        // if we are requesting for a Provider with a specific tag
        if (StringUtils.isNotEmpty(tag)) {
            List<String> addresses = tagRouterRuleCopy.getTagnameToAddresses().get(tag);
            // filter by dynamic tag group first
            if (CollectionUtils.isNotEmpty(addresses)) {
                result = filterInvoker(invokers, invoker -> addressMatches(invoker.getUrl(), addresses));
                // if result is not null OR it's null but force=true, return result directly
                if (CollectionUtils.isNotEmpty(result) || tagRouterRuleCopy.isForce()) {
                    return result;
                }
            } else {
                // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
                // dynamic tag group but force=false. check static tag
                result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(TAG_KEY)));
            }
            // If there's no tagged providers that can match the current tagged request. force.tag is set by default
            // to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
            if (CollectionUtils.isNotEmpty(result) || isForceUseTag(invocation)) {
                return result;
            }
            // FAILOVER: return all Providers without any tags.
            else {
                List<Invoker<T>> tmp = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(),
                        tagRouterRuleCopy.getAddresses()));
                return filterInvoker(tmp, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(TAG_KEY)));
            }
        } else {
            // List<String> addresses = tagRouterRule.filter(providerApp);
            // return all addresses in dynamic tag group.
            List<String> addresses = tagRouterRuleCopy.getAddresses();
            if (CollectionUtils.isNotEmpty(addresses)) {
                result = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(), addresses));
                // 1. all addresses are in dynamic tag group, return empty list.
                if (CollectionUtils.isEmpty(result)) {
                    return result;
                }
                // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the
                // static tag group.
            }
            return filterInvoker(result, invoker -> {
                String localTag = invoker.getUrl().getParameter(TAG_KEY);
                return StringUtils.isEmpty(localTag) || !tagRouterRuleCopy.getTagNames().contains(localTag);
            });
        }
    }

其配置格式大致如下:

        String serviceStr = "---\n" +
                "force: false\n" +
                "runtime: true\n" +
                "enabled: false\n" +
                "priority: 1\n" +
                "key: demo-provider\n" +
                "tags:\n" +
                "  - name: tag1\n" +
                "    addresses: [\"30.5.120.37:20881\"]\n" +
                "  - name: tag2\n" +
                "    addresses: [\"30.5.120.37:20880\"]\n" +
                "...";

2.5. AppRouter + ServiceRouter

这两个路由服务实际上不是独立的路由实现类,它是包装了 ConditionRouter 的实现,来完成特殊的业务逻辑。

    // org.apache.dubbo.rpc.cluster.router.condition.config.AppRouter#AppRouter
    public AppRouter(URL url) {
        // 将 application=xxx   作为路由key
        super(url, url.getParameter(CommonConstants.APPLICATION_KEY));
        this.priority = APP_ROUTER_DEFAULT_PRIORITY;
    }
    // org.apache.dubbo.rpc.cluster.router.condition.config.ListenableRouter#ListenableRouter
    public ListenableRouter(URL url, String ruleKey) {
        super(url);
        this.force = false;
        // 初始化路由服务
        this.init(ruleKey);
    }
    private synchronized void init(String ruleKey) {
        if (StringUtils.isEmpty(ruleKey)) {
            return;
        }
        // +   .condition-router
        String routerKey = ruleKey + RULE_SUFFIX;
        ruleRepository.addListener(routerKey, this);
        String rule = ruleRepository.getRule(routerKey, DynamicConfiguration.DEFAULT_GROUP);
        if (StringUtils.isNotEmpty(rule)) {
            this.process(new ConfigChangedEvent(routerKey, DynamicConfiguration.DEFAULT_GROUP, rule));
        }
    }
    
    @Override
    public synchronized void process(ConfigChangedEvent event) {
        if (logger.isInfoEnabled()) {
            logger.info("Notification of condition rule, change type is: " + event.getChangeType() +
                    ", raw rule is:\n " + event.getContent());
        }
        if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
            routerRule = null;
            conditionRouters = Collections.emptyList();
        } else {
            try {
                routerRule = ConditionRuleParser.parse(event.getContent());
                generateConditions(routerRule);
            } catch (Exception e) {
                logger.error("Failed to parse the raw condition rule and it will not take effect, please check " +
                        "if the condition rule matches with the template, the raw rule is:\n " + event.getContent(), e);
            }
        }
    }
    // 进行路由服务调用时,仅把功能委托给 conditionRouters 即可
    @Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        if (CollectionUtils.isEmpty(invokers) || conditionRouters.size() == 0) {
            return invokers;
        }

        // We will check enabled status inside each router.
        for (Router router : conditionRouters) {
            invokers = router.route(invokers, url, invocation);
        }

        return invokers;
    }

ServiceRouter 的实现也大致一样,只是取的 routerKey 不同而已。

    // org.apache.dubbo.rpc.cluster.router.condition.config.ServiceRouter#ServiceRouter
    public ServiceRouter(URL url) {
        // 与 AppRouter 的差别在于 routerKey 取值不同
        super(url, DynamicConfiguration.getRuleKey(url));
        this.priority = SERVICE_ROUTER_DEFAULT_PRIORITY;
    }
    // org.apache.dubbo.common.config.configcenter.DynamicConfiguration#getRuleKey
    /**
     * The format is '{interfaceName}:[version]:[group]'
     *
     * @return
     */
    static String getRuleKey(URL url) {
        return url.getColonSeparatedKey();
    }
    // org.apache.dubbo.common.URL#getColonSeparatedKey
    /**
     * The format is "{interface}:[version]:[group]"
     *
     * @return
     */
    public String getColonSeparatedKey() {
        StringBuilder serviceNameBuilder = new StringBuilder();
        serviceNameBuilder.append(this.getServiceInterface());
        append(serviceNameBuilder, VERSION_KEY, false);
        append(serviceNameBuilder, GROUP_KEY, false);
        return serviceNameBuilder.toString();
    }
    

服务路由的出发点,是为了让用户能够更灵活地配置一些特殊的调用场景,如跨机房调用,或者应用一些异常情况比如某实例不希望再被调用。总之,应用场景总是有的,否则就是在玩自嗨。

了解其运行原理,让我们更清楚,我们到底在路由什么!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK