Versions (relevant - OpenSearch/Dashboard/Server OS/Browser):
Data Prepper 2.8.0, Opensearch 3.x.x
Describe the issue:
I am looking for a OpenSearch data-prepper pipeline source yaml config for s3 compatible object storage (IBM Cloud COS Bucket). Gemini and Copilot do some suggestions but the yaml is not correct. This example s3 source - OpenSearch Documentation is definitiveli missing how to specify the endpoint.
I’d like to know if someone has been able to configure an S3 compatible COS bucket as a source in Data-Prepper
Configuration:
Recommendation from Gemini
# Data Prepper Pipeline Configuration for S3-compatible Cloud Object Storage
#
# Version: 2.x (compatible with Data Prepper 2.x and later)
#
# Key Customization Points:
# 1. S3 Source:
# - aws.s3_endpoint: REQUIRED for S3-compatible services (e.g., IBM COS, MinIO, Ceph S3).
# If using AWS S3 directly, you can omit this as it uses default AWS endpoints.
# - aws.region: REQUIRED for S3-compatible services and AWS S3.
# - bucket: Your bucket name.
# - Credentials: Prefer environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY).
# See comments for direct config (less secure) or IAM roles.
# - scan: Configure interval, prefix, suffix, etc., for batch processing.
# - codec: Choose based on your file format (json, csv, plaintext).
# 2. Processors: Tailor these to transform and enrich your data.
# 3. OpenSearch Sink:
# - hosts: Your OpenSearch cluster endpoint.
# - username/password: Your OpenSearch credentials.
# - insecure_ssl: Set to 'false' and provide TLS certs for production.
# - index: Your target OpenSearch index.
#
version: "2"
pipeline:
# Name of your Data Prepper pipeline
my-s3-compatible-cos-ingestion:
# ----------------------------------------------------------------------
# Source Configuration: S3-compatible Cloud Object Storage
# ----------------------------------------------------------------------
source:
s3:
# AWS SDK configuration. This section is used by Data Prepper's S3 source
# to connect to S3-compatible services.
aws:
# REQUIRED for S3-compatible services.
# Specify the custom S3 endpoint URL for your Cloud Object Storage.
# Examples:
# - IBM Cloud Object Storage: https://s3.us-south.cloud-object-storage.appdomain.cloud
# - MinIO: http://your-minio-host:9000
# - Ceph S3: https://s3.ceph-cluster.yourdomain.com
# If connecting to native AWS S3, you can usually omit this.
s3_endpoint: "https://s3.YOUR_REGION.cloud-object-storage.appdomain.cloud" # <-- Customize this!
# REQUIRED: The region associated with your bucket, even for S3-compatible services.
# This might be an AWS-like region name (e.g., 'us-south', 'eu-gb')
# or a custom identifier depending on your provider.
region: "YOUR_COS_BUCKET_REGION" # <-- Customize this!
# Optional: Specify a custom STS endpoint if your S3-compatible service
# has a separate STS endpoint and you encounter authentication issues.
# Example for IBM COS: https://sts.us-south.cloud-object-storage.appdomain.cloud
# sts_endpoint: "https://sts.YOUR_REGION.cloud-object-storage.appdomain.cloud" # <-- Customize if needed!
# --- Authentication Methods (Choose ONE, Environment variables recommended for prod) ---
# Method 1: Direct Credentials (NOT RECOMMENDED for production)
# access_key_id: "YOUR_ACCESS_KEY_ID" # <-- Customize this!
# secret_access_key: "YOUR_SECRET_ACCESS_KEY" # <-- Customize this!
# Method 2: Environment Variables (RECOMMENDED for production)
# Data Prepper will automatically pick up AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
# from its environment. Set these before running Data Prepper.
# For Kubernetes, use Secrets injected as environment variables.
# No configuration needed here, just ensure the env vars are set.
# Method 3: IAM Role/Service Account (for AWS EKS/EC2, less common for generic S3-compat)
# If Data Prepper is running on an AWS EC2 instance or EKS pod with an
# associated IAM role, it can automatically assume that role.
# No specific config needed here if the role has s3:GetObject, s3:ListBucket.
# The name of your Cloud Object Storage bucket.
bucket: "your-cloud-object-storage-bucket-name" # <-- Customize this!
# --- Ingestion Mode: Scan-based (Recommended for S3-compatible COS) ---
# Data Prepper will periodically list and download new/modified objects.
scan:
# How often Data Prepper should scan the bucket for new objects (e.g., 30 seconds).
interval: "30s"
# Optional: Only process objects with a specific key prefix (folder path).
# key_prefix: "logs/my_application/"
# Optional: Only process objects with a specific key suffix (file extension).
# key_suffix: ".json"
# Optional: Only scan objects created/modified after this time (ISO 8601).
# Useful for initial backfills.
# start_time: "2024-01-01T00:00:00Z"
# Optional: Specify the codec for parsing the file content.
codec:
# For JSON objects (each line is a JSON object, or the whole file is one JSON array/object)
json:
# Set to true to include JSON keys as attributes for better observability
json_keys_for_attributes: true
# For CSV files (uncomment and configure if your data is CSV)
# csv:
# delimiter: ","
# header_destination: "header_field" # Optional: Store headers in a dedicated field
# # column_names: ["col1", "col2", "col3"] # Optional: If no header row
# # skip_header: true # Set to true if the first row is a header
# For plain text log files (uncomment if your data is unstructured text)
# plaintext: {}
# Add other codecs like Avro, Parquet, etc., if needed and supported by Data Prepper.
# Optional: S3 Events Notifications (for near real-time, less common with generic S3-compat)
# This requires your S3-compatible service to send events to an SQS queue
# that Data Prepper can consume. Most non-AWS S3 services do not natively
# support this directly to an AWS SQS queue. You might need an intermediate
# system (e.g., cloud functions reacting to COS events and pushing to Kafka).
# If your COS supports SQS-compatible eventing, configure it here.
# s3_event_notifications:
# sqs:
# queue_url: "https://sqs.YOUR_REGION.amazonaws.com/YOUR_AWS_ACCOUNT_ID/YOUR_SQS_QUEUE_NAME"
# # ... other SQS related configurations (e.g., kms_key_id, visibility_timeout)
# ----------------------------------------------------------------------
# Processors Configuration (Optional but highly recommended)
# ----------------------------------------------------------------------
processors:
# Example 1: Add a field to all events
- add_entries:
message: "Data from S3-compatible storage"
timestamp_received: "${current_time}" # Data Prepper expression for current time
# Example 2: Parse logs with Grok (if your data is unstructured text)
# - grok:
# match:
# message: ["%{COMMONAPACHELOG}", "%{SYSLOGBASE} %{GREEDYDATA:log_message}"]
# timeout_millis: 1000
# Example 3: Date Processor to parse and standardize timestamps
# - date:
# from_keys: ["timestamp_field_in_data"] # Replace with the actual field name from your data
# to_key: "@timestamp" # Standard timestamp field for OpenSearch
# from_formats: ["ISO8601", "MMM dd YYYY HH:mm:ss"] # List possible date formats
# Example 4: Rename fields
# - rename:
# source: "old_field_name"
# target: "new_field_name"
# Example 5: Drop events based on a condition
# - drop:
# when: "/status_code == 404"
# Example 6: Flatten nested JSON objects (useful for some JSON structures)
# - flatten: {}
# Add more processors as needed for your specific data transformation requirements.
# ----------------------------------------------------------------------
# Sink Configuration: OpenSearch Cluster
# ----------------------------------------------------------------------
sink:
opensearch:
# List of your OpenSearch cluster host(s) and port(s).
# For HTTPS, include 'https://'. Default OpenSearch port is 9200.
hosts: ["https://your-opensearch-cluster-host:9200"] # <-- Customize this!
# OpenSearch username and password. Ensure this user has permissions to write to indices.
username: "your-opensearch-username" # <-- Customize this!
password: "your-opensearch-password" # <-- Customize this!
# Index configuration
# Use a dynamic index name (e.g., with date) for time-series data.
index: "my-s3-compatible-data-%{yyyy.MM.dd}" # <-- Customize index name!
# SSL/TLS Configuration:
# WARNING: insecure_ssl: true is for development ONLY.
# For production, set to false and provide proper certificates.
insecure_ssl: true # <-- Change to `false` for production!
# TLS certificate configuration (uncomment for production)
# tls:
# ssl_certificate_file: "/etc/certs/data-prepper-certificate.pem" # Path to client certificate
# ssl_key_file: "/etc/certs/data-prepper-private-key.pem" # Path to client private key
# ssl_ca_cert_file: "/etc/certs/root-ca-certificate.pem" # Path to CA certificate
# Optional: Configure buffer and batching for performance
# batch_size: 1000
# dlq: # Dead-Letter Queue for failed events (highly recommended for production)
# s3:
# bucket: "your-dlq-s3-bucket"
# region: "your-dlq-s3-bucket-region"
# sts_endpoint: "..." # If DLQ bucket is on S3-compatible service
# s3_endpoint: "..." # If DLQ bucket is on S3-compatible service
# access_key_id: "..."
# secret_access_key: "..."
# key_prefix: "data_prepper_dlq/"
Relevant Logs or Screenshots: