https://github.com/nbs-system/naxsi/wiki/naxsilogs 将naxsi产生的日志收集到elk里面方便分析统计。
在logstash里面新建文件夹,写好正则过滤:
logstash> mkdir pattern logstash> cd pattern logstash> vim naxsi DA1 \d{4}/\d{2}/\d{2} TM1 \d{2}:\d{2}:\d{2} LEVEL (\w+) NUM1 \d+(?:#0: \*) NUM2 \d+ EXLOG NAXSI_EXLOG FMT NAXSI_FMT ID1 (\d+) ZONE \w+ VAR1 (.*) CONTENT (.*) T3 \w+ T4 HTTP/1\.1", host: "(.*)", referrer: " HOST (.*) NAXSI %{DA1:date}\s%{TM1:time}\s\[%{LEVEL:level}\]\s%{NUM1:num1}%{NUM2:request_id}\s(?NAXSI_EXLOG):\s\w+=%{HOST:client_ip}&server=%{HOST:hostname}&uri=%{PROG:filepath}&id=%{ID1:id}&zone=%{ZONE:zone}&var_name=%{VAR1:var}&content=%{CONTENT:content},\sclient\:\s%{HOST:ip3},\sserver:\s(.*)\srequest:\s"%{T3:method}\s%{HOST:uri}\sHTTP/1\.1",\shost:\s"%{HOST:host22}" NAXSI2 %{DA1:date}\s%{TM1:time}\s\[%{LEVEL:level}\]\s%{NUM1:num1}%{NUM2:request_id}\s(?NAXSI_EXLOG):\s\w+=%{HOST:client_ip}&server=%{HOST:hostname}&uri=%{PROG:filepath}&id=%{ID1:id}&zone=%{ZONE:zone}&var_name=%{VAR1:var}&content=%{CONTENT:content},\sclient\:\s%{HOST:ip3},\sserver:\s(.*)\srequest:\s"%{T3:method}\s%{HOST:uri}\sHTTP/1\.1",\shost:\s"%{HOST:host22}",\sreferrer:\s"(?(.*)) NAXSI3 %{DA1:date}\s%{TM1:time}\s\[%{LEVEL:level}\]\s(\d+(?:#0:\s\*))%{NUM2:request_id}\s(?NAXSI_EXLOG)(.*)&var_name=%{VAR1:var}&content=%{CONTENT:content},\sclient\:\s(.*),\sserver:\s(.*)\srequest:\s"%{T3:method}\s%{HOST:uri} NAXSI4 %{DA1:date}\s%{TM1:time}\s\[%{LEVEL:level}\]\s(\d+(?:#0:\s\*))%{NUM2:request_id}\s(?NAXSI_EXLOG)(.*)&var_name=%{VAR1:var}&content=%{CONTENT:content},\sclient\:\s(.*),\sserver:\s(.*)\srequest:\s"%{T3:method}\s%{HOST:uri}\sHTTP/1\.1",\shost:\s"%{HOST:host}",\sreferrer:\s"(?(.*)) FMT %{DA1:date}\s%{TM1:time}\s\[%{LEVEL:level}\]\s%{NUM1:num1}%{NUM2:request_id}\s(?NAXSI_FMT):\sip=%{HOST:client_ip}&server=%{HOST:server_ip}&uri=%{UNIXPATH:uri}&learning=%{HOST:learing}&vers=%{HOST:vers}&total_processed=%{HOST:toal_processed}&total_blocked=%{HOST:total_blocked}&block=%{HOST:block}&cscore0=%{HOST:attack}&score0=%{HOST:score0}&cscore1=%{HOST:xss}&score1=%{HOST:score}&zone0=%{WORD:args}&id0=%{NUMBER:id}&var_name0=%{HOST:varname},\sclient:\s%{HOST:ip3},\sserver:\s(.*)\srequest:\s"%{T3:method}\s%{HOST:uri}\sHTTP/1\.1",\shost:\s"%{HOST:host22}" FMT_R %{DA1:date}\s%{TM1:time}\s\[%{LEVEL:level}\]\s%{NUM1:num1}%{NUM2:request_id}\s(?NAXSI_FMT):\sip=%{HOST:client_ip}&server=%{HOST:server_ip}&uri=%{UNIXPATH:uri}&learning=%{HOST:learing}&vers=%{HOST:vers}&total_processed=%{HOST:toal_processed}&total_blocked=%{HOST:total_blocked}&block=%{HOST:block}&cscore0=%{HOST:attack},\sclient:\s(.*)
上面这些代码就是解析最上面日志的正则,其中用到的是NAXSI3,NAXSI4,FMT_R,其他是调试用的。然后給logstash添加plugin:
bin/logstash-plugin install logstash-filter-grok bin/logstash-plugin install logstash-filter-ruby
然后配置/etc/logstash.conf文件:
input{ file { path => "/usr/local/nginx/logs/naxsi.err" type => "naxsi-error" start_position => "beginning" } } filter{ if [type] == "naxsi-error" { grok { patterns_dir => "/opt/logstash-5.5.1/pattern" match => [ "message" , "%{NAXSI4}", "message" , "%{NAXSI3}", "message" , "%{FMT_R}" ] } ruby { code => "require 'digest/md5'; event.set('computed_id', Digest::MD5.hexdigest(event.get('request_id')+event.get('time')) + '_' + event.get('logtype'))" } } } output{ if [type] == "naxsi-error" { elasticsearch { hosts => ["localhost"] index => "nxapi" document_id => "%{computed_id}" } stdout { codec => rubydebug} } }
配置好之后,启动logstash,这样对于同一次拦截会产生两条elk的日志,其中日志的document_id,前缀是请求的id和时间计算的hash,类似这样:
e0737c661e5e3457fbc3d847f75817fa_NAXSI_FMT e0737c661e5e3457fbc3d847f75817fa_NAXSI_EXLOG
在线调试正则: https://grokdebug.herokuapp.com