spark connector 写入边失败,断言异常

提问参考模版:

  • nebula 版本:3.0.0
  • spark connector 版本使用:

    com.vesoft
    nebula-spark-connector_3.0
    3.6.0


    commons-pool2
    org.apache.commons


    commons-codec
    commons-codec


  • spark 版本:spark 3.1.1
  • 部署方式:分布式
  • 安装方式:RPM
  • 是否上生产环境:Y
  • 硬件信息
    • 磁盘( 推荐使用 SSD)
    • CPU、内存信息
  • 问题的具体描述
    spark connector 写入边数据时异常,实际写入点数据是可以的。
    关键代码我将标注具体报错行

关键代码:

    package com.tujia.spark.pmo.ttj_31625

import com.tujia.spark.utils.GetSparkSession.session
import com.tujia.spark.utils.SetConf.setConf
import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter
import com.vesoft.nebula.connector.{NebulaConnectionConfig, WriteMode, WriteNebulaEdgeConfig, WriteNebulaVertexConfig}
import org.apache.log4j.{LogManager, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame

/**
 *
 * @author shixingr 2024/8/28
 */
object TreeNodeRelation {

  val logger: Logger = LogManager.getLogger(this.getClass)

  var spaceName = ""
  var tagName = "tree_node"
  var edgeName = "tree_node_edge"

  def main(args: Array[String]): Unit = {

    //1 解析参数
    val confMap = setConf(args, Array("dt", "tableName", "spaceName", "sqlEnd"))
    if (!confMap.contains("dt")) {
      throw new Exception("dt 配置错误")
    } else if (!confMap.contains("tableName")) {
      throw new Exception("tableName 配置错误")
    }
    val dt = confMap("dt")
    val tableName = confMap("tableName")
    spaceName = confMap("spaceName")
    val sqlEnd = confMap("sqlEnd")

    //2 获取 tree node 数据

    session.sql(
      s"""
        |with base_data as (
        |    select
        |        tree_node_id,
        |        split(regexp_replace(tree_node_parent_id_list, ' |\\[|\\]', ''), ',') as tree_node_parent_id_list
        |    from
        |      dw_algorithm.cq_tree_node_detail
        |    where
        |      dt = date_sub(current_date, 1)
        |    ${sqlEnd}
        |)
        |
        |select
        |    tree_node_id,
        |    c as tree_node_parent_id
        |from base_data
        |lateral view explode(tree_node_parent_id_list) b as c
        |""".stripMargin).createTempView("base_data")

    import session.implicits._
    //3 数据准备、导入
    //3.1 tree node 点
    val treeNodeDF: DataFrame = session.sql(
      """
        |select distinct
        |   tree_node_id
        |from base_data
        |""".stripMargin)
    logger.info(s"----- enter insert vertex, num: ${treeNodeDF.collect().length} -----")
    writeVertex(
      treeNodeDF,
      spaceName,
      tagName,
      "tree_node_id"
    )

    //3.2 tree node 边
    val treeNodeEdgeDF = session.sql(
      """
        |select distinct
        |    tree_node_parent_id,
        |    tree_node_id
        |from base_data
        |""".stripMargin)
    logger.info(s"----- enter insert edge, num: ${treeNodeEdgeDF.collect().length} -----")
    writeEdge(
      treeNodeEdgeDF,
      spaceName,
      edgeName,
      "tree_node_parent_id",
      "tree_node_id"
    )

    //4 数据读取


    //5 清洗落表



  }


  /**
   *
   * @param vertexDF     数据
   * @param tagName      插入的Tag name
   * @param vidFieldName vid选取DF中的字段
   */
  def writeVertex(vertexDF: DataFrame, spaceName: String, tagName: String, vidFieldName: String, writeMode: WriteMode.Value = WriteMode.INSERT, batchSize: Int = 1000): Unit = {
    val config = getNebulaConnectionConfig
    val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
      .builder()
      .withSpace(spaceName)
      .withTag(tagName)
      //将DF中的字段 table_name 设置为id
      .withVidField(vidFieldName)
      .withVidAsProp(false)
      //批量插入数量
      .withBatch(batchSize)
      .withUser("root")
      .withPasswd("7d^w@!,q)sd%u0nebula$$")
      .withWriteMode(writeMode)
      .build()
    vertexDF.write.nebula(config, nebulaWriteVertexConfig).writeVertices() // 此行报错
    session.read
  }

  /**
   * for this example, your nebula edge schema should have property names: descr, timp
   * if your withSrcAsProperty is true, then edge schema also should have property name: src
   * if your withDstAsProperty is true, then edge schema also should have property name: dst
   * if your withRankAsProperty is true, then edge schema also should have property name: degree
   */
  def writeEdge(edgeDF: DataFrame, spaceName: String, edgeName: String, srcVidFieldName: String, dstVidFieldName: String, writeMode: WriteMode.Value = WriteMode.INSERT, batchSize: Int = 1000): Unit = {

    val config = getNebulaConnectionConfig
    val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
      .builder()
      .withSpace(spaceName)
      .withEdge(edgeName)
      .withSrcIdField(srcVidFieldName)
      .withDstIdField(dstVidFieldName)
      //      .withRankField("degree")
      .withSrcAsProperty(false)
      .withDstAsProperty(false)
      .withRankAsProperty(false)
      .withBatch(batchSize)
      .withUser("root")
      .withPasswd("7d^w@!,q)sd%u0nebula$$")
      .withWriteMode(writeMode)
      .build()
    edgeDF.write.nebula(config, nebulaWriteEdgeConfig).writeEdges()
  }

  def getNebulaConnectionConfig: NebulaConnectionConfig = {
    // connection config without ssl
    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress("10.66.140.34:9559")
        .withGraphAddress("10.66.140.34:9669")
        .withConenctionRetry(2)
        .build()

    config
  }
}

报错堆栈:

NFO] 2024-08-29 12:14:58.003 cn.escheduler.server.worker.log.TaskLogger:[178] - [taskAppId=TASK_24164_12466166_31889936]  -> 	 client token: N/A
		 diagnostics: User class threw exception: org.apache.spark.SparkException: Writing job aborted.
		at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)
		at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
		at org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.writeWithV2(WriteToDataSourceV2Exec.scala:241)
		at org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.run(WriteToDataSourceV2Exec.scala:255)
		at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
		at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
		at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
		at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
		at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
		at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
		at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
		at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
		at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
		at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
		at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
		at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
		at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
		at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
		at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:377)
		at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
		at com.vesoft.nebula.connector.connector.package$NebulaDataFrameWriter.writeEdges(package.scala:280)
		at com.tujia.spark.pmo.ttj_31625.TreeNodeRelation$.writeEdge(TreeNodeRelation.scala:151)
		at com.tujia.spark.pmo.ttj_31625.TreeNodeRelation$.main(TreeNodeRelation.scala:84)
		at com.tujia.spark.pmo.ttj_31625.TreeNodeRelation.main(TreeNodeRelation.scala)
		at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
		at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
		at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
		at java.lang.reflect.Method.invoke(Method.java:498)
		at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
	Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 30 in stage 7.0 failed 4 times, most recent failure: Lost task 30.3 in stage 7.0 (TID 659) (l-hdps3103.data.cn2 executor 1): java.lang.AssertionError: assertion failed
		at scala.Predef$.assert(Predef.scala:208)
		at com.vesoft.nebula.connector.writer.NebulaExecutor$.extraID(NebulaExecutor.scala:35)
		at com.vesoft.nebula.connector.writer.NebulaEdgeWriter.write(NebulaEdgeWriter.scala:56)
		at com.vesoft.nebula.connector.writer.NebulaEdgeWriter.write(NebulaEdgeWriter.scala:17)
		at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
		at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
		at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
		at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
		at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
		at org.apache.spark.scheduler.Task.run(Task.scala:131)
		at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
		at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
		at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
		at java.lang.Thread.run(Thread.java:748)
	
	Driver stacktrace:
		at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
		at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
		at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
		at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
		at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
		at scala.Option.foreach(Option.scala:407)
		at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
[INFO] 2024-08-29 12:14:58.009 cn.escheduler.server.worker.log.TaskLogger:[178] - [taskAppId=TASK_24164_12466166_31889936]  -> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
		at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
		at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
		at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
		at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
		... 31 more
	Caused by: java.lang.AssertionError: assertion failed
		at scala.Predef$.assert(Predef.scala:208)
		at com.vesoft.nebula.connector.writer.NebulaExecutor$.extraID(NebulaExecutor.scala:35)
		at com.vesoft.nebula.connector.writer.NebulaEdgeWriter.write(NebulaEdgeWriter.scala:56)
		at com.vesoft.nebula.connector.writer.NebulaEdgeWriter.write(NebulaEdgeWriter.scala:17)
		at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
		at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
		at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
		at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
		at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
		at org.apache.spark.scheduler.Task.run(Task.scala:131)
		at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
		at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
		at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
		at java.lang.Thread.run(Thread.java:748)
	
		 ApplicationMaster host: l-hdps1447.data.cn2
		 ApplicationMaster RPC port: 33361
		 queue: root.tujiadev
		 start time: 1724904066176
		 final status: FAILED
		 tracking URL: http://l-hdpy43.data.cn2.qunar.com:9981/proxy/application_1708412500409_9108275/
		 user: tujiadev
	24/08/29 12:14:58 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.spark.SparkException: Writing job aborted.
		at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)
		at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
		at org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.writeWithV2(WriteToDataSourceV2Exec.scala:241)
		at org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.run(WriteToDataSourceV2Exec.scala:255)
		at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
		at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
		at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
		at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
		at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
		at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
		at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
		at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
		at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
		at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
		at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
		at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
		at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
		at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
		at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:377)
		at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
		at com.vesoft.nebula.connector.connector.package$NebulaDataFrameWriter.writeEdges(package.scala:280)
		at com.tujia.spark.pmo.ttj_31625.TreeNodeRelation$.writeEdge(TreeNodeRelation.scala:151)
		at com.tujia.spark.pmo.ttj_31625.TreeNodeRelation$.main(TreeNodeRelation.scala:84)
		at com.tujia.spark.pmo.ttj_31625.TreeNodeRelation.main(TreeNodeRelation.scala)
		at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
		at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
		at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[INFO] 2024-08-29 12:14:58.009 cn.escheduler.server.worker.log.TaskLogger:[178] - [taskAppId=TASK_24164_12466166_31889936]  -> 	at java.lang.reflect.Method.invoke(Method.java:498)
		at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
	Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 30 in stage 7.0 failed 4 times, most recent failure: Lost task 30.3 in stage 7.0 (TID 659) (l-hdps3103.data.cn2 executor 1): java.lang.AssertionError: assertion failed
		at scala.Predef$.assert(Predef.scala:208)
		at com.vesoft.nebula.connector.writer.NebulaExecutor$.extraID(NebulaExecutor.scala:35)
		at com.vesoft.nebula.connector.writer.NebulaEdgeWriter.write(NebulaEdgeWriter.scala:56)
		at com.vesoft.nebula.connector.writer.NebulaEdgeWriter.write(NebulaEdgeWriter.scala:17)
		at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
		at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
		at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
		at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
		at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
		at org.apache.spark.scheduler.Task.run(Task.scala:131)
		at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
		at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
		at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
		at java.lang.Thread.run(Thread.java:748)
	
	Driver stacktrace:
		at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
		at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
		at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
		at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
		at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
		at scala.Option.foreach(Option.scala:407)
		at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
		at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
		at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
		at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
		at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
		... 31 more
	Caused by: java.lang.AssertionError: assertion failed
		at scala.Predef$.assert(Predef.scala:208)
		at com.vesoft.nebula.connector.writer.NebulaExecutor$.extraID(NebulaExecutor.scala:35)
		at com.vesoft.nebula.connector.writer.NebulaEdgeWriter.write(NebulaEdgeWriter.scala:56)
		at com.vesoft.nebula.connector.writer.NebulaEdgeWriter.write(NebulaEdgeWriter.scala:17)
		at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
		at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
		at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
		at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
		at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
		at org.apache.spark.scheduler.Task.run(Task.scala:131)
		at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
		at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
		at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
		at java.lang.Thread.run(Thread.java:748)
	
	Exception in thread "main" org.apache.spark.SparkException: Application application_1708412500409_9108275 finished with failed status
		at org.apache.spark.deploy.yarn.Client.run(Client.scala:1242)
		at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1634)
		at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
		at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
		at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
[INFO] 2024-08-29 12:14:58.125 cn.escheduler.server.worker.log.TaskLogger:[178] - [taskAppId=TASK_24164_12466166_31889936]  -> 	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
		at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)
		at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)
		at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
	24/08/29 12:14:58 INFO util.ShutdownHookManager: Shutdown hook called
	24/08/29 12:14:58 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-dd93ddb6-3595-45f5-9383-7e122c829274
	24/08/29 12:14:58 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-252aaf34-056f-4aa9-87e8-12a83c9bbb10

executor 中日志堆栈

24/08/29 12:14:55 ERROR util.Utils: Aborting task
java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:208)
	at com.vesoft.nebula.connector.writer.NebulaExecutor$.extraID(NebulaExecutor.scala:35)
	at com.vesoft.nebula.connector.writer.NebulaEdgeWriter.write(NebulaEdgeWriter.scala:56)
	at com.vesoft.nebula.connector.writer.NebulaEdgeWriter.write(NebulaEdgeWriter.scala:17)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
24/08/29 12:14:55 ERROR util.Utils: Aborting task
java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:208)
	at com.vesoft.nebula.connector.writer.NebulaExecutor$.extraID(NebulaExecutor.scala:35)
	at com.vesoft.nebula.connector.writer.NebulaEdgeWriter.write(NebulaEdgeWriter.scala:56)
	at com.vesoft.nebula.connector.writer.NebulaEdgeWriter.write(NebulaEdgeWriter.scala:17)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
24/08/29 12:14:55 ERROR v2.DataWritingSparkTask: Aborting commit for partition 29 


具体错误在这一行,id可能是id的问题。

1 个赞

已经找到问题!!!开发可以忽略了,我回头写一下原因。感谢nebula

1 个赞

优秀如你!

此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。