spark导入nebula出现com.vesoft.nebula.client.graph.exception.IOErrorException: java.net.SocketTimeoutException: Read timed out

  • nebula 版本:3.4.0
  • 部署方式: 分布式
  • 安装方式: RPM
  • 是否上生产环境:Y
  • 硬件信息
    • 磁盘: SSD
    • CPU、内存信息:三台机器分别为 CPU(24) 内存:251
  • 问题的具体描述
  • 使用spark-connector 把CSV数据导入nebula集群
    文件不大的时候没有任何问题
    文件大时候(10G)会失败报错信息如下

尝试将batch从1000降低到200,storage_client_timeout_ms 从6000调高到6000000依然无法成功
请问还需要调整哪些参数?

2023-06-09 13:46:15 [dag-scheduler-event-loop] INFO  org.apache.spark.scheduler.DAGScheduler - ResultStage 4 (save at package.scala:231) failed in 239.734 s due to Job aborted due to stage failure: Task 33 in stage 4.0 failed 4 times, most recent failure: Lost task 33.3 in stage 4.0 (TID 188) (10.27.107.32 executor 2): com.vesoft.nebula.client.graph.exception.IOErrorException: java.net.SocketTimeoutException: Read timed out
        at com.vesoft.nebula.client.graph.net.SyncConnection.executeWithParameter(SyncConnection.java:191)
        at com.vesoft.nebula.client.graph.net.Session.executeWithParameter(Session.java:128)
        at com.vesoft.nebula.client.graph.net.Session.execute(Session.java:93)
        at com.vesoft.nebula.connector.nebula.GraphProvider.submit(GraphProvider.scala:115)
        at com.vesoft.nebula.connector.writer.NebulaWriter.submit(NebulaWriter.scala:49)
        at com.vesoft.nebula.connector.writer.NebulaVertexWriter.execute(NebulaVertexWriter.scala:82)
        at com.vesoft.nebula.connector.writer.NebulaVertexWriter.write(NebulaVertexWriter.scala:60)
        at com.vesoft.nebula.connector.writer.NebulaVertexWriter.write(NebulaVertexWriter.scala:22)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:442)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
 2023-06-09 13:46:15 [main] INFO  org.apache.spark.scheduler.DAGScheduler - Job 4 failed: save at package.scala:231, took 239.758864 s
 2023-06-09 13:46:15 [main] ERROR org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec - Data source write support com.vesoft.nebula.connector.writer.NebulaDataSourceVertexWriter@6f06d068 is aborting.
 2023-06-09 13:46:15 [main] ERROR com.vesoft.nebula.connector.writer.NebulaDataSourceVertexWriter - NebulaDataSourceVertexWriter abort
 2023-06-09 13:46:15 [main] ERROR org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec - Data source write support com.vesoft.nebula.connector.writer.NebulaDataSourceVertexWriter@6f06d068 aborted.
org.apache.spark.SparkException: Writing job aborted
        at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:767)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:409)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353)
        at org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.writeWithV2(WriteToDataSourceV2Exec.scala:262)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:332)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:331)
        at org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.run(WriteToDataSourceV2Exec.scala:262)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:318)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
        at com.vesoft.nebula.connector.connector.package$NebulaDataFrameWriter.writeVertices(package.scala:231)
        at net.cnki.ekr.nebula.service.impl.NebulaDataService.insertCSVToNebula(NebulaDataService.java:184)
        at net.cnki.ekr.nebula.EkrNebulaSparkApplication.run(EkrNebulaSparkApplication.java:35)
        at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:760)
        at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:750)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:317)
        at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:150)
        at net.cnki.ekr.nebula.EkrNebulaSparkApplication.main(EkrNebulaSparkApplication.java:25)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 33 in stage 4.0 failed 4 times, most recent failure: Lost task 33.3 in stage 4.0 (TID 188) (10.27.107.32 executor 2): com.vesoft.nebula.client.graph.exception.IOErrorException: java.net.SocketTimeoutException: Read timed out
        at com.vesoft.nebula.client.graph.net.SyncConnection.executeWithParameter(SyncConnection.java:191)
        at com.vesoft.nebula.client.graph.net.Session.executeWithParameter(Session.java:128)
        at com.vesoft.nebula.client.graph.net.Session.execute(Session.java:93)
        at com.vesoft.nebula.connector.nebula.GraphProvider.submit(GraphProvider.scala:115)
        at com.vesoft.nebula.connector.writer.NebulaWriter.submit(NebulaWriter.scala:49)
        at com.vesoft.nebula.connector.writer.NebulaVertexWriter.execute(NebulaVertexWriter.scala:82)
        at com.vesoft.nebula.connector.writer.NebulaVertexWriter.write(NebulaVertexWriter.scala:60)
        at com.vesoft.nebula.connector.writer.NebulaVertexWriter.write(NebulaVertexWriter.scala:22)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:442)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:377)
        ... 52 more
Caused by: com.vesoft.nebula.client.graph.exception.IOErrorException: java.net.SocketTimeoutException: Read timed out
        at com.vesoft.nebula.client.graph.net.SyncConnection.executeWithParameter(SyncConnection.java:191)
        at com.vesoft.nebula.client.graph.net.Session.executeWithParameter(Session.java:128)
        at com.vesoft.nebula.client.graph.net.Session.execute(Session.java:93)
        at com.vesoft.nebula.connector.nebula.GraphProvider.submit(GraphProvider.scala:115)
        at com.vesoft.nebula.connector.writer.NebulaWriter.submit(NebulaWriter.scala:49)
        at com.vesoft.nebula.connector.writer.NebulaVertexWriter.execute(NebulaVertexWriter.scala:82)
        at com.vesoft.nebula.connector.writer.NebulaVertexWriter.write(NebulaVertexWriter.scala:60)
        at com.vesoft.nebula.connector.writer.NebulaVertexWriter.write(NebulaVertexWriter.scala:22)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:442)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

要调大graph_client_timeout_ms, 这里和storaged的timeout没关系

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。