Is this the expected behavior?

Hello Everyone,

I have the following in the pipeline yaml file

pipeline1:
  workers: 2
  delay: "5000"
  source:
    http:
      port: 3333
  sink:
    - pipeline:
        name: "pipeline2"
    - pipeline:
        name: "pipeline3"

pipeline2:
  source:
    pipeline:
      name: "pipeline1"
  processor:
    - substitute_string:
        entries:
          - source: "message"
            from: "word"
            to: "wOrD"
  sink:
    - stdout:

pipeline3:
  workers: 1
  source:
    pipeline:
      name: "pipeline1"
  processor:
     - grok:
        match:
          message: ['%{WORD:word1} %{WORD:word2} %{WORD:word3}']
  sink:
    - stdout:

When send the input
{"message": "words wordy woodword"}

I see the following

{"message":"wOrDs wOrDy woodwOrD","word1":"words","word3":"woodword","word2":"wordy"}
{"message":"wOrDs wOrDy woodwOrD","word1":"words","word3":"woodword","word2":"wordy"}

and when I send this input

{"message": "firstword secondword thirdword"}

I see this output

{"message":"firstwOrD secondwOrD thirdwOrD","word1":"firstword","word3":"thirdword"}
{"message":"firstwOrD secondwOrD thirdwOrD", "word1":"firstword","word3":"thirdword","word2":"secondword"}

Is this expected? It looks like both outputs are wrong.

if I use “string coverter” processor instead of “string substitute” processor, I think the results look OK.

After investigation, I think there is a bug in data-prepper-plugins/mutate-string-processors/src/main/java/com/amazon/dataprepper/plugins/processor/mutatestring/AbstractStringProcessor.java. The input records with Events are modifying the events in place, instead of creating new Events. I see that the modified records are returned in ./data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/prepper/StringPrepper.java but not in AbstractStringProcessor.java. After making the change to return modified records, I see that that output is as below (as expected)
{“message”:“firstwOrD secondwOrD thirdwOrD”}
{“message”:“firstword secondword thirdword”, “word1”:“firstword”,“word3”:“thirdword”,“word2”:“secondword”}

@kkondaka ,

Yes, I noticed this same issue recently. The problem is that when we move an Event to different pipelines in the sink they are not duplicated. The solution I think we want to take is to duplicate events when they transfer from one pipeline to another.

@kkondaka , I created an issue on this in GitHub: [BUG] Sending Events to multiple pipelines does not duplicate Events · Issue #1886 · opensearch-project/data-prepper · GitHub

1 Like