nebula2.0.1,spark-connector报错

CREATE TAG APPLYINFO(APP_NO string,PRODUCT_CD string,ECIF_NO string,GPS_L string,GPS_B string,NETWORK_TYPE string,CREATED_DATETIME timestamp,LAST_MODIFIED_DATETIME timestamp);

建立边的语句:

CREATE EDGE PERSON2CARD(ID_NO string,CARD_NO string,CREATED_DATETIME timestamp,LAST_MODIFIED_DATETIME timestamp); 

使用spark-connector将hive数据写到nebula2.0.1,代码如下

使用spark-connector,将hive相关数据写入nebula2.0.1脚本,报错,spark-connector错误日志如下:

  • 相关的 meta / storage / graph info 日志信息
    graph错误信息:E0602 15:30:18.195520 84304 HeaderServerChannel.cpp:114] Received invalid request from client: N6apache6thrift9transport19TTransportExceptionE: Header transport frame is too large: 1583156490 (hex 0x5e5d0d0a) (transport N6apache6thrift5async12TAsyncSocketE, address *******, port 24569)
//创建dataframe的程序
    val spark = SparkSession.builder().master("yarn").appName(appName).enableHiveSupport().getOrCreate()
    spark.sql("use " +dataBase)
    val dataSet1:  Dataset[Row] = spark.sql("select app_no_tag_id,APP_NO,PRODUCT_CD,ECIF_NO,GPS_L,GPS_B,NETWORK_TYPE,CREATED_DATETIME,LAST_MODIFIED_DATETIME from tmp.table1")

    val dataFrame: DataFrame = dataSet1.toDF()


//写入点的代码
  def writeVertex(dataFrame: DataFrame,metaAddress: String,graphAddress: String,userName:String,passwd:String,space: String,tagName: String,vidField: String,batch: Int): Unit = {

    dataFrame.show()
    dataFrame.persist(StorageLevel.MEMORY_AND_DISK_SER)
    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress(metaAddress)
        .withGraphAddress(graphAddress)
        .withConenctionRetry(3)
        .build()
    val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
      .builder()
      .withUser(userName)
      .withPasswd(passwd)
      .withSpace(space)
      .withTag(tagName)
      .withVidField(vidField)
      .withVidAsProp(false)
      .withBatch(batch)
      .withVidPolicy("hash")
      .build()
    dataFrame.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
  }

把 Spark Connector 的版本号补充下,然后把你的相关配置信息贴一下,根据报错信息是无权访问

这个是nebula-spark-connector 2.0的版本:nebula-spark-utils/README_CN.md at v2.0.0 · vesoft-inc/nebula-spark-utils · GitHub

上面的代码也是2.0版本的,1.2版本的代码和2.0版本的代码还是有区别的

嗯。1x 和 2x 的代码不大一样,因为涉及到数据结构的变更,所以把你的 meta、graph 和 storage 这块的配置文件贴一下

配置文件公司不让发到网上,我们配置很简单的,就是单机

确认一下是否 client 端有多线程并发的使用同一个 connection 发送数据?如果有改成单线程使用一个 connection。

client端就是你们官方提供的组件,我只是读取hive的数据封装成dataDrame,其他数据写入的代码和你们官方提供的案例是一样的

你的spark集群是几台机器,你在每台机器上 telnet metad_ip metad_port看下是否是reachable

我估计也是这个问题,我问下我们公司的基础大数据平台部

你可以贴下spack-connector的配置吗

建立点的语句:

CREATE TAG APPLYINFO(APP_NO string,PRODUCT_CD string,ECIF_NO string,GPS_L string,GPS_B string,NETWORK_TYPE string,CREATED_DATETIME timestamp,LAST_MODIFIED_DATETIME timestamp);

对应的

tagSQL="select app_no_tag_id,APP_NO,PRODUCT_CD,ECIF_NO,GPS_L,GPS_B,NETWORK_TYPE,CREATED_DATETIME,LAST_MODIFIED_DATETIME from tmp.table1"
//spark-connector代码
   
object NebulaSparkVertexWriter {
  private val LOG = LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {

    println("----------neubla的配置信息如下所示:----------")
    val space: String = args(0).trim
    val appName: String = args(1).trim
    val dataBase : String= args(2).trim
    val dataDate : String= args(3).trim
    val userName: String = args(4).trim
    val passwd:String = args(5).trim
    val metaAddress: String = args(6).trim
    val graphAddress: String = args(7).trim
    val tagSQL: String = args(8).trim
    println("tagSQL:"+tagSQL)
    val tagName: String = args(9).trim
    val vidField: String = args(10).trim
    val batch:Int = args(11).trim.toInt
    println("-------------------------------------------")
    val spark = SparkSession.builder().master("yarn").appName(appName).enableHiveSupport().getOrCreate()
    spark.sql("use " +dataBase)
    val dataSet1:  Dataset[Row] = spark.sql(tagSQL)
    val dataFrame1: DataFrame = dataSet1.toDF()


    writeVertex(dataFrame1,metaAddress,graphAddress,userName,passwd,space,tagName,vidField,batch )
      //    writeEdge(dataFrame1)

    spark.close()
    sys.exit()
  }

  def writeVertex(dataFrame: DataFrame,metaAddress: String,graphAddress: String,userName:String,passwd:String,space: String,tagName: String,vidField: String,batch: Int): Unit = {

    dataFrame.show()
    dataFrame.persist(StorageLevel.MEMORY_AND_DISK_SER)
    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress(metaAddress)
        .withGraphAddress(graphAddress)
        .withConenctionRetry(3)
        .build()
    val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
      .builder()
      .withUser(userName)
      .withPasswd(passwd)
      .withSpace(space)
      .withTag(tagName)
      .withVidField(vidField)
      .withVidAsProp(false)
      .withBatch(batch)
      .withVidPolicy("hash")
      .build()
    dataFrame.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
  }


  def writeEdge(dataFrame: DataFrame,metaAddress: String,graphAddress: String,userName:String,passwd:String,space: String,edgeName: String,srcIdField: String,dstIdField: String,batch:Int): Unit = {

    dataFrame.show()
    dataFrame.persist(StorageLevel.MEMORY_AND_DISK_SER)

    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress(metaAddress)
        .withGraphAddress(graphAddress)
        .build
    val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
      .builder()
      .withUser(userName)
      .withPasswd(passwd)
      .withSpace(space)
      .withEdge(edgeName)
      .withSrcIdField(srcIdField)
      .withDstIdField(dstIdField)
      .withSrcAsProperty(false)
      .withDstAsProperty(false)
      .withBatch(batch)
      .withSrcPolicy("hash")
      .withDstPolicy("hash")
      .build()
    dataFrame.write.nebula(config, nebulaWriteEdgeConfig).writeEdges()
  }
}
 

报错信息

所以现在又没有 no route to host的问题了? 现在spark集群每台机器都是可以连通NebulaGraph metad服务了?

如果网络连通没问题,你看下metad服务状态, 并在spark-conenctor的配置中把timeout设置大一点。

我把所有错误日志发下吧。

error.txt (15.6 KB)

你这个日志里还是connect问题。

你现在能确认这个网络问题么, 你可以操作服务器 通过telnet确认下, 如果确实连通或不连通,麻烦先给个结论

已经解决,是网络连接问题

非常感谢