logstash在filter段对日志进行解析的时候, 可以直接筛选出我们想要的日志内容, 如果日志内容里不包括某些字段, 我们可以把整条日志直接扔掉, 下面是配置.
input {
kafka {
bootstrap_servers => k1.zhukun.net:6687 k2.zhukun.net:6687
topics => ["com.prod.feedengine","com.prod.feedgateway"]
# 如果收取多个kafaka topic里的消息也可以用下面的写法
# topics_pattern => "zhukun.net.log.rms-api.*"
group_id => logstash-mp-ops
consumer_threads => 10
decorate_events => true
auto_offset_reset => "latest"
}
}
filter {
# 如果message里不以2019/2020/2021开头, 则直接丢弃整条日志
if [message] !~ /^[2020|2021|2019]/ {
drop { }
}
# 直接打印出来原始日志看看
#ruby {
# code => 'puts event(message)'
#}
# grop正则匹配
grok {
match => { message => '%{TIMESTAMP_ISO8601:time_local}\s*\[%{DATA:service}\]\s*%{LOGLEVEL:loglevel}\s*%{DATA:message}$' }
overwrite => [message]
tag_on_failure => ["_invalid_log_format"] # 如果解析失败则加上这个tag
}
# 如果日志解析成功,那么
if !("_invalid_log_format" in [tags]) {
mutate {
# 如果把整条日志都解析出来以后(已经解析到各个tag之中), 原始日志应该也没什么用了, 可以考虑直接扔掉原始日志
remove_field => [ "message" ]
# 将kafka topic的名字作为oootype字段
add_field => { "oootype" => "%{[@metadata][kafka][topic]}" }
gsub => [
"logInfo", "\t\t", ""
]
}
# 日期处理
date {
# 将time_local赋给@timestamp字段, 右侧是time_local的实际格式, 例如2019-03-18 08:12:45.006
match => ["time_local", "yyyy-MM-dd HH:mm:ss.SSS"]
# match => [ "logTime", "ISO8601" ]
# timezone => "Asia/Shanghai"
target => "@timestamp" # 默认target就是@timestamp
tag_on_failure => [ "_dateparsefailure" ]
# remove_field => [ "time_local" ]
}
}
}
output {
elasticsearch {
hosts => [10.18.4.24:9200,10.18.4.25:9200,10.18.4.77:9200,10.18.4.78:9200, 10.11.149.69:9200,10.16.22.149:9200]
index => zhukun.net_console.log-%{+yyyy.MM.dd}
}
#stdout {
# codec => rubydebug {
# metadata => true
# }
#}
}
补充:
logstash可以使用条件判断来控制filter的执行。官方说明见Accessing Event Data and Fields in the Configuration。支持的运算符包括:
相等: ==, !=, <, >, <=, >=
正则: =~(匹配正则), !~(不匹配正则)
包含: in(包含), not in(不包含)
布尔操作: and(与), or(或), nand(非与), xor(非或)
一元运算: !(取反), ()(复合表达式), !()(对复合表达式结果取反)
参考文档
Drop filter plugin
Logstash Grep and Drop
Missing grep filter in logstash
logstash挺好用的