nebula spark connector timeout

  • nebula 版本:2.5.1

  • nebula-spark-connector 版本:2.5.1

  • spark版本:2.4.4

  • 部署方式: 单机

  • 安装方式:RPM

  • 是否为线上版本:Y

  • 硬件信息

    • 磁盘 SSD
  • 问题的具体描述

connector 分多次读取不同类型的点和边的信息,
在已经有部分信息读取成功的情况下,
会出现后续信息读取失败的情况

NebulaConnectionConfig withTimeout 已设置为Int.MaxValue
  • 日志信息
  • 情况1
2022-04-08 11:06:56 ERROR [Executor task launch worker for task 107] - Get Parts Error: java.net.SocketTimeoutException: Read timed out
2022-04-08 11:06:56 ERROR [Executor task launch worker for task 107] - Exception in task 3.0 in stage 29.0 (TID 107)
com.facebook.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out
	at com.facebook.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:116)
	at com.facebook.thrift.transport.TTransport.readAll(TTransport.java:75)
	at com.facebook.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:621)
	at com.facebook.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:479)
	at com.vesoft.nebula.meta.MetaService$Client.recv_getPartsAlloc(MetaService.java:1124)
	at com.vesoft.nebula.meta.MetaService$Client.getPartsAlloc(MetaService.java:1101)
	at com.vesoft.nebula.client.meta.MetaClient.getPartsAlloc(MetaClient.java:355)
	at com.vesoft.nebula.connector.nebula.MetaProvider.getPartitionNumber(MetaProvider.scala:25)
	at com.vesoft.nebula.connector.reader.NebulaPartitionReader.<init>(NebulaPartitionReader.scala:61)
	at com.vesoft.nebula.connector.reader.NebulaVertexPartitionReader.<init>(NebulaVertexPartitionReader.scala:17)
	at com.vesoft.nebula.connector.reader.NebulaVertexPartition.createPartitionReader(NebulaPartition.scala:17)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Read timed out
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
	at java.net.SocketInputStream.read(SocketInputStream.java:171)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
	at com.facebook.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:114)
	... 31 more
2022-04-08 11:06:56 ERROR [task-result-getter-0] - Task 3 in stage 29.0 failed 1 times; aborting job
  • 情况2

Exception in thread "main" com.facebook.thrift.transport.TTransportException: java.net.SocketTimeoutException: connect timed out
	at com.facebook.thrift.transport.TSocket.open(TSocket.java:175)
	at com.vesoft.nebula.client.meta.MetaClient.getClient(MetaClient.java:104)
	at com.vesoft.nebula.client.meta.MetaClient.doConnect(MetaClient.java:99)
	at com.vesoft.nebula.client.meta.MetaClient.connect(MetaClient.java:89)
	at com.vesoft.nebula.connector.nebula.MetaProvider.<init>(MetaProvider.scala:22)
	at com.vesoft.nebula.connector.reader.NebulaSourceReader.getSchema(NebulaSourceReader.scala:45)
	at com.vesoft.nebula.connector.reader.NebulaSourceReader.readSchema(NebulaSourceReader.scala:31)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:175)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:204)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
	at com.vesoft.nebula.connector.connector.package$NebulaDataFrameReader.loadEdgesToDF(package.scala:146)
	at NebulaReader2$.createNebulaEdgeDF(NebulaReader2.scala:128)
	at NebulaReader2$.main(NebulaReader2.scala:57)
	at NebulaReader2.main(NebulaReader2.scala)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at com.facebook.thrift.transport.TSocket.open(TSocket.java:170)
	... 13 more


设置成 0 试试呢?

还是不行 会报超时


2022-04-08 13:45:58 ERROR [main] - Get Edge Error: java.net.SocketTimeoutException: Read timed out
Exception in thread "main" com.facebook.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out
	at com.facebook.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:116)
	at com.facebook.thrift.transport.TTransport.readAll(TTransport.java:75)
	at com.facebook.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:621)
	at com.facebook.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:479)
	at com.vesoft.nebula.meta.MetaService$Client.recv_getEdge(MetaService.java:989)
	at com.vesoft.nebula.meta.MetaService$Client.getEdge(MetaService.java:966)
	at com.vesoft.nebula.client.meta.MetaClient.getEdge(MetaClient.java:317)
	at com.vesoft.nebula.connector.nebula.MetaProvider.getEdge(MetaProvider.scala:41)
	at com.vesoft.nebula.connector.reader.NebulaSourceReader.getSchema(NebulaSourceReader.scala:70)
	at com.vesoft.nebula.connector.reader.NebulaSourceReader.getSchema(NebulaSourceReader.scala:36)
	at com.vesoft.nebula.connector.reader.NebulaDataSourceEdgeReader$$anonfun$2.apply(NebulaSourceReader.scala:121)
	at com.vesoft.nebula.connector.reader.NebulaDataSourceEdgeReader$$anonfun$2.apply(NebulaSourceReader.scala:120)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.Range.foreach(Range.scala:160)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at com.vesoft.nebula.connector.reader.NebulaDataSourceEdgeReader.planInputPartitions(NebulaSourceReader.scala:120)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:149)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:148)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:148)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:312)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:281)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:281)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:278)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:278)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:278)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:304)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:37)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:753)
	at NebulaReader2$.createNebulaEdgeDF(NebulaReader2.scala:132)
	at NebulaReader2$.main(NebulaReader2.scala:56)
	at NebulaReader2.main(NebulaReader2.scala)
Caused by: java.net.SocketTimeoutException: Read timed out
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
	at java.net.SocketInputStream.read(SocketInputStream.java:171)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
	at com.facebook.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:114)
	... 58 more


你看下 meta 和 storage 那边有没有日志,服务是不是正常的

通过今日日期查询日志 :metad 有日志 storaged 没有日志

./nebula.service status all
[INFO] nebula-metad(5b83e5cb): Running as 25624, Listening on 9559
[INFO] nebula-graphd(ca585878): Running as 24871, Listening on 9669
[INFO] nebula-storaged(5b83e5cb): Running as 29697, Listening on 9779

图数据库服务是正常的,
spark程序通过spark connector多次读取数据,
比方说一共需要分十次读取十个类型的边/点 数据,
但是可能读取到第六个类型就会失败,报timeout异常

异常的出现不是每次跑spark程序的时候都会触发

withBatch 试试把这个参数调小点看看会不会还报错呢?

这个版本只找到了withLimit 设置成了100

第二次读取的时候报了异常

+------------------+------------------+--------------+------+-------+
|_srcId            |_dstId            |_rank         |type  |is_show|
+------------------+------------------+--------------+------+-------+
|m_-158684912384925|m_646053138570445 |-8522836984831|STRING|false  |
|m_-158684912384925|m_-222716971653284|-8294635190954|STRING|true   |
+------------------+------------------+--------------+------+-------+
only showing top 2 rows

Exception in thread "main" com.facebook.thrift.transport.TTransportException: java.net.SocketTimeoutException: connect timed out
	at com.facebook.thrift.transport.TSocket.open(TSocket.java:175)
	at com.vesoft.nebula.client.meta.MetaClient.getClient(MetaClient.java:104)
	at com.vesoft.nebula.client.meta.MetaClient.doConnect(MetaClient.java:99)
	at com.vesoft.nebula.client.meta.MetaClient.connect(MetaClient.java:89)
	at com.vesoft.nebula.connector.nebula.MetaProvider.<init>(MetaProvider.scala:22)
	at com.vesoft.nebula.connector.reader.NebulaSourceReader.getSchema(NebulaSourceReader.scala:45)
	at com.vesoft.nebula.connector.reader.NebulaSourceReader.readSchema(NebulaSourceReader.scala:31)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:175)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:204)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
	at com.vesoft.nebula.connector.connector.package$NebulaDataFrameReader.loadEdgesToDF(package.scala:146)
	at NebulaReader2$.createNebulaEdgeDF(NebulaReader2.scala:130)
	at NebulaReader2$.main(NebulaReader2.scala:57)
	at NebulaReader2.main(NebulaReader2.scala)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at com.facebook.thrift.transport.TSocket.open(TSocket.java:170)
	... 13 more

Process finished with exit code 1

贴一下spark参数吧

val masterCores = '8'
val conenctionRetry = 8
val executeRetry = 8
val partitionNum = 5
val timeout = Int.MaxValue
 val sparkConf = new SparkConf()
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))

val spark = SparkSession
      .builder()
      .appName(this.getClass.getName)
      .master(master)
      .config(sparkConf)
      .config("spark.sql.shuffle.partitions", "10")
      .getOrCreate()

nebulaWriteVertexConfig 你全局找下这个呢,:thinking: 我看文档里面是有 withBatch 的,我特意去看了 2.5.1 的文档

啊 ,我这个只是在读取数据的阶段,使用的是ReadNebulaConfig

Read 好像是咩有,:thinking: 我给研发同学看看哈。稍等

辛苦了!

看着像是网络不稳定。
每次都是meta服务超时了,连接超时和read 超时。 你可以看下你的spark集群每台机器是否可以正常连接nebula的每个 meta服务。

这是本地开发遇到的问题,我换个网络环境试一下

遇到了一个新的问题,在做不同图空间数据迁移的时候,
数据写入过程出现了npe

    val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
      .builder()
      .withSpace(space)
      .withTag(tag)
      .withVidField("_vertexId")
      .withVidPolicy("hash")
//    .withVidAsProp(true)
      .withUser("root")
      .withPasswd("nebula")
//    .withBatch(1000)
      .withWriteMode(writeMode)
      .build()
    df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()

Exception in thread "main" java.lang.NullPointerException
	at java.util.Hashtable.put(Hashtable.java:460)
	at java.util.Properties.setProperty(Properties.java:166)
	at com.vesoft.nebula.connector.NebulaOptions$$anonfun$13.apply(NebulaOptions.scala:51)
	at com.vesoft.nebula.connector.NebulaOptions$$anonfun$13.apply(NebulaOptions.scala:51)
	at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
	at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
	at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
	at com.vesoft.nebula.connector.NebulaOptions.<init>(NebulaOptions.scala:51)
	at com.vesoft.nebula.connector.NebulaDataSource.getNebulaOptions(NebulaDataSource.scala:143)
	at com.vesoft.nebula.connector.NebulaDataSource.createWriter(NebulaDataSource.scala:63)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:255)
	at com.vesoft.nebula.connector.connector.package$NebulaDataFrameWriter.writeVertices(package.scala:248)
	at com.spark.MigrateData$.writeNebulaVerticesDF(MigrateData.scala:87)
	at com.spark.MigrateData$$anonfun$main$1.apply(MigrateData.scala:66)
	at com.spark.MigrateData$$anonfun$main$1.apply(MigrateData.scala:66)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at com.spark.MigrateData$.main(MigrateData.scala:66)
	at com.spark.MigrateData.main(MigrateData.scala)

@steam @nicole 辛苦有空帮忙看下

抱歉,早上巡版留下来你帖子的,- -,忘记看了,我现在去瞅瞅

:thinking: 这个报错大概是什么对象没有初始化,稍等我问问解决方法哈

试试这个帖子的方法呢,我看了下你们的报错信息是类似的

检查下里面的参数变量是不是传的为null