使用spark-connector 从hive 表导入到 nebugraph 卡顿报错,任务失败

提问参考模版:

  • nebula 版本:3.2.0
  • spark connector 版本 3.0.0
  • 部署方式:分布式 5个node
  • 安装方式:RPM
  • 是否为线上版本:Y
  • 硬件信息
    • 磁盘 单节点 40T+ 机械盘
    • CPU、 单节点40核
    • 内存信息 单节点256G
  • 问题的具体描述
    报错数据 hive表 vertex 量大概 1亿, edge1 8000万,edge2 数量8亿,使用spark connector 导入运行, vertex 导入成功, edge1 导入成功, edge2 导入时卡顿之后包超时异常退出
    yarn 进度条如下


上述状态维持了 1.8 个 小时, 然后报错退出,报错信息如下:

User class threw exception: org.apache.spark.SparkException: Writing job aborted.
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:136)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:160)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:157)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:280)
at com.vesoft.nebula.connector.connector.package$NebulaDataFrameWriter.writeEdges(package.scala:334)
at com.tech97.graph.sparkscala.NebulaSparkWriter$.execute(NebulaSparkWriter.scala:88)
at com.tech97.graph.sparkscala.NebulaSparkWriter$.main(NebulaSparkWriter.scala:27)
at com.tech97.graph.sparkscala.NebulaSparkWriter.main(NebulaSparkWriter.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:685)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 218 tasks (1026.3 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
… 24 more

相关代码大概如下
df = spark.sql(“select user1,user2,count,”+ month +" as rank from graph.t_graph where day=20220901")
df.show() //此行已日志输出
df.repartition(200).persist(StorageLevel.MEMORY_AND_DISK_SER)

nebulaWriteEdgeConfig = WriteNebulaEdgeConfig
  .builder()
  .withSpace("t_graph")
  .withEdge("tl")
  .withSrcIdField("user1")
  .withDstIdField("user2")
  .withRankField("rank")
  .withSrcAsProperty(false)
  .withDstAsProperty(false)
  .withRankAsProperty(false)
  .withUser("root")
  .withPasswd("nebula")
  .withBatch(10000)
  .build()
df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges()

mark 下,我也是经常这个问题,导入速度没有预想的快。

spark.driver.maxResultSize 值调大以后, 可以跑了, 但是跑了将近 3 小时, 不到30G的数据, 8亿的edge 量, 这个还有办法调优加快速度么,这在生产环境中似乎不太能够满足需求

代码中writeNebulaEdgeConfig 加上withPartition的配置,把partition调大,不配置的话默认100个partition

好的我尝试一下

试了一下,ReadNebulaConfig 里面是有withPartitionNum 这个方法的 ,WriteNebulaVertexConfig 没withPartitionNum这个方法

抱歉,看到你的代码中已经repartition了,你提交任务时分配给你的代码的总cores是多少

150 个

–deploy-mode cluster
–executor-memory 16G
–driver-memory 40G
–num-executors 25
–executor-cores 6

你可以把repartition的值设成150或者300

你的config中有没有把所有的graphd地址都配上,如果NebulaGraph是集群模式,那么就配上所有的地址,这样负载可以均衡的打到所有机器上。

1 个赞

meta 地址 和 grand 地址 都是配的所有的, 我把 分区改成 300 试试

试了一下, 感觉效果不是太好 260G的 数据跑了27 个小时 最后还失败了

我还没用那个exchange , 那个对于这种大批量的数据 是否效果会好点, 或者还要其他快一点的数据同步方法么

最后失败的原因是什么? 对于空space来说写入的性能参考 https://github.com/vesoft-inc/nebula-spark-connector#performance

260G的数据你可以尝试用exchange的sst模式导入,我们150亿数据导入过程(HDD): sst生成11.5小时、download 1.25小时、ingest 4.67小时

1 个赞

看到一些失败任务的报错信息,22/09/19 14:56:44 ERROR writer.NebulaVertexWriter: failed to write INSERT vertex person(num) VALUES “XXXXXX”:(“XXXXX”) Storage Error: part: 155, error: E_RPC_FAILURE(-3)

服务端压力过大,rpc通信失败了。 你可以调小一下batch 和partition数

调小那个batch 我没疑问, 那个partition 你指的 写人之前的 Dataframe 的分区数 还是 图空间的,图空间的分区目前我设置的 600(之所以这么设置, 我看官方的文档说磁盘量乘以20, 目前我用了5 个节点, 每个节点6 块盘, 每个5.5T, 所以 总共30 块盘, 30*20=600), 这个设置的 偏大了是么

减小 batch 后目前成功入库了, batch(10000) 改成了 batch(2000)

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。