提问参考模版:
- nebula 版本:3.2.0
- spark connector 版本 3.0.0
- 部署方式:分布式 5个node
- 安装方式:RPM
- 是否为线上版本:Y
- 硬件信息
- 磁盘 单节点 40T+ 机械盘
- CPU、 单节点40核
- 内存信息 单节点256G
- 问题的具体描述
报错数据 hive表 vertex 量大概 1亿, edge1 8000万,edge2 数量8亿,使用spark connector 导入运行, vertex 导入成功, edge1 导入成功, edge2 导入时卡顿之后包超时异常退出
yarn 进度条如下
上述状态维持了 1.8 个 小时, 然后报错退出,报错信息如下:
User class threw exception: org.apache.spark.SparkException: Writing job aborted.
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:136)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:160)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:157)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:280)
at com.vesoft.nebula.connector.connector.package$NebulaDataFrameWriter.writeEdges(package.scala:334)
at com.tech97.graph.sparkscala.NebulaSparkWriter$.execute(NebulaSparkWriter.scala:88)
at com.tech97.graph.sparkscala.NebulaSparkWriter$.main(NebulaSparkWriter.scala:27)
at com.tech97.graph.sparkscala.NebulaSparkWriter.main(NebulaSparkWriter.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:685)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 218 tasks (1026.3 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
… 24 more
相关代码大概如下
df = spark.sql(“select user1,user2,count,”+ month +" as rank from graph.t_graph where day=20220901")
df.show() //此行已日志输出
df.repartition(200).persist(StorageLevel.MEMORY_AND_DISK_SER)
nebulaWriteEdgeConfig = WriteNebulaEdgeConfig
.builder()
.withSpace("t_graph")
.withEdge("tl")
.withSrcIdField("user1")
.withDstIdField("user2")
.withRankField("rank")
.withSrcAsProperty(false)
.withDstAsProperty(false)
.withRankAsProperty(false)
.withUser("root")
.withPasswd("nebula")
.withBatch(10000)
.build()
df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges()