Thanks for the detailed setup description. See below a few clarifications and then a full working configuration tested locally.
Quick note: minor version upgrades don’t need this if you are using operator
If you’re doing a minor version bump on an existing cluster managed by the opensearch-k8s-operator, you don’t need Data Prepper at all. Just update the version field in your OpenSearchCluster CR and the operator handles the rolling restart for you:
spec:
general:
version: 2.19.0 # change this to the new version
The suggested approach below is for cases where you genuinely need to migrate data between two separate clusters - different hardware, major version upgrade, cross-region move, or when you want a long validation window before cutting over.
Snapshots: A restored snapshot preserves the original mappings, settings, and shard count exactly as they were. If those mappings have legacy field types, suboptimal analyzers, or a shard count that no longer fits your data volume, this might cause issues later. Reindexing gives you the opportunity to update mappings and settings on the new cluster before any data is written. Also snapshots carry the Lucene segment format from the source cluster, whereas reindexing reads the document source and writes fresh segments, so there is no version ceiling. Therefore reindexing might be a better option, but it does take longer with large datasets.
Kafka as a buffer directly
When Kafka is configured as the buffer inside a pipeline, Data Prepper treats it as a raw byte store. The opensearch_api source detects this and takes a shortcut: instead of parsing the incoming bulk request into individual documents, it dumps the entire raw HTTP body straight into Kafka as a blob of bytes.
I would recommend two pipelines with Kafka between them
You can use Kafka as a sink/source pair rather than a buffer. This gives you Kafka’s durability guarantees without hitting the byte-buffer problem.
App
│
▼
Data Prepper (opensearch_api :9202)
│
├─ add_entries: stamp index + doc ID from metadata into event body
│
▼
Kafka topic: migration-docs ← durable, survives DP restart
│
├─ Kafka source (serde_format: json)
│
├──
blue-cluster (existing)
└──
green-cluster (new)
See below full locally tested configuration
data-prepper-config.yaml
ssl: false
peer_forwarder:
discovery_mode: local_node
pipelines.yaml
# Pipeline 1: receives HTTP bulk writes, stamps routing fields, publishes to Kafka
ingest-to-kafka:
source:
opensearch_api:
port: 9202
processor:
- add_entries:
entries:
- key: "_dp_index"
value_expression: 'getMetadata("opensearch_index")'
- key: "_dp_id"
value_expression: 'getMetadata("opensearch_id")'
sink:
- kafka:
bootstrap_servers:
- "kafka:9092"
encryption:
type: none
topic:
name: "migration-docs"
create_topic: true
number_of_partitions: 1
replication_factor: 1
partition_key: "${/_dp_index}"
serde_format: json
# Pipeline 2: consumes from Kafka, dual-writes to both clusters
kafka-to-opensearch:
source:
kafka:
bootstrap_servers:
- "kafka:9092"
encryption:
type: none
topics:
- name: "migration-docs"
group_id: "migration-consumer"
auto_offset_reset: earliest
serde_format: json # required - without this DP treats messages as plaintext
sink:
- opensearch:
hosts: ["https://blue-cluster:9200"]
username: "admin"
password: "${OS_PASSWORD}"
insecure: true
index: "${/_dp_index}"
document_id: "${/_dp_id}"
- opensearch:
hosts: ["https://green-cluster:9200"]
username: "admin"
password: "${OS_PASSWORD}"
insecure: true
index: "${/_dp_index}"
document_id: "${/_dp_id}"
The add_entries processor is required because event metadata (index name, document ID) does not survive Kafka serialisation. The Kafka sink only serialises record.getData().getJsonNode() — the event body — and discards the metadata map. By stamping _dp_index and _dp_id into the event body before the Kafka sink, those values are preserved through the round-trip and available to the opensearch sinks in pipeline 2.
Two important gotchas:
Point your application at http://data-prepper:9202/_bulk - no index name in the URL path. Standard multi-index bulk format with _index in each action line works exactly as it does against a real OpenSearch cluster. Documents for different indices in the same request are routed independently. The only change your application needs to make is swapping the host from opensearch:9200 to data-prepper:9202. If you do include an index in the URL path (e.g. /products/_bulk), it overrides the per-action _index field and all documents in that request go to that single index - so avoid it unless you specifically want that behaviour.
serde_format: json is required on the source topic config in pipeline 2. Without it the Kafka source uses a plain string deserialiser and treats each message as a raw string, wrapping it under the Kafka partition key rather than expanding the JSON fields into the event. The ${/_dp_index} expression then resolves to an empty string and every write fails silently.
If you decide to go with backfill pipeline, you can use something similar to the following:
backfill-pipelines.yaml
backfill-blue-to-green:
source:
opensearch:
hosts: ["https://blue-cluster:9200"]
username: "admin"
password: "${OS_PASSWORD}"
connection:
insecure: true
indices:
include:
- index_name_regex: ".*" # or lock this down to specific indices
processor:
- add_entries:
entries:
- key: "_backfill_id"
value_expression: 'getMetadata("opensearch-document_id")'
- key: "_backfill_index"
value_expression: 'getMetadata("opensearch-index")'
sink:
- opensearch:
hosts: ["https://green-cluster:9200"]
username: "admin"
password: "${OS_PASSWORD}"
insecure: true
action: create
index: "${getMetadata(\"opensearch-index\")}"
document_id: "${/_backfill_id}"
You can run it with its own isolated config, so it doesn’t interfere with the live pipelines:
docker run --rm \
--network <your-network> \
-v ./backfill-pipelines.yaml:/usr/share/data-prepper/pipelines/pipelines.yaml \
-v ./data-prepper-config.yaml:/usr/share/data-prepper/config/data-prepper-config.yaml \
opensearchproject/data-prepper:2
It exits on its own when the backfill is complete.
Or you can of course use snapshot and restore while the kafka pipeline is running, making sure there is no delta between the 2 clusters once its restored.
Cutover sequence (zero-downtime):
Once the backfill finishes and green’s doc count match blue:
Flip reads: update your Kubernetes Service selector (or Ingress backend) to point at green. Reads land on complete, up-to-date data immediately.
Verify: run a few minutes of traffic against green and confirm latencies, error rates, and doc counts.
Flip writes: remove the blue sink from pipelines.yaml and do a rolling restart of the Data Prepper deployment. Kafka absorbs any in-flight writes during the restart - nothing should be lost.
Decommission blue: once you’re satisfied with green, bring blue down.
The Kubernetes read cutover is a single kubectl patch:
kubectl patch service opensearch \
-p '{"spec":{"selector":{"app":"opensearch-green"}}}'
Hope this helps