Hi Moma,
this might be a steep learning curve…
I use Fluentd as universal Log aggregator and pipe data into Opensearch for storage and analysis.
In the classic ELK stack that would’ve been the role of logstash. (I prefer Fluentd because it appears much more lightweight than logstash to me.)
Fluentd can also ingest Elastic Beats so you wouldn’t have to worry about any versioning checks on the Beats side of things.
I run Fluentd on the same machine as the Opensearch ingest node - in fact they’re in the same Docker network. When using the docker version of Fluentd, you have to add some input/output plugins:
# Fluentd Dockerfile
FROM fluent/fluentd:edge
USER root
RUN gem install fluent-plugin-opensearch --no-document \
&& gem install fluent-plugin-beats --no-document \
&& gem install fluent-plugin-rename-key --no-document \
&& gem install fluent-plugin-dict-map --no-document \
&& gem sources --clear-all \
&& rm -rf /tmp/* /var/tmp/* /usr/lib/ruby/gems/*/cache/*.gem
USER fluent
This will install the Opensearch Output, Beats Input and some field modification plugins you will learn to appreciate
Default Syslog input port is 514 but on most applications you can specify an alternate port.
If you didn’t already learn the hard way - syslog seem to differ not only in the RFC they use. That’s why I create multiple UDP listeners, doing my own syslog parsing:
# fluentd.conf
<system>
log_level warn
</system>
<source>
@type beats
port 5044
bind 0.0.0.0
tag winlogbeat
</source>
<source>
@type syslog
port 5044
bind 0.0.0.0
source_address_key host.ip
severity_key log.syslog.severity.name
facility_key log.syslog.facility.name
<parse>
message_format auto
</parse>
tag system-log
</source>
<source>
@type udp
port 5140
bind 0.0.0.0
source_address_key host.ip
<parse>
@type regexp
expression ^<(?<log.syslog.priority>[0-9]+)>([0-9TZ:.-]* )?(?<host.name>[a-zA-Z0-9_.-]+) +(?<log.logger>[^:\[]*)?(\[(?<log.pid>[0-9]+)\])?\]?:? *(?<message>.*)
</parse>
tag linux-syslog
</source>
<filter *-syslog.**>
@type record_transformer
enable_ruby true
<record>
log.syslog.facility.code ${record["log.syslog.priority"].to_i / 8}
log.syslog.severity.code ${record["log.syslog.priority"].to_i % 8}
</record>
</filter>
<filter *-syslog.**>
@type dict_map
key_name log.syslog.severity.code
destination_key_name log.syslog.severity.name
dictionary { "0":"Emergency", "1":"Alert", "2":"Critical", "3":"Error", "4":"Warning", "5":"Notice", "6":"Informational", "7":"Debug" }
</filter>
<filter *-syslog.**>
@type dict_map
key_name log.syslog.facility.code
destination_key_name log.syslog.facility.name
dictionary {"0":"kernel","1":"user","2":"mail","3":"system","4":"security/authorization","5":"syslogd","6":"lpr","7":"news","8":"UUCP","9":"clock","10":"security/authorization","11":"FTP","12":"NTP","13":"audit","14":"alert","15":"clock","16":"local0","17":"local1","18":"local2","19":"local3","20":"local4","21":"local5","22":"local6","23":"local7"}
</filter>
<match {winlogbeat.**,*-syslog.**}>
@type opensearch_data_stream
data_stream_name ${tag[0]}
data_stream_template_name ${tag[0]}
hosts opensearch:9200
scheme https
ssl_verify false
user fluentd
password very_secret
request_timeout 30s
remove_keys _hash, _id
validate_client_version true
log_os_400_reason true
<buffer tag>
@type memory # or file
total_limit_size 1024MB
chunk_limit_size 16MB
flush_mode interval
flush_interval 30s
flush_thread_count 4
</buffer>
</match>
<match *.**>
@type stdout
</match>
You will have to create an empty template in Opensearch for each tag that you define in Fluentd. The storage in Opensearch will be a data_stream using that template.
If you want to see what the JSON looks like on stdout, just comment the first match directive entirely. (the conf file is processed from op to bottom)
When you’re happy to find the first log lines in Opensearch, you might re-iterate and change the ingestion pipeline to better match a common list of attributes (e.g. ECS).
While this can be done using ingestion-pipelines in Opensearch I prefer normalization in Fluentd. As an example I included my mapping of the Syslog priority to log.syslog.severity.name and log.syslog.facility.name
Alex