Spark读取图数据库内容遇到NullPointerException.

  • nebula 版本:Nebula v1.1

  • 部署方式(分布式 / 单机 / Docker / DBaaS):Docker

  • 硬件信息

    • 磁盘( 必须为 SSD ,不支持 HDD):SSD
  • 问题的具体描述
    使用Spark连接metad来读取存在Docker数据库里面的数据,代码如下,
    前面执行都正常,df.show()也能够看到数据,在执行df.count()时遇到报错:NullPointerException

  //代码段
  private def nebula: NebulaDataFrameReader = {
    spark.read.nebula(
      dbConfig.get("spark.graphdb.metad_address"),
      dbConfig.get("spark.graphdb.space"),
      graphDBClient.descSpace(dbConfig.get("spark.graphdb.space")).partitionNumber.toString
    )
  }
  val df = nebula.loadEdgesToDF(name, property)
  print(df.count())

完整报错内容:

  Caused by: java.lang.NullPointerException
	at com.vesoft.nebula.client.storage.StorageClientImpl.close(StorageClientImpl.java:971)
	at com.vesoft.nebula.tools.connector.reader.NebulaEdgeIterator.hasNext(NebulaEdgeIterator.scala:72)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:127)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1374)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

这个异常出现是因为scan过程异常了,在NPE之前你看下应该有个 Exception scanning vertex的异常日志,可以查看到scan异常的堆栈

很奇怪,并没有找到你说的Scanning Vertex的异常。。找到的第一个报错就是这个NPE…
很奇怪,似乎已经读取到了df,使用df.show()是能够看到数据的,但是做全量统计就有问题了。。。
我去翻metad / graphd / storaged的日志,似乎没有发现有什么报错迹象啊。。

你是编译打包后使用的吧,需要在com.vesoft.nebula.tools.connector.reader.NebulaEdgeIterator中把72行的关闭client 去掉,然后重新打包。
你试下看是不是可以。

我不是编译打包使用的呃。。我是直接用Maven引入的依赖……
能判断出来问题出在哪里么。。。是那个StorageClient判空逻辑有问题?

不是storageClient判空的逻辑,应该是内部的transport关闭时没有判空直接关闭了。

您好,请问你的spark跟docker部署的nebula是处于同一network么

主要想请教下这个

呃,我是处于同一个Network的……
有点奇怪,为啥你一个IP是192.168的网段地址,另一个是172.28网段地址??这两个都是私有地址不能直接互相访问的吧。。?不知道我理解有没有错。。
我这边都是非私有IP地址,都是全球唯一IP地址来的。。。

一个是宿主机的网络,一个是docker划分出的子网

你好,我发现了一个现象,看看对定位问题有没有帮助……

我这里StorageClient开了8个容器,使用show hosts可以看到8个节点。
启动Spark的时候,我习惯性的开了200个executor来处理,然后就遇到NPE,当executor降为10个的时候,好像偶尔就是正常的,我目前改为了开8个,跑了快俩小时了,暂时没出现问题。我猜是不是其他executor释放了StorageClient以后,然后另外executor再释放就出现NPE?

跟executor执行的任务有关,如果executor内 scan的最后一个part没有数据时,就会导致同时关闭metaClient和storageClient。这两个client共用一个transport,所以就出现了你最开始的错误。
我们客户端需要修改下

啊,定位到问题是不是修复就很快了。。~~
我八个executor跑了一上午了,连1/20都没跑完……啥时候能体验到200个executor并发的快感。。
PS:我是用Maven依赖的呃。。。

最快速的方式是 你把com.vesoft.nebula.tools.connector.reader.NebulaEdgeIterator中72行的storageClient.close去掉,然后走一下编译spark-connector的流程打包,就可以使用了

……关键是我不会。。
另外,如果storageClient没有执行close(),会有什么影响么?
另外,我看到代码里面关闭storageClient也阔以用这个:NebulaUtils.closeStorageClient(storageClient)
这个能不能解决那个NPE的问题。。?

可以根据这个文档走。需要变动是:
在执行第三步的mvn命令之前,先找到/nebula-java/tools/nebula-spark/src/main/scala/com/vesoft/nebula/tools/connector/reader/NebulaEdgeIterator.scala 这个文件,然后把第72行的if (storageClient != null) storageClient.close() 去掉。

之后再执行 mvn clean 。。。

2 个赞

看到了,非常感谢。。。
想了解一下为啥这个Spark Connector不支持不含属性的点和边的读取啊。。
我好像很多点和边都是。。不含属性的呃,大概什么时候会支持么?

2.0会支持 三种读取: 不含属性、含指定属性、含所有属性

3 个赞