Data Prepper Route Pipeline Validation Issues

Versions (relevant - OpenSearch/Dashboard/Server OS/Browser):

Data Prepper: 2.11.0

Server OS: Ubuntu Linux

Java: OpenJDK 17.0.10+7

Issue:
I’m encountering persistent pipeline validation errors when configuring a pipeline in Data Prepper 2.11.0. Despite multiple configuration attempts based on the documentation, I continue to receive “expected source X for pipeline Y is missing” errors. The goal is to:

Receive logs via OpenTelemetry

Route to specialized pipelines based on log attributes

Aggregate all logs in a final pipeline

Send to OpenSearch

All pipelines validate correctly individually, but fail when connected in the fan-in/fan-out pattern. The validation fails during startup with errors about missing sources for the final aggregation pipeline.

Configuration:

yaml

Entry pipeline

log-routing-pipeline:
source:
otel_logs_source:
port: 21891
ssl: false
route:
- application_logs: ‘(/attributes/log.attributes.app_name == “xyz” or /attributes/log.attributes.app_name == “zyx”) and /attributes/log.attributes.log_type == “applicationlog”’
- error_logs: ‘/attributes/log.attributes.log_type == “errorlog”’
- syslog_logs: ‘/attributes/log.attributes.log_type == “syslog”’
- unknown_logs: ‘not (/attributes/log.attributes.app_name == “xyz” or /attributes/log.attributes.app_name == “zyx”)’
sink:
- pipeline:
name: “application-pipeline”
routes: [application_logs]
- pipeline:
name: “errorlog-pipeline”
routes: [error_logs]
- pipeline:
name: “syslog-pipeline”
routes: [syslog_logs]
- pipeline:
name: “unknown-pipeline”
routes: [unknown_logs]

Intermediate pipelines

application-pipeline:
source:
pipeline:
name: “log-routing-pipeline”
routes: [application_logs]
processor:
sink:
- pipeline:
name: “final-log-pipeline”

errorlog-pipeline:
source:
pipeline:
name: “log-routing-pipeline”
routes: [error_logs]
processor:
sink:
- pipeline:
name: “final-log-pipeline”

syslog-pipeline:
source:
pipeline:
name: “log-routing-pipeline”
routes: [syslog_logs]
processor:
sink:
- pipeline:
name: “final-log-pipeline”

unknown-pipeline:
source:
pipeline:
name: “log-routing-pipeline”
routes: [unknown_logs]
processor:
sink:
- pipeline:
name: “final-log-pipeline”

Final pipeline

final-log-pipeline:
source:
pipeline:
name:
- application-pipeline
- errorlog-pipeline
- syslog-pipeline
- unknown-pipeline
processor:
sink:
- opensearch:
hosts: [“https://search-xxxxxxx.us-east-1.es.amazonaws.com:443”]
username: “xxx”
password: “xxx”
index: “logs-${/index_name}-%{yyyy.MM.dd}”
Relevant Logs or Screenshots:

Error

Data Prepper now supports reading pipeline and data-prepper configuration files
from Data Prepper home directory automatically.
You can continue to specify paths to configuration files as command line arguments,
but that support will be dropped in a future release.

JAVA_HOME is set to /home/ubuntu/opensearch-data-prepper-jdk-2.11.0-linux-x64/openjdk/jdk-17.0.10+7
2025-06-24T00:13:45,200 [main] INFO org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformer - No transformation needed
2025-06-24T00:13:46,395 [main] INFO org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigExtension - Applying Kafka Cluster Config Extension.
2025-06-24T00:13:47,063 [main] ERROR org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationValidator - Invalid configuration, expected source application-pipeline for pipeline final-log-pipeline is missing
2025-06-24T00:13:47,065 [main] WARN org.springframework.context.support.AbstractApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘dataPrepper’ defined in URL [jar:file:/home/ubuntu/opensearch-data-prepper-jdk-2.11.0-linux-x64/lib/data-prepper-core-2.11.0.jar!/org/opensearch/dataprepper/core/DataPrepper.class]: Bean instantiation via constructor failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.opensearch.dataprepper.core.DataPrepper]: Constructor threw exception; nested exception is java.lang.RuntimeException: Invalid configuration, expected source application-pipeline for pipeline final-log-pipeline is missing
Exception in thread “main” org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘dataPrepper’ defined in URL [jar:file:/home/ubuntu/opensearch-data-prepper-jdk-2.11.0-linux-x64/lib/data-prepper-core-2.11.0.jar!/org/opensearch/dataprepper/core/DataPrepper.class]: Bean instantiation via constructor failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.opensearch.dataprepper.core.DataPrepper]: Constructor threw exception; nested exception is java.lang.RuntimeException: Invalid configuration, expected source application-pipeline for pipeline final-log-pipeline is missing
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:306)
at org.springframework.beans.factory.support.ConstructorResolver.autowireConstructor(ConstructorResolver.java:287)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.autowireConstructor(AbstractAutowireCapableBeanFactory.java:1372)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1222)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:336)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:334)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:209)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:955)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:932)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:591)
at org.opensearch.dataprepper.AbstractContextManager.start(AbstractContextManager.java:64)
at org.opensearch.dataprepper.AbstractContextManager.getDataPrepperBean(AbstractContextManager.java:50)
at org.opensearch.dataprepper.DataPrepperExecute.main(DataPrepperExecute.java:40)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.opensearch.dataprepper.core.DataPrepper]: Constructor threw exception; nested exception is java.lang.RuntimeException: Invalid configuration, expected source application-pipeline for pipeline final-log-pipeline is missing
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:226)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:117)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:302)
… 15 more
Caused by: java.lang.RuntimeException: Invalid configuration, expected source application-pipeline for pipeline final-log-pipeline is missing
at org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationValidator.validateSourceMapping(PipelineConfigurationValidator.java:128)
at org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationValidator.visitAndValidate(PipelineConfigurationValidator.java:97)
at org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationValidator.lambda$validateAndGetPipelineNames$0(PipelineConfigurationValidator.java:54)
at java.base/java.util.HashMap.forEach(HashMap.java:1421)
at org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationValidator.validateAndGetPipelineNames(PipelineConfigurationValidator.java:52)
at org.opensearch.dataprepper.core.parser.PipelineTransformer.transformConfiguration(PipelineTransformer.java:111)
at org.opensearch.dataprepper.core.DataPrepper.(DataPrepper.java:73)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:213)
… 17 more
ubuntu@ip-172-31-21-104:~/opensearch-data-prepper-jdk-2.11.0-linux-x64$

What I’ve Tried:

Verified all pipeline names match exactly

Added explicit routes to intermediate pipelines

Simplified the final pipeline source declaration

Tried different indentation formats

Added route declarations to the final pipeline (resulted in new errors)

Confirmed the configuration works with a single pipeline approach

Key Questions:

Is this approach supported in Data Prepper 2.11?

Are there special requirements for the aggregation pipeline?

Could this be a dependency ordering issue in pipeline validation?

Are there any working examples of multi-pipeline fan-in configurations?

What alternative patterns would achieve the same result?

Any guidance would be greatly appreciated, as I’ve exhausted all documentation and configuration options I could find.