Data Prepper extract record field and partition on it

Versions (relevant - OpenSearch/Dashboard/Server OS/Browser):
2.8

Describe the issue:
I’d like to migrate data from OpenSearch to S3 sink and hive partition data on record’s field. Record’s fields (avro definition):

    {
      "type" : "record",
      "namespace" : "org.opensearch.dataprepper.examples",
      "name" : "Data",
      "fields" : [
        { "name" : "created", "type" : {"type" : "string", "logicalType" : "timestamp-micros"}},
        { "name" : "resource", "type": ["null", "string"]},
        { "name" : "response_payload", "type" : ["null", "string"]}
      ]
    }

The partition should happen on created field. For example below record:

{
  "created": "2023-01-05T12:35:24.139Z",
  "resource": "asdfsdafsdfa",
  "response_payload": "asdf"
}

Should go to S3 bucket “test/year=2023/month=1/day=5/”. Payload:

{
  "created": "2024-05-12T13:13:13.000Z",
  "resource": "asdfsdafsdfa",
  "response_payload": "asdf"
}

Should go to S3 bucket “test/year=2024/month=5/day=12/”.

Here is a link to Github issue which was resolved. However, documentation lacks clear guide how to properly extract record’s field and partition data on it.

Configuration:

version: "2"
opensearch-migration-pipeline:
  source:
    opensearch:
      acknowledgments: true
      # Provide an OpenSearch or Elasticsearch cluster endpoint.
      hosts: <host>
      indices:
        include:
          - index_name_regex: <regex>
      aws:
        region: <region>
        sts_role_arn: <role>
        serverless: false
  sink:
    - s3:
        aws:
          region: <region>
          sts_role_arn: <role>
        bucket: <bucket>
        object_key:
          path_prefix: test/year=${date_time_format(/created, "YYYY")}/month=${date_time_format(/created, "MM")}/day=${date_time_format(/created, "dd")}/"
        codec:
          parquet:
            schema: >
{
  "type" : "record",
  "namespace" : "org.opensearch.dataprepper.examples",
  "name" : "Data",
  "fields" : [
    { "name" : "created", "type" : {"type" : "string", "logicalType" : "timestamp-micros"}},
    { "name" : "resource", "type": ["null", "string"]},
    { "name" : "response_payload", "type" : ["null", "string"]}
  ]
}
        threshold:
          maximum_size: 10mb
          event_collect_timeout: PT1M
        compression: snappy

Relevant Logs or Screenshots:
Above configuration fails.

Nevermind. This feature is not yet supported as per this Github Issue ticket.

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.