17

基于Prometheus网关的监控完整实现参考

 3 years ago
source link: http://www.cnblogs.com/yougewe/p/13698833.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

prometheus 是一个非常好的监控组件,尤其是其与grafana配合之后,更是如虎添翼。而prometheus的监控有两种实现方式。1. server端主动拉取应用监控数据;2. 主动推送监控数据到prometheus网关。这两种方式各有优劣,server端主动拉取实现可以让应用专心做自己的事,根本无需关心外部监控问题,但有一个最大的麻烦就是server端需要主动发现应用的存在,这个问题也并不简单(虽然现在的基于k8s的部署方式可以实现自动发现)。而基本prometheus网关推送的实现,则需要应用主动发送相应数据到网关,即应用可以根据需要发送监控数据,可控性更强,但也同时带来一个问题就是需要应用去实现这个上报过程,实现得不好往往会给应用带来些不必要的麻烦,而且基于网关的实现,还需要考虑网关的性能问题,如果应用无休止地发送数据给网关,很可能将网关冲跨,这就得不偿失了。

而prometheus的sdk实现也非常多,我们可以任意选择其中一个来做业务埋点。如: dropwizard, simpleclient ...  也并无好坏之分,主要看自己的业务需要罢了。比如 dropwizard 操作简单功能丰富,但只支持单值的监控。而 simpleclient 支持多子标签的的监控,可以用于丰富的图表展现,但也需要更麻烦的操作等等。

由于我们也许更倾向于多子标签的支持问题,今天我们就基于 simpleclient 来实现一个完整地网关推送的组件吧。给大家提供一些思路和一定的解决方案。

1. pom 依赖引入

如果我们想简单化监控以及如果需要一些tps方面的数据,则可以使用 dropwizard 的依赖:

        <!-- jmx 埋点依赖 -->
        <dependency>
            <groupId>io.dropwizard.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.dropwizard.metrics</groupId>
            <artifactId>metrics-jmx</artifactId>
            <version>4.0.0</version>
        </dependency>

当然,以上不是我们本文的基础,我们基于simpleclient 依赖实现:

        <!-- https://mvnrepository.com/artifact/io.prometheus/simpleclient_pushgateway -->
        <dependency>
            <groupId>io.prometheus</groupId>
            <artifactId>simpleclient_pushgateway</artifactId>
            <version>0.9.0</version>
        </dependency>

看起来simpleclient的依赖更简单些呢!但实际上因为我们需要使用另外组件将dropwizard的数据暴露原因,不过这无关紧要。

2. metrics 埋点简单使用

dropwizard 的使用样例如下:

public class PrometheusMetricManager {
    // 监控数据写入容器
    private static final MetricRegistry metricsContainer = new MetricRegistry();

    static {
        // 使用 jmx_exporter 将埋点数据暴露出去
        JmxReporter jmxReporter = JmxReporter.forRegistry(metricsContainer).build();
        jmxReporter.start();
    }

    // 测试使用
    public static void main(String[] args) {
        // tps 类数据监控
        Meter meter = metricsContainer.meter("tps_meter");
        meter.mark();
        Map<String, Object> queue = new HashMap<>();
        queue.put("sss", 1);
        // 监控数组大小
        metricsContainer.register("custom_metric", new Gauge<Integer>() {
            @Override
            public Integer getValue() {
                return queue.size();
            }
        });
    }
}

simpleclient 使用样例如下:

public class PrometheusMetricManager {

    /**
     * prometheus 注册实例
     */
    private static final CollectorRegistry registry = new CollectorRegistry();


    /**
     * prometheus 网关实例
     */
    private static volatile PushGateway pushGatewayIns = new PushGateway("172.30.12.167:9091");

    public static void main(String[] args) {
        try{
            // 测试 gauge, counter
            Gauge guage = Gauge.build("my_custom_metric", "This is my custom metric.")
                                .labelNames("a").create().register(registry);
            Counter counter = Counter.build("my_counter", "counter help")
                                .labelNames("a", "b").create().register(registry);

            guage.labels("1").set(23.12);

            counter.labels("1", "2").inc();
            counter.labels("1", "3").inc();

            Map<String, String> groupingKey = new HashMap<String, String>();
            groupingKey.put("instance", "my_instance");
            // 推送网关数据
            pushGatewayIns.pushAdd(registry, "my_job", groupingKey);
        } catch (Exception e){
            e.printStackTrace();
        }
    }

}

以上就是简单快速使用prometheus的sdk进行埋点数据监控了,使用都非常简单,即注册实例、业务埋点、暴露数据;

但要做好管理埋点也许并不是很简单,因为你可能需要做到易用性、可管理性、及性能。

3. 一个基于pushgateway 的管理metrics完整实现

从上节,我们知道要做埋点很简单,但要做到好的管理不简单。比如如何做到易用?如何做到可管理性强?

解决问题会有很多方法,我这边给到方案是,要想易用,那么我就封装一些必要的接口给到应用层,比如应用层只做数据量统计,那么我就只暴露一个counter的增加方法,其他一概隐藏,应用层想要使用埋点时也不用管什么底层推送,数据结构之类,只需调用一个工厂方法即可得到操作简单的实例。想要做可管理,那么就必须要依赖于外部的配置系统,只需从外部配置系统一调整,应用立马可以感知到,从而做出相应的改变,比如推送频率、推送开关、网关地址。。。

下面一个完整的实现样例:

import com.my.mvc.app.common.util.ArraysUtil;
import com.my.mvc.app.common.util.IpUtil;
import com.my.mvc.app.component.metrics.types.*;
import io.prometheus.client.*;
import io.prometheus.client.exporter.PushGateway;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

/**
 * 功能描述: prometheus指标埋点 操作类
 *
 */
@Slf4j
public class PrometheusMetricManager {

    /**
     * prometheus 注册实例
     */
    private static final CollectorRegistry registry = new CollectorRegistry();

    /**
     * 指标统一容器
     *
     *      counter: 计数器类
     *      gauge: 仪表盘类
     *      histogram: 直方图类
     *      summary: 摘要类
     */
    private static final Map<String, CounterMetric> counterMetricContainer = new ConcurrentHashMap<>();
    private static final Map<String, TimerMetric> timerMetricContainer = new ConcurrentHashMap<>();
    private static final Map<String, CustomValueMetricCollector> customMetricContainer = new ConcurrentHashMap<>();
    private static final Map<String, Histogram> histogramMetricContainer = new ConcurrentHashMap<>();
    private static final Map<String, Summary> summaryMetricContainer = new ConcurrentHashMap<>();


    /**
     * prometheus 网关实例
     */
    private static volatile PushGateway pushGatewayIns;

    /**
     * prometheus gateway api 地址
     */
    private static volatile String gatewayApiCurrent;

    /**
     * 项目埋点统一前缀
     */
    private static final String METRICS_PREFIX = "sys_xxx_";

    /**
     * 指标的子标签key名, 统一定义
     */
    private static final String METRIC_LABEL_NAME_SERVER_HOST = "server_host";

    /**
     * 推送gateway线程池
     */
    private static final ScheduledExecutorService executorService =
            Executors.newScheduledThreadPool(1,
                    r -> new Thread(r, "Prometheus-push"));

    static {
        // 自动进行gateway数据上报
        startPrometheusThread();
    }

    private PrometheusMetricManager() {

    }

    /**
     * 注册一个prometheus的监控指标, 并返回指标实例
     *
     * @param metricName 指标名称(只管按业务命名即可: 数字+下划线)
     * @param labelNames 所要监控的子指标名称,会按此进行分组统计
     * @return 注册好的counter实例
     */
    public static CounterMetric registerCounter(String metricName, String... labelNames) {
        CounterMetric counter = counterMetricContainer.get(metricName);
        if(counter == null) {
            synchronized (counterMetricContainer) {
                counter = counterMetricContainer.get(metricName);
                if(counter == null) {
                    String[] labelNameWithServerHost = ArraysUtil.addFirstValueIfAbsent(
                                        METRIC_LABEL_NAME_SERVER_HOST, labelNames);
                    Counter counterProme = Counter.build()
                            .name(PrometheusMetricManager.METRICS_PREFIX + metricName)
                            .labelNames(labelNameWithServerHost)
                            .help(metricName + " counter")
                            .register(registry);
                    counter = new PrometheusCounterAdapter(counterProme,
                            labelNameWithServerHost != labelNames);
                    counterMetricContainer.put(metricName, counter);
                }
            }
        }
        return counter;
    }

    /**
     * 注册一个仪表盘指标实例
     *
     * @param metricName 指标名称
     * @param labelNames 子标签名列表
     * @return 仪表实例
     */
    public static TimerMetric registerTimer(String metricName, String... labelNames) {
        TimerMetric timerMetric = timerMetricContainer.get(metricName);
        if(timerMetric == null) {
            synchronized (timerMetricContainer) {
                timerMetric = timerMetricContainer.get(metricName);
                if(timerMetric == null) {
                    String[] labelNameWithServerHost = ArraysUtil.addFirstValueIfAbsent(
                                    METRIC_LABEL_NAME_SERVER_HOST, labelNames);
                    Gauge gauge = Gauge.build()
                            .name(METRICS_PREFIX + metricName)
                            .labelNames(labelNameWithServerHost)
                            .help(metricName + " gauge")
                            .register(registry);
                    timerMetric = new PrometheusTimerAdapter(gauge,
                            labelNameWithServerHost != labelNames);
                    timerMetricContainer.put(metricName, timerMetric);
                }
            }
        }
        return timerMetric;
    }

    /**
     * 注册一个仪表盘指标实例
     *
     * @param metricName 指标名称
     * @param valueSupplier 用户自定义实现的单值提供实现
     */
    public static void registerSingleValueMetric(String metricName,
                                                 CustomMetricValueSupplier<? extends Number> valueSupplier) {
        CustomValueMetricCollector customMetric = customMetricContainer.get(metricName);
        if(customMetric == null) {
            synchronized (customMetricContainer) {
                customMetric = customMetricContainer.get(metricName);
                if(customMetric == null) {
                    String[] labelNameWithServerHost = {
                                METRIC_LABEL_NAME_SERVER_HOST };
                    CustomValueMetricCollector customCollector
                            = CustomValueMetricCollector.build()
                            .name(METRICS_PREFIX + metricName)
                            .labelNames(labelNameWithServerHost)
                            .valueSupplier(valueSupplier)
                            .help(metricName + " custom value metric")
                            .register(registry);
                    // 主动触发固定参数的value计数
                    customCollector.labels(IpUtil.getLocalIp());
                    customMetricContainer.put(metricName, customCollector);
                }
            }
        }
    }

    /**
     * 定时推送指标到PushGateway
     */
    private static void pushMetric() throws IOException {
        refreshPushGatewayIfNecessary();
        pushGatewayIns.pushAdd(registry, "my_job");
    }

    /**
     * 保证pushGateway 为最新版本
     */
    private static void refreshPushGatewayIfNecessary() {
        // PushGateway地址
        com.ctrip.framework.apollo.Config config = com.ctrip.framework.apollo.ConfigService.getAppConfig();
        String gatewayApi = config.getProperty("prometheus_gateway_api", "10.1.20.121:9091");
        if(pushGatewayIns == null) {
            pushGatewayIns = new PushGateway(gatewayApi);
            return;
        }
        if(!gatewayApi.equals(gatewayApiCurrent)) {
            gatewayApiCurrent = gatewayApi;
            pushGatewayIns = new PushGateway(gatewayApi);
        }
    }

    /**
     * 开启推送 gateway 线程
     *
     * @see #useCustomMainLoopPushGateway()
     * @see #useJdkSchedulerPushGateway()
     */
    private static void startPrometheusThread() {
        useCustomMainLoopPushGateway();
    }

    /**
     * 使用自定义纯种循环处理推送网关数据
     */
    private static void useCustomMainLoopPushGateway() {
        executorService.submit(() -> {
            while (isPrometheusMetricsPushSwitchOn()) {
                try {
                    pushMetric();
                }
                catch (IOException e) {
                    log.error("【prometheus】推送gateway失败:" + e.getMessage(), e);
                }
                finally {
                    sleep(getPrometheusPushInterval());
                }
            }

            // 针对关闭埋点采集后,延时检测是否重新开启了, 以便重新恢复埋点上报
            executorService.schedule(PrometheusMetricManager::startPrometheusThread,
                    30, TimeUnit.SECONDS);
        });
    }

    /**
     * 休眠指定时间(毫秒)
     *
     * @param millis 指定时间(毫秒)
     */
    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException e) {
            log.error("sleep异常", e);
        }
    }

    /**
     * 获取prometheus推送网关频率(单位:s)
     *
     * @return 频率如: 60(s)
     */
    private static Integer getPrometheusPushInterval() {
        com.ctrip.framework.apollo.Config config = com.ctrip.framework.apollo.ConfigService.getAppConfig();
        return config.getIntProperty("prometheus_metrics_push_gateway_interval", 10) * 1000;
    }

    /**
     * 检测apollo是否开启推送网关数据开关
     *
     * @return true:已开启, false:已关闭(不得推送指标数据)
     */
    private static boolean isPrometheusMetricsPushSwitchOn() {
        com.ctrip.framework.apollo.Config config = com.ctrip.framework.apollo.ConfigService.getAppConfig();
        return "1".equals(config.getProperty("prometheus_metrics_push_switch", "1"));
    }

    /**
     * 使用 scheduler 进行推送采集指标数据
     */
    private static void useJdkSchedulerPushGateway() {
        executorService.scheduleAtFixedRate(() -> {
            if(!isPrometheusMetricsPushSwitchOn()) {
                return;
            }
            try {
                PrometheusMetricManager.pushMetric();
            }
            catch (Exception e) {
                log.error("【prometheus】推送gateway失败:" + e.getMessage(), e);
            }
        }, 1, getPrometheusPushInterval(), TimeUnit.SECONDS);
    }

    // 测试功能
    public static void main(String[] args) throws Exception {
        // 测试counter
        CounterMetric myCounter1 = PrometheusMetricManager.registerCounter(
                "hello_counter", "topic", "type");
        myCounter1.incWithLabelValues("my-spec-topic", "t1");

        // 测试 timer
        TimerMetric timerMetric = PrometheusMetricManager.registerTimer("hello_timer", "sub_label1");
        timerMetric.startWithLabelValues("key1");
        Thread.sleep(1000);
        timerMetric.stop();

        Map<String, Object> queue = new HashMap<>();
        queue.put("a", 11);
        queue.put("b", 1);
        queue.put("c", 222);

        // 测试队列大小监控,自定义监控实现
        PrometheusMetricManager.registerSingleValueMetric(
                "custom_value_supplier", queue::size);

        // 队列值发生变化
        Thread.sleep(60000);
        queue.put("d", 22);

        // 等待发送线程推送数据
        System.in.read();
    }
}

以上,就是咱们整个推送网关的管理框架了,遵循前面说的原则,只暴露几个注册接口,返回的实例按自定义实现,只保留必要的功能。使用时只管注册,及使用有限功能即可。(注意:这不是写一个通用框架,而仅是为某类业务服务的管理组件)

实际也是比较简单的,如果按照这些原则来做的话,应该会为你的埋点监控工作带来些许的方便。另外,有些细节的东西我们稍后再说。

4. 自定义监控的实现

上面我们看到,我们有封装 CounterMetric, TimerMetric 以减少不必要的操作。这些主要是做了一下prometheus的一些代理工作,本身是非常简单的,我们可以简单看看。

/**
 * 功能描述: 计数器型埋点指标接口定义
 *
 *           <p>简化不必要的操作方法暴露</p>
 *
 */
public interface CounterMetric {

    /**
     * 计数器 +1, 作别名使用 (仅对无多余labelNames 情况), 默认无需实现该方法
     */
    default void inc() {
        incWithLabelValues();
    }

    /**
     * 带子标签类型填充的计数器 +1
     *
     * @param labelValues 子标签值(与最初设置时顺序个数一致)
     */
    void incWithLabelValues(String... labelValues);

}
 
// -------------- 以下是实现类 ---------------
import com.my.common.util.ArraysUtil;
import com.my.common.util.IPAddressUtil;
import io.prometheus.client.Counter;

/**
 * 功能描述: prometheus counter 适配器实现
 *
 */
public class PrometheusCounterAdapter implements CounterMetric {

    private Counter counter;

    /**
     * 是否在头部添加 主机名
     */
    private boolean appendServerHost;

    public PrometheusCounterAdapter(Counter counter, boolean appendServerHost) {
        this.counter = counter;
        this.appendServerHost = appendServerHost;
    }

    @Override
    public void incWithLabelValues(String... labelValues) {
        if(appendServerHost) {
            labelValues = ArraysUtil.addFirstValue(IPAddressUtil.getLocalIp(), labelValues);
        }
        counter.labels(labelValues).inc();
    }

}

不复杂,看业务需要实现某些功能即可。供参考,其他类似功能可自行实现。

我们主要来看一下自定义监控值的实现,主要场景如队列大小监控。Prometheus 的 simpleclient 中给我们提供了几种监控类型 Counter, Gauge, Histogram, Summary, 可能都不能很好的支持到我们这种自定义的实现。所以,需要自己干下这件事。其与 Gauge 的实现是非常相似的,值都是可大可小可任意,所以我们可以参考Gauge的实现做出我们的自定义值监控。具体实现如下:

import io.prometheus.client.Collector;
import io.prometheus.client.GaugeMetricFamily;
import io.prometheus.client.SimpleCollector;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
 * 功能描述: prometheus 自定义单值监控工具(如:元素大小监控)
 *
 */
public class CustomValueMetricCollector
        extends SimpleCollector<CustomValueMetricCollector.Child>
        implements Collector.Describable {

    private CustomMetricValueSupplier<? extends Number> valueSupplier;

    private CustomValueMetricCollector(Builder b) {
        super(b);
        this.valueSupplier = b.valueSupplier;
        if(valueSupplier == null) {
            throw new IllegalArgumentException("unknown value supplier");
        }
    }

    /**
     *  Return a Builder to allow configuration of a new Gauge.
     */
    public static Builder build() {
        return new Builder();
    }

    @Override
    protected Child newChild() {
        return new Child(valueSupplier);
    }

    @Override
    public List<MetricFamilySamples> describe() {
        return Collections.<MetricFamilySamples>singletonList(
                    new GaugeMetricFamily(fullname, help, labelNames));
    }

    @Override
    public List<MetricFamilySamples> collect() {
        List<MetricFamilySamples.Sample> samples = new ArrayList<>(children.size());
        for(Map.Entry<List<String>, Child> c: children.entrySet()) {
            samples.add(new MetricFamilySamples.Sample(
                            fullname, labelNames, c.getKey(), c.getValue().get()));
        }
        return familySamplesList(Type.GAUGE, samples);
    }

    public static class Builder extends SimpleCollector.Builder
            <CustomValueMetricCollector.Builder, CustomValueMetricCollector> {
        private CustomMetricValueSupplier<? extends Number> valueSupplier;

        @Override
        public CustomValueMetricCollector create() {
            return new CustomValueMetricCollector(this);
        }

        /**
         * 自定义值提供者
         *
         * @param valueSupplier 提供者用户实现实现
         * @param <T> 用户返回的数值类型
         */
        public <T extends Number> Builder valueSupplier(
                        CustomMetricValueSupplier<T> valueSupplier) {
            this.valueSupplier = valueSupplier;
            return this;
        }
    }


    /**
     * 多标签时使用的子项描述类
     *
     *      实际上并不支持多标签配置,除了一些统一标签如 IP
     */
    public static class Child {

        private CustomMetricValueSupplier<? extends Number> valueSupplier;

        Child(CustomMetricValueSupplier<? extends Number> valueSupplier) {
            this.valueSupplier = valueSupplier;
        }

        /**
         * Get the value of the gauge.
         */
        public double get() {
            return Double.valueOf(valueSupplier.getValue().toString());
        }
    }
}

之所以要使用到 Child, 是因为我们需要支持多子标签的操作,所以稍微绕了一点。不过总体也不复杂。而且对于单值提供者的实现,也只有一个 getValue 方法,这会很好地让我们利用 Lamda 表达式,写出极其简单的提供者实现。接口定义如下:

/**
 * 功能描述:  单值型度量 提供者(用户自定义实现)
 *
 * @param <T> 返回的数据类型,一定是数值型哟
 */
public interface CustomMetricValueSupplier<T extends Number> {

    /**
     * 用户实现的提供度量值方法
     */
    T getValue();
}

具体使用时就非常简单了:

    // 测试队列大小监控,自定义监控实现
    PrometheusMetricManager.registerSingleValueMetric(
            "custom_value_supplier", queue::size);

如此,一个完整的监控数据上报功能就完成了。你要做的仅是找到需要监控的业务点,然后使用仅有api调用就可以了,至于后续是使用jmx上报,主动上报,网关推送。。。 你都不需要关心了,而且还可以根据情况随时做出调整。

至于后续的监控如何做,可以参考我另一篇文章(grafana方案): 快速构建业务监控体系,观grafana监控的艺术


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK