Output with logstash-output-opensearch plugin should include AMQP metadata that was input from RabbitMQ

Hello Everyone,

Within Logstash we are inputting AMQP messages from RabbitMQ. These come to RabbitMQ via SMTP utilizing the rabbitmq-email plugin, so the AMQP headers contain some SMTP information we need.
We are then outputting to OpenSearch using the logstash-output-opensearch plugin.

By default it seems that only the body of the email is inputted to OpenSearch.

We require the email subject and destination address from the AMQP metadata. We know they are there, as we are able to see these when consuming messages with a python script. We can also see these in the Logstash container logs, EG:

[2024-02-09T08:57:13,918][WARN ][logstash.outputs.opensearch][main] Restored connection to OpenSearch instance {:url=>"https://admin:xxxxxx@opensearch:9200/"}
[2024-02-09T08:57:13,942][INFO ][logstash.outputs.opensearch][main] Cluster version determined (2.11.1) {:version=>2}
[2024-02-09T08:57:14,927][INFO ][logstash.outputs.opensearch][main] Using a default mapping template {:version=>2, :ecs_compatibility=>:v8}
{
       "@timestamp" => 2024-02-09T08:58:06.541860511Z,
        "@metadata" => {
        "rabbitmq_properties" => {
             "consumer-tag" => "amq.ctag-_36BYl7dfFbFo5s8kAOZEQ",
             "content-type" => "text/plain",
            "delivery-mode" => 2,
               "message-id" => "9054ce4cf059eadb9e4c809f3926d798",
              "routing-key" => "smtp_test@example.com",
                "timestamp" => 1707469086,
                 "exchange" => "email-in"
        },
           "rabbitmq_payload" => "Hello. Timestamp is 2024-02-09T16:58:10.721259 for email send.",
           "rabbitmq_headers" => {
            "Subject" => "SMTP MQ Test",
               "From" => "random_sender@invalid_domain"
        }
    },
       "email_dest" => "smtp_test@example.com",
         "@version" => "1",
          "message" => "Hello. Timestamp is 2024-02-09T16:58:10.721259 for email send.",
            "event" => {
        "original" => "Hello. Timestamp is 2024-02-09T16:58:10.721259 for email send."
    },
    "email_subject" => "SMTP MQ Test",
             "tags" => [
        [0] "test",
        [1] "RabbitMQ",
        [2] "smtp"
    ]
}

Attempting to get this working has lead me to using the following logstash.conf file.
Note: routing-key from AMPQ metadata is obtained from the SMTP email destination (“To”) address.

input {
  rabbitmq {
    codec => "plain"
    tags => ["test","RabbitMQ","smtp"]
    host => "rabbitmq"
    port => 5672
    queue => "queue_T"
    durable => true
    passive => true
    user => "consumer_t"
    password => "BOGUS_PASSWORD"
    metadata_enabled => "extended"
  }
}
filter {
  mutate {
    add_field => { "email_dest" => "%{[@metadata][rabbitmq_properties][routing-key]}" }
    add_field => { "email_subject" => "%{[@metadata][rabbitmq_headers][Subject]}" }
  }
}
output {
  opensearch {
    hosts => ["opensearch:9200"]
    ssl => true
    ssl_certificate_verification => false
    user => "osearch_user"
    password => "BOGUS_PASSWORD"
    index => "test-rabbitmq-logstash-pipeline"
  }
  stdout { codec => rubydebug { metadata => true } }
}

But it seems that I may have missed something or made a mistake, as the email info required is not outputting to OpenSearch. EG:

$ curl -k -s -u admin:admin -X GET https://localhost:9200/test-rabbitmq-logstash-pipeline/_search?pretty=true
{
  "took" : 898,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "test-rabbitmq-logstash-pipeline",
        "_id" : "_YUWjY0BGvTl8jIUnz3P",
        "_score" : 1.0,
        "_source" : {
          "tags" : [
            "test",
            "RabbitMQ",
            "smtp"
          ],
          "email_dest" : "smtp_test@example.com",
          "message" : "Hello. Timestamp is 2024-02-09T16:58:10.721259 for email send.",
          "email_subject" : "SMTP MQ Test",
          "@version" : "1",
          "@timestamp" : "2024-02-09T08:58:06.541860511Z",
          "event" : {
            "original" : "Hello. Timestamp is 2024-02-09T16:58:10.721259 for email send."
          }
        }
      }
    ]
  }
}

Anyone able to see what might be wrong?

Cheers,
Eddie.

Oops, my mistake, it is working!

Looks like it begun to work after a “docker compose down” and “up” again.

That curl output from OpenSearch actually does have the email_dest and email_subject we are after, but I just did not see it.

Sorry everyone, it was getting late.

Maybe a forum moderator can delete this topic?
I don’t think I am able to.

Cheers,
Eddie.