@bailey857 have you tried using the following processor in the pipeline:
processor:
- write_json:
source: "line_items"
target: "line_items_json"
with :
codec:
parquet:
auto_schema: true
I tested this locally with s3 as the sink, see my complete config below:
docker-compose.yml
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
networks: [demo]
kafka:
image: confluentinc/cp-kafka:7.6.0
container_name: kafka
depends_on: [zookeeper]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
ports:
- "9092:9092"
- "29092:29092"
networks: [demo]
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
container_name: schema-registry
depends_on: [kafka]
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
ports:
- "8081:8081"
networks: [demo]
# Seeds the topic with Avro messages so the subject orders-avro-value exists
avro-init:
image: confluentinc/cp-schema-registry:7.6.0
container_name: avro-init
depends_on: [kafka, schema-registry]
entrypoint: ["/bin/bash","-lc"]
command: |
set -e
kafka-topics --bootstrap-server kafka:9092 --create --topic orders-avro --if-not-exists --replication-factor 1 --partitions 1
cat > /tmp/order.avsc <<'EOF'
{
"type":"record",
"name":"Order",
"namespace":"demo",
"fields":[
{"name":"order_id","type":"string"},
{"name":"amount","type":"double"},
{"name":"customer","type":"string"}
]
}
EOF
kafka-avro-console-producer \
--broker-list kafka:9092 \
--topic orders-avro \
--property schema.registry.url=http://schema-registry:8081 \
--property value.schema.file=/tmp/order.avsc <<'MSG'
{"order_id":"o-1","amount":42.5,"customer":"alice"}
{"order_id":"o-2","amount":12.0,"customer":"bob"}
{"order_id":"o-3","amount":99.9,"customer":"carol"}
MSG
echo "Seeded Avro messages to orders-avro."
restart: "no"
networks: [demo]
# Optional UI to inspect Kafka/Schema Registry
kafka-ui:
image: provectuslabs/kafka-ui:v0.7.2
container_name: kafka-ui
depends_on: [kafka, schema-registry]
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
ports:
- "8080:8080"
networks: [demo]
# Data Prepper
data-prepper:
image: opensearchproject/data-prepper:2.12.0
platform: linux/amd64
container_name: data-prepper
environment:
AWS_ACCESS_KEY_ID: "<key>"
AWS_SECRET_ACCESS_KEY: "secret"
AWS_REGION: "eu-west-1"
depends_on:
avro-init:
condition: service_completed_successfully
volumes:
- ./pipelines/pipelines.yml:/usr/share/data-prepper/pipelines/pipelines.yaml:ro
- ./config/data-prepper-config.yaml:/usr/share/data-prepper/data-prepper-config.yaml:ro
ports:
- "2021:2021"
networks: [demo]
networks:
demo: {}
Pipeline.yaml
json_to_s3_parquet_repro:
workers: 2
delay: 0
source:
kafka:
bootstrap_servers: ["kafka:9092"]
encryption:
type: NONE
topics:
- name: orders-json
group_id: dp-parquet-repro-v2
auto_offset_reset: earliest
processor:
- write_json:
source: "line_items"
target: "line_items_json"
sink:
- s3:
bucket: "<bucket_name>"
object_key:
path_prefix: "dp-repros/orders/date=%{yyyy-MM-dd}/hour=%{HH}"
codec:
parquet:
auto_schema: true
threshold:
event_count: 200
event_collect_timeout: 30s
maximum_size: 10mb
aws:
region: eu-west-1
- stdout:
codec: json
Using command below to test:
docker compose exec kafka bash -lc '
cat <<EOF | kafka-console-producer --bootstrap-server kafka:9092 --topic orders-json
{"order_id":"o-200","customer":"eve","line_items":[{"sku":"a1","qty":1,"price":10.5},{"sku":"b2","qty":2,"price":5.0}]}
{"order_id":"o-201","customer":"mallory","line_items":[{"sku":"c3","qty":3,"price":1.25}]}
EOF
'