Logstash解析出Field以后, 可以使用filter的ruby插件进行简单数学计算/大小写转换等操作(官方介绍地址), 下面是配置
input {
kafka{
bootstrap_servers => ["www.hizy.net:6667,www.xpdo.net:6667","www.zhukun.net:6667"]
client_id => "logstash_www.xpdo.net"
group_id => "www.zhukun.net"
auto_offset_reset => "latest"
consumer_threads => 10
decorate_events => false
topics => ["www.zhukun.net"]
}
}
filter {
mutate {
gsub =>["message",'\\"','"']
gsub =>["message",'\\"','\\\\"']
}
json {
source => "message"
target => "aduser"
}
# 将[aduser][action][info][timestamp]映射为@timestamp
# 需要注意的是, 即使是UNIX时间戳, 也有带毫秒和不带毫秒的, 可能是UNIX或者UNIX_MS
date {
match => [ "[aduser][action][info][timestamp]", "UNIX_MS" ]
target => "@timestamp"
locale => "cn"
}
# 如果这2个Field都存在, 则对它们进行相加, 形成一个新的Field
if [aduser][action][param][vast][during_time] and [aduser][action][param][resource][during_time] {
ruby {
code => 'event.set("[aduser][action][param][vast_resource_during_time]", event.get("[aduser][action][param][vast][during_time]") + event.get("[aduser][action][param][resource][during_time]") )'
}
} else {
drop { }
}
mutate {
remove_field => [ "message" ]
}
}
output {
stdout {
codec => rubydebug {
# metadata => true
}
}
}
参考文档:
官方介绍地址
Simple Math Functions with Ruby in Logstash 5.3
Logstash中的数学函数