Hi, im tyring to use kafka-connect with open distro, I’m using the es-sink-connector.
i get an erorr: java.lang.IllegalStateException: Not a JSON Object: "Unauthorized"
.
I’m using the admin user to access from the connector, but the same error.
i saw on few blogs that there is a problem to connect kafka-connect-es-sink and open-distro, is it true?
thanksss!
I haven’t heard that but from the documentation it seems like it should work - it certainly looks you’re not passing your credentials from the error message.
yes, it bit wired, that is the connector that i’m creating :
curl -X POST http://localhost:8083/connectors -H “Content-Type: application/json” -d ‘{
“name”: “simple-elasticsearch-connector2”,
“config”: {
“connector.class”: “io.confluent.connect.elasticsearch.ElasticsearchSinkConnector”,
“connection.url”: “http://od-master.internal:9200”,
“tasks.max”: “1”,
“topics”: “test”,
“key.ignore”: “true”,
“name”: “simple-elasticsearch-connector2”,
“connection.username”: “admin”,
“connection.password”: “admin”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”,
“schema.ignore”: “true”,
“type.name”: “_doc”
} }’
(my admin user is admin:admin)
“connection.url”: “http://od-master.internal:9200”,
Shouldn’t that be https?
@searchymcsearchface @pablo @idanl I am trying to do same thing, with kafka-connect-opensearch, but I have also enable the SASL_PLAINTEXT authentication, so while creating new connector using kafka connect REST API, it showing authentication failed.
Need to understand what key - value pair I need to pass as a json body for SASL_PLAINTEXT authentication? Currently my JSON Body is -
{
"name": "first-opensearch-connector",
"config": {
"name": "first-opensearch-connector",
"connector.class": "com.dmathieu.kafka.opensearch.OpenSearchSinkConnector",
"type.name": "_doc",
"connection.password": "admin",
"connection.username": "admin",
"connection.url": "http://10.30.1.101:9200",
"topics": "my_topic",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "1",
"key.ignore": "true",
"schema.ignore": "true",
"drop.invalid.message":"true",
"behavior.on.malformed.documents":"fail",
"write.method":"INSERT",
"read.timeout.ms":"10000",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.tolerance": "all",
"transforms":"AddPrefix,TimestampRouter,InsertField",
"transforms.AddPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex":".*",
"transforms.AddPrefix.replacement":"acme_$0",
"transforms.TimestampRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.TimestampRouter.topic.format": "foo-bar-${topic}-${timestamp}",
"transforms.TimestampRouter.timestamp.format": "YYYYMMdd",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "MessageSource",
"transforms.InsertField.static.value": "Kafka Connect framework",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_PLAINTEXT",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\""
}
}
I’m not an expert on this part of the OpenSearch. You might want to cross post it in the security category for more visibility.
Ok, thanks for the direction.