1

搭建ELK技术栈

 2 years ago
source link: https://imnisen.github.io/deploy-elk-stack.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1 搭建与基本使用

elk_01.png

1.1 基于Docker 搭建

ELK可以在官网分别下载 软件包运行,软件依赖jvm。或者可以使用docker来方便地部署。

我采用的是github:docker-elk 这里的docker配置, 该项目有几个分支,除了单纯elk还有配备searchguard 权限加密系统的elk、配备x-pack的elk 和结合vagrant的elk。

我使用了searchguard版的,可以看下其项目结构,还是挺清楚的,主要结构如下:

➜  tree
.
├── docker-compose.yml                    # docker compose 文件
├── elasticsearch                         # elasticsearch相关目录
│   ├── Dockerfile
│   ├── bin
│   │   └── init_sg.sh
│   └── config
│       ├── elasticsearch.yml
│       └── sg
│           ├── kirk-keystore.jks
│           ├── node-0-keystore.jks
│           ├── sg_action_groups.yml
│           ├── sg_config.yml
│           ├── sg_internal_users.yml
│           ├── sg_roles.yml
│           ├── sg_roles_mapping.yml
│           └── truststore.jks
├── extensions                            # 额外扩展
│   ├── README.md
│   └── logspout
│       ├── Dockerfile
│       ├── README.md
│       ├── build.sh
│       ├── logspout-compose.yml
│       └── modules.go
├── kibana                                # kibana相关目录
│   ├── Dockerfile
│   └── config
│       └── kibana.yml
└── logstash                              # logstash相关目录
    ├── Dockerfile
    ├── config
    │   └── logstash.yml
    └── pipeline
        └── logstash.conf

docker-compose.yaml 内容如下:

version: '2'

services:

  elasticsearch:
    build:
      context: elasticsearch/
    volumes:
      - ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml:ro
    ports:
      - "9200:9200"
      - "9300:9300"
    environment:
      ES_JAVA_OPTS: "-Xmx256m -Xms256m"
    networks:
      - elk

  logstash:
    build:
      context: logstash/
    volumes:
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
      - ./logstash/pipeline:/usr/share/logstash/pipeline:ro
    ports:
      - "5000:5000"
    environment:
      LS_JAVA_OPTS: "-Xmx256m -Xms256m"
    networks:
      - elk
    depends_on:
      - elasticsearch

  kibana:
    build:
      context: kibana/
    volumes:
      - ./kibana/config/:/usr/share/kibana/config:ro
    ports:
      - "5601:5601"
    networks:
      - elk
    depends_on:
      - elasticsearch

networks:

  elk:
    driver: bridge


可以看到由3部分组成,每部分分别有Dockerfile构建而成,elasticseach和kibana的Dockerfile都是基于官方镜像,然后安装了searchguard插件,logstash仅是基于官方镜像。

1.2 Elasticsearch

Elasticsearch的目录结构

➜  tree
.
├── Dockerfile
├── bin
│   └── init_sg.sh
└── config
    ├── elasticsearch.yml           # elasticsearch 配置文件
    └── sg                          # seachguard配置目录
        ├── kirk-keystore.jks
        ├── node-0-keystore.jks
        ├── sg_action_groups.yml
        ├── sg_config.yml
        ├── sg_internal_users.yml   # 配置seachguard用户
        ├── sg_roles.yml            # 配置seachguard角色
        ├── sg_roles_mapping.yml
        └── truststore.jks

elasticseach支持集群部署,目前这里简单起见采用的单节点部署。

为了持久化数据存储存储,可以将数据卷挂载到容器默认的 /usr/share/elasticsearch/data ,当然也可以在 elasticsearch.yaml 额外配置

1.2.1 seach-guard users

searchguard 相关配置在 config/sg 下,其中 sg_internal_users.yaml 文件配置权限系统中的用户:

文件内容类似如下这样的,

admin:
  readonly: true
  hash: $2a$12$VcCDgh2NDk07JGN0rjGbM.Ad41qVR/YFJcgHp0UGns5JDymv..TOG
  roles:
    - admin


这是一个用户的配置,用户名 admin, 密码加密后是hash字段那一长串,该用户还配置了只读,角色是 admin.

密码加密的字段我是通过 htpasswd 生成的, 采用bcrypt加密:

htpasswd -bnB your_username your_password

1.2.2 seach-guard roles

该项目使用了searchguard默认的示例用户和角色,可以参考这里

sg_roles.yaml 文件配置角色内容,如下这样

# For logstash and beats
sg_logstash:
  readonly: true
  cluster:
    - CLUSTER_MONITOR
    - CLUSTER_COMPOSITE_OPS
    - indices:admin/template/get
    - indices:admin/template/put
  indices:
    'logstash-*':
      '*':
        - CRUD
        - CREATE_INDEX
    '*beat*':
      '*':
        - CRUD
        - CREATE_INDEX


该配置配置了 logstash 这个角色对 logstash- 开头的index和包含 beat 的index的相关权限,cluster配置参考文档

1.2.3 其它

整个项目以 docker-compose up 启动后,需要执行初始化命令来执行 elasticseach/bin/init_sg.sh , 来初始化searchhuard

$ docker-compose exec -T elasticsearch bin/init_sg.sh

1.3 Logstash

Logstash的目录结构

➜  tree
.
├── Dockerfile
├── config
│   └── logstash.yml
└── pipeline
    └── logstash.conf

logstash 的配置文件在 logstash/config/logstash.yaml

实际使用的时候需要使用多个pipline, 就可以在 logstash/pipline 下配置,该目录下所有配置文件会被当成配置文件

logstash配置了数据从哪里来,然后经过一些中间处理,然后输出到哪里,结构如下:

input {}

filter {}

output {}


1.3.1 Input

Input支持很多类型,详细可以参考input plugins,常见的有

  1. file input

    从一个文件输入:

    file {
      path => ["/var/log/postgresql/postgresql-9.6-main.log"]
      start_position => "beginning"
      codec => multiline {
        pattern => "^%{TIMESTAMP_ISO8601} "
        negate => true
        what => "previous"
      }
    }
    
    

    该配置指定了监听文件 /var/log/postgresql/postgresql-9.6-main.log ,而且是从头开始。

    codec 插件是用来实现:如果这行日志开头不是ISO8601的格式,那么认为这行日志属于上一行。因为日志中有时候为了更好地可读性,会将同一个事件输出到多行,比如异常traceback等。

  2. rabbitmq input

    从rabbitmq输入:

    rabbitmq {
        user => 'elk'
        password => 'changeme'
        durable => true
        host => 'changeme'
        port => 5672
        ssl => false
        queue => 'changeme'
        vhost => '/'
        metadata_enabled => true
        metadata_enabled => true
    }
    
    

    该配置指定了监听rabbitmq的'changeme'队列

1.3.2 Filter

filter下面支持很多插件,甚至还支持"if else" 逻辑。

  1. grok 和 mutate

    常用的有grok插件, 用来从日志中提取提取结构,假设有这么样的日志:

    2018-03-12 06:25:21 CST [11195-74894] ning@learning LOG:  statement: SET SESSION AUTHORIZATION DEFAULT
    2018-03-12 06:25:21 CST [11195-74895] ning@learning LOG:  statement: SHOW default_transaction_isolation
    2018-03-12 06:25:21 CST [11195-74896] ning@learning LOG:  statement: SET default_transaction_isolation TO 'read uncommitted'
    2018-03-12 06:25:21 CST [11195-74897] ning@learning LOG:  statement: BEGIN
    2018-03-12 06:25:21 CST [11195-74898] ning@learning LOG:  statement: SELECT 1 FROM ir_module_module WHERE name='base' AND latest_version='8.0.1.3'
    2018-03-12 06:25:21 CST [11195-74899] ning@learning LOG:  statement: SELECT * FROM ir_cron
                                          WHERE numbercall != 0
                                              AND active AND nextcall <= now()
                                          ORDER BY priority08:10.056 * 1 changes in 900 seconds. Saving...
    

    这是一个postgresql日志, 为了便于之后分析,需要将每条日志切成结构化的信息,类似下面这样配置:

    grok {
          match => {"message" => ["%{TIMESTAMP_ISO8601:timestamp} %{TZ:tz} \[%{NUMBER:process_id}\-%{DATA:line_number:int}\] (?:%{DATA:db_owner}@%{DATA:db_name} )?%{WORD:log_level}:  %{GREEDYDATA:log_data}"]
         }
    }
    

    会将整体信息匹配到 message, tz, process_id, line_number... 等字段上,然后elasticsearch就可以结构化存储。

    其中可以看到 [] 需要转义,每个匹配格式如 %{pattern:filed} 这样,会按照 pattern 来匹配成 filed

    空格数量敏感,不计数量空格也可以用 %{SPACE} 来表示,grok插件已经包括的pattern和常用的可以参考这里这里

    另外,这里 有一个辅助工具帮助调试grok的匹配时是否能正常工作,非常方便!

    使用下载包安装的时候,在 logstash/config/patterns 里可以添加自定义的pattern, 但换成docker部署后,还没有找到配置的地方,留待以后研究(TODO)

    mutate插件用来对提取的结构做一些动作,比如转化字段类型或者添加删减字段等,

    filter {
        mutate {
          add_field => {
            "index_field" => "test"
            }
        }
    }
    
    

    上面的例子是添加一个字段 index_field,值是"test"

  2. 复杂case1

    有时候需要区分不同的input来应用不同的grok的模式:

    input {
        file {
          path => ["/var/log/rabbitmq/[email protected]"]
          type => "fromfile1"
        }
        file {
          path => ["/var/log/rabbitmq/[email protected]"]
          type => "fromfile2"
        }
    
    }
    
    filter {
        if [type] == "fromfile1" {
          grok {
            match => {"message" => ["%{TIMESTAMP_ISO8601:timestamp} %{TZ:tz} \[%{NUMBER:process_id}\-%{DATA:line_number:int}\] (?:%{DATA:db_owner}@%{DATA:db_name} )?%{WORD:log_level}:  %{GREEDYDATA:log_data}"]
            }
          }
        } else if [type] == "fromfilr2" {
          grok {
            match => {"message" => ["\[%{POSINT:pid}\] %{MONTHDAY} %{MONTH} %{TIME} %{DATA:level} %{GREEDYDATA:log_data}"]
            }
          }
        }
    }
    
    

    这里,根据来源不同的文件来源应用不同的匹配模式。

  3. 复杂case2

    有时候同一个日志文件里有多种日志格式:

    filter {
    
        if "grokked" not in [tags] {
            grok {
              match => {"message" => ["%{DATA:docker_prefix}\[%{TIMESTAMP_ISO8601:log_datetime} \+0800\] - \(%{DATA:logger}\) \[%{NUMBER:process}\] \[%{LOGLEVEL:level}\] : \"Request\" => PATH\:%{URIPATH:uri_path}, METHOD\:%{DATA:method}, ARGS\:%{DATA:args}, BODY\:%{GREEDYDATA:body}"]}
              add_tag => ["grokked"]
              tag_on_failure => []
            }
          }
    
         if "grokked" not in [tags] {
            grok {
              match => {"message" => ["%{DATA:docker_prefix}\[%{TIMESTAMP_ISO8601:log_datetime} \+0800\] - \(%{DATA:logger}\) \[%{NUMBER:process}\] \[%{LOGLEVEL:level}\] : \"Response\" <= STATUS:%{NUMBER:status}, BODY\:%{GREEDYDATA:body}"]}
              add_tag => ["grokked"]
              tag_on_failure => []
            }
          }
    
          if "grokked" not in [tags] {
            grok {
              match => {"message" => ["%{DATA:docker_prefix}\[%{TIMESTAMP_ISO8601:log_datetime} \+0800\] - \(%{DATA:logger}\)\[%{WORD:level}\]\[%{DATA:access_host}\]\: %{WORD:method} %{URI:log_message}  %{NUMBER:status} %{NUMBER:bytes}"]}
              add_tag => ["grokked"]
            }
          }
    
    }
    
    

    这里一个日志文件里可能有3个模式,所以人为在匹配成功后的文档里添加tag grokked ,这样如果第一个匹配成功了就不会应用到接下里的匹配中,另外因为默认匹配失败的doc会添加上 _grokparsefailure 标签,所以在前两个我们将失败时添加的标签设置成空。

  4. 复杂case3

    有时候如果是从rabbimq获取输入,需要根据元信息来做不同的事情,比如:

    filter {
        if [@metadata][rabbitmq_headers][index_header] {
           mutate {
            add_field => {
              "index_field" => "%{[@metadata][rabbitmq_headers][index_header]}"
            }
           }
        } else {
          mutate {
            add_field => {
              "index_field" => "unknown"
            }
          }
        }
    }
    

    上面例子根据rabbitmq每条消息的headers里判断有没有 index_header 来给新字段的值添加不同的值

  5. 复杂case4

    有时候需要根据不同的字段采取不同的模式匹配

    filter {
        if [fields][logtype] == "pglog" {
          grok {
            match => {"message" => ["%{TIMESTAMP_ISO8601:timestamp} %{TZ:tz} \[%{NUMBER:process_id}\-%{DATA:line_number:int}\] (?:%{DATA:db_owner}@%{DATA:db_name} )?%{WORD:log_level}:  %{GREEDYDATA:log_data}"]
            }
          }
        } else if [fields][logtype] == "redislog" {
          grok {
            match => {"message" => ["\[%{POSINT:pid}\] %{MONTHDAY} %{MONTH} %{TIME} %{DATA:level} %{GREEDYDATA:log_data}"]
            }
          }
        }
    
    }
    

1.3.3 Output

Output用来配置logstash将信息发往何处,可支持插件参考文档 ,常用的有elasticseach和stdout.

output{
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "logstash-%{[@metadata][beat]}-%{[fields][env]}-%{[fields][logtype]}-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"
    user => logstash
    password => logstash

  }
}

上述配置将信息发往elasticseach, 并且index和type是根据每条信息动态设置的。

stdout { codec => rubydebug { metadata => true } }

上述配置将信息输出到标准输出,用于调试目的

1.4 Kibana

Kibana的目录结构

➜  tree
.
├── Dockerfile
└── config
    └── kibana.yml

kibana的配置文件在 kibana/config/kibana.yml

server.name: kibana
server.host: "0"
elasticsearch.url: http://elasticsearch:9200

## Custom configuration
#
elasticsearch.username: "kibanaserver"
elasticsearch.password: "kibanaserver"

searchguard.cookie.password: "123567818187654rwrwfsfshdhdhtegdhfzftdhncn"

主要用来配置连接到的elasticseach地址和用户名密码

kibana web界面的使用博大精深,看其他资源吧;)

1.5 Filebeat

Filebeat是elastic的beats产品家族的一员,用来收集文件日志,相比于logstash的file input插件,它非常轻量,消耗地资源很少。通常可以用来和elk搭配使用,在需要收集的机器上使用beats,然后发送到logstash或者直接发到elasticsearch上,有时考虑到稳定性等因素,也可以发送到消息队列里,filebeat不支持rabbitmq输出,支持kafa,github上对此事有一些讨论。

示例简单配置文件

filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /mnt/postgresql-9.6-main.log
  fields:
    logtype: pglog
    env: dev
  multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'   # 配置多行输入
  multiline.negate: true
  multiline.match: after

- type: log
  enabled: true
  paths:
    - /mnt/redis-server.log
  fields:
    logtype: redislog
    env: dev

output.logstash:         # 直接输出到logstash
  # The Logstash hosts
  hosts: ["10.47.52.122:5044"]

# logstash 的 beats 输入插件
input {
    beats {
        port => 5044
    }
}


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK