- nebula 版本:3.2.0
- 部署方式:分布式
- 安装方式:源码编译
- 是否为线上版本:Y
sparkconnector代码提交集群运行,报如下错误:
23/02/07 16:51:37 ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: com.vesoft.nebula.connector.NebulaDataSource is not a valid Spark SQL Data Source.
org.apache.spark.sql.AnalysisException: com.vesoft.nebula.connector.NebulaDataSource is not a valid Spark SQL Data Source.
at org.apache.spark.sql.errors.QueryCompilationErrors$.invalidDataSourceError(QueryCompilationErrors.scala:1011)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:424)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:336)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:249)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:249)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:177)
at com.vesoft.nebula.connector.connector.package$NebulaDataFrameReader.loadVerticesToDF(package.scala:135)
at com.vivo.lineage.nebula.connector.testtest$.readVertex(testtest.scala:52)
at com.vivo.lineage.nebula.connector.testtest$.main(testtest.scala:26)
at com.vivo.lineage.nebula.connector.testtest.main(testtest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:744)
23/02/07 16:51:38 WARN SparkContext: Successfully stopped SparkContext
代码如下:
package com.vivo.lineage.nebula.connector
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.ssl.SSLSignType
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object testtest {
private val LOG = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf
sparkConf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
readVertex(spark)
spark.close()
sys.exit()
}
def readVertex(spark: SparkSession): Unit = {
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress(Constant.NEBULA_META_ADDRESS)
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("`lineate_test`")
.withLabel("`User`")
.withNoColumn(false)
.withReturnCols(List("user_vid", "user_name", "position"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
vertex.printSchema()
vertex.show(20)
println("vertex count: " + vertex.count())
}
}
依赖如下:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>nebula-spark-connector</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.2.0</version>
</dependency>