What shippers should I use to collect the logs so that SIGMA rules will start matching?

Hi Team,

Since in ELK stack there are filebeats are available to ship the logs which are already mapped to Elastic Common schema and since the logs are mapped and stored using ecs how do I ship the logs to opensearch? so that the rules placed will start matching the traffic?

TIA
Blason R

Hey @blason

I dont know about mapping to match Opensearch but what I did was use Logstash and all my Beats ( winlogbeat,filebeat.Packetbeat,Auditbeat, etc…) go through there and Logstash sends them to Opensearch.

I see - So you are collecting logs using filebeat modules and then are sending it to logstash and then to openserach?

Would you mind sharing your logstash pipelines pls?

Sure

root@ansible:/opt/logstash-8.6.1/config# cat logstash.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
  beats {
    port => 5044
    tags => [ 'beat' ]
      }
}
input {
  udp {
    port => 5144
    tags => ['syslog']
  }
}
input {
  http {
    port      => 12345
    tags => ['fluent']
    add_field => { "[@metadata][input-http]" => "" }
  }
}

filter {
  if [@metadata][input-http] {
    date {
      match => [ "date", "UNIX" ]
      remove_field => [ "date" ]
    }
    mutate {
      remove_field => ["headers","host"]
    }
  }
}


filter {

if "syslog" in [tags] {

grok {
      match => ["message", "%{SYSLOG5424PRI}%{GREEDYDATA:message}"]
      overwrite => [ "message" ]
        }


    kv {
       source => "message"
       value_split => "="
    }

   }
  }


filter {

if "syslog" in [tags] {
 mutate {
        remove_field => [ "addr","appcat","craction","crlevel","crscore","devtype","dstdevtype","dstosname","dstserver","dstserver","fazlograte","freediskstorage","interface","log.syslog.priority","masterdstmac","mastersrcmac","osname","policytype","poluuid","setuprate","srchwvendor","srcserver","total","totalsession","used","user","vd"]
  }
 }
}

output {
if "beat" in [tags] {
  opensearch {
    hosts => ["https://opensearch.com:9200"]
    auth_type => {
              type => 'basic'
              user => 'admin'
              password => 'changeit'
            }
    ecs_compatibility => disabled
    ssl => true
    ssl_certificate_verification => false
    cacert => "/opt/logstash-8.6.1/root-ca.pem"
    #index => "winlogbeat-%{+YYYY.MM.dd}"
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
     }
  }
if "syslog" in [tags] {
          opensearch {
             hosts => ["https://opensearch.com:9200"]
                        auth_type => {
                            type => 'basic'
                            user => 'admin'
                            password => 'changeit'
                          }
                        ecs_compatibility => disabled
                        ssl => true
                        ssl_certificate_verification => false
                        cacert => "/opt/logstash-8.6.1/root-ca.pem"
                        index => "firewall-%{+YYYY.MM.dd}"
        }
    }
if "fluent" in [tags] {
          opensearch {
             hosts => ["https://opensearch.com:9200"]
                        auth_type => {
                            type => 'basic'
                            user => 'admin'
                            password => 'changeit'
                          }
                        ecs_compatibility => disabled
                        ssl => true
                        ssl_certificate_verification => false
                        cacert => "/opt/logstash-8.6.1/root-ca.pem"
                        index => "fluent-bit-%{+YYYY.MM.dd}"

        }
    }
}

I also made a service for logstash

root@ansible:/opt/logstash-8.6.1/config# cat /etc/systemd/system/logstash.service
[Unit]
Description=logstash

[Service]
Type=simple
User=root
Group=root
# Load env vars from /etc/default/ and /etc/sysconfig/ if they exist.
# Prefixing the path with '-' makes it try to load, but if the file doesn't
# exist, it continues onward.
EnvironmentFile=-/opt/logstash-8.6.1/
ExecStart=/opt/logstash-8.6.1/bin/logstash  -f  /opt/logstash-8.6.1/config/logstash.conf --config.reload.automatic

WorkingDirectory=/opt/logstash-8.6.1/
Restart=always
Nice=19
LimitNOFILE=16384

# When stopping, how long to wait before giving up and sending SIGKILL?
# Keep in mind that SIGKILL on a process can cause data loss.
TimeoutStopSec=infinity

[Install]
WantedBy=multi-user.target
root@ansible:/opt/logstash-8.6.1/config#
root@ansible:/opt/logstash-8.6.1/config# systemctl status logstash
● logstash.service - logstash
     Loaded: loaded (/etc/systemd/system/logstash.service; enabled; vendor preset: enabled)
     Active: active (running) since Wed 2023-08-02 18:47:58 CDT; 4h 34min ago
   Main PID: 748 (java)
      Tasks: 82 (limit: 4594)
     Memory: 899.9M
     CGroup: /system.slice/logstash.service
             └─748 /opt/logstash-8.6.1/jdk/bin/java -Xms512m -Xmx1g -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djruby.compile.invokedynamic=true -XX:>

1 Like

I am actually using FluentD.
Input source: @type beats
Output: @type opensearch_data_stream

I exported index-template, pipelines, and index-pattern from Winlogbeat and imported these with slight adjustments to Opensearch 2.8:

./winlogbeat.exe export template | sed -E 's/winlogbeat-\*/winlogbeat*/g;s/flattened/keyword/g;s/constant_keyword/keyword/g;s/match_only_text/text/g;s/wildcard/keyword/g' | jq 'del( .template.settings.index.lifecycle ) | .template.settings.index.default_pipeline |= "winlogbeat-routing"' > index-template.json

./winlogbeat.exe export index-pattern | jq -c '.' | sed -E 's/winlogbeat-\*/winlogbeat*/g' > index-pattern.ndjson

./winlogbeat.exe export pipelines --es.version=7.10.2

Alex

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.