Since in ELK stack there are filebeats are available to ship the logs which are already mapped to Elastic Common schema and since the logs are mapped and stored using ecs how do I ship the logs to opensearch? so that the rules placed will start matching the traffic?
I dont know about mapping to match Opensearch but what I did was use Logstash and all my Beats ( winlogbeat,filebeat.Packetbeat,Auditbeat, etc…) go through there and Logstash sends them to Opensearch.
root@ansible:/opt/logstash-8.6.1/config# cat logstash.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
tags => [ 'beat' ]
}
}
input {
udp {
port => 5144
tags => ['syslog']
}
}
input {
http {
port => 12345
tags => ['fluent']
add_field => { "[@metadata][input-http]" => "" }
}
}
filter {
if [@metadata][input-http] {
date {
match => [ "date", "UNIX" ]
remove_field => [ "date" ]
}
mutate {
remove_field => ["headers","host"]
}
}
}
filter {
if "syslog" in [tags] {
grok {
match => ["message", "%{SYSLOG5424PRI}%{GREEDYDATA:message}"]
overwrite => [ "message" ]
}
kv {
source => "message"
value_split => "="
}
}
}
filter {
if "syslog" in [tags] {
mutate {
remove_field => [ "addr","appcat","craction","crlevel","crscore","devtype","dstdevtype","dstosname","dstserver","dstserver","fazlograte","freediskstorage","interface","log.syslog.priority","masterdstmac","mastersrcmac","osname","policytype","poluuid","setuprate","srchwvendor","srcserver","total","totalsession","used","user","vd"]
}
}
}
output {
if "beat" in [tags] {
opensearch {
hosts => ["https://opensearch.com:9200"]
auth_type => {
type => 'basic'
user => 'admin'
password => 'changeit'
}
ecs_compatibility => disabled
ssl => true
ssl_certificate_verification => false
cacert => "/opt/logstash-8.6.1/root-ca.pem"
#index => "winlogbeat-%{+YYYY.MM.dd}"
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
}
}
if "syslog" in [tags] {
opensearch {
hosts => ["https://opensearch.com:9200"]
auth_type => {
type => 'basic'
user => 'admin'
password => 'changeit'
}
ecs_compatibility => disabled
ssl => true
ssl_certificate_verification => false
cacert => "/opt/logstash-8.6.1/root-ca.pem"
index => "firewall-%{+YYYY.MM.dd}"
}
}
if "fluent" in [tags] {
opensearch {
hosts => ["https://opensearch.com:9200"]
auth_type => {
type => 'basic'
user => 'admin'
password => 'changeit'
}
ecs_compatibility => disabled
ssl => true
ssl_certificate_verification => false
cacert => "/opt/logstash-8.6.1/root-ca.pem"
index => "fluent-bit-%{+YYYY.MM.dd}"
}
}
}
I also made a service for logstash
root@ansible:/opt/logstash-8.6.1/config# cat /etc/systemd/system/logstash.service
[Unit]
Description=logstash
[Service]
Type=simple
User=root
Group=root
# Load env vars from /etc/default/ and /etc/sysconfig/ if they exist.
# Prefixing the path with '-' makes it try to load, but if the file doesn't
# exist, it continues onward.
EnvironmentFile=-/opt/logstash-8.6.1/
ExecStart=/opt/logstash-8.6.1/bin/logstash -f /opt/logstash-8.6.1/config/logstash.conf --config.reload.automatic
WorkingDirectory=/opt/logstash-8.6.1/
Restart=always
Nice=19
LimitNOFILE=16384
# When stopping, how long to wait before giving up and sending SIGKILL?
# Keep in mind that SIGKILL on a process can cause data loss.
TimeoutStopSec=infinity
[Install]
WantedBy=multi-user.target
root@ansible:/opt/logstash-8.6.1/config#
root@ansible:/opt/logstash-8.6.1/config# systemctl status logstash
● logstash.service - logstash
Loaded: loaded (/etc/systemd/system/logstash.service; enabled; vendor preset: enabled)
Active: active (running) since Wed 2023-08-02 18:47:58 CDT; 4h 34min ago
Main PID: 748 (java)
Tasks: 82 (limit: 4594)
Memory: 899.9M
CGroup: /system.slice/logstash.service
└─748 /opt/logstash-8.6.1/jdk/bin/java -Xms512m -Xmx1g -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djruby.compile.invokedynamic=true -XX:>