## inputs from Amazon Managed Kafka input { kafka { bootstrap_servers => "$kafka_brokers" topics_pattern => ".*" codec => "json" decorate_events => true } } ## filter to tidy up filter { # add kafka fields from decoration mutate { add_field => {"[kafka][topic]" => "%{[@metadata][kafka][topic]}"} add_field => {"[kafka][consumer_group]" => "%{[@metadata][kafka][consumer_group]}"} add_field => {"[kafka][partition]" => "%{[@metadata][kafka][partition]}"} add_field => {"[kafka][offset]" => "%{[@metadata][kafka][offset]}"} add_field => {"[kafka][timestamp]" => "%{[@metadata][kafka][timestamp]}"} } # convert [kafka][timestamp] to timestamp type date { timezone => "UTC" match => ["[kafka][timestamp]", "UNIX_MS"] target => "[kafka][timestamp]" } # copy @timestamp to agent.timestamp mutate { copy => { "[@timestamp]" => "[agent][timestamp]" } copy => { "source_field" => "dest_field" } } # don't need field log_topic anymore (use kafka topic) mutate { remove_field => [ "[fields][log_topic]" ] } # tidy up the apachelogs if [kafka][topic] == "apachelog" { # grok for a common apache log grok { match => { "message" => "%{COMMONAPACHELOG}" } } # update fields into doc mutate { rename => ["[auth]", "[doc][auth]" ] rename => ["[bytes]", "[doc][bytes]" ] rename => ["[clientip]", "[doc][clientip]" ] rename => ["[httpversion]", "[doc][httpversion]" ] rename => ["[ident]", "[doc][ident]" ] rename => ["[request]", "[doc][request]" ] rename => ["[response]", "[doc][response]" ] rename => ["[timestamp]", "[doc][timestamp]" ] rename => ["[verb]", "[doc][verb]" ] } # drop the message field mutate { remove_field => [ "message" ] } # timestamp to timestamp date { match => [ "[doc][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ] } # bytes, response to integer mutate { convert => { "[doc][bytes]" => "integer" } convert => { "[doc][response]" => "integer" } convert => { "[log][offset]" => "integer" } } } # tidy up the appevents if [kafka][topic] == "appevent" { # get json fields from message json { source => "message" target => "doc" } # timestamp to timestamp dtype date { match => [ "[doc][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ] } # update field dtypes mutate { convert => { "[doc][purchase]" => "boolean" } convert => { "[doc][response]" => "integer" } convert => { "[log][offset]" => "integer" } } # drop message field mutate { remove_field => [ "message" ] } } } ## output to Amazon Elasticsearch Service output { amazon_es { hosts => [ "$es_endpoint" ] region => "$elkk_region" index => "elkk-%{[kafka][topic]}-%{+YYYY.MM.dd}" codec => "json" } ## output to s3 s3 { region => "$elkk_region" bucket => "$s3_bucket" size_file => 2048 time_file => 5 codec => "json" prefix => "elkk-%{[kafka][topic]}/%{+YYYY}/%{+MM}/%{+dd}" } }