Configuring Peer forwarder

Data Prepper 2.9
OpenSearch 2.6.0
Open Telemetry 0.110
Rocky Linux 8.5 / 64 CPU / 32 RAM

Hi there,
I’m trying to make two Data Prepper nodes working together to process Open Telemetry traces, but not sure if my settings are correct. It seems that nodes connected, but from metrics I see that the load is uneven. For example, the raw_pipeline_recordsProcessed_total metric for dataprepper1 shows about 20000 records at 15s interval, while dataprepper2 only ~500 records for the same period. I’m using kafka buffer in the entry pipeline of the dataprepper1 node. Otel collector is connected to the dataprepper1 node.

Configuration:
data-prepper-config.yaml - this is the same for both nodes:

ssl: false
peer_forwarder:
  ssl: false
  buffer_size: 5000000
  discovery_mode: static
  static_endpoints: ["dataprepper1", "dataprepper2"]

pipelines.yaml - dataprepper1

entry-pipeline:
  source:
    otel_trace_source:
      ssl: false
  buffer:
    kafka:
      bootstrap_servers:
        - kafka01:9092
      topics:
        - name: otel_spans
          group_id: otel_spans
          max_partition_fetch_bytes: 50mb
          consumer_max_poll_records: 20
          workers: 128
      encryption:
        type: none
  processor:
    - trace_peer_forwarder:
  sink:
    - pipeline:
        name: "raw-pipeline"
    - pipeline:
        name: "service-map-pipeline"

raw-pipeline:
  workers: 128
  source:
    pipeline:
      name: "entry-pipeline"
  buffer:
    bounded_blocking:
      buffer_size: 102400
      batch_size: 100
  processor:
    - otel_traces:
  sink:
    - opensearch:
        hosts:
          - "https://coordinator001:9200"
          - "https://coordinator002:9200"
        insecure: true
        username: "opentelemetry"
        password: "secret"
        index_type: trace-analytics-raw
        dlq_file: /data/dlq/raw_pipeline_dlq.log
        max_retries: 1

service-map-pipeline:
  workers: 128
  source:
    pipeline:
      name: "entry-pipeline"
  buffer:
    bounded_blocking:
      buffer_size: 102400
      batch_size: 100
  processor:
    - service_map:
  sink:
    - opensearch:
        hosts:
          - "https://coordinator001:9200"
          - "https://coordinator002:9200"
        insecure: true
        username: "opentelemetry"
        password: "secret"
        index_type: trace-analytics-service-map
        dlq_file: /data/dlq/service_map_pipeline_dlq.log
        max_retries: 1

pipelines.yaml - dataprepper2

entry-pipeline:
  source:
    otel_trace_source:
      ssl: false
  buffer:
    bounded_blocking:
      buffer_size: 51200
      batch_size: 200
  processor:
    - trace_peer_forwarder:
  sink:
    - pipeline:
        name: "raw-pipeline"
    - pipeline:
        name: "service-map-pipeline"

raw-pipeline:
  workers: 128
  source:
    pipeline:
      name: "entry-pipeline"
  buffer:
    bounded_blocking:
      buffer_size: 102400
      batch_size: 100
  processor:
    - otel_traces:
  sink:
    - opensearch:
        hosts:
          - "https://coordinator001:9200"
          - "https://coordinator002:9200"
        insecure: true
        username: "opentelemetry"
        password: "secret"
        index_type: trace-analytics-raw
        dlq_file: /data/dlq/raw_pipeline_dlq.log
        max_retries: 1

service-map-pipeline:
  workers: 128
  source:
    pipeline:
      name: "entry-pipeline"
  buffer:
    bounded_blocking:
      buffer_size: 102400
      batch_size: 100
  processor:
    - service_map:
  sink:
    - opensearch:
        hosts:
          - "https://coordinator001:9200"
          - "https://coordinator002:9200"
        insecure: true
        username: "opentelemetry"
        password: "secret"
        index_type: trace-analytics-service-map
        dlq_file: /data/dlq/service_map_pipeline_dlq.log
        max_retries: 1

Relevant Logs or Screenshots:
In dataprepper1 node log I see the following:

2024-10-11T10:58:54,249 [entry-pipeline-processor-worker-1-thread-1] WARN org.opensearch.dataprepper.peerforwarder.RemotePeerForwarder - Failed to add 642 records to the batching queue, processing locally.

dataprepper2 has no warnings or errors in logs.

Any suggestions on what I should change in my configs to load both nodes?