Unable to parse response body for Response - flink connector

OpenSearch:2.12.0 (or any version greater than 1.3.x) - flink 1.18.1

Describe the issue:
I`m trying to use flink-sql-opensearch connector to sink stream data to OpenSearch via Flink …
After submitting the Job to Flink cluster successfully , the job runs normally for 30sec and create the index with data … then it fails with the following message:
org.apache.flink.util.FlinkRuntimeException: Complete bulk has failed… Caused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://172.20.0.6:9200, response=HTTP/1.1 200 OK}
at org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
at org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
at org.opensearch.client.RestClient$1.completed(RestClient.java:396)
at org.opensearch.client.RestClient$1.completed(RestClient.java:390)
at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:182)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
… 1 more
Caused by: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Unknown Source)
at org.opensearch.action.DocWriteResponse.(DocWriteResponse.java:140)
at org.opensearch.action.index.IndexResponse.(IndexResponse.java:67) …

It seems that this error is common but without any solution …
the flink connector despite it was built for OpenSearch 1.3 , but it still working in sending and creating index to OpenSearch 2.12.0 … but this error persists with all OpenSearch versions greater than 1.13 …
Any Advise please ??
Thanks …

Configuration:
Docker-Compose:
Flink 1.18.1 - Java11
OpenSearch 2.12.0
Flink-Sql-Opensearch-connector (flink 1.18.1 → Os 1.3)
Relevant Logs or Screenshots:
org.apache.flink.util.FlinkRuntimeException: Complete bulk has failed… Caused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://172.20.0.6:9200, response=HTTP/1.1 200 OK}
at org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
at org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
at org.opensearch.client.RestClient$1.completed(RestClient.java:396)
at org.opensearch.client.RestClient$1.completed(RestClient.java:390)
at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:182)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
… 1 more
Caused by: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Unknown Source)
at org.opensearch.action.DocWriteResponse.(DocWriteResponse.java:140)
at org.opensearch.action.index.IndexResponse.(IndexResponse.java:67) …

@wael this is unexpected, could you please create an issue here [1], the issue is caused by _type property that has been removed in 2.x Thank you.

https://issues.apache.org/jira/projects/FLINK/issues

1 Like

flink-cdc to OpenSearch (flink-sql-connector-postgres-cdc-3.0.1.jar, flink-sql-connector-opensearch-2.0-SNAPSHOT.jar)

Caused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://xx.xx.xx.xx:9200, response=HTTP/1.1 200 OK}
at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$1.completed(RestClient.java:396)
at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$1.completed(RestClient.java:390)
at org.apache.flink.opensearch.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)

1. Analyzing the Issue

  • The error indicates an inability to parse the response body while using the OpenSearch connector.

  • Upon inspecting the project, it’s found that the OpenSearch client version is 1.3.0, but the documentation suggests using version 2.x or 3.x, requiring JDK 11 or higher.
    –Analyzing details:
    Use idea to open the pom.xml under flink-connector-opensearch and open it as a project.
    Searching for “org.opensearch.client” (in project) found that the version of “org.opensearch.client” in the sub-project flink-connector-opensearch/pom.xml is 1.30 (${opensearch.version}):

    org.opensearch.client
    opensearch-rest-high-level-client
    ${opensearch.version}
    Discover:

    <opensearch.version>1.3.0</opensearch.version>

    In docs\content\docs\connectors\datastream\opensearch.md it is stated:
    By default, Apache Flink Opensearch Connector uses 1.3.x client libraries. You could switch to use 2.x (or upcoming 3.x) clients noting that those require JDK-11 or above, for example.
    So modify:

    <opensearch.version>2.5.0</opensearch.version>

  • Attempt to upgrade the OpenSearch client version to 2.5.0 and modify the corresponding code to adapt to the new client version.

  • During Maven build, formatting issues are detected, necessitating the use of the Spotless plugin.

  • Some test cases fail to compile during testing, requiring modifications to the test code to address these issues.

2. Clone the Project Code

git clone https://github.com/apache/flink-connector-opensearch.git

3. Modify pom.xml

In flink-connector-opensearch/flink-connector-opensearch/pom.xml, update the OpenSearch client version to 2.5.0:

<properties>
    <opensearch.version>2.5.0</opensearch.version>
</properties>

4. Modify Test Code

e.g.: OpensearchSinkTest.java

// Modify the process method in OpensearchSinkTest.java
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
    Map<String, Object> json = new HashMap<>();
    json.put("data", element);

    indexer.add(
            Requests.indexRequest().index("index").opType("type").id("id").source(json));
}

And more …

e.g.: DefaultBulkResponseInspectorTest.java


// Modify the testThrowsChainedFailure method in 
@Test
void testThrowsChainedFailure() {
    final IOException failureCause0 = new IOException("A");
    final IOException failureCause1 = new IOException("B");
    final DefaultBulkResponseInspector inspector = new DefaultBulkResponseInspector();
    Assertions.assertThatExceptionOfType(FlinkRuntimeException.class)
            .isThrownBy(
                    () -> {
                        final BulkRequest request = new BulkRequest();
                        request.add(
                                new IndexRequest(), new DeleteRequest(), new DeleteRequest());

                        inspector.inspect(
                                request,
                                new BulkResponse(
                                        new BulkItemResponse[] {
                                            new BulkItemResponse(
                                                    0, OpType.CREATE, (DocWriteResponse) null),
                                            new BulkItemResponse(
                                                    1,
                                                    OpType.DELETE,
                                                    new Failure(
                                                            "index",
                                                            // "type",
                                                            "id",
                                                            failureCause0)),

And more …

5. Compile

mvn spotless:apply
mvn clean package -DskipTests -U

NOTE: To resolve formatting violations by “mvn spotless:apply”.

Following these steps should resolve the issues encountered with the OpenSearch connector and successfully compile the project.

Dear Kenny ,
Please accept my sincere appreciation and thank you so much for all of your brilliant effort and support … showing steps was very helpful as well …

I followed exactly the steps that you showed to me , but after Modify the testThrowsChainedFailure method by commenting “type” … // “type” … as shown in your explanation … I got an error that:
“Cannot resolve constructor ‘Failure(String, String, IOException)’”

Candidates for new Failure() are:
Failure(String index, String type, String id, Exception cause)
Failure(String index, String type, String id, Exception cause, boolean aborted)
Failure(String index, String type, String id, Exception cause, RestStatus status)
Failure(String index, String type, String id, Exception cause, long seqNo, long term)
Failure(String index, String type, String id, Exception cause, RestStatus status, long seqNo, long term, …)
Failure(StreamInput in)


as you can see the “Failure” method must accept “type” as a parameter …

Would you please kindly explain what else can i do in order to apply the modifications of testThrowsChainedFailure in DefaultBulkResponseInspectorTest.java ( by commenting // “type”) as you have kindly explained.

all the best regards
wael

Hi Wael,
Please be noted that this is a “TEST” class, so just to adjust it to meet the library needs.
In your case you don’t have to comment out this line: “type”,

Dear Kenny,
Thank you for the clarification…I proceeded as per your clarifications…
Now it works fine …
Thanks a million…

all the best regards
wael