使用spark-connector 读取Nebula3报错 no parts succeed, error message: Unable to activate object

  • nebula 版本:3.1.0
  • 部署方式:腾讯云单机部署
  • 安装方式: RPM
  • 是否为线上版本: N
  • 硬件信息
    • 磁盘 100G
    • CPU: 4核
    • 内存信息 16G
  • 问题的具体描述
    使用spark-connector 读取Nebula 中数据去构建graphx 图去做图计算,读取的时候报如下:
Caused by: com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
	at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:99)
	at com.vesoft.nebula.client.storage.scan.ScanVertexResultIterator.next(ScanVertexResultIterator.java:143)
	at com.vesoft.nebula.connector.reader.NebulaVertexPartitionReader.next(NebulaVertexPartitionReader.scala:66)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
22/08/10 16:01:38 ERROR ScanVertexResultIterator: get storage client error, 
java.util.NoSuchElementException: Unable to activate object
	at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:400)
	at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:277)
	at com.vesoft.nebula.client.storage.StorageConnPool.getStorageConnection(StorageConnPool.java:41)
	at com.vesoft.nebula.client.storage.scan.ScanVertexResultIterator.lambda$next$0(ScanVertexResultIterator.java:88)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.facebook.thrift.transport.TTransportException: java.net.ConnectException: Connection timed out: connect
	at com.facebook.thrift.transport.TSocket.open(TSocket.java:206)
	at com.vesoft.nebula.client.storage.GraphStorageConnection.open(GraphStorageConnection.java:66)
	at com.vesoft.nebula.client.storage.StorageConnPoolFactory.activateObject(StorageConnPoolFactory.java:58)
	at com.vesoft.nebula.client.storage.StorageConnPoolFactory.activateObject(StorageConnPoolFactory.java:15)
	at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:391)
	... 8 more
Caused by: java.net.ConnectException: Connection timed out: connect
	at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200)
	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)
	at java.net.Socket.connect(Socket.java:606)
	at com.facebook.thrift.transport.TSocket.open(TSocket.java:201)
	... 12 more
22/08/10 16:01:38 ERROR ScanVertexResultIterator: get storage client error, 

相关读取的代码如下:

 private[this] def readVertexGraph() = {
    println("start to read graphx vertex")
    val space = configs.nebulaConfig.readConfigEntry.space
    println("space:" + space)
    val vertexLabels = configs.nebulaConfig.readConfigEntry.vertexLabels
    var vertexRDD: RDD[(VertexID, PropertyValues)] = null
    if(vertexLabels.nonEmpty) {
      for(i <- vertexLabels.indices) {
        val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
          .builder()
          .withSpace(space)
          .withLabel(vertexLabels(i))
          .withNoColumn(false)
          .withLimit(10)
          .withPartitionNum(10)
          .build()
        if (vertexRDD == null) {
          vertexRDD = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToGraphx()
        } else {
          val df = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToGraphx()
          vertexRDD = vertexRDD.union(df)
        }
      }
    }
    vertexRDD.collect().foreach(println(_))
    vertexRDD
  }

程序的配置地址如下:
image

show hosts 显示出来的地址如下

从日志内容上看,是没有获取到storage 客户端,也参考了论坛其他小伙伴遇到类似的问题进行排查,保证spark 可以访问到show hosts 的主机地址,目前看下来是可以正常访问的,我们部署方式是腾讯云部署的,相关的9559 9669 9779 端口已开

我又把spark-connector的代码拉下来,编译完,在源码中的example改了下读取nebula的配置信息换成我们在腾讯云上部署的Nebula,还是报一样的错误,如下:

在idea 读取部署在公网的Nebula 数据,相关的9559,9669,9779 已确认开启,读取配置的meta 的ip 地址填写的也是公网IP 地址

不知道是不是方便比如用 python 的 storage client 测试一下真的能通否

python 的没试过,但是java 客户端连接是正常的,我看到社区有人用java api 连接也遇到类似的错误,我们小伙伴使用java 是没有问题的,我使用spark 就有问题了

我想了解一下其他帖子回复的要确认saprk 环境是否能连接到show hosts 所在的机器地址这句话咋理解? spark 程序 local 模式执行的,也就是本地, Nebula 部署在腾讯云 上,响应的端口也开了, 程序需要的配置也就是一个mataAddress,我配置的是公网ip,然后拉了下spark-connector源码,改了下源码的meta配置,space等信息,读取也是报一模一样的问题,就是没理解其他帖子的 确认saprk 环境是否能连接到show hosts 所在的机器地址 这句话啥意思

  • show hosts 里列出来的是 meta 里维护的 storaged 服务发现的地址,对应的是 stroaged 自己配置的地址
  • storage client 是从 meta 里取这个列表去访问的。

这里蕴含的假设是,storage client 可以访问 meta 能访问的 storageD 地址,如果不能访问就会出问题。

我在这里也有这个服务发现的假设的详细一点的介绍

如上,这里需要你 spark 里能够访问 metad,storaged 里配置的网络 ip,所以我建议你在spark同样的网络上用 storage client 联通排除这个问题

show hosts 查出来的是内网的ip地址,metad,storage配置的网络IP 也是对应的内网ip,而我在spark 同样的网络用storage client联通,这句话也没明白,是需要我的本地程序打包到nebula 跟nebula服务网络通的ip机器上或者nebula 服务节点执行?

就是本地spark 程序不能远程读取Nebula的数据?是这个意思么?因为开发的时候肯定是local 模式,nebula 服务部署在服务端的

如何在spark同样的网络上用 storage client 联通排除这个问题? 具体需要我的本地程序做什么改动才能访问到云上的Nebula呢?

意思是

  1. 你先在 nebula console 中执行下show hosts命令,该命令显示的是nebula metad服务所暴露出来的 storaged服务的地址。
  2. 然后确认 你本地是否可以联通 这个metad服务所暴露出来的storaged地址。
1 个赞
  1. show hosts 命令显示出来的地址见上图,这里再贴一下:

因为我们服务部署在腾讯云上,Nebula show hosts 命令执行出来的是内网的ip地址,而程序访问的是公网的ip地址,响应的9559,9669,9779已开通公网访问
2. 确认本地联通meta服务所暴露出来的storaged的地址,测试如下:

结果如下:

这样看下来网络是通的,Nebula 服务我们单机部署,是可以连接的

这只是看了 meta 的地址,没有验证 x.206.9.12:9779 的 storage 地址

这个是腾讯云上对应的内网地址,肯定本地是不通的呀,如下:


按我的理解我程序肯定是访问的公网地址,我网络通的是对应的公网地址,而且这公网和内网ip 对应的其实就是一台Nebula 服务的机器,上面部署了Nebula 的storaged,metad,graphd服务,为啥一定要保证内网ip 通?还是需要把服务配置的内网ip 也改为公网ip,然后重启服务?我觉得这样就不是很合理了…

是这样的,我们的Nebula 服务在部署的时候,配置文件填写的是内网地址,因为部署在公有云,外网访问是通过公网ip 访问的,公有云将请求转发到内网服务上,这内外网两个ip本身也就是一一对应的关系,服务也就在这一台机器上,难道Nebula 服务部署的时候,必须都填公网ip,不能设置内网ip?

我把Nebula 服务部署的配置文件全部换成公网ip地址,发现Nebula 服务都起不起来了…

日志报错如下:

所以就是这里的问题,需要能够直连 meta 里维护的 storageD 的地址,所以如果有 spark 连接需求,需要 storage 配置的地址是外部可达的

现在服务是用 ip 识别实例的哈,修改 ip 会比较麻烦,不知道有没有条件清理干净重新拉起来重新导数据?

嗯嗯,这个ip必须是一个网卡的地址,如果是 floating ip 这种 nat 的,还真不行,能不能把 spark 运行在 公有云的 vpc 内呢?

我的理解有问题么? 程序肯定访问的是公网ip,公有云不就应该这么部署么?服务配置的是内网ip,然后通过公网去访问,重新清理不太现实了,我们比较多的场景都在用,不是很好弄