Record<Event> written to buffer by my custom source are not being consumed by the pipeline

Data Prepper 2.8 - Jdk 11.0.9 - Redhat Linux rel 7

I have written a Data Prepper source for reading Kafka messages. It is successfully reading the messages and writing them to the buffer, however, nothing flows to the rest of the pipeline. My source with a STDOUT sink produces no output. Logging in the source shows messages being processed. What am I missing here.

The basic code edited for brevity:
@DataPrepperPlugin(name = “kafka”, pluginType = Source.class)
public class KafkaSource implements Source<Record>

public void start(
final Buffer<Record> in_oBuffer)
{
ConsumerRecords<String, String> oConsumerRecords =
getKafkaConsumer().poll(Duration.ofMillis(100L));
for (ConsumerRecord<String, String> oConsumerRecord : oConsumerRecords)
{
Event oEvent = JacksonEvent.fromMessage(oConsumerRecord.value());
oRecord = new Record(oEvent);
in_oBuffer.write(oRecord, getWriteTimeout());
}
}

Here is the pipeline (sensitive data removed):
simple-sample-pipeline:
workers: 4
delay: “100”
source:
kafka:
bootstrap_servers:
topic_name:
consumer_group:
max_poll_records:
max_poll_Interval_Ms:

sink:
- stdout:

Here is logging:
2023-06-21T21:23:55,602 [main] INFO org.opensearch.dataprepper.parser.PipelineParser - Building pipeline [simple-sample-pipeline] from provided configuration
2023-06-21T21:23:55,603 [main] INFO org.opensearch.dataprepper.parser.PipelineParser - Building [kafka] as source component for the pipeline [simple-sample-pipeline]
2023-06-21T21:23:55,677 [main] INFO org.opensearch.dataprepper.parser.PipelineParser - Building buffer for the pipeline [simple-sample-pipeline]
2023-06-21T21:23:55,681 [main] INFO org.opensearch.dataprepper.parser.PipelineParser - Building processors for the pipeline [simple-sample-pipeline]
2023-06-21T21:23:55,681 [main] INFO org.opensearch.dataprepper.parser.PipelineParser - Building sinks for the pipeline [simple-sample-pipeline]
2023-06-21T21:23:55,682 [main] INFO org.opensearch.dataprepper.parser.PipelineParser - Building [stdout] as sink component
2023-06-21T21:23:55,683 [main] INFO org.opensearch.dataprepper.parser.PipelineParser - Constructing MultiBufferDecorator with [0] secondary buffers for pipeline [simple-sample-pipeline]
2023-06-21T21:23:55,787 [main] DEBUG org.opensearch.dataprepper.parser.config.MetricsConfig - 6 Meter Binder beans registered.
2023-06-21T21:23:55,842 [main] INFO org.opensearch.dataprepper.pipeline.Pipeline - Pipeline [simple-sample-pipeline] - Initiating pipeline execution
2023-06-21T21:23:55,843 [main] INFO org.opensearch.dataprepper.plugins.source.KafkaSource - simple-sample-pipeline: KafkaSource has started.

I know this is not a lot to go on but I am not at liberty to expose much of the code. Just hoping there is something basic missing.

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.