Daniel Dreier

Logstash configuration dissection


In this post I'll be taking my actual in-use Logstash configuration and stepping through it in an attempt to explain the purpose of each section. First things first, however. I am definitely not a Logstash expert. My configuration is cobbled together from whatever little pieces of information I could find at the time, with some tweaks.

Not using Logstash for centralized log collection? Check out my post on getting it set up.

The way I'm going to structure this post is to start with the most basic configuration possible, and build up from there explaining my rationale along the way. Ready? Let's get started!

Logstash Configuration Basics

First, give the Logstash documentation a once-over, if you haven't already. It's pretty good.

Logstash configurations are separated into three different sections: input, filter, and output. Our config is going to start with these three sections, each empty for now:

# Comments look like this
input {

}

filter {

}

output {

}

The input section, as you might have guessed, is where we tell Logstash how to listen for logs from your sources. The filter section is where all of the work happens. Mutating and massaging logs into useful data. And in the output section section, we tell Logstash where to send the data once it's done with it. For a list of all of the inputs, filters, and outputs check out the Logstash documentation (but you did that already, right?).

Inputs

The environment I manage (at the time of this writing) and will be collecting logs from includes five Windows Server 2008 machines (2 DC's, IIS+MSSQL server) and a Sophos UTM firewall. To get Event Logs from the Windows boxes, I'm using nxlog-ce and (the first half of) a config that I found on the 'net somewhere.

Event Logs

The nxlog config is pretty straightforward, I think, so I won't cover it in-depth. But what it does, basically, is hook into the Windows Event Log stream, roughly format the logs in JSON, and send them via TCP to whatever host and port you desire. Since nxlog is going to be sending its data via TCP, we'll use a TCP input in Logstash. That'll look like this:

tcp {  
    type => "eventlog"
    port => 3515
    codec => json_lines
}

Let's break it down:

  • tcp is the type of the input
  • The type parameter sets the type field for all of the logs received through this input. This will come in handy in the filter section.
  • port is a little self-explanatory, I hope. Port 3515 doesn't have any significance, it's just what I chose when first setting everything up. Something to note: Logstash will require elevated privileges if you want to use a port in the 1-1024 range.
  • codec tells Logstash what kind of data to expect on this input. From the docs: "[json_lines] will decode streamed JSON that is newline delimited."

IIS Access Logs

To get Access Logs from IIS we'll need an nxlog config which is a little bit different. Something to note is that you might need to change the log-file path (line 39) and you'll need to configure your IIS site to use the W3C log format, and enable all of the fields. Check out this MSDN page for instructions.

I also use a separate input for the IIS logs so that they're easy to separate out. Here's what that looks like:

tcp {  
    type => "iislog"
    port => 3516
    codec => json_lines
}

Sophos UTM Logs

The Sophos UTM (formerly Astaro) sends logs in a mostly-syslog format. Though, like most things I imagine, it doesn't follow the syslog format very strictly which will cause some extra work later on. The most immediate consequence of this is that we can't use the "syslog" input which only supports strictly RFC3164 or ISO8601 formatted logs. So we'll employ another "tcp" input:

tcp {  
    type => "syslog"
    port => 5000
}

I didn't specify a codec here because the default of "plain" works well enough.

Putting it all together

These three inputs are enough for my environment, so my entire input section looks like this:

input {  
    tcp {
        port => 5000
        type => "syslog"
    }
    tcp {
        type   => "eventlog"
        port   => 3515
        codec => json_lines
    }
    tcp {
        type => "iislog"
        port => 3516
        codec => json_lines
    }
}

Filters

Here's where all of the heavy-lifting and usefulness of Logstash actually comes into play. First we need a way to separate out the logs from the different sources, and we'll use the type field (which we set earlier) to do that. Applying certain filters to specific events is done using conditionals. The basic setup for the three types we've set up will look like this:

filter {  
    if [type] == "syslog" {

    }
    if [type] == "eventlog" {

    }
    if [type] == "iislog" {

    }
}

Syslog

Note: Some of this section might be pretty specific to my particular firewall
Here's an example of one of the more well-formatted syslog messages from my firewall:

<30>2014:05:27-22:23:03 nw-asg-fw0 ulogd[4520]: id="2001" severity="info" sys="SecureNet" sub="packetfilter" name="Packet dropped" action="drop" fwrule="60001" initf="eth3" srcmac="aa:aa:aa:aa:aa:aa" dstmac="bb:bb:bb:bb:bb:bb" srcip="a.a.a.a" dstip="b.b.b.b" proto="6" length="109" tos="0x00" prec="0x00" ttl="46" srcport="443" dstport="64091" tcpflags="ACK PSH"  

To parse this (and the other logs), we'll start with a grok filter:

grok {  
    match => { "message" => "<%{POSINT:syslog_pri}>%{DATA:syslog_timestamp} %{DATA:syslog_program}\[%{NUMBER:syslog_pid}\]\: %{GREEDYDATA:syslog_message}" }
    add_field => [ "received_at", "%{@timestamp}" ]
}

What this does is looks for the various patterns, like POSINT and put the value in a field like syslog_pri.
So for my example log:

  • syslog_pri = 30
  • syslog_timestamp = "2014:05:27-22:23:03"
  • syslog_program = "ulogd"
  • syslog_pid = 4520
  • syslog_message = "id="2001" severity="info" sys="SecureNet" sub="packetfilter" name="Packet dropped" action="drop" fwrule="60001" initf="eth3" srcmac="aa:aa:aa:aa:aa:aa" dstmac="bb:bb:bb:bb:bb:bb" srcip="a.a.a.a" dstip="b.b.b.b" proto="6" length="109" tos="0x00" prec="0x00" ttl="46" srcport="443" dstport="64091" tcpflags="ACK PSH""

Next, we want to turn the Syslog PRI number into some useful data. This is done using the syslog_pri filter, which converts the numeric value into a "severity" and a "facility". It looks like:

syslog_pri { }  

By default, the syslog_pri filter will look in the syslog_pri field, which is why I put it after the initial grok filter.

Next, we use the string timestamp from the syslog message as the actual timestamp for the Logstash event. This is done with a date filter:

date {  
    match => [ "syslog_timestamp", "yyyy:MM:dd-HH:mm:ss" ]
}

Next up is something that I do, which is just a personal preference. The firewall doesn't include its FQDN in the syslog message, but I want to have the FQDN stored in the @source_host field (the default place for storing the source of the log). Also, I want to copy the syslog_message field into the @message field which is the default field displayed in Kibana. I do that using a mutate filter:

mutate {  
    replace => [ "@source_host", "firewall-name.ad.company.com" ]
    replace => [ "@message", "%{syslog_message}" ]
    remove  => [ "syslog_message", "syslog_timestamp" ]
}

"%{syslog_message}" is the syntax used to reference a field from within a string. See sprintf format and field references in the Logstash docs.
I remove the syslog_message and syslog_timestamp fields, using a mutate filter, because they now duplicate other fields.

Finally, I use the kv filter to make individual fields out of the key-value pairs that exist in most of the messages (and especially those packet filter violations). I simply point the kv filter at the @message field and it does all of the hard work:

kv {  
    source => "@message"
}

In this example, fields like srcip, dstip, srcport, and dstport are added with their corresponding values.

So, our final syslog section looks like this:

if [type] == "syslog" {  
    grok {
        match => { "message" => "<%{POSINT:syslog_pri}>%{DATA:syslog_timestamp} %{DATA:syslog_program}\[%{NUMBER:syslog_pid}\]\: %{GREEDYDATA:syslog_message}" }
        add_field => [ "received_at", "%{@timestamp}" ]
    }
    syslog_pri { }
    date {
        match => [ "syslog_timestamp", "yyyy:MM:dd-HH:mm:ss" ]
    }
    mutate {
        replace => [ "@source_host", "firewall-name.ad.company.com" ]
        replace => [ "@message", "%{syslog_message}" ]
        remove  => [ "syslog_message", "syslog_timestamp" ]
    }
    kv {
        source => "@message"
    }
}

Event Logs

No example this time, just going to dive right in (and move a little bit quicker).

First, a matter of personal preference:

mutate {  
    # Lowercase some values that are always in uppercase
    lowercase => [ "EventType", "FileName", "Hostname", "Severity" ]
}

This uses the mutate filter to convert the values of the listed fields to lowercase. The less my computers yell at me, the better.

Next, converting the hostname field to the Logstash standard and setting the timestamp:

mutate {  
    # Set source to what the message says
    rename => [ "Hostname", "@source_host" ]
}
date {  
    # Convert timestamp from integer in UTC
    match => [ "EventReceivedTime", "UNIX" ]
}

If you take a look back at the nxlog config, you can see where we set EventReceivedTime to a UNIX timestamp.

Some more renaming and redundancy removal:

mutate {  
    # Rename some fields into something more useful
    rename => [ "Message", "@message" ]
    rename => [ "Severity", "eventlog_severity" ]
    rename => [ "SeverityValue", "eventlog_severity_code" ]
    rename => [ "Channel", "eventlog_channel" ]
    rename => [ "SourceName", "eventlog_program" ]
    rename => [ "SourceModuleName", "nxlog_input" ]
    rename => [ "Category", "eventlog_category" ]
    rename => [ "EventID", "eventlog_id" ]
    rename => [ "RecordNumber", "eventlog_record_number" ]
    rename => [ "ProcessID", "eventlog_pid" ]
}
mutate {  
    # Remove redundant fields
    remove => [ "SourceModuleType", "EventTimeWritten", "EventTime", "EventReceivedTime", "EventType" ]
}

Now for one of my favorite things to do with Logstash. I use the mutate filter add some tags to specific Event Log messages because they're ones I care about. In this case, events related to Active Directory logins.

if [eventlog_id] == 4624 {  
    mutate {
        add_tag => [ "ad-logon-success" ]
    }
}
if [eventlog_id] == 4634 {  
    mutate {
        add_tag => [ "ad-logoff-success" ]
    }
}
if [eventlog_id] == 4771 or [eventlog_id] == 4625 or [eventlog_id] == 4769 {  
    mutate {
        add_tag => [ "ad-logon-failure" ]
    }
}
if [eventlog_id] == 4723 {  
    mutate {
        add_tag => [ "ad-password-change" ]
    }
}
if [eventlog_id] == 4724 {  
    mutate {
        add_tag => [ "ad-password-reset" ]
    }
}

Another cool feature, metrics:

if "ad-logon-success" in [tags] {  
    metrics {
        add_tag => [ "drop", "metric", "ad-logon-success" ]
        meter => "ad-logon-success-metric"
    }
}
if "ad-logon-failure" in [tags] {  
    metrics {
        add_tag => [ "drop", "metric", "ad-logon-failure" ]
        meter => "ad-logon-failure-metric"
    }
}

The metrics filter keeps a count of each time the filter is invoked, and periodically generates events containing one minute, five minute, and 15 minute averages of the rate of occurence of the relevant event. I add the "drop" tag because I don't want the generated events to be stored in elasticsearch (more on that later), the "metric" tag because, well, it's a metric, and the "ad-logon-" tag to indicate which events are counted in the metric. Metrics are one of the more buggy filters at the moment and should be placed as close to the bottom of the filter section as possible.

Now, here's our completed "eventlog" section:

if [type] == "eventlog" {  
    # Incoming Windows Event logs from nxlog
    mutate {
        # Lowercase some values that are always in uppercase
        lowercase => [ "EventType", "FileName", "Hostname", "Severity" ]
    }
    mutate {
        # Set source to what the message says
        rename => [ "Hostname", "@source_host" ]
    }
    date {
        # Convert timestamp from integer in UTC
        match => [ "EventReceivedTime", "UNIX" ]
    }
    mutate {
        # Rename some fields into something more useful
        rename => [ "Message", "@message" ]
        rename => [ "Severity", "eventlog_severity" ]
        rename => [ "SeverityValue", "eventlog_severity_code" ]
        rename => [ "Channel", "eventlog_channel" ]
        rename => [ "SourceName", "eventlog_program" ]
        rename => [ "SourceModuleName", "nxlog_input" ]
        rename => [ "Category", "eventlog_category" ]
        rename => [ "EventID", "eventlog_id" ]
        rename => [ "RecordNumber", "eventlog_record_number" ]
        rename => [ "ProcessID", "eventlog_pid" ]
    }
    mutate {
        # Remove redundant fields
        remove => [ "SourceModuleType", "EventTimeWritten", "EventTime", "EventReceivedTime", "EventType" ]
    }
    if [eventlog_id] == 4624 {
        mutate {
            add_tag => [ "ad-logon-success" ]
        }
    }
    if [eventlog_id] == 4634 {
        mutate {
            add_tag => [ "ad-logoff-success" ]
        }
    }
    if [eventlog_id] == 4771 or [eventlog_id] == 4625 or [eventlog_id] == 4769 {
        mutate {
            add_tag => [ "ad-logon-failure" ]
        }
    }
    if [eventlog_id] == 4723 {
        mutate {
            add_tag => [ "ad-password-change" ]
        }
    }
    if [eventlog_id] == 4724 {
        mutate {
            add_tag => [ "ad-password-reset" ]
        }
    }
    if "ad-logon-success" in [tags] {
        metrics {
            add_tag => [ "drop", "metric", "ad-logon-success" ]
            meter => "ad-logon-success-metric"
        }
    }
    if "ad-logon-failure" in [tags] {
        metrics {
            add_tag => [ "drop", "metric", "ad-logon-failure" ]
            meter => "ad-logon-failure-metric"
        }
    }
}

IIS (Access) Logs

Here's what one of my IIS access logs looks like, it's in W3C format with all of the fields enabled.

2014-05-31 00:05:01 W3SVC6 SERVER-NAME a.a.a.a GET / - 443 - b.b.b.b HTTP/1.1 Mozilla/5.0+(Windows+NT+5.1)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/34.0.1847.131+Safari/537.36 - - www.company.com 200 0 0 11525 333 140  

Luckily for me, the nxlog config that I use to ship the IIS logs already turns all of the parts of the log into fields. So I don't have to do that parsing in Logstash.

First things first, some more personal preferences:

mutate {  
    replace => [ "@message", "%{hostname} %{verb} %{fqdn}%{request}%{querystring} %{httpversion} %{status} %{useragent}" ]
    replace => [ "@source_host", "server-name.ad.company.com" ]
    add_field => { "requesturl" => "%{fqdn}%{request}%{querystring}" }
}

Here I replace @message which starts out as the entire log (like the example above) with just the information that I want at a glance. The second replace sets @source_host to the server's FQDN like the Event Logs from the same server. (Quick Note: there's probably a better way to set this, i.e. not hard-coded, but we only have the one web server so this works well enough) Finally, I add the requesturl field which is the fqdn, request, and querystring fields combined. (Another Note: if querystring was empty in the actual request (like in the example), the field here will be set to "-". It doesn't bother me that much, so I haven't figured out how to fix it.)

Next up:

useragent {  
    source => "useragent"
}

As you might have guessed, the useragent filter decodes the User Agent string from the request (which is stored in the useragent field). It adds os, os_name, browser name, browser minor and major version fields. (Quick Note: it doesn't currently seem to report OS X properly)

And, finally:

geoip {  
    source => "clientip"
}

This magic geoip filter looks up GeoIP data for the IP in the (in my case) clientip field using the Maxmind GeoLite database which ships with Logstash. It adds a bunch of data about the IP's location data, so I'm going to refer you to the filter's docs for that.

Now the "iislog" section is complete! Here's what it looks like:

if [type] == "iislog"  
{
    mutate {
        replace => [ "@message", "%{hostname} %{verb} %{fqdn}%{request}%{querystring} %{httpversion} %{status} %{useragent}" ]
        replace => [ "@source_host", "server-name.ad.company.com" ]
        add_field => { "requesturl" => "%{fqdn}%{request}%{querystring}" }
    }
    useragent {
        source => "useragent"
    }
    geoip {
        source => "clientip"
    }
}

...There's one more thing

The last part of the filter section is another metrics filter, this time for keeping track of the total number of events processed by Logstash. (Note the lack of a conditional, so this metrics is triggered by all events.)

metrics {  
    meter => "events"
    add_tag => [ "drop", "metric", "events-metric" ]
}

And that's it! Here's the whole filter section:

filter {  
    if [type] == "syslog" {
        grok {
            match => { "message" => "<%{POSINT:syslog_pri}>%{DATA:syslog_timestamp} %{DATA:syslog_program}\[%{NUMBER:syslog_pid}\]\: %{GREEDYDATA:syslog_message}" }
            add_field => [ "received_at", "%{@timestamp}" ]
        }
        syslog_pri { }
        date {
            match => [ "syslog_timestamp", "yyyy:MM:dd-HH:mm:ss" ]
        }
        mutate {
            replace => [ "@source_host", "firewall-name.ad.company.com" ]
            replace => [ "@message", "%{syslog_message}" ]
            remove  => [ "syslog_message", "syslog_timestamp" ]
        }
        kv {
            source => "@message"
        }
    }

    if [type] == "eventlog" {
        # Incoming Windows Event logs from nxlog
        mutate {
            # Lowercase some values that are always in uppercase
            lowercase => [ "EventType", "FileName", "Hostname", "Severity" ]
        }
        mutate {
            # Set source to what the message says
            rename => [ "Hostname", "@source_host" ]
        }
        date {
            # Convert timestamp from integer in UTC
            match => [ "EventReceivedTime", "UNIX" ]
        }
        mutate {
            # Rename some fields into something more useful
            rename => [ "Message", "@message" ]
            rename => [ "Severity", "eventlog_severity" ]
            rename => [ "SeverityValue", "eventlog_severity_code" ]
            rename => [ "Channel", "eventlog_channel" ]
            rename => [ "SourceName", "eventlog_program" ]
            rename => [ "SourceModuleName", "nxlog_input" ]
            rename => [ "Category", "eventlog_category" ]
            rename => [ "EventID", "eventlog_id" ]
            rename => [ "RecordNumber", "eventlog_record_number" ]
            rename => [ "ProcessID", "eventlog_pid" ]
        }
        mutate {
            # Remove redundant fields
            remove => [ "SourceModuleType", "EventTimeWritten", "EventTime", "EventReceivedTime", "EventType" ]
        }
        if [eventlog_id] == 4624 {
            mutate {
                add_tag => [ "ad-logon-success" ]
            }
        }
        if [eventlog_id] == 4634 {
            mutate {
                add_tag => [ "ad-logoff-success" ]
            }
        }
        if [eventlog_id] == 4771 or [eventlog_id] == 4625 or [eventlog_id] == 4769 {
            mutate {
                add_tag => [ "ad-logon-failure" ]
            }
        }
        if [eventlog_id] == 4723 {
            mutate {
                add_tag => [ "ad-password-change" ]
            }
        }
        if [eventlog_id] == 4724 {
            mutate {
                add_tag => [ "ad-password-reset" ]
            }
        }
        if "ad-logon-success" in [tags] {
            metrics {
                add_tag => [ "drop", "metric", "ad-logon-success" ]
                meter => "ad-logon-success-metric"
            }
        }
        if "ad-logon-failure" in [tags] {
            metrics {
                add_tag => [ "drop", "metric", "ad-logon-failure" ]
                meter => "ad-logon-failure-metric"
            }
        }
    }

    if [type] == "iislog"
    {
        mutate {
            replace => [ "@message", "%{hostname} %{verb} %{fqdn}%{request}%{querystring} %{httpversion} %{status} %{useragent}" ]
            replace => [ "@source_host", "server-name.ad.company.com" ]
            add_field => { "requesturl" => "%{fqdn}%{request}%{querystring}" }
        }
        useragent {
            source => "useragent"
        }
        geoip {
            source => "clientip"
        }
    }

    metrics {
        meter => "events"
        add_tag => [ "drop", "metric", "events-metric" ]
    }
}

Outputs

Now we take all of that lovely log data and send it off to be stored permenantly. It's pretty easy, so here's the whole thing up front:

output {  
    if "drop" not in [tags] {
        elasticsearch { }
    }
}

And that's actually it! Ok, not really. But for my pretty basic setup, that really is it.

Now, keen readers might remember that all of the metrics have a "drop" tag which means that they aren't being stored in any way, and that's correct. Typically, you'd send the metrics from Logstash to something like Graphite either directly or through something like Statsd. In my testing VM, Graphite and Statsd were always a bit of a pain-point, either not working entirely, not working intermittently, or causing Logstash to crash randomly. So I haven't set these up on my production box. Also, it's non-trivial to do alerting off of data sent to Graphite. So I'm currently experimenting with using Zabbix to keep track of metrics and do alerting (along with Zabbix's usual functions).

All of that aside, and because I'm a nice guy, here's an example of sending the "events" metric to Graphite using Statsd and the statsd output:

statsd {  
    sender => "events"
    count => [ "rate.1m", "%{events.rate_1m}" ]
    count => [ "rate.5m", "%{events.rate_5m}" ]
    count => [ "rate.15m", "%{events.rate_15m}" ]
}

This will create three series in Graphite, events.rate.1m, events.rate.5m, and events.rate.15m.

And that really is it for this post. I'll try to keep this updated as my configuration evolves, and I'll probably eventually write a post about my Zabbix experiments.

Share this post: