Decrease logs count with the Logstash aggregation plugin

Vakhtang Matskeplishvili
2 min readFeb 5, 2023

Let’s talk a little bit about the Logstash aggregation plugin:
Information about this plugin can be found in the Logstash guide
https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html

Usecase: sometimes we need to aggregate our input events to improve visibility.

What does aggregation mean?
Aggregation means creating a structured events summary that contains merging results of several events.

For example:
We have a huge amount of logs. We don’t need all of them, because of their similarity.
To improve visibility/readability we could instead store only one log, that represents all of them based on the key/keys fields
Our original logs look like this:

{"@timestamp":"2023-01-23T03:21:40.881Z","source":{"host":"linx0001"},"field1":"This is the first example field","field2":"This is the second example field","destintion":{"port":53,"host":"8.8.8.8"}}
{"@timestamp":"2023-01-23T03:21:42.881Z","source":{"host":"linx0001"},"field1":"This is the first example field","field2":"This is the second example field","destintion":{"port":53,"host":"8.8.8.8"}}
{"@timestamp":"2023-01-23T03:21:43.881Z","source":{"host":"linx0001"},"field1":"This is the first example field","field2":"This is the second example field","destintion":{"port":53,"host":"8.8.8.8"}}
{"@timestamp":"2023-01-23T03:21:44.881Z","source":{"host":"linx0001"},"field1":"This is the first example field","field2":"This is the second example field","destintion":{"port":53,"host":"8.8.8.8"}}

As we can see, these logs have similar fields and only differ in the timestamp field.
There is no reason to store them in the database separately
It would be sufficient to store only one of them with the log sample and the logs count for the last 1 minute
We can do this in the Logstash with the following configuration:

input { 
stdin{
codec => json
}
}
filter {
###Build UID for aggregation, based on fields source.host, destination.host, destination.port
###We will use it as a key for your aggregation. Based on this key, the aggregation plugin will merge the data
fingerprint {
"source" => ["[source][host]", "[destination][host]", "[destination][port]"]
"target" => "task_id"
}

###Aggregate data
aggregate {
task_id => "%{task_id}"
code => "
#Ingest all fields to the new event
event.to_hash.each do |key, value|
map[key] ||= value;
end

#Increase the events count
map['events_count'] ||= 0; map['events_count'] += 1;

#Remove original events
event.cancel();
"

#Push an event with aggregation results for every 1 minute
push_map_as_event_on_timeout => true
timeout => 60
}
}
output {
stdout{
codec => rubydebug
}
}

And this is the result, that we will receive after one minute:

{
"field1" => "This is the first example field",
"field2" => "This is the second example field",
"@version" => "1",
"task_id" => "1b69a0f7974692cbd38c3561e5de8151ebc06373",
"source" => {
"host" => "linx0001"
},
"destination" => {
"port" => 53,
"host" => "8.8.8.8"
},
"events_count" => 4,
"@timestamp" => 2023-01-23T03:21:43.881Z
}

Hope, that this small trick will help you

--

--

Vakhtang Matskeplishvili
Vakhtang Matskeplishvili

Written by Vakhtang Matskeplishvili

Try my open-source applications for Elasticsearch on my site: https://dbeast.co

No responses yet