Writing to Opensearch using Opensearch connector

Versions (relevant - OpenSearch/Dashboard/Server OS/Browser):
2.3

Describe the issue:
Trying to write to AOSS 2.3 from Pyspark / Scala using the following Opensearch connector

However keep receiving the following exceptions:

An error was encountered: An error occurred while calling o278.save. : org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: org.opensearch.spark.sql. Please find packages at [https://spark.apache.org/third-party-projects.html`](https://spark.apache.org/third-party-projects.html`). at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697) at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:863) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:257) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.lang.ClassNotFoundException: org.opensearch.spark.sql.DefaultSource at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633) at scala.util.Failure.orElse(Try.scala:224) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633) … 15 more`

Configuration:

Steps to Reproduce:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json

spark = SparkSession.builder.appName(“OpenSearchExample”).config(“spark.jars”, “home/opensearch-spark-2.13-1.2.0-SNAPSHOT.jar”).getOrCreate()

df = spark.createDataFrame([(1, “value1”), (2, “value2”)], [“id”, “value”])
df.show()
df.write
.format(“org.opensearch.spark.sql”)
.option(“inferSchema”, “true”)
.option(“opensearch.nodes”, “https://xxxxxxx.us-east-1.aoss.amazon.com”)
.option(“opensearch.port”, “9200”)
.option(“opensearch.net.http.auth.user”, “admin”)
.option(“opensearch.net.http.auth.pass”, “admin”)
.option(“opensearch.net.ssl”, “true”)
.option(“opensearch.net.ssl.cert.allow.self.signed”, “true”)
.option(“opensearch.batch.write.retry.count”, “9”)
.option(“opensearch.http.retries”, “9”)
.option(“opensearch.http.timeout”, “18000”)
.mode(“append”)
.save(“test-index”)

Relevant Logs or Screenshots:

@gsharma2907 I’ve got your example working. Just double-check that your Spark and Scala versions match the jar file.
The .config spark.jars didn’t work for me. Instead, I’ve pointed to the jar file in the command line.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json

# Initialize SparkSession with the appropriate configurations
spark = SparkSession.builder.appName("OpenSearchExample").getOrCreate()

# Create a DataFrame
data = [(1, "value1"), (2, "value2")]
df = spark.createDataFrame(data, ["id", "value"])
df.show()

# Write DataFrame to OpenSearch
df.write \
    .format("org.opensearch.spark.sql") \
    .option("inferSchema", "true") \
    .option("opensearch.nodes", "https://docker1.pablo.local") \
    .option("opensearch.port", "9200") \
    .option("opensearch.net.http.auth.user", "admin") \
    .option("opensearch.net.http.auth.pass", "Eliatra123") \
    .option("opensearch.net.ssl", "true") \
    .option("opensearch.net.ssl.cert.allow.self.signed", "true") \
    .option("opensearch.batch.write.retry.count", "9") \
    .option("opensearch.http.retries", "9") \
    .option("opensearch.http.timeout", "18000") \
    .mode("append") \
    .save("test-index")
spark-submit --jars /home/pablo/jars/opensearch-spark-30_2.12-1.2.0.jar test.py
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "test-index",
        "_id" : "avOxKJEB2cQss1fML_lx",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "value" : "value1"
        }
      },
      {
        "_index" : "test-index",
        "_id" : "bPOxKJEB2cQss1fML_l_",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "value" : "value2"
        }
      }
    ]
  }
}

what was your spark or scala version for this run ? @pablo

@gsharma2907

root@docker1:/home/pablo# spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/

Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 18.0.2-ea
Branch HEAD
Compiled by user heartsavior on 2024-02-15T11:24:58Z
Revision fd86f85e181fc2dc0f50a096855acf83a6cc5d9c

@gsharma2907 This is where I got the jar file.

https://repo1.maven.org/maven2/org/opensearch/client/opensearch-spark-30_2.12/1.2.0/

@pablo the repo from maven worked like charm, looks like the one i was building after cloning from github was the problem. Facing some other connection issues but the jar problem is solved. Thank you.

now getting following exceptions, appreciate any pointers to fix. this one.

Caused by: org.opensearch.hadoop.rest.OpenSearchHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]]
at org.opensearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:170)
at org.opensearch.hadoop.rest.RestClient.execute(RestClient.java:443)
at org.opensearch.hadoop.rest.RestClient.execute(RestClient.java:439)
at org.opensearch.hadoop.rest.RestClient.execute(RestClient.java:407)
at org.opensearch.hadoop.rest.RestClient.mainInfo(RestClient.java:720)
at org.opensearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:413)

while i am running opensearch in locally on docker container.

@gsharma2907 I had no issues like that. I’m using local docker on Ubuntu 22.
Your original post contains an AWS link. Did you check your connectivity to the AWS OpenSearch cluster?
Try the below command from the same place were you executed the code.

curl --insecure -u admin:admin https://<OpenSearch_FQDN_or_IP>:9200