Any on-premise solution of sink connector for openseach

Versions (relevant - OpenSearch/Dashboard/Server OS/Browser):
Version 2.2.0

Describe the issue:
My current Opensearch version is 2.2.0 and need a way to push data from kafka topics to opensearch.
I was using elastic sink connector before but now it seems that after version 2.x.x, opensearch removed the mapping of _type, causing the sink connector to report error as below:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed: {“root_cause”:[{“type”:“illegal_argument_exception”,“reason”:“Action/metadata line [1] contains an unknown parameter [_type]”}],“type”:“illegal_argument_exception”,“reason”:“Action/metadata line [1] contains an unknown parameter [_type]”}\n\tat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.handleMalformedDoc(BulkProcessor.java:479)\n\tat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:433)\n\tat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:389)\n\tat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:375)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n\tat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkProcessorThread.run(BulkProcessor.java:370)\n

After searching around, there seems no open source version of sink connector. Even GitHub - Aiven-Open/opensearch-connector-for-apache-kafka: Aiven's OpenSearch® Connector for Apache Kafka® is not really free seems like. Has anybody run into the same situation? Opensearch is being used worldwide, i am pretty sure there might be similar use cases. Any recommendation is welcome.

1 Like

Hi @jasmine ,

The GitHub - aiven/opensearch-connector-for-apache-kafka: Aiven's OpenSearch® Connector for Apache Kafka® is fully open-source ASFv2 licensed OpenSearch connector for Apache Kafka, compatible with OpenSearch 1.x and 2.x.

Best Regards,
Andriy Redko

Hey @reta ,

Based on the doc, here is what i found:

To setup an OpenSearch sink connector, you need an Aiven for Apache Kafka service with Kafka Connect enabled or a dedicated Aiven for Apache Kafka Connect cluster.

Aiven for Apache Kafka service with kafka connector enabled needs business or above plan, which is not free? and I need a on-house solution instead of managing the cluster on Aiven console.

Thanks,
Jasmine

1 Like

Hey @jasmine ,

The connector itself does not have any dependency on Aiven or any other managed services provider, as far as you have Kafka and OpenSearch instances, you should be good to go. The documentation you are referring to is specific to Aiven’s offering of the connectors for Apache Kafka, where OpenSearch is just one of many. Thanks.

Best Regards,
Andriy Redko

Hi @reta , I have tried the connector in question and it fails with OpenSearch. When you write “you should be good to go”, do you have any github examples/tests that you can share?

Hi @erick.audet_2022 , the repository contains a number of tests [1], if you don’t mind please open an issue on the repo with more details what is failing, thank you.

[1] opensearch-connector-for-apache-kafka/src/integration-test/java/io/aiven/kafka/connect/opensearch at main · aiven/opensearch-connector-for-apache-kafka · GitHub

Best Regards,
Andriy Redko

1 Like

@reta The tests ran without any errors. I have installed and configured the plugin in Kafka connect without an issue (perhaps a bit more documentation would be beneficial). I do have a serialization issue.

@erick.audet_2022 awesome, thank you for update, please open the issues on the project’s Github page for documentation improvements, that would be much appreciated. Thank you.

@reta i am trying with the connector but i don’t see any SSL options other than username and password, does this connector supports following configuration?

"elastic.security.protocol": "SSL",
 "elastic.https.ssl.keystore.location": "xxx",
 "elastic.https.ssl.keystore.password": "xxx",
 "elastic.https.ssl.key.password": "xxx",
 "elastic.https.ssl.keystore.type": "JKS",
 "elastic.https.ssl.truststore.location": "xxx",
 "elastic.https.ssl.truststore.password": "xxx",
 "elastic.https.ssl.truststore.type": "JKS",
 "elastic.https.ssl.protocol": "TLS"

@jasmine it seems like not: Options for SSL · Issue #106 · aiven/opensearch-connector-for-apache-kafka · GitHub