Versions (relevant - OpenSearch/Dashboard/Server OS/Browser):
v 2.12.0
Describe the issue:
I’m using aiven kafka connector to opensearch (version 2.12.0 ), and using data stream to ingest data to different indeices, it works well however once in a while kakfa connector tasks fails with error: “security_exception”, when I stop the connector and resume it, it starts working untill receiving the same error after some days. this error seems to only affect indices with millions of records (high velocity); indices with fewer records don’t encounter this exception.
Configuration:
my cluster has 5 nodes, 1 master, 1 master-eligible & data and 3 data nodes running on AWS EC2 machines, using https with AWS ALB with public certificate which is then forwarded to target port 9200 with self-signed certifcate.
Relevant Logs or Screenshots:
{ "name": "iot-node-data-sink", "connector": { "state": "RUNNING", "worker_id": "ip-172-xx-xx-xx.eu-central-1.compute.internal:8083" }, "tasks": [ { "id": 0, "state": "FAILED", "worker_id": "ip-172-xx-xx-xx.eu-central-1.compute.internal:8083", "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Non-repeatable exception trown by bulk processing\n\tat io.aiven.kafka.connect.opensearch.RetryUtil.callWithRetry(RetryUtil.java:127)\n\tat io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.execute(BulkProcessor.java:398)\n\tat io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.call(BulkProcessor.java:370)\n\tat io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.call(BulkProcessor.java:351)\n\t... 4 more\nCaused by: OpenSearchStatusException[OpenSearch exception [type=security_exception, reason=Unexpected exception indices:data/write/bulk]]\n\tat org.opensearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:209)\n\tat org.opensearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2235)\n\tat org.opensearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2212)\n\tat org.opensearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1931)\n\tat org.opensearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1884)\n\tat org.opensearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1852)\n\tat org.opensearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:371)\n\tat io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.lambda$execute$0(BulkProcessor.java:401)\n\tat io.aiven.kafka.connect.opensearch.RetryUtil.callWithRetry(RetryUtil.java:119)\n\t... 7 more\n\tSuppressed: org.opensearch.client.ResponseException: method [POST], host [https://opensearch-nodes.xxxxxx.de], URI [/_bulk?timeout=1m], status line [HTTP/1.1 500 Internal Server Error]\n{\"error\":{\"root_cause\":[{\"type\":\"security_exception\",\"reason\":\"Unexpected exception indices:data/write/bulk\"}],\"type\":\"security_exception\",\"reason\":\"Unexpected exception indices:data/write/bulk\"},\"status\":500}\n\t\tat org.opensearch.client.RestClient.convertResponse(RestClient.java:375)\n\t\tat org.opensearch.client.RestClient.performRequest(RestClient.java:345)\n\t\tat org.opensearch.client.RestClient.performRequest(RestClient.java:320)\n\t\tat org.opensearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1918)\n\t\t... 12 more\n" } ], "type": "sink" }
other task error:
{ "name": "mqtt-opensearch-sink", "connector": { "state": "RUNNING", "worker_id": "ip-172-xx-xx-xx.eu-central-1.compute.internal:8083" }, "tasks": [ { "id": 0, "state": "FAILED", "worker_id": "ip-172-xx-xx-xx.eu-central-1.compute.internal:8083", "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Non-repeatable exception trown by bulk processing\n\tat io.aiven.kafka.connect.opensearch.RetryUtil.callWithRetry(RetryUtil.java:127)\n\tat io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.execute(BulkProcessor.java:398)\n\tat io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.call(BulkProcessor.java:370)\n\tat io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.call(BulkProcessor.java:351)\n\t... 4 more\nCaused by: OpenSearchStatusException[OpenSearch exception [type=security_exception, reason=Unexpected exception indices:data/write/bulk]]\n\tat org.opensearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:209)\n\tat org.opensearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2235)\n\tat org.opensearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2212)\n\tat org.opensearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1931)\n\tat org.opensearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1884)\n\tat org.opensearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1852)\n\tat org.opensearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:371)\n\tat io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.lambda$execute$0(BulkProcessor.java:401)\n\tat io.aiven.kafka.connect.opensearch.RetryUtil.callWithRetry(RetryUtil.java:119)\n\t... 7 more\n\tSuppressed: org.opensearch.client.ResponseException: method [POST], host [https://opensearch-nodes.xxxxxx.de], URI [/_bulk?timeout=1m], status line [HTTP/1.1 500 Internal Server Error]\n{\"error\":{\"root_cause\":[{\"type\":\"security_exception\",\"reason\":\"Unexpected exception indices:data/write/bulk\"}],\"type\":\"security_exception\",\"reason\":\"Unexpected exception indices:data/write/bulk\"},\"status\":500}\n\t\tat org.opensearch.client.RestClient.convertResponse(RestClient.java:375)\n\t\tat org.opensearch.client.RestClient.performRequest(RestClient.java:345)\n\t\tat org.opensearch.client.RestClient.performRequest(RestClient.java:320)\n\t\tat org.opensearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1918)\n\t\t... 12 more\n" } ], "type": "sink" }