Data Prepper: Alternative of OpenSearch Migration Assistant for Upgrade

Dear All, Need your suggestion on a solution I am designing for a large OpenSearch store upgrade (Potentially many TBs), involving Data Prepper.

Versions (relevant - OpenSearch/Dashboard/Server OS/Browser): Upgrade between OpenSearch minor versions, in the same major version branch. Also to the immediate next major version (data transfer supported by regular snapshot restore).

Describe the issue: OpenSearch Migration assistant (OMA) is a nice tool for migration, however there are following issues:

  1. For a regular OpenSearch Upgrade (not migration, and considering rollback implications), this is a complex tooling for integration in some existing product.
  2. OMA is not yet fully productized on bare metal K8s, the same way it is available for AWS.
  3. In case the solution looks to avoid dependence on AWS infra.

Configuration:

This strategy uses a blue-green approach, similar to what OMA does.

  • Use data prepper pipelines with 2 sinks for traffic replication.
  • Use normal snapshot restore for bringing old data into the second cluster.
  • DP internal Kafka buffer helps in scaling and for avoiding data loss.

Eventually when the Green copy comes in sync with the Blue copy, we cutover to the green one.

Relevant Logs or Screenshots: NA

Any thoughts on this? Attn. @dlv

Thanks for the detailed setup description. See below a few clarifications and then a full working configuration tested locally.

Quick note: minor version upgrades don’t need this if you are using operator

If you’re doing a minor version bump on an existing cluster managed by the opensearch-k8s-operator, you don’t need Data Prepper at all. Just update the version field in your OpenSearchCluster CR and the operator handles the rolling restart for you:

spec:
  general:
    version: 2.19.0   # change this to the new version

The suggested approach below is for cases where you genuinely need to migrate data between two separate clusters - different hardware, major version upgrade, cross-region move, or when you want a long validation window before cutting over.

Snapshots: A restored snapshot preserves the original mappings, settings, and shard count exactly as they were. If those mappings have legacy field types, suboptimal analyzers, or a shard count that no longer fits your data volume, this might cause issues later. Reindexing gives you the opportunity to update mappings and settings on the new cluster before any data is written. Also snapshots carry the Lucene segment format from the source cluster, whereas reindexing reads the document source and writes fresh segments, so there is no version ceiling. Therefore reindexing might be a better option, but it does take longer with large datasets.

Kafka as a buffer directly

When Kafka is configured as the buffer inside a pipeline, Data Prepper treats it as a raw byte store. The opensearch_api source detects this and takes a shortcut: instead of parsing the incoming bulk request into individual documents, it dumps the entire raw HTTP body straight into Kafka as a blob of bytes.

I would recommend two pipelines with Kafka between them

You can use Kafka as a sink/source pair rather than a buffer. This gives you Kafka’s durability guarantees without hitting the byte-buffer problem.

App


Data Prepper (opensearch_api :9202)

├─ add_entries: stamp index + doc ID from metadata into event body


Kafka topic: migration-docs ← durable, survives DP restart

├─ Kafka source (serde_format: json)

├──:play_button: blue-cluster (existing)
└──:play_button: green-cluster (new)

See below full locally tested configuration
data-prepper-config.yaml

ssl: false
peer_forwarder:
  discovery_mode: local_node
pipelines.yaml


# Pipeline 1: receives HTTP bulk writes, stamps routing fields, publishes to Kafka
ingest-to-kafka:
  source:
    opensearch_api:
      port: 9202
  processor:
    - add_entries:
        entries:
          - key: "_dp_index"
            value_expression: 'getMetadata("opensearch_index")'
          - key: "_dp_id"
            value_expression: 'getMetadata("opensearch_id")'
  sink:
    - kafka:
        bootstrap_servers:
          - "kafka:9092"
        encryption:
          type: none
        topic:
          name: "migration-docs"
          create_topic: true
          number_of_partitions: 1
          replication_factor: 1
        partition_key: "${/_dp_index}"
        serde_format: json

# Pipeline 2: consumes from Kafka, dual-writes to both clusters
kafka-to-opensearch:
  source:
    kafka:
      bootstrap_servers:
        - "kafka:9092"
      encryption:
        type: none
      topics:
        - name: "migration-docs"
          group_id: "migration-consumer"
          auto_offset_reset: earliest
          serde_format: json      # required - without this DP treats messages as plaintext
  sink:
    - opensearch:
        hosts: ["https://blue-cluster:9200"]
        username: "admin"
        password: "${OS_PASSWORD}"
        insecure: true
        index: "${/_dp_index}"
        document_id: "${/_dp_id}"
    - opensearch:
        hosts: ["https://green-cluster:9200"]
        username: "admin"
        password: "${OS_PASSWORD}"
        insecure: true
        index: "${/_dp_index}"
        document_id: "${/_dp_id}"

The add_entries processor is required because event metadata (index name, document ID) does not survive Kafka serialisation. The Kafka sink only serialises record.getData().getJsonNode() — the event body — and discards the metadata map. By stamping _dp_index and _dp_id into the event body before the Kafka sink, those values are preserved through the round-trip and available to the opensearch sinks in pipeline 2.

Two important gotchas:

Point your application at http://data-prepper:9202/_bulk - no index name in the URL path. Standard multi-index bulk format with _index in each action line works exactly as it does against a real OpenSearch cluster. Documents for different indices in the same request are routed independently. The only change your application needs to make is swapping the host from opensearch:9200 to data-prepper:9202. If you do include an index in the URL path (e.g. /products/_bulk), it overrides the per-action _index field and all documents in that request go to that single index - so avoid it unless you specifically want that behaviour.

serde_format: json is required on the source topic config in pipeline 2. Without it the Kafka source uses a plain string deserialiser and treats each message as a raw string, wrapping it under the Kafka partition key rather than expanding the JSON fields into the event. The ${/_dp_index} expression then resolves to an empty string and every write fails silently.

If you decide to go with backfill pipeline, you can use something similar to the following:

backfill-pipelines.yaml

backfill-blue-to-green:
  source:
    opensearch:
      hosts: ["https://blue-cluster:9200"]
      username: "admin"
      password: "${OS_PASSWORD}"
      connection:
        insecure: true
      indices:
        include:
          - index_name_regex: ".*"    # or lock this down to specific indices
  processor:
    - add_entries:
        entries:
          - key: "_backfill_id"
            value_expression: 'getMetadata("opensearch-document_id")'
          - key: "_backfill_index"
            value_expression: 'getMetadata("opensearch-index")'
  sink:
    - opensearch:
        hosts: ["https://green-cluster:9200"]
        username: "admin"
        password: "${OS_PASSWORD}"
        insecure: true
        action: create
        index: "${getMetadata(\"opensearch-index\")}"
        document_id: "${/_backfill_id}"

You can run it with its own isolated config, so it doesn’t interfere with the live pipelines:

docker run --rm \
  --network <your-network> \
  -v ./backfill-pipelines.yaml:/usr/share/data-prepper/pipelines/pipelines.yaml \
  -v ./data-prepper-config.yaml:/usr/share/data-prepper/config/data-prepper-config.yaml \
  opensearchproject/data-prepper:2

It exits on its own when the backfill is complete.

Or you can of course use snapshot and restore while the kafka pipeline is running, making sure there is no delta between the 2 clusters once its restored.

Cutover sequence (zero-downtime):

Once the backfill finishes and green’s doc count match blue:

Flip reads: update your Kubernetes Service selector (or Ingress backend) to point at green. Reads land on complete, up-to-date data immediately.
Verify: run a few minutes of traffic against green and confirm latencies, error rates, and doc counts.
Flip writes: remove the blue sink from pipelines.yaml and do a rolling restart of the Data Prepper deployment. Kafka absorbs any in-flight writes during the restart - nothing should be lost.
Decommission blue: once you’re satisfied with green, bring blue down.

The Kubernetes read cutover is a single kubectl patch:

kubectl patch service opensearch \
  -p '{"spec":{"selector":{"app":"opensearch-green"}}}'

Hope this helps

@Anthony thanks for the reply.

Yes operator based upgrade will work in happy scenario.

However the main issue I am dealing with is rollback scenario. with 0 data loss and minimal service downtime.

The normal rolling upgrade might not suffice if rollback is required.

I am also looking at your complete solution and will share my observation shortly.

Hi Anthony,

Thanks for sharing the elaborate solution. It gives me confidence that data prepper is already being used for migration purpose in the community. However there are a few differences in our UC compared to yours:

  1. It looks like, your UC is more about migration (as commented by you as well), because you do reindex backfill, not snapshot restore. We are targeting a huge (up to a few hundreds TBs) data store so performance is critical.
  2. We restrict our upgrade use case only to the upgrade paths where snapshot restore is supported (i.e. inside a major and between 2 consecutives majors)
  3. About not sending index in the URL: that is not possible in our case since our system already uses index name in the _bulk API, which we cannot change.
  4. Due to this, the complexity about index name not being available doesn’t arise in our case.
  5. I am not sure about the byte-buffer problem. It is it related to DP not being able to read from kafka buffer? This was a bug for which a few days ago I submitted a fix (DP issue #6876) to solve that (for opensearch_api source, not for opensearch). Would you still need the same solution design (2 DP pipelines, Kafka in between) after this fix? Edit: I think your solution probably was pull based, so you used opensearch source?
  6. We are able to use Kafka buffer after this fix, which provides the expected advantages like scalability and HA.

What are your view about my solution?

Thanks for the detail and submitting #6876. That’s actually the exact bug I hit while building out the test environment.

On your specific points:

Snapshot/restore for bulk transfer + DP for the write-gap. At a few hundred TBs, I think this is the right call, and it’s the same hybrid pattern I mentioned on above. Reindexing that volume is slow and CPU-heavy on both ends, snapshot/restore is a file-level copy and scales far better. DP dual-write only has to carry the delta during the restore window, regardless of how large the base dataset is.

Index names in _bulk can’t be modified. I don’t think this is actually something you need to design around. The opensearch_api source resolves the index per-request from the URL path if present (/<index>/_bulk), falling back to the per-action _index in the body if the URL has none. So whatever index your clients already target, DP routes dynamically from that, and you don’t need to restrict to one index at a time or change how clients call _bulk. That resolves the “one index at a time” worry from earlier in this thread.

Kafka as a true buffer (post-#6879). Once that merges, your architecture is simpler than what I tested: one pipeline, opensearch_api source with a Kafka buffer, dual opensearch sinks for blue and green. You get Kafka’s durability, backpressure, and HA without the two-pipeline workaround.

Once the fix lands, I would still recommend to load-test the opensearch_api source’s HTTP ingestion throughput against your peak bulk rate. It becomes your single ingestion chokepoint feeding Kafka, versus today where writes go straight to the cluster.