Opensearch_api source doesn't work with kafka buffer

Versions (relevant - OpenSearch/Dashboard/Server OS/Browser): Latest

Describe the issue:

  1. opensearch_api is not explained in the document, although it was mentioned in some 2 years old blog as an available feature.
  2. traffic doesn’t flow when we use kafka buffer with opensearch_api. it works fine with bounded blocking.

Configuration: (see snippet below, refer to screenshot as well for formatted yaml)

version: “2”
dual-write-pipeline:
source:
opensearch_api:
port: 9200
buffer:
kafka:
bootstrap_servers: [“kafka:9092”]
authentication:
sasl:
plaintext:
username: “kafka-user”
password: “kafka-pass”
encryption:
type: ssl
topics:
- name: “opensearch-writes”
group_id: “dp-dual-write”
create_topic: true
sink:
- opensearch:
hosts: [“``https://cluster-a-direct:9200``”]
username: “admin”
password: “admin”
index: “${getMetadata("opensearch_index")}”
action: “${getMetadata("opensearch_action")}”
document_id: “${getMetadata("opensearch_id")}”
routing: “${getMetadata("opensearch_routing")}”
- opensearch:
hosts: [“``https://cluster-b:9200``”]
username: “admin”
password: “admin”
index: “${getMetadata("opensearch_index")}”
action: “${getMetadata("opensearch_action")}”
document_id: “${getMetadata("opensearch_id")}”
document_version_type: “external”
document_version: “${getMetadata("opensearch_document_version")}”
routing: "${getMetadata("opensearch_routing")}

Relevant Logs or Screenshots:

@dpsmails Welcome to the forum!

Based on my understanding with kafka buffer, isByteBuffer() = true tells the source to hand off raw bytes and let the buffer’s consumer decode them on the other side. This round-trip only works if the source’s decoder (JsonDecoder) can reconstruct the original events from those bytes, and it can’t, because JsonDecoder expects {"records":[...]} while opensearch_api produces NDJSON bulk format.

The bounded blocking buffer never needs a decoder at all, it stores Record<Event> objects directly in a Java BlockingQueue, so nothing is ever serialized or re-parsed.

This would seem like a good candidate for a feature request. Which can be submitted here.

@Anthony I think this is a bug, if a newly added source isn’t working with an existing buffer (more because Kafka buffer being unsupported is not mentioned in any documentation for this source).

So I went ahead and created this issue. I will provide my fix/contribution also for this which i already have: [BUG] opensearch_api source does not work with Kafka buffer - no ByteDecoder registered · Issue #6876 · opensearch-project/data-prepper · GitHub