SparkConnector连接问题

  • nebula 版本:3.2.0
  • 部署方式:分布式
  • 安装方式:源码编译
  • 是否为线上版本:Y
    sparkconnector代码提交集群运行,报如下错误:
23/02/07 16:51:37 ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: com.vesoft.nebula.connector.NebulaDataSource is not a valid Spark SQL Data Source.
org.apache.spark.sql.AnalysisException: com.vesoft.nebula.connector.NebulaDataSource is not a valid Spark SQL Data Source.
	at org.apache.spark.sql.errors.QueryCompilationErrors$.invalidDataSourceError(QueryCompilationErrors.scala:1011)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:424)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:336)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:249)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:249)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:177)
	at com.vesoft.nebula.connector.connector.package$NebulaDataFrameReader.loadVerticesToDF(package.scala:135)
	at com.vivo.lineage.nebula.connector.testtest$.readVertex(testtest.scala:52)
	at com.vivo.lineage.nebula.connector.testtest$.main(testtest.scala:26)
	at com.vivo.lineage.nebula.connector.testtest.main(testtest.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:744)
23/02/07 16:51:38 WARN SparkContext: Successfully stopped SparkContext

代码如下:

package com.vivo.lineage.nebula.connector
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.ssl.SSLSignType
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory


object testtest {
  private val LOG = LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf
    sparkConf
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()

    readVertex(spark)

    spark.close()
    sys.exit()
  }

  def readVertex(spark: SparkSession): Unit = {
    LOG.info("start to read nebula vertices")
    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress(Constant.NEBULA_META_ADDRESS)
        .withConenctionRetry(2)
        .build()
    val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("`lineate_test`")
      .withLabel("`User`")
      .withNoColumn(false)
      .withReturnCols(List("user_vid", "user_name", "position"))
      .withLimit(10)
      .withPartitionNum(10)
      .build()
    val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
    vertex.printSchema()
    vertex.show(20)
    println("vertex count: " + vertex.count())
  }
}

依赖如下:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.2.0</version>
        </dependency>

        <dependency>
            <groupId>com.vesoft</groupId>
            <artifactId>nebula-spark-connector</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.2.0</version>
        </dependency>

这俩不要再加反引号了

去掉了反引号还是一行的报错呢

你试试在本地iea中运行正常吗,如果本地正常 那就是打包没把依赖打进包里

在本地也报这个错时什么原因呢
报错:

Exception in thread "main" org.apache.spark.sql.AnalysisException: com.vesoft.nebula.connector.NebulaDataSource is not a valid Spark SQL Data Source.
	at org.apache.spark.sql.errors.QueryCompilationErrors$.invalidDataSourceError(QueryCompilationErrors.scala:1011)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:424)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:274)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:245)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:245)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
	at com.vesoft.nebula.connector.connector.package$NebulaDataFrameReader.loadVerticesToDF(package.scala:135)
	at com.vivo.lineage.nebula.connector.testtest$.readVertex(testtest.scala:47)
	at com.vivo.lineage.nebula.connector.testtest$.main(testtest.scala:24)
	at com.vivo.lineage.nebula.connector.testtest.main(testtest.scala)

我知道了,你的spark是3.2.0的, 请看下文档或者github readme中的对spark版本的要求

我这边平台现有的spark版本只有三个:
image
我看到readme这边只支持2.2和2.4


也就是说除非部署一个新的spark版本,否则没办法用spark connector是吗?

你想用支持spark3的 ,可以用这个pr 自己打包 support spark3.x for connector by Nicole00 · Pull Request #71 · vesoft-inc/nebula-spark-connector · GitHub

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。