How to get data from opensearch index in Logstash and add field to new event if the field doesnt exist

Versions (relevant - OpenSearch/Dashboard/Server OS/Browser):

OpenSearch 2.11.1
logstash 7.16.2
OpenSearch Dashboards 2.11.1-2

Describe the issue:

We are getting DB audit information in json format (see the example below).

{
  "properties": {
    "@timestamp": {
      "type": "date"
    },
    "Host": {
      "type": "keyword"
    },
    "Port": {
      "type": "keyword"
    },
    "Type": {
      "type": "text"
    },
    "audit": {
      "type": "object",
      "properties": {
        "Audit_Type": {
          "type": "integer"
        },
        "DBID": {
          "type": "keyword"
        },
        "DB_User": {
          "type": "keyword"
        },
        "EntryId": {
          "type": "keyword"
        },
        "Ext_Name": {
          "type": "keyword"
        },
        "Extended_Timestamp": {
          "type": "date"
        },
        "Instance_Number": {
          "type": "integer"
        },
        "OSPrivilege": {
          "type": "keyword"
        },
        "OS_Process": {
          "type": "keyword"
        },
        "OS_User": {
          "type": "keyword"
        },
        "Returncode": {
          "type": "keyword"
        },
        "Session_Id": {
          "type": "keyword"
        },
        "Sql_Text": {
          "type": "keyword"
        },
        "UserIP": {
          "type": "keyword"
        },
        "UserPort": {
          "type": "keyword"
        },
        "Userhost": {
          "type": "keyword"
        },
        "content": {
          "type": "keyword"
        }
      }
    },
    "dbuniquename": {
      "type": "keyword"
    },
    "logstash_intake_timestamp": {
      "type": "date"
    }
  }
}

One DB session generates multiple audit events. Several fields (like UserIP) exist on LOGIN event only.

We need to add these fields to all events that are generated by particular session.
Logstash has supported plugin logstash-filter-elasticsearch for ElasticSearch. It is not working with OpenSearch however.

We installed logstash-filter-opensearch (not officially supported).

We tested several options (see examples below) inside filter block but none of them works:

 if ![audit][UserIP] {
   opensearch {
      index => "ora_aud-%{+YYYY.MM.dd}"
      hosts => ["http://host:9200"]
      user => "admin"
      password => "***"
      query => "(audit.Session_Id:%{Session_Id}) and (audit.Action:100)"
   }
   }

Tested following options:

and (audit.Session_Id:[audit][Session_Id])"
and (audit.Session_Id:%{audit.Session_Id})"
and (audit.Session_Id:%{[audit][Session_Id]})"

Got errors when trying to search data.

We were able to add only static field. But not the field from index.

What is the correct syntax to get the field from previous event using opensearch filter to add it to other events with same SessionId if the filed does not exist?

Do we have any alternatives to achieve this?

Configuration:

Data from DB audit files => Logstash => OpenSearch => Opensearch Dashboards

Relevant Logs or Screenshots:

2024-10-14T11:44:15,510][WARN ][logstash.filters.opensearch][man][751713dff07ea50fd357c549200c73f182fee876ee7452e1cbb9e26395ff3a69] Failed to query opensearch for previous event {:index=>"ora_aud-%{+YYYY.MM.dd}", :error=>"[400] {\"error\":{\"root_cause\":[{\"type\":\"parse_exception\",\"reason\":\"parse_exception: Encountered \\\" \\\"}\\\" \\\"} \\\"\\\" at line 1, column 53.\\nWas expecting:\\n    \\\"TO\\\" ...\\n    \"}],\"type\":\"search_phase_execution_exception\",\"reason\":\"all shards failed\",\"phase\":\"query\",\"grouped\":true,\"failed_shards\":[{\"shard\":3,\"index\":\"ora_aud-2024.10.14\",\"node\":\"iqswCrCIS96yJ5wRnu8eMg\",\"reason\":{\"type\":\"query_shard_exception\",\"reason\":\"Failed to parse query [(audit.Action:100) and (audit.Session_Id:%{Session_Id})]\",\"index\":\"ora_aud-2024.10.14\",\"index_uuid\":\"5nRWRZNGSuOMddMTZ162qg\",\"caused_by\":{\"type\":\"parse_exception\",\"reason\":\"parse_exception: Cannot parse '(audit.Action:100) and (audit.Session_Id:%{Session_Id})': Encountered \\\" \\\"}\\\" \\\"} \\\"\\\" at line 1, column 53.\\nWas expecting:\\n    \\\"TO\\\" ...\\n    \",\"caused_by\":{\"type\":\"parse_exception\",\"reason\":\"parse_exception: Encountered \\\" \\\"}\\\" \\\"} \\\"\\\" at line 1, column 53.\\nWas expecting:\\n    \\\"TO\\\" ...\\n    \"}}}}],\"caused_by\":{\"type\":\"parse_exception\",\"reason\":\"parse_exception: Encountered \\\" \\\"}\\\" \\\"} \\\"\\\" at line 1, column 53.\\nWas expecting:\\n    \\\"TO\\\" ...\\n    \"}},\"status\":400}"}

Hi @missionpossible ,

What Logstash input plugin do you use? Can you please share your Logstash configurations for your pipeline?

Do you want to add these fields with empty value?

Hi @Eugene7 ,

Input is coming from syslog stream

input {
tcp {
port => 5014
type => “syslog”
}
}

We parse it and convert to XML format.

filter {
grok {
pattern_definitions => {“AUDITRECORD” => “%{GREEDYDATA}<.AuditRecord>”}
match => { “message” => “%{GREEDYDATA:dbuniquename}ora%{GREEDYDATA}%{AUDITRECORD:message}” }
overwrite => [ “message” ]
}
if “_grokparsefailure” in [tags] {
drop{}
}
xml {
store_xml => true
source => “message”
target => “audit”
}

So we get it in XML format.

Single session generates multiple
<AuditRecord>…</AuditRecord>
entries.
UserIP and UserPort information exists in the very first entry only.

We need to get these fields with values from the first event and add to other events for each session.
All events generated by particular DB session should have same audit.Session_Id.

So we are trying to search events by audit.Session_Id and add UserIP/UserPort field if it doesnt exist

We are trying to use opensearch plugin inside filter block:

if [audit][Action] != “100” {
opensearch {
index => “ora_aud-%{+YYYY.MM.dd}”
hosts => [“…”]
user => “admin”
password => “***”
query => “(audit.Session_Id:%{[audit][Session_Id]} and audit.Action:"100")”
fields => { “audit.UserIP” => “audit.UserIP” }
}
}

It is causing error:

[2024-10-14T15:03:44,065][WARN ][logstash.filters.opensearch][man][137d7b93a8b011f723b0990da2f5112adb81b2bc00d757dcd8556c02a7c7c1d5] Failed to query opensearch for previous event {:index=>“ora_aud-%{+YYYY.MM.dd}”, :error=>“[400] {"error":{"root_cause":[{"type":"parse_exception","reason":"parse_exception: Encountered \" \"]\" \"] \"\" at line 1, column 26.\nWas expecting:\n \"TO\" …\n "}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":2,"index":"ora_aud-2024.10.14","node":"tn7I2vnCRIyT5BgwnGIz2A","reason":{"type":"query_shard_exception","reason":"Failed to parse query [(audit.Session_Id:%{[audit][Session_Id]} and audit.Action:\\\"100\\\")]","index":"ora_aud-2024.10.14","index_uuid":"5nRWRZNGSuOMddMTZ162qg","caused_by":{"type":"parse_exception","reason":"parse_exception: Cannot parse ‘(audit.Session_Id:%{[audit][Session_Id]} and audit.Action:\\\"100\\\")’: Encountered \" \"]\" \"] \"\" at line 1, column 26.\nWas expecting:\n \"TO\" …\n ","caused_by":{"type":"parse_exception","reason":"parse_exception: Encountered \" \"]\" \"] \"\" at line 1, column 26.\nWas expecting:\n \"TO\" …\n "}}}}],"caused_by":{"type":"parse_exception","reason":"parse_exception: Encountered \" \"]\" \"] \"\" at line 1, column 26.\nWas expecting:\n \"TO\" …\n "}},"status":400}”}