In my ELK setup there are currently two ways of receiving and centralizing logs from clients:
- A Logstash listener on udp/10514 receiving syslog logs from Linux machines (in json format)
  
- A Logstash listener for Filebeat on tcp/5044 receiving logs already parsed and json-formatted by the Filebeat daemon running on Linux machines
In a recent post I wrote about adding HAProxy logs into the ELK stack (see Handling different timezones of HAProxy logs in ELK stack). I found out that since I created the grok filter for HAProxy in /etc/logstash/conf.d/13-haproxy-filter.conf, I got _grokparsefailure tags on all syslog entries. But why? Let's take a look at the file again:
root@logstash:~# cat /etc/logstash/conf.d/13-haproxy-filter.conf 
  
filter {
  
  
    grok {
  
    match => [
  
    "message", "%{IP:client_ip}:%{NUMBER:client_port:int} \[%{NOTSPACE:haproxy_timestamp}] %{NOTSPACE:frontend_name} %{NOTSPACE:backend_name}/%{NOTSPACE:server_name} %{NUMBER:time_queue:int}/%{NUMBER:time_backend_connect:int}/%{NUMBER:time_duration:int} %{NUMBER:bytes_read:int} %{NOTSPACE:termination_state} %{NUMBER:actconn:int}/%{NUMBER:feconn:int}/%{NUMBER:beconn:int}/%{NUMBER:srvconn:int}/%{NUMBER:retries:int} %{NUMBER:srv_queue:int}/%{NUMBER:backend_queue:int}" ,
  
    "message" , "%{IP:client_ip}:%{NUMBER:client_port:int} \[%{NOTSPACE:haproxy_timestamp}\] %{NOTSPACE:frontend_name} %{NOTSPACE:backend_name}/%{NOTSPACE:server_name} %{NUMBER:time_client_req:int}/%{NUMBER:time_queue:int}/%{NUMBER:time_backend_connect:int}/%{NUMBER:time_server_response:int}/%{NUMBER:time_duration:int} %{NUMBER:status_code:int} %{NUMBER:bytes_read:int} %{NOTSPACE:captured_request_cookie} %{NOTSPACE:captured_response_cookie} %{NOTSPACE:termination_state_with_cookie_status} %{NUMBER:actconn:int}/%{NUMBER:feconn:int}/%{NUMBER:beconn:int}/%{NUMBER:srvconn:int}/%{NUMBER:retries:int} %{NUMBER:srv_queue:int}/%{NUMBER:backend_queue:int}?( \"%{GREEDYDATA:full_http_request}\")?( %{NOTSPACE:captured_response_headers})?"
  
    ]
  
    }
  
  
  if [type] == "rsyslog" {
  
    date {
  
      match => [ "haproxy_timestamp", "dd/MMM/yyyy:HH:mm:ss.SSS" ]
  
      timezone => "UTC"
  
    }
  
  }
  
  
  if [type] == "haproxy" {
  
    date {
  
      match => [ "haproxy_timestamp", "dd/MMM/yyyy:HH:mm:ss.SSS" ]
  
    }
  
  }
  
  
    grok {
  
    match => [
  
    "full_http_request", "%{WORD:http_verb} %{URIPATHPARAM:http_request}?( HTTP/%{NUMBER:http_version})" ,
  
    "full_http_request", "<%{WORD:http_request}>"
  
    ]
  
    remove_field => [ "full_http_request" ]
  
    }
  
  
}
After fiddling around with that filter I finally figured out what happens. This filter is applied on every entry arriving in Logstash, doesn't matter if it came through the syslog or the Filebeat listener. So every time a syslog message arrived, it was tagged with "_grokparsefailure" because it ran through this filter even though it does not match the line format.
The solution to this is to create a condition before applying the grok filter: It should only apply to logs clearly coming from HAProxy. That's easy for the ones coming from Filebeat, because they're already tagged on the Filebeat daemon (the sender) as "haproxy". But how do I tell Logstash to apply the filter on Syslog entries? This is what I came up with:
root@elk01:~$ cat /etc/logstash/conf.d/13-haproxy.conf 
  
filter {
  
  
  if [type] == "syslog" and [programname] == "haproxy" {
  
  
    grok {
  
      match => [
  
    "message", "%{IP:client_ip}:%{NUMBER:client_port:int} \[%{NOTSPACE:haproxy_timestamp}] %{NOTSPACE:frontend_name} %{NOTSPACE:backend_name}/%{NOTSPACE:server_name} %{NUMBER:time_queue:int}/%{NUMBER:time_backend_connect:int}/%{NUMBER:time_duration:int} %{NUMBER:bytes_read:int} %{NOTSPACE:termination_state} %{NUMBER:actconn:int}/%{NUMBER:feconn:int}/%{NUMBER:beconn:int}/%{NUMBER:srvconn:int}/%{NUMBER:retries:int} %{NUMBER:srv_queue:int}/%{NUMBER:backend_queue:int}" ,
  
    "message" , "%{IP:client_ip}:%{NUMBER:client_port:int} \[%{NOTSPACE:haproxy_timestamp}\] %{NOTSPACE:frontend_name} %{NOTSPACE:backend_name}/%{NOTSPACE:server_name} %{NUMBER:time_client_req:int}/%{NUMBER:time_queue:int}/%{NUMBER:time_backend_connect:int}/%{NUMBER:time_server_response:int}/%{NUMBER:time_duration:int} %{NUMBER:status_code:int} %{NUMBER:bytes_read:int} %{NOTSPACE:captured_request_cookie} %{NOTSPACE:captured_response_cookie} %{NOTSPACE:termination_state_with_cookie_status} %{NUMBER:actconn:int}/%{NUMBER:feconn:int}/%{NUMBER:beconn:int}/%{NUMBER:srvconn:int}/%{NUMBER:retries:int} %{NUMBER:srv_queue:int}/%{NUMBER:backend_queue:int}?( \"%{GREEDYDATA:full_http_request}\")?( %{NOTSPACE:captured_response_headers})?"
  
      ]
  
    }
  
  
    grok {
  
      match => [
  
      "full_http_request", "%{WORD:http_verb} %{URIPATHPARAM:http_request}?( HTTP/%{NUMBER:http_version})" ,
  
      "full_http_request", "<%{WORD:http_request}>"
  
      ]
  
      remove_field => [ "full_http_request" ]
  
    }
  
    date {
  
      match => [ "haproxy_timestamp", "dd/MMM/yyyy:HH:mm:ss.SSS" ]
  
      timezone => "UTC"
  
    }
  
  
  }
  
  
  
  if [type] == "haproxy" {
  
  
    grok {
  
      match => [
  
    "message", "%{IP:client_ip}:%{NUMBER:client_port:int} \[%{NOTSPACE:haproxy_timestamp}] %{NOTSPACE:frontend_name} %{NOTSPACE:backend_name}/%{NOTSPACE:server_name} %{NUMBER:time_queue:int}/%{NUMBER:time_backend_connect:int}/%{NUMBER:time_duration:int} %{NUMBER:bytes_read:int} %{NOTSPACE:termination_state} %{NUMBER:actconn:int}/%{NUMBER:feconn:int}/%{NUMBER:beconn:int}/%{NUMBER:srvconn:int}/%{NUMBER:retries:int} %{NUMBER:srv_queue:int}/%{NUMBER:backend_queue:int}" ,
  
    "message" , "%{IP:client_ip}:%{NUMBER:client_port:int} \[%{NOTSPACE:haproxy_timestamp}\] %{NOTSPACE:frontend_name} %{NOTSPACE:backend_name}/%{NOTSPACE:server_name} %{NUMBER:time_client_req:int}/%{NUMBER:time_queue:int}/%{NUMBER:time_backend_connect:int}/%{NUMBER:time_server_response:int}/%{NUMBER:time_duration:int} %{NUMBER:status_code:int} %{NUMBER:bytes_read:int} %{NOTSPACE:captured_request_cookie} %{NOTSPACE:captured_response_cookie} %{NOTSPACE:termination_state_with_cookie_status} %{NUMBER:actconn:int}/%{NUMBER:feconn:int}/%{NUMBER:beconn:int}/%{NUMBER:srvconn:int}/%{NUMBER:retries:int} %{NUMBER:srv_queue:int}/%{NUMBER:backend_queue:int}?( \"%{GREEDYDATA:full_http_request}\")?( %{NOTSPACE:captured_response_headers})?"
  
      ]
  
    }
  
  
    grok {
  
      match => [
  
      "full_http_request", "%{WORD:http_verb} %{URIPATHPARAM:http_request}?( HTTP/%{NUMBER:http_version})" ,
  
      "full_http_request", "<%{WORD:http_request}>"
  
      ]
  
      remove_field => [ "full_http_request" ]
  
    }
  
  }
  
  
}
  
The first if condition handles all entries coming in through the Syslog listener and therefore being tagged as "syslog" type. Because these entries have fields (like procid, programname, facility, etc) these fields can be used to specify a certain type of (application) log. Let's analyze a very simple Syslog entry arriving at Logstash:
{
  
  "_index": "logstash-2017.09.12",
  
  "_type": "syslog",
  
  "_id": "AV50py7qDSxp4hj7KGHi",
  
  "_version": 1,
  
  "_score": null,
  
  "_source": {
  
    "severity": "info",
  
    "@timestamp": "2017-09-12T05:54:27.151Z",
  
    "@version": "1",
  
    "programname": "sudo",
  
    "procid": "-",
  
    "host": "10.10.10.60",
  
    "sysloghost": "yetanotherlinux",
  
    "message": " pam_unix(sudo:session): session closed for user root",
  
    "type": "syslog",
  
    "facility": "authpriv"
  
  },
  
  "fields": {
  
    "@timestamp": [
  
      1505195667151
  
    ]
  
  },
  
  "sort": [
  
    1505195667151
  
  ]
  
}
That's a classical Syslog message, split up into the different (known) fields. As you can see, programname is one of them. Therefore this (and the others, too) field can be used to create a very detailed if condition.
The second if condition is explaining itself. These entries are being received by the listener for Filebeat logs. As I mentioned just before, the HAProxy log files are already being tagged as "type => haproxy" on the client side.
Note to self: If you fiddle around with Logstash grok filters, don't forget they're applied on every single entry arriving in Logstash (ergo most likely resulting in a grokparsefailure).
  
No comments yet.
 
AWS Android Ansible Apache Apple Atlassian BSD Backup Bash Bluecoat CMS Chef Cloud Coding Consul Containers CouchDB DB DNS Databases Docker ELK Elasticsearch Filebeat FreeBSD Galera Git GlusterFS Grafana Graphics HAProxy HTML Hacks Hardware Icinga Influx Internet Java KVM Kibana Kodi Kubernetes LVM LXC Linux Logstash Mac Macintosh Mail MariaDB Minio MongoDB Monitoring Multimedia MySQL NFS Nagios Network Nginx OSSEC OTRS Observability Office OpenSearch PHP Perl Personal PostgreSQL PowerDNS Proxmox Proxy Python Rancher Rant Redis Roundcube SSL Samba Seafile Security Shell SmartOS Solaris Surveillance Systemd TLS Tomcat Ubuntu Unix VMware Varnish Virtualization Windows Wireless Wordpress Wyse ZFS Zoneminder Linux