How to configure otel_logs_source as source for my data prepper pipeline and access fields inside that

Versions (relevant - OpenSearch/Dashboard/Server OS/Browser):
Describe the issue:

I’ve configured OpenSearch and Data Prepper locally, and set up the pipeline to test the otel_logs_source plugin. Although the setup appears to be running without any errors, and I’m sending data to it, I’m not seeing any logs ingested into the OpenSearch index.

Configuration:

Opnsearch docker-compose.yaml:

services:
  opensearch-node1: 
    image: opensearchproject/opensearch:latest 
    container_name: opensearch-node1
    environment:
      - cluster.name=opensearch-cluster
      - node.name=opensearch-node1 
      - discovery.seed_hosts=opensearch-node1,opensearch-node2
      - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - OPENSEARCH_INITIAL_ADMIN_PASSWORD=Aravinth@31
    ulimits:
      memlock:
        soft: -1 
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-data1:/usr/share/opensearch/data
    ports:
      - 9200:9200
      - 9600:9600
    networks:
      - opensearch-net

  opensearch-node2:
    image: opensearchproject/opensearch:latest
    container_name: opensearch-node2
    ports:
      - 9201:9200
    environment:
      - cluster.name=opensearch-cluster
      - node.name=opensearch-node2
      - discovery.seed_hosts=opensearch-node1,opensearch-node2
      - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - OPENSEARCH_INITIAL_ADMIN_PASSWORD=Aravinth@31
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-data2:/usr/share/opensearch/data
    networks:
      - opensearch-net

  opensearch-dashboards:
    image: opensearchproject/opensearch-dashboards:latest
    container_name: opensearch-dashboards
    ports:
      - 5601:5601
    expose:
      - "5601"
    environment:
      OPENSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]'
    networks:
      - opensearch-net

volumes:
  opensearch-data1:
  opensearch-data2:

networks:
  opensearch-net:
    external: true

data-prepper docker-compose.yaml:

services:
  data-prepper:
    image: opensearchproject/data-prepper:latest
    container_name: data-prepper
    platform: linux/amd64
    ports:
      - "2021:2021"
      - "21892:21892"
      - "4900:4900"
    volumes:
      - ./pipelines.yaml:/usr/share/data-prepper/pipelines/pipelines.yaml
      - ./data-prepper-config.yaml:/usr/share/data-prepper/config/data-prepper-config.yaml
    networks:
      - opensearch-net

volumes:
  data-prepper:

networks:
  opensearch-net:
    external: true

data-prepper-config.yaml

ssl: false
serverPort: 4900
authentication:
  http_basic:
    username: admin
    password: Aravinth@31

pipelines.yaml

version: "2"
log-pipeline:
  source:
    otel_logs_source:
      path: /events/ingest
      ssl: false

    processor:
      - add_entries:
          entries:
            - metadata_key: "doc_id"
              value_expression: "/log.attributes.EventUuId"
      - rename_keys:
          entries:
            - from_key: "body"
              to_key: "eventName"

            - from_key: "log.attributes.ExternalIp"
              to_key: "resource.attributes.ExternalIp"

            - from_key: "log.attributes.EndpointId"
              to_key: "resource.attributes.EndpointId"

            - from_key: "log.attributes.LocalIp"
              to_key: "resource.attributes.LocalIp"

            - from_key: "log.attributes.TenantId"
              to_key: "resource.attributes.TenantId"

            - from_key: "log.attributes.DeviceName"
              to_key: "resource.attributes.DeviceName"

            - from_key: "log.attributes.AgentVersion"
              to_key: "instrumentationScope.version"

      - convert_entry_type:
          key: "eventName"
          type: "integer"
      - grok:
          match:
            log.attributes.Hashes:
              - "SHA1=%{NOTSPACE:log.attributes.HashSHA1},MD5=%{NOTSPACE:log.attributes.HashMD5},SHA256=%{NOTSPACE:log.attributes.HashSHA256},IMPHASH=%{NOTSPACE:log.attributes.ImpHash}"
      - delete_entries:
          with_keys: ["log.attributes.EventUuId", log.attributes.Hashes]

  sink:
    - opensearch:
        hosts:
          - "https://opensearch-node1:9200"
        index: "data-prepper-0001"
        username: admin
        password: Aravinth@31
        document_id: ${getMetadata("doc_id")}
        insecure: true

Relevant Logs or Screenshots:

2025-09-02 16:16:46 Reading pipelines and data-prepper configuration files from Data Prepper home directory.
2025-09-02 16:16:46 /usr/bin/java
2025-09-02 16:16:46 Found openjdk version  of 17.0
2025-09-02 16:16:49 2025-09-02T10:46:49,826 [main] INFO  org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformer - No transformation needed
2025-09-02 16:16:51 2025-09-02T10:46:51,473 [main] INFO  org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigExtension - Applying Kafka Cluster Config Extension.
2025-09-02 16:16:52 2025-09-02T10:46:52,362 [main] WARN  org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSource - Creating otel-logs-source without authentication. This is not secure.
2025-09-02 16:16:52 2025-09-02T10:46:52,362 [main] WARN  org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSource - In order to set up Http Basic authentication for the otel-logs-source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-logs-source#authentication-configurations
2025-09-02 16:16:52 2025-09-02T10:46:52,727 [log-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Initializing OpenSearch sink
2025-09-02 16:16:52 2025-09-02T10:46:52,734 [log-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.sink.opensearch.ConnectionConfiguration - Using the username provided in the config.
2025-09-02 16:16:52 2025-09-02T10:46:52,752 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.HttpServerProvider - Creating Data Prepper server without TLS. This is not secure.
2025-09-02 16:16:52 2025-09-02T10:46:52,752 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.HttpServerProvider - In order to set up TLS for the Data Prepper server, go here: https://github.com/opensearch-project/data-prepper/blob/main/docs/configuration.md#server-configuration
2025-09-02 16:16:53 2025-09-02T10:46:53,032 [log-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.sink.opensearch.ConnectionConfiguration - Using the trust all strategy
2025-09-02 16:16:53 2025-09-02T10:46:53,534 [log-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Initialized OpenSearch sink
2025-09-02 16:16:54 2025-09-02T10:46:54,226 [log-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.server.CreateServer - Adding service with path: /events/ingest
2025-09-02 16:16:54 2025-09-02T10:46:54,644 [log-pipeline-sink-worker-2-thread-1] WARN  org.opensearch.dataprepper.plugins.server.CreateServer - Creating otel_logs_source without SSL/TLS. This is not secure.
2025-09-02 16:16:54 2025-09-02T10:46:54,645 [log-pipeline-sink-worker-2-thread-1] WARN  org.opensearch.dataprepper.plugins.server.CreateServer - In order to set up TLS for the otel_logs_source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-trace-source#ssl
2025-09-02 16:16:54 2025-09-02T10:46:54,885 [log-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSource - Started otel_logs_source...

these is no logs in pipeline too.

curl --request POST \
  --url http://localhost:21892/events/ingest \
  --header 'Content-Type: application/json' \
  --data '{
  "resourceLogs": [
    {
      "resource": {},
      "scopeLogs": [
        {
          "scope": {},
          "logRecords": [
            {
              "timeUnixNano": "1756457718527272600",
              "body": {
                "stringValue": "0"
              },
              "attributes": [
                {
                  "key": "DeviceName",
                  "value": {
                    "stringValue": "mitsdevice"
                  }
                },
                {
                  "key": "TenantId",
                  "value": {
                    "stringValue": "-99"
                  }
                },
                {
                  "key": "EndpointId",
                  "value": {
                    "stringValue": "htut89"
                  }
                },
                {
                  "key": "AgentVersion",
                  "value": {
                    "stringValue": "1.0.0.0"
                  }
                },
                {
                  "key": "ExternalIp",
                  "value": {
                    "stringValue": "100.0.0.1"
                  }
                },
                {
                  "key": "LocalIp",
                  "value": {
                    "stringValue": "100.0.0.1"
                  }
                },
                {
                  "key": "Image",
                  "value": {
                    "stringValue": "C:/Program Files/PostgreSQL/17/bin/postgres.exe"
                  }
                },
                {
                  "key": "ParentProcessId",
                  "value": {
                    "intValue": "6148"
                  }
                },
                {
                  "key": "CommandLine",
                  "value": {
                    "stringValue": "\"C:/Program Files/PostgreSQL/17/bin/postgres.exe\" --forkchild=\"autovacuum worker\" 1564"
                  }
                },
                {
                  "key": "ParentImage",
                  "value": {
                    "stringValue": "C:\\Program Files\\PostgreSQL\\17\\bin\\postgres.exe"
                  }
                },
                {
                  "key": "ParentCommandLine",
                  "value": {
                    "stringValue": "\"C:\\Program Files\\PostgreSQL\\17\\bin\\postgres.exe\" -D \"C:\\Program Files\\PostgreSQL\\17\\data\" "
                  }
                },
                {
                  "key": "IntegrityLevel",
                  "value": {
                    "stringValue": ""
                  }
                },
                {
                  "key": "UserName",
                  "value": {
                    "stringValue": ""
                  }
                },
                {
                  "key": "Hashes",
                  "value": {
                    "stringValue": "1B17FE58BDAAB5BC62A2410771CFBF61"
                  }
                },
                {
                  "key": "FileVersion",
                  "value": {
                    "stringValue": "17.2"
                  }
                },
                {
                  "key": "Description",
                  "value": {
                    "stringValue": "PostgreSQL Server"
                  }
                },
                {
                  "key": "Product",
                  "value": {
                    "stringValue": "PostgreSQL"
                  }
                },
                {
                  "key": "Company",
                  "value": {
                    "stringValue": "PostgreSQL Global Development Group"
                  }
                },
                {
                  "key": "ProviderName",
                  "value": {
                    "stringValue": "Windows Kernel"
                  }
                },
                {
                  "key": "ParentUser",
                  "value": {
                    "stringValue": "NT AUTHORITY\\NETWORK SERVICE"
                  }
                },
                {
                  "key": "CurrentDirectory",
                  "value": {}
                },
                {
                  "key": "OriginalFileName",
                  "value": {
                    "stringValue": "postgres.exe"
                  }
                },
                {
                  "key": "LogonId",
                  "value": {
                    "stringValue": ""
                  }
                },
                {
                  "key": "ThreadId",
                  "value": {
                    "intValue": "0"
                  }
                },
                {
                  "key": "EventUuId",
                  "value": {
                    "stringValue": "b0749eac-290b-473d-bcca-9bd6401453e3"
                  }
                },
                {
                  "key": "traceId",
                  "value": {
                    "stringValue": "4fd0c4f0e3f64b7d9b8c2c4a9d2a1fcd"
                  }
                },
                {
                  "key": "spanId",
                  "value": {
                    "stringValue": "6a7c12345de34c92"
                  }
                }
              ],
              "traceId": "4fd0c4f0e3f64b7d9b8c2c4a9d2a1fcd",
              "spanId": "6a7c12345de34c92"
            }
          ]
        }
      ]
    }
  ]
}'

for curl request with content type as application/json i am getting Missing or invalid Content-Type header.If I tried with content type as application/grpc for same request, I am getting 200, but could not see any logs in pipeline also I couldn’t see any data in index.

Also I tried with grpc curl.

grpcurl -plaintext \
  -proto opentelemetry/proto/collector/logs/v1/logs_service.proto \
  -proto opentelemetry/proto/logs/v1/logs.proto \
  -proto opentelemetry/proto/common/v1/common.proto \
  -proto opentelemetry/proto/resource/v1/resource.proto \
  -d '{
    "resourceLogs": [{
      "resource": {
        "attributes": [{
          "key": "service.name",
          "value": { "stringValue": "test-service" }
        }]
      },
      "scopeLogs": [{
        "logRecords": [{
          "timeUnixNano": "1756457718527272600",
          "body": { "stringValue": "Hello from grpcurl!" },
          "attributes": [{
            "key": "DeviceName",
            "value": { "stringValue": "mitsdevice" }
          }]
        }]
      }]
    }]
  }' \
  localhost:21892 opentelemetry.proto.collector.logs.v1.LogsService/Export

ERROR:
  Code: Unimplemented
  Message: unexpected HTTP status code received from server: 404 (Not Found); transport: received unexpected content-type "text/plain; charset=utf-8"

'm using the otel_logs_source plugin to ingest logs via an AWS pipeline, where I’m able to send data in JSON format. Now, I want to replicate this setup locally and need to confirm whether JSON-formatted logs can be sent locally as well. Additionally, I need to understand the structure of the incoming JSON once it’s received by the otel_logs_source plugin, as I’m facing issues with configuring processors due to lack of clarity on the schema. Could someone share an example or documentation that shows how the incoming JSON looks after ingestion via otel_logs_source, or guide me on how to inspect it?

@Aravinth You should be able to see the logs from data prepper using docker logs <data-prepper-container>

The logs should outline the problems with the records.

Additionally, I would recommend to output the result to stdout using below:

sink:
    - stdout:
        format: json
    - opensearch:

This will demonstrate how the date looks before it tried to send it to opensearch.
Have a look at the “doc_id” value in the printed log, it possible it is set to “null”

@Aravinth see my configuration and commands below, this might answer some questions:

payload.json

{
  "resourceLogs": [{
    "resource": {
      "attributes": [{
        "key": "service.name",
        "value": { "stringValue": "dp-grpc-test" }
      }]
    },
    "scopeLogs": [{
      "scope": { "name": "manual-test", "version": "1.0.0" },
      "logRecords": [{
        "timeUnixNano": "1756457718527272600",
        "body": { "stringValue": "1111" },
        "attributes": [
          { "key": "EventUuId",    "value": { "stringValue": "abc-123" } },
          { "key": "ExternalIp",   "value": { "stringValue": "1.2.3.4" } },
          { "key": "EndpointId",   "value": { "stringValue": "endpoint-4444" } },
          { "key": "LocalIp",      "value": { "stringValue": "10.0.0.5" } },
          { "key": "TenantId",     "value": { "stringValue": "tenant-99" } },
          { "key": "DeviceName",   "value": { "stringValue": "host-01" } },
          { "key": "AgentVersion", "value": { "stringValue": "1.0.0" } },
          { "key": "Hashes",       "value": { "stringValue": "SHA1=a1b2c3,MD5=ff11aa22,SHA256=deadbeef,IMPHASH=123hash" } }
        ],
        "traceId": "4fd0c4f0e3f64b7d9b8c2c4a9d2a1fcd",
        "spanId":  "6a7c12345de34c92"
      }]
    }]
  }]
}

pipeline.yaml

version: "2"

log-pipeline:
  source:
    otel_logs_source:
      port: 21892          # default OTLP/gRPC port
      ssl: false
      output_format: otel  # events look like your stdout sample (nested OTel shape)
      health_check_service: true

  # Optional processors; keep it simple to start
  sink:
    - stdout:
        format: json
    - opensearch:
        hosts: ["https://opensearch-node2:9200"]
        index: "data-prepper-0001"
        username: "admin"
        password: "admin"
        # _id pulled directly from the OTel attributes object you send
        document_id: '${/attributes/EventUuId}'
        insecure: true

data-prepper-config.yml

ssl: false
serverPort: 4900
authentication:
  http_basic:
    username: admin
    password: admin

Command:

grpcurl -plaintext \
  -import-path "$PROTO_DIR" \
  -proto opentelemetry/proto/collector/logs/v1/logs_service.proto \
  -proto opentelemetry/proto/logs/v1/logs.proto \
  -proto opentelemetry/proto/common/v1/common.proto \
  -proto opentelemetry/proto/resource/v1/resource.proto \
  -d @ \
  localhost:21892 \
  opentelemetry.proto.collector.logs.v1.LogsService/Export < payload.json

Data-prepper logs:

{"traceId":"e1f7747387f47b77fae1beddf5bf1cd9ce1af5dd9ad5f71d","instrumentationScope":{"name":"manual-test","droppedAttributesCount":0,"version":"1.0.0"},"resource":{"droppedAttributesCount":0,"attributes":{"service.name":"dp-grpc-test"},"schemaUrl":""},"flags":0,"severityNumber":0,"body":"1111","schemaUrl":"","spanId":"e9aedcd76df8e5d7b7e1cf76","severityText":"","attributes":{"TenantId":"tenant-99","ExternalIp":"1.2.3.4","AgentVersion":"1.0.0","Hashes":"SHA1=a1b2c3,MD5=ff11aa22,SHA256=deadbeef,IMPHASH=123hash","LocalIp":"10.0.0.5","EventUuId":"abc-123","EndpointId":"endpoint-4444","DeviceName":"host-01"},"time":"2025-08-29T08:55:18.527272600Z","droppedAttributesCount":0,"observedTimestamp":"1970-01-01T00:00:00Z"}

Document in the index:

curl -k -u admin:admin "https://localhost:9200/data-prepper-0001/_search?pretty"
{
  "took" : 700,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "data-prepper-0001",
        "_id" : "abc-123",
        "_score" : 1.0,
        "_source" : {
          "traceId" : "e1f7747387f47b77fae1beddf5bf1cd9ce1af5dd9ad5f71d",
          "instrumentationScope" : {
            "name" : "manual-test",
            "droppedAttributesCount" : 0,
            "version" : "1.0.0"
          },
          "resource" : {
            "droppedAttributesCount" : 0,
            "attributes" : {
              "service.name" : "dp-grpc-test"
            },
            "schemaUrl" : ""
          },
          "flags" : 0,
          "severityNumber" : 0,
          "body" : "1111",
          "schemaUrl" : "",
          "spanId" : "e9aedcd76df8e5d7b7e1cf76",
          "severityText" : "",
          "attributes" : {
            "TenantId" : "tenant-99",
            "ExternalIp" : "1.2.3.4",
            "AgentVersion" : "1.0.0",
            "Hashes" : "SHA1=a1b2c3,MD5=ff11aa22,SHA256=deadbeef,IMPHASH=123hash",
            "LocalIp" : "10.0.0.5",
            "EventUuId" : "abc-123",
            "EndpointId" : "endpoint-4444",
            "DeviceName" : "host-01"
          },
          "time" : "2025-08-29T08:55:18.527272600Z",
          "droppedAttributesCount" : 0,
          "observedTimestamp" : "1970-01-01T00:00:00Z"
        }
      }
    ]
  }
}

Let me know if you have any questions

1 Like

@Anthony Thanks for sharing the updated configurations. I tested them, and now I can see that the data is being successfully sent to OpenSearch. However, I’m having trouble understanding the difference between the configuration I initially shared and the one you provided. They appear quite similar. If possible, could you explain what the issue was and what I might have done wrong in my original configuration?

Although the data is being successfully sent to OpenSearch, I have two additional concerns.

First, I need help accessing the incoming JSON fields within the processors section. I’m currently using the pipeline configuration below, which includes processors. However, none of the processors seem to be working. My main challenge is accessing the fields from the incoming OTEL-formatted JSON. The data is formatted using the otel_logs_source plugin in OpenSearch format, and while the stdout sink prints the output after the processor stage, I’m unsure how to reference the fields correctly within the processor itself. I’d appreciate guidance on how to modify the configuration so that I can process the incoming OpenSearch-formatted OTEL JSON using processors.

Second, the otel_logs_source includes trace_id and span_id fields. I want to send custom UUIDs for these fields. Initially, I tried sending UUIDs with hyphens, but received 500 errors, likely because OTEL expects a 16-byte array. I then generated UUIDs without hyphens and sent them, but the values stored in the index differ from what I sent. Could you please help me understand how to correctly send UUIDs as trace_id and span_id so that they appear as expected in the index?

pipeline.yaml

version: "2"

log-pipeline:
  source:
    otel_logs_source:
      port: 21892 # default OTLP/gRPC port
      ssl: false
      output_format: opensearch # events look like your stdout sample (nested OTel shape)
      health_check_service: true

    processor:
      - add_entries:
          entries:
            - metadata_key: "doc_id"
              value_expression: "/log.attributes.EventUuId"
      - copy_values:
          entries:
            - from_key: "body"
              to_key: "eventName"
            - from_key: "attributes"
              to_key: "test_att"
            - from_key: "resource"
              to_key: "test_res"
            - from_key: "/log/attributes/EventUuId"
              to_value: "eventId"
      - delete_entries:
          with_keys: ["body"]

  # Optional processors; keep it simple to start
  sink:
    - stdout:
        format: json
    - opensearch:
        hosts: ["https://opensearch-node2:9200"]
        index: "data-prepper-0001"
        username: "admin"
        password: "Aravinth@31"
        # _id pulled directly from the OTel attributes object you send
        # document_id: ${getMetadata("doc_id")}
        insecure: true

document in index:

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "data-prepper-0001",
        "_id": "_oXnC5kB89JbYR9dnUoF",
        "_score": 1,
        "_source": {
          "traceId": "e1f7747387f47b77fae1beddf5bf1cd9ce1af5dd9ad5f71d",
          "spanId": "e1f7747387f47b77fae1beddf5bf1cd9ce1af5dd9ad5f71d",
          "severityText": "",
          "flags": 0,
          "time": "2025-08-29T08:55:18.527272600Z",
          "severityNumber": 0,
          "droppedAttributesCount": 0,
          "serviceName": "dp-grpc-test",
          "body": "1111",
          "observedTimestamp": "1970-01-01T00:00:00Z",
          "schemaUrl": "",
          "log.attributes.DeviceName": "host-01",
          "instrumentationScope.name": "manual-test",
          "log.attributes.Hashes": "SHA1=a1b2c3,MD5=ff11aa22,SHA256=deadbeef,IMPHASH=123hash",
          "log.attributes.AgentVersion": "1.0.0",
          "log.attributes.TenantId": "tenant-99",
          "log.attributes.EventUuId": "abc-123",
          "log.attributes.EndpointId": "endpoint-4444",
          "resource.attributes.service@name": "dp-grpc-test",
          "log.attributes.LocalIp": "10.0.0.5",
          "log.attributes.ExternalIp": "1.2.3.4",
          "instrumentationScope.version": "1.0.0"
        }
      }
    ]
  }
}

Additional Info: I have used 4fd0c4f0e3f64b7d9b8c2c4a9d2a1fcd as both trace id and span id.

1 Like

In the above pipeline configuration, I tried with multiple processors to access the incoming fields at multiple ways. Also I commented # document_id: ${getMetadata("doc_id")}. I am getting error because mapping is wrong.

@Avatar1461 can you provide the output of stdout before it goes to opensearch please

@Anthony

2025-09-03 19:31:05 2025-09-03T14:01:05,995 [log-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSource - Started otel_logs_source...
2025-09-03 19:31:11 {"traceId":"e1f7747387f47b77fae1beddf5bf1cd9ce1af5dd9ad5f71d","spanId":"e1f7747387f47b77fae1beddf5bf1cd9ce1af5dd9ad5f71d","severityText":"","flags":0,"time":"2025-08-29T08:55:18.527272600Z","severityNumber":0,"droppedAttributesCount":0,"serviceName":"dp-grpc-test","body":"1111","observedTimestamp":"1970-01-01T00:00:00Z","schemaUrl":"","log.attributes.DeviceName":"host-01","instrumentationScope.name":"manual-test","log.attributes.Hashes":"SHA1=a1b2c3,MD5=ff11aa22,SHA256=deadbeef,IMPHASH=123hash","log.attributes.AgentVersion":"1.0.0","log.attributes.TenantId":"tenant-99","log.attributes.EventUuId":"abc-123","log.attributes.EndpointId":"endpoint-4444","resource.attributes.service@name":"dp-grpc-test","log.attributes.LocalIp":"10.0.0.5","log.attributes.ExternalIp":"1.2.3.4","instrumentationScope.version":"1.0.0"}

this was the output I received in data prepper pipeline using stdout before reaching opensearch sink

@Aravinth when you use:

output_format: opensearch 

Instead of:

output_format: otel

You flatten the values, resulting in log.attributes.EventUuId instead of log{attributes{EventUuId}}

There is already a bug reported where these fail to validate.

Is there a reason you can’t use otel, instead and access the values using /log/attributes/EventUuId
The same can be used in document_id section but with ${}:

document_id: '${/attributes/EventUuId}'

As a side note, its probably copy/paste issue, but your indent on processor is off.

2 Likes

@Anthony Thanks for the solution. It took me a while to understand the issue, but I finally got it. When using output_format: opensearch, the values are being flattened and appear as log.attributes.EventUuId inside the attributes key. That’s why I couldn’t access them using /log.attributes.EventUuId or /log/attributes/EventUuId. Eventually, I discovered that the correct way to access the keys is via /attributes/log.attributes.EventUuId.

1 Like

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.