nebula2.0.1,spark-connector报NullPointerException

nebula 版本:2.0.1
用的nebula-spark-connector 2.0的版本:nebula-spark-utils/README_CN.md at v2.0.0 · vesoft-inc/nebula-spark-utils · GitHub
部署方式(分布式 / 单机 / Docker / DBaaS):单机
是否为线上版本:N
问题的具体描述:使用spark-connector将hive数据导入到nebula2.0.1中,报错。
建立边的语句:

CREATE EDGE APPLYER2CONTACTOR(ID_NO_APPLYER string,ID_NO_CONTACTOR string,CREATED_DATETIME timestamp,LAST_MODIFIED_DATETIME timestamp); 

外部参数:

edge_sql="select id_no_applyer_tag_id,
        id_no_contactor_tag_id,
        ID_NO_APPLYER,
        ID_NO_CONTACTOR,
        CREATED_DATETIME,
        LAST_MODIFIED_DATETIME
from(
        select applyer.id_no as id_no_applyer_tag_id,
                contack.id_no as id_no_contactor_tag_id,
                applyer.id_no ID_NO_APPLYER,
                contack.id_no ID_NO_CONTACTOR,
                unix_timestamp(nvl(contack.created_datetime,to_timestamp('9999-12-31 23:59:99','yyyy-MM-dd HH:mm:ss'))) as CREATED_DATETIME,
                unix_timestamp(nvl(contack.last_modified_datetime,to_timestamp('9999-12-31 23:59:99','yyyy-MM-dd HH:mm:ss'))) as LAST_MODIFIED_DATETIME,
                row_number() over(partition by applyer.id_no,contack.id_no order by contack.last_modified_datetime desc) as rn
        from APPLY_INFO as applyer
        inner join CONTACT as contack
        on applyer.app_no=contack.app_no
        where applyer.ds='${data_date}'
                and contack.ds='${data_date}'
                and applyer.id_no is not null
                and contack.id_no is not null
)t where rn=1"

edge="APPLYER2CONTACTOR"
source_vertex="id_no_applyer_tag_id"
target_vertex="id_no_contactor_tag_id"

spark-connector的代码:

/* Copyright (c) 2020 vesoft inc. All rights reserved.
 *
 * This source code is licensed under Apache 2.0 License,
 * attached with Common Clause Condition 1.0, found in the LICENSES directory.
 */

package com.vesoft.nebula.examples.connector


import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.{NebulaConnectionConfig, WriteNebulaEdgeConfig, WriteNebulaVertexConfig}
import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.slf4j.LoggerFactory

object NebulaSparkEdgeWriter {
  private val LOG = LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {

    println("----------neubla的配置信息如下所示:----------")
    val space: String = args(0).trim
    val appName: String = args(1).trim
    val dataBase : String= args(2).trim
    val dataDate : String= args(3).trim
    val userName: String = args(4).trim
    val passwd:String = args(5).trim
    val metaAddress: String = args(6).trim
    val graphAddress: String = args(7).trim
    val edgeSql: String = args(8).trim
    val edgeName: String = args(9).trim
    val srcIdField: String = args(10).trim
    val dstIdField: String = args(11).trim
    val batch:Int = args(12).trim.toInt
    println("edgeSql:"+edgeSql)
    println("edgeName:"+edgeName)
    println("srcIdField:"+srcIdField)
    println("dstIdField:"+dstIdField)
    println("batch:"+batch)
    println("-------------------------------------------")
    val spark = SparkSession.builder().master("yarn").appName(appName).enableHiveSupport().getOrCreate()
    spark.sql("use " +dataBase)
    val dataSet1:  Dataset[Row] = spark.sql(edgeSql)
    val dataFrame1: DataFrame = dataSet1.toDF()


    writeEdge(dataFrame1,metaAddress,graphAddress,userName,passwd,space,edgeName,srcIdField,dstIdField,batch)

    spark.close()
    sys.exit()
  }
  def writeEdge(dataFrame: DataFrame,metaAddress: String,graphAddress: String,userName:String,passwd:String,space: String,edgeName: String,srcIdField: String,dstIdField: String,batch:Int): Unit = {

    dataFrame.show(10,false)
    dataFrame.persist(StorageLevel.MEMORY_AND_DISK_SER)

    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress(metaAddress)
        .withGraphAddress(graphAddress)
        .build
    val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
      .builder()
      .withUser(userName)
      .withPasswd(passwd)
      .withSpace(space)
      .withEdge(edgeName)
      .withSrcIdField(srcIdField)
      .withDstIdField(dstIdField)
      .withSrcAsProperty(false)
      .withDstAsProperty(false)
      .withBatch(batch)
      .build()
    dataFrame.write.nebula(config, nebulaWriteEdgeConfig).writeEdges()
  }
}

程序中打印出来的日志:

麻烦看下,边的sql已经把null的数据过滤掉了,但是报nullpointerException,不是很理解

这里和数据没关系的,你可以看下堆栈 是properties.setProperty报的NPE。
这里rankField是设置的null,rankField没有做默认值和空值的处理,你配置一下rankField为存在的字段。

配置了,现在日志显示成功了,但是查询不到相关数据。
日志信息:
nebula-info.txt (274.0 KB)
查询的信息:

你配置的rankField是什么,对应的值是多少,如果不是0, 这个语句查不出来是正常的。
使用示例:
插入语句 INSERT EDGE e1(p1) VALUES 1->2@1:(10);
查询时需要 FETCH PROP ON e1 1->2@1;

可以了,谢谢

浙ICP备20010487号