Best practise: extract SYSLOG messages

Versions:
opensearch 2.9.0
opensearch-dashboard 2.9.0
logstash (oss) 8.9.0

Describe the issue:
We evaluate OpenSearch as new platform to collect and analyze log data from devices. We deployed OpenSearch with Dashboards along with logstash. Logstash receives the data via Syslog and pushes it into OpenSearch. We configured a new index and an index pattern.

What is the best approach to extract the data from the syslog messages? Example: if the syslog messages contains “filter” and “udp”, the fields are as followed. If the syslog message contains “filter” and “tcp”, extract the fields as followed, etc. pp. (log example below).

Do we have to do this with logstash or within OpenSearch?

Configuration:
On Premise
OpnSense (Test Device) → (udp:514) logstash (opensearch out) → opensearch ← opensearch dashboard

Relevant Logs or Screenshots:
<134>1 2023-09-08T10:59:18+02:00 test.test.local filterlog 90052 - [meta sequenceId=“2727721”] 104,fae559338f65e11c53669fc3642c93c2,lo0,match,pass,out,4,0x0,64,44042,0,none,17,udp,143,127.0.0.1,127.0.0.1,53,40013,123

Even though we can use grok filter in Logstash or grok ingest processor in OpenSearch to extract the syslog message, but I think it’s better to do the extraction in Logstash, because the extraction will consume some CPU resources, if we do it in OpenSearch, it will impact the indexing and searching performance, but Logstash is stateless and can be easily scaled up.

If your syslog daemon is rsyslog or syslog-ng (or another syslog daemon), you can do this in the syslog deamon itself VERY cheaply. And you can send data directly to OpenSearch, configure it to buffer retry, etc. Here’s a tutorial showing how to parse Apache Logs written from files - it will be a bit simpler if data is syslog already: Recipe: Apache Logs + rsyslog (parsing) + Elasticsearch - Sematext

Feel free to ping me if you decide to go that route and need any help.

Thanks @gaobinlong . I researched a bit and found a good logstash way for our proof of concept:

egrep -v "^#|^$|^-" /opt/logstash-8.9.0/config/logstash.conf
input {
    file {
        path => "/var/log/*log"
        exclude => "*.gz"
    }
    syslog {
        port => 514
        type => opnsense
        ecs_compatibility => "v1"
    }
}
filter {
  if [type] == "opnsense" {
    grok {
      #match => { "message" => "%{WORD:} %{SYSLOGTIMESTAMP:syslog_timestamp} %{HOSTNAME:fw_name} %{WORD:syslog_program} %{POSINT:syslog_pid} - \[%{WORD:}\] %{GREEDYDATA:syslog_message}" }
      match => { "message" => "<%{POSINT}>%{POSINT} %{TIMESTAMP_ISO8601:syslog_timestamp} %{HOSTNAME:fw_name} %{WORD:syslog_program} %{POSINT:syslog_pid} - \[meta sequenceId=\"%{POSINT}\"\] %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    if [syslog_program] == "filterlog" {
        grok {
          match => { "syslog_message" => "(%{WORD:rulenr}),,,(%{WORD:rid}),(%{WORD:interface}),(%{WORD:reason}),(%{WORD:action}),(%{WORD:dir}),(%{WORD:version}),(%{WORD:tos}),,(%{NUMBER:ttl}),(%{NUMBER:id}),(%{NUMBER:offset}),(%{WORD:ipflags}),(%{NUMBER:protonumber}),(%{WORD:protocol}),(%{NUMBER:length}),(%{IP:src_ip}),(%{IP:dst_ip}),(%{NUMBER:src_port}),(%{NUMBER:dst_port}),(%{NUMBER:datalen})" }
          add_field => [ "parsed", "filterlog" ]
        }
    }
    if [syslog_program] == "unbound" {
       grok {
         match => { "syslog_message" => "%{GREEDYDATA:syslog_message2}"}
         add_field => [ "parsed", "unbound" ]
       }
    } 
    if [syslog_program] == "devd" {
       grok {
         match => { "syslog_message" => "%{GREEDYDATA:syslog_message2}"}
         add_field => [ "parsed", "devd" ]
       }
    }
    if [syslog_program] == "openvpn" {
       grok {
         match => { "syslog_message" => "%{GREEDYDATA:syslog_message2}"}
         add_field => [ "parsed", "openvpn" ]
       }
     }
      #match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
  }
}
output {
     if "_grokparsefailure" in [tags] {
       file {
         path => "/tmp/var/log/parse_failures.log"
       }
     }
    # All of the following connection details
    opensearch {
        hosts => ["https://localhost:9200"]
        # SSL enabled
        ssl => true
        ssl_certificate_verification => false
        # The Logstash Username and Password created earlier
        user => ""
        password => ""
        # The name of the index
        index => "firewall"
    }
}

@radu.gheorghe
Most of the devices are network devices, which just send Syslog. I’ll have a look to your suggestion for the couple of servers. Thank you as well.

1 Like