Parse_json proccesor

Hey all,

I’m having some trouble using the parse_json processor with my OpenSearch ingestion pipeline on AWS. Here’s the setup: I upload a file to S3, which triggers an SQS queue, and that message gets indexed in OpenSearch. Below is a sample of one line in my JSON file:

{"driver": "max verstappen", "log": "{\"level\":\"info\",\"ts\":1724484885.6000538}"}

My goal is to parse the log field, which seemed straightforward—I thought I could just set

source: log

in the configuration. However, it’s not working as expected. I tried different configurations like the one below:

version: "2"
log-pipeline4-level-up:
  source:
    s3:
      notification_type: "sqs"
      codec:
        newline: null  # This specifies that each line in the S3 object is a separate log entry
      sqs:
        queue_url: "https://sqs.us-east-1.amazonaws.com/1111111111111/jsonlLogsQueue"
      compression: "none"
      aws:
        region: "us-east-1"
        # IAM role that the pipeline assumes to read data from the queue. This role must be the same as the pipeline role.
        sts_role_arn: "arn:aws:iam::11111111111:role/OpenSearchIngestionRole"
  processor:
    - parse_json:
        destination: new
  sink:
    - opensearch:
        hosts: 
          - "https://vpc-opensearch-workshop.us-west-2.es.amazonaws.com"
        index: "my-index"
        aws:
          region: "us-east-1"
          sts_role_arn: "arn:aws:iam::11111111111:role/OpenSearchIngestionRole"

As a result, I managed to get the following document in OpenSearch:


{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "my-index",
        "_id": "1843ppEBN3hgemr_fuL9",
        "_score": 1,
        "_source": {
          "message": """{"driver": "max verstappen", "log": "{\"level\":\"info\",\"ts\":1724484885.6000538}"}""",
          "s3": {
            "bucket": "dank-jsonl-logs",
            "key": "driver.jsonl"
          },
          "new": {
            "driver": "max verstappen",
            "log": """{"level":"info","ts":1724484885.6000538}"""
          }
        }
      }
    ]
  }
}

I want to extract the log value not as a string but as a nested JSON object. Could anyone recommend what I should put in the configuration to achieve this? Thanks in advance!