Avro codec - Array of records

Hi,

I am trying to convert some json data from Kafka into parquet and then push to S3. It works fine with simple data types, complex objects and arrays of simple data types. But as soon as I try to define an array of complex objects I get an exception throw. Not sure how to resolve.

Many thanks for any help

Versions (relevant - OpenSearch/Dashboard/Server OS/Browser): Data prepper 2.12.2

Describe the issue: class java.util.LinkedHashMap cannot be cast to class org.apache.avro.generic.IndexedRecord Exception throw

Configuration:

{
    "name": "Example",
    "type": [
        "null",
        {
            "type": "array",
            "items": {
                "type": "record",
                "name": "ExampleChild",
                "fields": [
                    {
                        "name": "Id",
                        "type": "int"
                    }
                ]
            }
        }
    ]
}

Relevant Logs or Screenshots:

@bailey857 have you tried using the following processor in the pipeline:

  processor:
    - write_json:
        source: "line_items"
        target: "line_items_json"

with :

        codec:
          parquet:
            auto_schema: true

I tested this locally with s3 as the sink, see my complete config below:

docker-compose.yml

version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
    networks: [demo]

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    container_name: kafka
    depends_on: [zookeeper]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    ports:
      - "9092:9092"
      - "29092:29092"
    networks: [demo]

  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.0
    container_name: schema-registry
    depends_on: [kafka]
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    ports:
      - "8081:8081"
    networks: [demo]

  # Seeds the topic with Avro messages so the subject orders-avro-value exists
  avro-init:
    image: confluentinc/cp-schema-registry:7.6.0
    container_name: avro-init
    depends_on: [kafka, schema-registry]
    entrypoint: ["/bin/bash","-lc"]
    command: |
      set -e
      kafka-topics --bootstrap-server kafka:9092 --create --topic orders-avro --if-not-exists --replication-factor 1 --partitions 1
      cat > /tmp/order.avsc <<'EOF'
      {
        "type":"record",
        "name":"Order",
        "namespace":"demo",
        "fields":[
          {"name":"order_id","type":"string"},
          {"name":"amount","type":"double"},
          {"name":"customer","type":"string"}
        ]
      }
      EOF
      kafka-avro-console-producer \
        --broker-list kafka:9092 \
        --topic orders-avro \
        --property schema.registry.url=http://schema-registry:8081 \
        --property value.schema.file=/tmp/order.avsc <<'MSG'
      {"order_id":"o-1","amount":42.5,"customer":"alice"}
      {"order_id":"o-2","amount":12.0,"customer":"bob"}
      {"order_id":"o-3","amount":99.9,"customer":"carol"}
      MSG
      echo "Seeded Avro messages to orders-avro."
    restart: "no"
    networks: [demo]

  # Optional UI to inspect Kafka/Schema Registry
  kafka-ui:
    image: provectuslabs/kafka-ui:v0.7.2
    container_name: kafka-ui
    depends_on: [kafka, schema-registry]
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
    ports:
      - "8080:8080"
    networks: [demo]

  # Data Prepper
  data-prepper:
    image: opensearchproject/data-prepper:2.12.0
    platform: linux/amd64    
    container_name: data-prepper
    environment:
      AWS_ACCESS_KEY_ID: "<key>"
      AWS_SECRET_ACCESS_KEY: "secret"
      AWS_REGION: "eu-west-1" 
    depends_on:
      avro-init:
        condition: service_completed_successfully
    volumes:
      - ./pipelines/pipelines.yml:/usr/share/data-prepper/pipelines/pipelines.yaml:ro
      - ./config/data-prepper-config.yaml:/usr/share/data-prepper/data-prepper-config.yaml:ro
    ports:
      - "2021:2021"
    networks: [demo]


networks:
  demo: {}

Pipeline.yaml

json_to_s3_parquet_repro:
  workers: 2
  delay: 0

  source:
    kafka:
      bootstrap_servers: ["kafka:9092"]
      encryption:
        type: NONE
      topics:
        - name: orders-json
          group_id: dp-parquet-repro-v2
          auto_offset_reset: earliest

  processor:
    - write_json:
        source: "line_items"
        target: "line_items_json"

  sink:
    - s3:
        bucket: "<bucket_name>"
        object_key:
          path_prefix: "dp-repros/orders/date=%{yyyy-MM-dd}/hour=%{HH}"
        codec:
          parquet:
            auto_schema: true
        threshold:
          event_count: 200
          event_collect_timeout: 30s
          maximum_size: 10mb
        aws:
          region: eu-west-1
    - stdout:
        codec: json

Using command below to test:

docker compose exec kafka bash -lc '
cat <<EOF | kafka-console-producer --bootstrap-server kafka:9092 --topic orders-json
{"order_id":"o-200","customer":"eve","line_items":[{"sku":"a1","qty":1,"price":10.5},{"sku":"b2","qty":2,"price":5.0}]}
{"order_id":"o-201","customer":"mallory","line_items":[{"sku":"c3","qty":3,"price":1.25}]}
EOF
'