41

收集各类安全设备、Nginx日志实现日志统一管理及告警

 5 years ago
source link: http://www.freebuf.com/articles/es/176953.html?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

*本文原创作者:hackerbaba,本文属FreeBuf原创奖励计划,未经许可禁止转载

一、日志收集及告警项目背景

近来安全测试项目较少,想着把安全设备、nginx日志收集起来并告警, 话不多说,直接说重点,搭建背景:  

 1. 日志源:安全设备日志(Imperva  WAF、绿盟WAF、paloalto防火墙)、nginx日志等;
 2. 日志分析开源软件:ELK,告警插件:Sentinl 或elastalert,告警方式:钉钉和邮件;
 3. 安全设备日志->logstash->es,nginx日志由于其他部门已有一份(flume->kafka)我们通过kafka->logstash->es再输出一份,其中logstash的正则过滤规则需要配置正确,不然比较消耗性能,建议写之前使用grokdebug先测试好再放入配置文件;
 4. 搭建系统:centos 7 , JDK 1.8, Python 2.7
 5. ELK统一版本为5.5.2

由于es和kibana的安装都比较简单,就不在下文中说明安装及配置方法了。相关软件的下载链接如下:

Es: https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.2.tar.gz

Kibana: https://artifacts.elastic.co/downloads/kibana/kibana-5.5.2-linux-x86_64.tar.gz

Logstash: https://artifacts.elastic.co/downloads/logstash/logstash-5.5.2.tar.gz

测试链接Grokdebug: http://grokdebug.herokuapp.com

二、安全设备日志收集

2.1  Imperva WAF配置

策略->操作集->新增日志告警规则

aAfAz2Q.jpg!webe6BVBrB.jpg!web

我这边没有用设备自身的一些日志规则,而是根据手册自定义了一些需要的日志字段,可在自定义策略的消息填入如下字段:

StartTime=$!{Alert.createTime}AlarmID=$!{Alert.dn} EventID=$!{Event.dn}
AggregationInfo=$!{Alert.aggregationInfo.occurrences}Alert_level=$!{Alert.severity}
 RuleName=$!{Alert.alertMetadata.alertName} Category=$!{Alert.alertType}
 Alert_description=$!{Alert.description}EventType=$!{Event.eventType}
PolicyName=$!{Rule.parent.displayName}SrcIP=$!{Event.sourceInfo.sourceIp}
SrcPort=$!{Event.sourceInfo.sourcePort}Proto=$!{Event.sourceInfo.ipProtocol}
DstIP=$!{Event.destInfo.serverIp}DstPort=$!{Event.destInfo.serverPort}
WebMethod=$!{Event.struct.httpRequest.url.method}Domain=$!{Alert.serverGroupName}
URL=$!{Event.struct.httpRequest.url.path}ResponseCode=$!{Event.struct.httpResponse.responseCode}Alert_key=$!{Event.struct.httpRequest.url.queryString}Action=$!{Alert.immediateAction}
ResponseTime=$!{Event.struct.responseTime}ResponseSize=$!{Event.struct.responseSize}
Headers_value=$!{Event.struct.httpRequest.headers.value}Parameters_value=$!{Event.struct.httpRequest.parameters.value}

参数说明:告警开始时间、告警ID、事件ID、事情数量、告警级别…. HTTP返回码、触发告警字符串、响应动作、响应时间、响应大小、http包头的值,中间省略的部分请自行查看手册。其实Imperva WAF的总日志字段数不少于一两百个,单从这一点可以看出确实好于国产WAF太多。

针对Imperva WAF的logstash配置如下:

input{
   syslog{
   type => "syslog"
   port => 514
    }
}
filter {
grok  {
match =>["message","%{GREEDYDATA:StartTime} AlarmID=%{NUMBER:AlarmID}
EventID=%{NUMBER:EventID}AggregationInfo=%{NUMBER:AggregationInfo}
Alert_level=%{DATA:Alert_level}RuleName=%{GREEDYDATA:RuleName}
Category=%{GREEDYDATA:Category}Alert_description=%{GREEDYDATA:Alert_description}
EventType=%{DATA:EventType}PolicyName=%{GREEDYDATA:PolicyName} SrcIP=%{IPV4:SrcIP}
SrcPort=%{NUMBER:SrcPort}Proto=%{DATA:Proto} DstIP=%{IPV4:DstIP}
DstPort=%{NUMBER:DstPort}WebMethod=%{GREEDYDATA:WebMethod}
Domain=%{DATA:Domain}URL=%{GREEDYDATA:URL}
ResponseCode=%{GREEDYDATA:ResponseCode}Alert_key=%{GREEDYDATA:Alert_key}
Action=%{DATA:Action}ResponseTime=%{GREEDYDATA:ResponseTime}
ResponseSize=%{GREEDYDATA:ResponseSize}Headers_value=%{GREEDYDATA:Headers_value}Parameters_value=%{GREEDYDATA:Parameters_value}"]  
# 实际复制粘贴可能会有点格式问题,注意参数之间空一个空格即可
  remove_field => ["message"]
}
geoip {
   source => "SrcIP"   #IP归属地解析插件
  }
}
output{
   elasticsearch{
     hosts => "es单机或集群地址:9200"   
     index => "impervasyslog"
   }  
}

说明:其中geoip为elk自带插件,可以解析ip归属地,比如ip归属的国家及城市,在仪表盘配置“地图炮”装X、查看攻击源地理位置的时候有点用,

2.2 绿盟WAF配置

日志报表->日志管理配置->Syslog配置&日志发生参数

uMZJBzb.jpg!webEjq2iaz.jpg!webvAbayav.jpg!web

针对绿盟WAF的logstash配置如下:

input和output参照imperva waf,贴出最要的grok部分,如下:

grok  { 
    match => ["message",%{DATA:syslog_flag}  site_id:%{NUMBER:site_id} 
protect_id:%{NUMBER:protect_id}  dst_ip:%{IPV4:dst_ip}  dst_port:%{NUMBER:dst_port} 
src_ip:%{IPV4:src_ip}src_port:%{NUMBER:src_port} method:%{DATA:method}
domain:%{DATA:domain}  uri:%{DATA:uri}  alertlevel:%{DATA:alert_level}
event_type:%{DATA:Attack_types} stat_time:%{GREEDYDATA:stat_time} 
policy_id:%{NUMBER:policy_id}rule_id:%{NUMBER:rule_id}  action:%{DATA:action} 
block:%{DATA:block} block_info:%{DATA:block_info}  http:%{GREEDYDATA:URL}

2.3 paloalto防火墙配置

(6.1版本,其他版本可能会有点差异)

新建syslog服务器->日志转发,具体看截图

YNRJFzv.jpg!webfemUn2e.jpg!webEn2qEju.jpg!webErmqEne.jpg!webzYnInqN.jpg!web

针对PA的logstash配置如下:

input和output参照imperva waf,贴出最要的grok部分,如下:

grok  { 
    match =>["message","%{DATA:PA_Name},%{GREEDYDATA:Time},%{NUMBER:Eventid},%{DATA:Category},%{DATA:Subcategory},%{NUMBER:NULL},%{GREEDYDATA:Generate_Time},%{IPV4:SourceIP},%{IPV4:DestinationIP},%{IPV4:NAT_SourceIP},%{IPV4:NAT_DestinationIP},%{DATA:RuleName},%{DATA:SourceUser},%{DATA:DestinationUser},%{DATA:Application},%{DATA:VMsys},%{DATA:SourceZone},%{DATA:DestinationZone},%{DATA:IN_interface},%{DATA:OUT_interface},%{DATA:Syslog},%{DATA:GREEDYDATA:TimeTwo},%{NUMBER:SessionID},%{NUMBER:Repeat},%{NUMBER:SourcePort},%{NUMBER:DestinationPort},%{NUMBER:NAT-SourcePort},%{NUMBER:NAT-DestinationPort},%{DATA:Flag},%{DATA:Proto},%{DATA:Action},%{DATA:NUll2},%{DATA:ThreatName},%{DATA:Category2},%{DATA:Priority},%{DATA:Direction},%{NUMBER:Serialnum},%{GREEDYDATA:NULL3}"]

贴两张最终的效果图:

M3YRbaA.jpg!webyeYVZn2.jpg!web

三、Nginx日志收集

由于nginx日志已经被其他大数据部门收集过一遍了,为避免重复读取,我们从其他部门的kafka拉取过来即可,这里说一下nginx收集的方式,flume->kafka 示例配置方式如下:

Flume配置如下

配置扫描日志文件

log_analysis_test.conf配置文件

 a1.sources=s1  #可以理解为输入端,定义名称为s1
 a1.channels=c1  #传输频道,类似队列,定义为c1,设置为内存模式 
 a1.sinks=k1 #可以理解为输出端,定义为sk1

#source配置

 a1.sources.s1.type=exec
 a1.sources.s1.command=tail -F/data/log/nginx/crf_crm.access.log
 a1.sources.s1.channels=c1

#channel配置

 a1.channels.c1.type=memory
 a1.channels.c1.capacity=1000
 a1.channels.c1.transactionCapacity=100
 a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink  设置Kafka接收器
 a1.sinks.k1.channel=c1
 a1.sinks.k1.topic=crm_nginx_log_topic  #设置Kafka的Topic
 a1.sinks.k1.brokerList=x.x.x.x:9092  #设置Kafka的broker地址和端口号
 a1.sinks.k1.requiredAcks=1
 a1.sinks.k1.batchSize=20

kafka配置

kafka下载

wgethttp://mirror.bit.edu.cn/apache/kafka/0.8.2.2/kafka_2.9.1-0.8.2.2.tgz

配置zookeeper, 根据机器状况更改jvm 内存设置

vimbin/zookeeper-server-start.sh

配置kafka

 vimbin/kafka-server-start.sh   根据机器状况更改jvm 内存设置

启动zookeeper

nohupbin/zookeeper-server-start.sh config/zookeeper.properties &

启动kafka

nohup bin/kafka-server-start.shconfig/server.properties &

#创建应用服务器的topic

kafka-topics.sh --create –zookeeper x.x.x.x:2181--replication-factor 2 --partitions 5 --topic crm_nginx_log_topic

分区及副本以自己的情况而定

查看Topic是否创建成功

./bin/kafka-topics.sh --list --zookeeperx.x.x.x:2181

#启动kafka生产者

bin/kafka-console-producer.sh --broker-listx.x.x.x:9092 --topic pay 

#启动kafka消费者

kafka-console-consumer.sh --zookeeperx.x.x.x:2181--topic crm_nginx_log_topic

logstash 配置文件

input{
   kafka {
       codec => "plain"
       group_id => "logstash1"
       auto_offset_reset => "smallest"
       reset_beginning => true
       topic_id => "crm_nginx_log_topic"
       zk_connect => "x.x.x.x:2181" # zookeeper的地址
   }
grok {省略}
}
output{
   elasticsearch{
     hosts => "es单机或集群地址:9200"   
      index => "impervasyslog"
   }

四、安全告警

安全告警可以是Sentinl或 elastalert,Sentinl是kibana插件,可以集成到kibana内图形化展示,但是写规则时需要对JS较熟悉,elastalert 是es的插件,不支持集成到kibana界面进行图形化展示。下面分别对这2个插件进行安装及配置说明。

4.1 Sentinl

安装如下:

kibana-plugin install https://github.comsirensolutionssentinlreleasesdownloadtag-

5.4sentinl-v5.4.1.zip

安装后,以IP请求频次告警设置为例:

每2分钟去es查询一次

NzmINnu.jpg!web

针对需要监控的IP字段

Input的body内容即es查询语法

ERvQVzE.jpg!web

编写过滤条件

{
  "script": {
    "script":"payload.json='超过阀值40';var match=false;var threshold=40;varfirst=payload.aggregations.code.buckets;for(var i=0;i<first.length;i++){varip_count = parseInt(first[i].doc_count);if (ip_count >= threshold){match=true;payload.json += '【ip】:' + first[i].key + '  【count】:' +first[i].doc_count + '\\n';}};match;"
  }
}

UJr6jeQ.jpg!web

如果想对状态码做监控,参考如下:

eIjumia.jpg!webueuINfB.jpg!web

Action里面添加钉钉群机器人的webhook

EzARN3n.jpg!web

钉钉报警如下

6zamIfb.jpg!web

钉钉的接口文档链接:

https://open-doc.dingtalk.com/docs/doc.htm?spm=a219a.7629140.0.0.karFPe&treeId=257&articleId=105735&docType=1

邮件告警设置如下:

AB36ryq.jpg!web

4.2 elastalert

前置条件:

 JDK 1.8
 python 2.7
 easy_install -U setuptools (最新setuptools-39.2.0)
 yum install gcc
 yum install python-devel
 yum install libffi-devel

安装

git clone https://github.com/Yelp/elastalert.git

cd elastalert

python setup.py install

pip install -r requirements.txt

cp config.yaml.example config.yaml

下载 https://github.com/xuyaoqiang/elastalert-dingtalk-plugin

elastalert-dingtalk-plugin中的elastalert_modules复制到elastalert下

创建索引

elastalert-create-index   

#输入 es的IP 及 es的端口,其余根据自己的环境写,一般默认即可

配置config.yaml,以下为关键配置信息:

rules_folder: example_rules  #指定rule的目录

run_every:

minutes: 1   #每一分钟去探测一次

buffer_time:

minutes: 15   #缓存15分钟

es_host: x.x.x.x

es_port: 9200

writeback_index: elastalert_status  #创建的告警索引

alert_time_limit:

days: 2   #失败重试的时间限制

下面先以钉钉告警为例:

在example_rules里面新建钉钉告警配置文件,内容如下

es_host: x.x.x.x

es_port: 9200

name: xxx安全告警

type: cardinality

index: nsfocuswaf_syslog

cardinality_field: src_ip

max_cardinality: 30

timeframe:

minutes: 5   #单IP 5分钟内访问超过30次就会告警

aggregation_key: src_ip

summary_table_fields:

– src_ip

– dst_ip

– dst_port

– Attack_types

– stat_time

alert:

-”elastalert_modules.dingtalk_alert.DingTalkAlerter”

dingtalk_webhook: “your webhook”

dingtalk_msgtype: text

群机器人的配置比较简单,自己搜索一下即可

注意如果告警匹配了N条,却只发出1条告警,修改elastalert.py代码,在856行后面增加如下代码:

ZFvi637.jpg!web

修改dingtalk_alert.py的代码,增加如下内容:

src_ip = matches[0]['src_ip']   #src_ip为grok过滤后自定义的字段,以下相同
dst_ip = matches[0]['dst_ip']
dst_port = matches[0]['dst_port']
attack_types = matches[0]['Attack_types']
start_time =  matches[0]['stat_time']
修改payload初的代码,如下:
payload = {
"msgtype": self.dingtalk_msgtype,
       "text": {
           "content": r"发现源IP地址:{}在5分钟内,针对服务器IP:{}的{}端口发动了3次攻击,攻击类型是:{},攻击时间:{},请及时处理,谢谢!".format(src_ip,dst_ip,dst_port,attack_types,start_time)
            },

如果没有过滤除自定义的字段,只有message字段的话,可以新增如下代码:

message = matches[0]['message']
src_ip_pattern =r"src_ip:(\d+\.\d+\.\d+\.\d+)"
dst_ip_pattern =r"dst_ip:(\d+\.\d+\.\d+\.\d+)"
dst_port_pattern =r"dst_port:(\d+)"
time_pattern =r"stat_time:(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})"
type_pattern =r"event_type:(\w+)"
 src_ip= re.findall(src_ip_pattern, message)[0]
 dst_ip= re.findall(dst_ip_pattern, message)[0]
 dst_port= re.findall(dst_port_pattern, message)[0]
 start_time= re.findall(time_pattern, message)[0]
 attack_types= re.findall(type_pattern, message)[0]

最终效果图如下:

7BRf2i6.jpg!web

邮件的rule配置文件如下:

es_host: x.x.x.x

es_port: 9200

name: 信息安全告警

type: cardinality

index: nsfocuswaf_syslog

cardinality_field: src_ip

max_cardinality: 5

timeframe:

minutes: 5

alert:

- "email"

smtp_host: mail.crfchina.com

smtp_port: 25

smtp_auth_file:/opt/elastalert/smtp_auth_file.yaml

email_reply_to: [email protected] #回复给谁

from_addr: [email protected] #发件人

email:

- [email protected] #收件人

alert_subject: "信息安全告警"

alert_text_type: alert_text_only

alert_text: |

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

您好,

网站:{}正被人恶意攻击,请及时处理!!

当前匹配XX规则数:{}

发生时间:{}

攻击者的IP:{}

攻击类型:{}

请求的URL:{}

alert_text_args:

- domain

-num_hits

-stat_time

-src_ip

-Attack_types

-URL

邮件告警最终效果如下:

6JjUz2Q.jpg!web

五、总结

相关进程运行命令如下:

后台启动es

nohup su - elasticsearch -c'/opt/elasticsearch-5.5.2/bin/elasticsearch -d' &

后台启动logstash

nohup ./bin/logstash -f x.x.x.conf &

后台启动kibana

nohup ./kibana &

#启动flume

flume-ng agent --conf conf --conf-fileconf/log_analysis_test.conf --name a1 –
Dflume.root.logger=INFO,console

启动kafka

nohup bin/kafka-server-start.shconfig/server.properties &

后台启动elastalert

nohup python -m elastalert.elastalert--config ./config.yaml --verbose
--rule ./example_rules/DD_rule.yaml  &    

常见的告警策略除了来自安全设备的正则之外,大量的IP请求、错误状态码、nginx的request请求中包含的特征码也都是常见的告警规则。

参考链接:

http://www.freebuf.com/articles/others-articles/161905.html

http://www.freebuf.com/articles/web/160254.html

*本文原创作者:hackerbaba,本文属FreeBuf原创奖励计划,未经许可禁止转载


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK