31

基于Bro的应用层数据包识别工具

 5 years ago
source link: http://www.freebuf.com/sectool/179757.html?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

拓扑介绍

应用识别系统将会通过bro识别应用并生成日志。ELK部署在远程端,用于收集,分析,存储和识别所有日志。BRO安装在IP为192.168.1.147的机器上,ELK安装在IP为192.168.1.142的机器上。

ens33              网络接口名称   
192.168.1.147      bro设备的网络接口ipv4地址    
192.168.1.142      ELK设备的网络接口ipv4地址

安装bro

由于metron-bro-plugin-kafka插件的编译安装依赖于bro源代码,因此我们使用bro源代码进行安装。

依赖安装

~/src$ sudo apt-get install -y tcpdump git vim cmake make gcc g++ flex bison libpcap-dev python-dev swig zlib1g-dev libssl1.0-dev dirmngr curl openjdk-8-jre zookeeperd

下载bro源代码并验证

~/src$ wget https://www.bro.org/downloads/bro-2.5.4.tar.gz 
~/src$ wget https://www.bro.org/downloads/bro-2.5.4.tar.gz.asc 
~/src$ gpg --recv-keys C68B494DF56ACC7E
~/src$ gpg -v bro-2.5.4.tar.gz.asc

安装

~/src$ tar -xvf bro-2.5.4.tar.gz 
~/src$ cd bro-2.5.4/ 
~/src/bro-2.5.4$ ./configure --enable-debug 
~/src/bro-2.5.4$ make 
~/src/bro-2.5.4$ sudo make install

设置env值

Bro安装的目录位置为/usr/local/bro/bin/,添加以下行至/etc/bash.bashrc文件中:

if [ -d /usr/local/bro/bin ];then
  PATH="/usr/local/bro/bin/:$PATH"
  export PATH
fi

将bro设置为systemd并启动bro服务

~# cat > /etc/systemd/system/bro.service << EOL
[Unit]
Description=Bro
After=network.target

[Service]
ExecStartPre=/usr/local/bro/bin/broctl cleanup
ExecStartPre=/usr/local/bro/bin/broctl check
ExecStartPre=/usr/local/bro/bin/broctl install
ExecStart=/usr/local/bro/bin/broctl start
ExecStop=/usr/local/bro/bin/broctl stop
RestartSec=10s
Type=oneshot
RemainAfterExit=yes
TimeoutStopSec=600

[Install]
WantedBy=multi-user.target

EOL

~# systemctl daemon-reload   
~# systemctl enable bro  
~# systemctl start bro

有关bro的详细安装和使用,请参阅文档 how-to-deploy-bro-app-identification-env.mkd

将本地解析配置文件添加到BRO

以下我将通过一个示例来说明,本地Bro解析配置文件为 bro_parse_jd.bro

在/usr/local/bro/share/bro/site路径下创建一个名为harbian的目录,并将bro_parse_jd.bro文件放置在该目录中。然后在harbian目录中创建一个名为__load__.bro的文件,并在 __load__.bro文件中添加以下行:

@load ./bro_parse_jd.bro.

__load__.bro文件仅用于在启动bro时自动加载当前目录中的bro配置文件。此外,我们还需要将以下行添加到文件/usr/local/bro/share/bro/site/local.bro中:

@load ./harbian

然后使用以下命令重启bro服务:

~# systemctl restart bro

安装Kafka

下载kafka源

~/src$ wget https://archive.apache.org/dist/kafka/1.0.0/kafka_2.12-1.0.0.tgz 
~/src$ wget https://archive.apache.org/dist/kafka/1.0.0/kafka_2.12-1.0.0.tgz.asc

验证

~/src$ gpg --recv-keys  3B417B9B 
~/src$ gpg -v kafka_2.12-1.0.0.tgz.asc

安装kafka并启动服务

~/src$ tar -xvf kafka_2.12-1.0.0.tgz
~/src$ sudo mv kafka_2.12-1.0.0 /opt/kafka
~/src$ sudo sed -i '/^log.dirs/{s/=.*//;}' /opt/kafka/config/server.properties
~/src$ sudo sed -i 's/^log.dirs/log.dirs=\/var\/lib\/kafka/' /opt/kafka/config/server.properties
~/src$ sudo sed -i '$alisteners=bro://192.168.1.147:9092' /opt/kafka/config/server.properties 

~/src# cat > /etc/systemd/system/kafka.service << EOF
[Unit]
Description=Kafka Service
Wants=network.target
After=network.target

[Service]
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecReload=on-failure
Restart=always
User=root
Group=root
StandardOutput=syslog
StandardError=syslog

[Install]
WantedBy=multi-user.target
EOF

启用kafka服务并启动

启动kafka服务时,必须确保zookeeper服务已启动。

~/src$ systemctl enable zookeeper 
~/src$ systemctl start zookeeper
~/src$ sudo systemctl daemon-reload
~/src$ sudo systemctl enable kafka 
~/src$ sudo systemctl start kafka

安装metron-bro-plugin-kafka插件

安装librdkafka

~/src$ curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar xvz 
~/src$ cd librdkafka-0.9.4/ 
~/src/librdkafka-0.9.4$ ./configure --enable-sasl 
~/src/librdkafka-0.9.4$ make 
~/src/librdkafka-0.9.4$ sudo make install

安装插件

~/src$ git clone https://github.com/apache/metron-bro-plugin-kafka.git
~/src$ cd metron-bro-plugin-kafka
~/src/metron-bro-plugin-kafka$ ./configure --bro-dist=$HOME/src/bro-2.5.4/
~/src/metron-bro-plugin-kafka$ make 
~/src/metron-bro-plugin-kafka$ sudo make install

确认插件是否已被正确安装:

~/src# bro -N Apache::Kafka

如何设置bro日志写入到kafka

设置以下行到/usr/local/bro/share/bro/site/local.bro文件中:

@load /usr/local/bro/lib/bro/plugins/APACHE_KAFKA/scripts/Apache/Kafka/logs-to-kafka.bro
redef Kafka::topic_name = "";
redef Kafka::logs_to_send = set(Conn::LOG, HTTP::LOG, DNS::LOG, SMTP::LOG, SSL::LOG, Software::LOG, DHCP::LOG, FTP::LOG, IRC::LOG, Notice::LOG, X509::LOG, SSH::LOG, SNMP::LOG);
redef Kafka::kafka_conf = table(["metadata.broker.list"] = "192.168.1.147:9092");

确保bro日志已被写入到kafka中:

~/src# systemctl status kafka | grep "Active:.active"
   Active: active (running) since Tue 2018-07-24 03:25:10 CST; 23min ago

~/src# netstat -ntpl  | grep 9092
tcp6       0      0 192.168.1.147:9092      :::*                    LISTEN      30913/java 

~/src$ ls /var/lib/kafka/bro-0/00000000000000000000.log

安装ELK

依赖安装

~$ sudo apt-get install -y openjdk-8-jre curl wget libgeoip-dev

下载ELK deb包以及SHA512 (512-bit) checksums文件

~$ mkdir src; cd src 
~/src$ wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.0.deb 
~/src$ wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.0.deb.sha512 
~/src$ wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.0.deb 
~/src$ wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.0.deb.sha512 
~/src$ wget https://artifacts.elastic.co/downloads/kibana/kibana-6.3.0-amd64.deb 
~/src$ wget https://artifacts.elastic.co/downloads/kibana/kibana-6.3.0-amd64.deb.sha512

验证ELK deb包

~/src$ sha512sum -c elasticsearch-6.3.0.deb.sha512  
elasticsearch-6.3.0.deb: OK
~/src$ sha512sum -c logstash-6.3.0.deb.sha512 
logstash-6.3.0.deb: OK
~/src$ sha512sum -c kibana-6.3.0-amd64.deb.sha512 
kibana-6.3.0-amd64.deb: OK

安装ELK deb包

~/src$ sudo dpkg -i *.deb

Logstash配置

~/src$ echo config.reload.automatic: true |sudo tee -a /etc/logstash/logstash.yml
~/src$ echo config.reload.interval: 3s |sudo tee -a /etc/logstash/logstash.yml

创建新的logstash配置

为每种类型的日志生成conf文件。以下仅是软件的一个示例:

~/src# cat > /etc/logstash/conf.d/bro-software.conf << EOF 
input {
  kafka {
    topics => ["software"]
    group_id => "bro_logstash"
     		bootstrap_servers => "192.168.1.147:9092"
     		codec => json
     		auto_offset_reset => "earliest"
   	}
}

output {
  elasticsearch {
     		hosts => ["192.168.1.142:9200"]
    index => "bro-software"
    document_type => "software"
   	}
}

其他conf文件你可以在 logstash-conf 中找到。将这些配置文件放置在/etc/logstash/conf.d目录中。

Elasticsearch配置

将Elasticsearch绑定到localhost

将以下行添加到/etc/elasticsearch/elasticsearch.yml;

network.host: "192.168.1.142" 
http.port:9200

如果elasticsearch服务为远程端,请将绑定地址设置为指定的IP地址。

确保elasticsearch正常工作

sudo systemctl start elasticsearch 
curl http://192.168.1.142:9200
{
  "name" : "VZDjFmY",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "xql3xQSbSvinXDIYchwswQ",
  "version" : {
    "number" : "6.3.0",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "424e937",
    "build_date" : "2018-06-11T23:38:03.357887Z",
    "build_snapshot" : false,
    "lucene_version" : "7.3.1",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

Kibana配置

在Kibana配置文件/etc/kibana/kibana.yml中添加以下行:

server.port: 5601
server.host: "192.168.1.142"
elasticsearch.url: "http://192.168.1.142:9200"

启动ELK服务

~/src$ sudo /bin/systemctl daemon-reload
~/src$ sudo /bin/systemctl enable elasticsearch.service logstash.service kibana.service
~/src$ sudo systemctl start elasticsearch.service kibana.service logstash.service

Kibana

打开Kibana

在浏览器地址栏中输入192.168.1.142:5601。打开如下所示页面:

z6RBZjV.jpg!web

创建索引模式

以下是使用软件日志创建模式的示例:

BBNvUv6.jpg!webz6R7jeZ.jpg!webrINvQvE.jpg!webzMVBJnm.jpg!webqEzMbaU.jpg!web

发现软件索引模式

yAr6R3q.jpg!web

创建可视化

nIRRraf.jpg!webyiUryi7.jpg!webumYbiav.jpg!webRNvqQvM.jpg!webJfUN7fi.jpg!webzyMZFjU.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK