Ingesting Kafka data into opensearch

Versions (relevant - OpenSearch/Dashboard/Server OS/Browser):
Opensearch docker : image: opensearchproject/opensearch:latest

Describe the issue:
I have log that is ingested in Kafka in json format, here is a sample of the log

“Action”: “publishEventLog”,
“MachineID”: “12345”,
“machineInstalledUTC”: “2020-12-08T12:42:54.67Z”,
“MachineType”: “Desktop”,
“Message”: {
“channel”: “Security”,
“createdAt”: “2024-02-13T09:26:45.3428555Z”,
“duplications”: 0,
“eventID”: 4624,
“facility”: 16,
“hostname”: “DESKTOP-JOE”,
“message”: “An account was successfully logged on.Subject:Security ID:S-1-5-18Account Name:DESKTOP-JOE$Account Domain:WORKGROUPLogon ID:0x3E7Logon Information:Logon Type:5Restricted Admin Mode:-Virtual Account:NoElevated Token:YesImpersonation Level:ImpersonationNew Logon:Security ID:S-1-5-18Account Name:SYSTEMAccount Domain:NT AUTHORITYLogon ID:0x3E7Linked Logon ID:0x0Network Account Name:-Network Account Domain:-Logon GUID:{00000000-0000-0000-0000-000000000000}Process Information:Process ID:0x2f4Process Name:C:\Windows\System32\services.exeNetwork Information:Workstation Name:-Source Network Address:-Source Port:-Detailed Authentication Information:Logon Process:Advapi Authentication Package:NegotiateTransited Services:-Package Name (NTLM only):-Key Length:0This event is generated when a logon session is created. It is generated on the computer that was accessed.The subject fields indicate the account on the local system which requested the logon. This is most commonly a service such as the Server service, or a local process such as Winlogon.exe or Services.exe.The logon type field indicates the kind of logon that occurred. The most common types are 2 (interactive) and 3 (network).The New Logon fields indicate the account for whom the new logon was created, i.e. the account that was logged on.The network fields indicate where a remote logon request originated. Workstation name is not always available and may be left blank in some cases.The impersonation level field indicates the extent to which a process in the logon session can impersonate.The authentication information fields provide detailed information about this specific logon request.- Logon GUID is a unique identifier that can be used to correlate this event with a KDC event.- Transited services indicate which intermediate services have participated in this logon request.- Package name indicates which sub-protocol was used among the NTLM protocols.- Key length indicates the length of the generated session key. This will be 0 if no session key was requested.”,
“severity”: 5,
“source”: “Microsoft-Windows-Security-Auditing”,
“xmlView”: “”
“OSName”: “Windows 10 Pro”,
“OSType”: “windows”,
“OSVersion”: “10.0.18362”,
“RemoteAddr”: “”,
“Topic”: “windows-event-log”,
“TransactionID”: “342334544534”

I want to now use data-prepper to push this into a windows event log index which has fields that are mapped to the ecs schema

I tried to extract the log using different processors like grok etc but could not get it to work, a quick google search also did not give me concrete examples of how to use the various processors.

Can someone please help me with creating a pipeline.yml file to acheive this.
Thanks in advance.


Relevant Logs or Screenshots:

Personally I use another program called Logstash.
When using Logstash , you need to create a Logstash pipeline this pipeline at a minimum will have/use:

input {
  kafka {
    bootstrap_servers => "myhost:9092"
    codec => json

Hopefully that points you in helpful direction.

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.