使用spark-connector始终无法读取Nebula Graph数据,抛出ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object

  • nebula 版本:2.5.1
  • 部署方式:单机
  • 安装方式: RPM
  • 是否为线上版本:否
  • 硬件信息
    • 磁盘:SSD
    • CPU、内存信息:2核60G
  • spark connector 版本: 2.5.1
  • spark 版本:2.4.4
  • scala-sdk: 2.12.5
  • 端口:9559、9669、9779均开放
  • open files: (-n) 65535
  • 使用的是git上的模板代码,可以写入nebula数据,始终无法从nebula中读取数据
def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf
    sparkConf
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
    val spark = SparkSession
      .builder()
      .master("local")
      .config(sparkConf)
      .getOrCreate()

    readVertex(spark)
//    readEdges(spark)
//    readVertexGraph(spark)
//    readEdgeGraph(spark)

    spark.close()
    sys.exit()
  }

  def readVertex(spark: SparkSession): Unit = {
    LOG.info("start to read nebula vertices")
    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress("真实IP:9559")
        .withConenctionRetry(2)
        .build()
    val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("demo")
      .withLabel("person")
      .withNoColumn(false)
      .withReturnCols(List("name"))
      .withLimit(10)
      .withPartitionNum(10)
      .build()
    val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
    vertex.printSchema()
    vertex.show(20)
    println("vertex count: " + vertex.count())
  }
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
	at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:96)
	at com.vesoft.nebula.client.storage.scan.ScanVertexResultIterator.next(ScanVertexResultIterator.java:142)
	at com.vesoft.nebula.connector.reader.NebulaVertexPartitionReader.next(NebulaVertexPartitionReader.scala:67)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:751)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:710)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
	at com.vesoft.nebula.examples.connector.NebulaSparkReaderExample$.readVertex(NebulaSparkReaderExample.scala:60)
	at com.vesoft.nebula.examples.connector.NebulaSparkReaderExample$.main(NebulaSparkReaderExample.scala:32)
	at com.vesoft.nebula.examples.connector.NebulaSparkReaderExample.main(NebulaSparkReaderExample.scala)
Caused by: com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
	at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:96)
	at com.vesoft.nebula.client.storage.scan.ScanVertexResultIterator.next(ScanVertexResultIterator.java:142)
	at com.vesoft.nebula.connector.reader.NebulaVertexPartitionReader.next(NebulaVertexPartitionReader.scala:67)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

追加信息!!!
发现启动nebula后,文件打开数积增

  1. 第一个问题论坛有相同问题,应该是spark环境下访问不了 你在nebula中show hosts显示出来的地址。
  2. 你可以单独开个帖子问关于open files的问题。

1、是的,论坛有类似的问题,但是从你们的问答中并没有找到答案(如:https://discuss.nebula-graph.com.cn/t/topic/5324/15),且该帖已关闭,只能重新发贴。
回归问题本身,请教下,如果是spark环境下访问不了,那为什么可以通过spark写数据,但是通过其读数据就不行呢。

代码如下:

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf
    sparkConf
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
    val spark = SparkSession
      .builder()
      .master("local")
      .config(sparkConf)
      .getOrCreate()

    writeVertex(spark)
//    writeEdge(spark)

//    updateVertex(spark)
//    updateEdge(spark)

//    deleteVertex(spark)
//    deleteEdge(spark)

    spark.close()
  }

  /**
    * for this example, your nebula tag schema should have property names: name, age, born
    * if your withVidAsProp is true, then tag schema also should have property name: id
    */
  def writeVertex(spark: SparkSession): Unit = {
    LOG.info("start to write nebula vertices")
    val df = spark.read.json("example/src/main/resources/vertex")
    df.show()

    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress("真实IP:9559")
        .withGraphAddress("真实IP:9669")
        .withConenctionRetry(2)
        .build()
    val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
      .builder()
      .withSpace("demo")
      .withTag("person")
      .withVidField("id")
      .withVidAsProp(false)
      .withBatch(1000)
      .build()
    df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
  }
 INFO [main] - Starting job: show at NebulaSparkWriterExample.scala:56
 INFO [dag-scheduler-event-loop] - Got job 1 (show at NebulaSparkWriterExample.scala:56) with 1 output partitions
 INFO [dag-scheduler-event-loop] - Final stage: ResultStage 1 (show at NebulaSparkWriterExample.scala:56)
 INFO [dag-scheduler-event-loop] - Parents of final stage: List()
 INFO [dag-scheduler-event-loop] - Missing parents: List()
 INFO [dag-scheduler-event-loop] - Submitting ResultStage 1 (MapPartitionsRDD[6] at show at NebulaSparkWriterExample.scala:56), which has no missing parents
 INFO [dag-scheduler-event-loop] - Block broadcast_3 stored as values in memory (estimated size 11.5 KB, free 893.8 MB)
 INFO [dag-scheduler-event-loop] - Block broadcast_3_piece0 stored as bytes in memory (estimated size 6.0 KB, free 893.8 MB)
 INFO [dispatcher-event-loop-1] - Added broadcast_3_piece0 in memory on DESKTOP-8V852AQ:60740 (size: 6.0 KB, free: 894.3 MB)
 INFO [dag-scheduler-event-loop] - Created broadcast 3 from broadcast at DAGScheduler.scala:1161
 INFO [dag-scheduler-event-loop] - Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[6] at show at NebulaSparkWriterExample.scala:56) (first 15 tasks are for partitions Vector(0))
 INFO [dag-scheduler-event-loop] - Adding task set 1.0 with 1 tasks
 INFO [dispatcher-event-loop-0] - Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, PROCESS_LOCAL, 8292 bytes)
 INFO [Executor task launch worker for task 1] - Running task 0.0 in stage 1.0 (TID 1)
 INFO [Executor task launch worker for task 1] - Code generated in 7.3107 ms
 INFO [Executor task launch worker for task 1] - Finished task 0.0 in stage 1.0 (TID 1). 1469 bytes result sent to driver
 INFO [task-result-getter-1] - Finished task 0.0 in stage 1.0 (TID 1) in 39 ms on localhost (executor driver) (1/1)
 INFO [task-result-getter-1] - Removed TaskSet 1.0, whose tasks have all completed, from pool 
 INFO [dag-scheduler-event-loop] - ResultStage 1 (show at NebulaSparkWriterExample.scala:56) finished in 0.054 s
 INFO [main] - Job 1 finished: show at NebulaSparkWriterExample.scala:56, took 0.056048 s
+---+----------+---+-----+
|age|      born| id| name|
+---+----------+---+-----+
| 20|2000-01-01| 12|  Tom|
| 21|1999-01-02| 13|  Bob|
| 22|1998-01-03| 14| Jane|
| 23|1997-01-04| 15| Jena|
| 24|1996-01-05| 16|  Nic|
| 25|1995-01-06| 17|  Mei|
| 26|1994-01-07| 18|   HH|
| 27|1993-01-08| 19|Tyler|
| 28|1992-01-09| 20|  Ber|
| 29|1991-01-10| 21|Mercy|
+---+----------+---+-----+

 INFO [main] - NebulaWriteVertexConfig={space=demo,tagName=person,vidField=id,vidPolicy=null,batch=1000,writeMode=insert}
 INFO [main] - create writer
 INFO [main] - Pruning directories with: 
 INFO [main] - Post-Scan Filters: 
 INFO [main] - Output Data Schema: struct<age: bigint, born: string, id: bigint, name: string ... 2 more fields>
 INFO [main] - Pushed Filters: 
 INFO [main] - Block broadcast_4 stored as values in memory (estimated size 220.6 KB, free 893.6 MB)
 INFO [main] - Block broadcast_4_piece0 stored as bytes in memory (estimated size 20.7 KB, free 893.6 MB)
 INFO [dispatcher-event-loop-1] - Added broadcast_4_piece0 in memory on DESKTOP-8V852AQ:60740 (size: 20.7 KB, free: 894.2 MB)
 INFO [main] - Created broadcast 4 from save at package.scala:248
 INFO [main] - Planning scan with bin packing, max size: 4194847 bytes, open cost is considered as scanning 4194304 bytes.
 INFO [main] - Start processing data source writer: com.vesoft.nebula.connector.writer.NebulaDataSourceVertexWriter@661d49d1. The input RDD has 1 partitions.
 INFO [main] - Starting job: save at package.scala:248
 INFO [dag-scheduler-event-loop] - Got job 2 (save at package.scala:248) with 1 output partitions
 INFO [dag-scheduler-event-loop] - Final stage: ResultStage 2 (save at package.scala:248)
 INFO [dag-scheduler-event-loop] - Parents of final stage: List()
 INFO [dag-scheduler-event-loop] - Missing parents: List()
 INFO [dag-scheduler-event-loop] - Submitting ResultStage 2 (MapPartitionsRDD[8] at save at package.scala:248), which has no missing parents
 INFO [dag-scheduler-event-loop] - Block broadcast_5 stored as values in memory (estimated size 11.6 KB, free 893.6 MB)
 INFO [dag-scheduler-event-loop] - Block broadcast_5_piece0 stored as bytes in memory (estimated size 6.7 KB, free 893.6 MB)
 INFO [dispatcher-event-loop-0] - Added broadcast_5_piece0 in memory on DESKTOP-8V852AQ:60740 (size: 6.7 KB, free: 894.2 MB)
 INFO [dag-scheduler-event-loop] - Created broadcast 5 from broadcast at DAGScheduler.scala:1161
 INFO [dag-scheduler-event-loop] - Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[8] at save at package.scala:248) (first 15 tasks are for partitions Vector(0))
 INFO [dag-scheduler-event-loop] - Adding task set 2.0 with 1 tasks
 INFO [dispatcher-event-loop-1] - Starting task 0.0 in stage 2.0 (TID 2, localhost, executor driver, partition 0, PROCESS_LOCAL, 8292 bytes)
 INFO [Executor task launch worker for task 2] - Running task 0.0 in stage 2.0 (TID 2)
 INFO [Executor task launch worker for task 2] - switch space demo
 INFO [Executor task launch worker for task 2] - Commit authorized for partition 0 (task 2, attempt 0, stage 2.0)
 INFO [Executor task launch worker for task 2] - batch write succeed
 INFO [Executor task launch worker for task 2] - Committed partition 0 (task 2, attempt 0, stage 2.0)
 INFO [Executor task launch worker for task 2] - Finished task 0.0 in stage 2.0 (TID 2). 1124 bytes result sent to driver
 INFO [task-result-getter-2] - Finished task 0.0 in stage 2.0 (TID 2) in 464 ms on localhost (executor driver) (1/1)
 INFO [task-result-getter-2] - Removed TaskSet 2.0, whose tasks have all completed, from pool 
 INFO [dag-scheduler-event-loop] - ResultStage 2 (save at package.scala:248) finished in 0.477 s
 INFO [main] - Job 2 finished: save at package.scala:248, took 0.478719 s
 INFO [main] - Data source writer com.vesoft.nebula.connector.writer.NebulaDataSourceVertexWriter@661d49d1 is committing.
 INFO [main] - failed execs:
 List()
 INFO [main] - Data source writer com.vesoft.nebula.connector.writer.NebulaDataSourceVertexWriter@661d49d1 committed.
 INFO [main] - Stopped Spark@47a7a101{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
 INFO [main] - Stopped Spark web UI at http://DESKTOP-8V852AQ:4040
 INFO [dispatcher-event-loop-1] - MapOutputTrackerMasterEndpoint stopped!
 INFO [main] - MemoryStore cleared
 INFO [main] - BlockManager stopped
 INFO [main] - BlockManagerMaster stopped
 INFO [dispatcher-event-loop-0] - OutputCommitCoordinator stopped!
 INFO [main] - Successfully stopped SparkContext

show hosts如下:

2、关于发open files问题,我觉得和该问题可能存在必要的联系,故发出来方便你们排查定位。如无因果关系,请忽略 ^-^'。
​ ​

我也遇到了类似的问题,读取数据的时候,df.show一直不返回结果,就报ERROR ScanVertexResultIterator: get storage client error

1 个赞
  1. 关于帖子中已有问题: 单机部署 使用默认配置部署,spark connector 使用示例代码 读取nebula,报 ERROR ScanVertexResultIterator: get storage client error - #5,来自 nicole 本质都是一样的,确认你的spark环境是否能访问 你在nebula中show hosts 显示出来的地址。

读写调用的Nebula服务不一样,写数据调用graph服务,默认端口号9669, 读数据调用storage服务,默认端口号9779.

storage服务都是正常的,如下:
image

端口也可正常访问

image

我这边感觉是docker-compose部署的问题,端口指定应该是有问题


但是我这边都做了转发 :joy: 不知道怎么改一下

这里再补充一下,我使用spark-connector自带的docker-compose.yaml 在本地部署是可以连接访问的。但是,以同样的方式部署到服务器,获取数据仍然会出现get storage client error 报的错是超时,但是数据量并不大,用的是官方的basketball数据集。是不是在服务器部署不推荐使用docker-compose的形式?

虽然你的9779端口映射出来了,但meta返回的是容器内的地址。 connector内部使用的storage地址是show hosts中的地址。 你show hosts 贴一下。

ps:如果需要用到storage服务,不建议docker部署。

OK,明白了。docker-compose的yaml应该需要显式指定storage ip,我这边debug出来显示的ip是有问题,我再改一下,感谢!

你好,我也遇到了get storage client error的问题。
我在本地使用docker-compose启动nebula,在docker-compose.yaml中显式指定了端口映射。
telnet 127.0.0.1 9559/9669/9779都成功
show hosts的内容为(不知道为啥不能上传图片):


| HOST | Port | Status | Leader count | Leader distribution | Partition distribution |
| "storage0" | 9779 | "ONLINE" | 5 | "importer_test:5" | "importer_test:5" |
|"Total" |  |  |  5 | "importer_test:5" | "importer_test:5" |

请问如何解决呢?

@viper 是这样的,代码里获取 storage 地址是从 metaD 获取的,而 metaD 里的 storage 地址是 storage自己的配置里上报给 metaD 的,所以,您需要保证的是配置里的地址和 docker 暴露出来的 给 spark 访问的地址一定。

我的情况呢,我的Storage服务检查是没问题的。。避免重复复制粘贴,具体请看我上个回复,麻烦了。

哈哈,抱歉我以为楼主的已经解决了,原来是歪楼了。
其实和我回复上边同学的信息一致

  • storaged 的地址不是 client(spark)指定的,而是从 meta 读取出来的信息等价于 show hosts 的 host 列,我这里的例子是 domain name 的
(root@nebula) [basketballplayer]> show hosts storage
+-------------+------+----------+-----------+--------------+----------------------+
| Host        | Port | Status   | Role      | Git Info Sha | Version              |
+-------------+------+----------+-----------+--------------+----------------------+
| "storaged0" | 9779 | "ONLINE" | "STORAGE" | "07ccfde"    | "2021.11.03-nightly" |
| "storaged1" | 9779 | "ONLINE" | "STORAGE" | "07ccfde"    | "2021.11.03-nightly" |
| "storaged2" | 9779 | "ONLINE" | "STORAGE" | "07ccfde"    | "2021.11.03-nightly" |
+-------------+------+----------+-----------+--------------+----------------------+
Got 3 rows (time spent 2060/4074 us)
  • 如果我需要在代码里用 storage client 去读数据,就需要我的代码运行环境能够访达 show hosts 里的 host 加 port,这需要 dns 可解析(如果是非ip),ip 可访达,以及这个 ip 下的 port 确实是 storageD listen 的,您能保证以上么?

您好,抱歉我还是不太理解,从您上面提供的样例来看,host是"storaged0",那我如何知道storage0对应的IP是多少呢。另外,在spark脚本中,Reader可以配置的地址只有withMetaAddress(),这里应该填哪个呢?

我这里的情况比较特别,是用 docker compose 拉的,这里边 storage0 实际上是 storage0所在 的容器的域名,在 nebula 所在的 容器网络里,他是可解析的(dns),而且解析出来的地址是那个容器网络的地址。

所以 nicle 不推荐有 storage client 的场景去使用 docker部署,因为需要我们额外做一些工作(比如迫使 spark 运行在 nebula 容器网络里,或者手动加dns 解析让这个地址能转到真正的 storageD 的地址)

另外,在spark脚本中,Reader可以配置的地址只有withMetaAddress(),这里应该填哪个呢?

这就是我提到的,正常 storage 的地址是被 meta 服务发现获取的,而那个地址的信息来源正式 每一个 storage 的 conf,对于客户端来说,meta 地址是不变的,但是 storage 是可能变化的(比如扩容,增加了,替换了坏掉的),所以使用的方式是 spark client 只需要知道 meta 的地址,通过 meta 去获得 storage 的 host list。

直接回答你的问题 这里应该填哪个呢?这里填的就是 字面的 Meta 的地址(而不是 storage的地址)

要想使得 spark 能访问 storage的地址,需要保证 这些 meta 里存着的 storage 的地址是外部(spark)可以访达的。

1 个赞

你始终没贴show hosts的结果。
你的storage服务是好的,是你的应用程序访问不了 show hosts 显示的storage地址。 看你前面贴的服务的status信息应该不是docker部署的,那就要看你配置的ip是不是127.0.0.1了。 所以你贴一下show hosts结果就能知道问题所在了。

我完整描述一下我的需求以及目前遇到的问题吧,老师您可否帮看看如何部署或者如何配置比较合适。
1、将数据导入nebula-graph中 – 使用docker的nebula-importer;
2、将数据从nebula-graph转移至spark中 – 加载nebula-spark-connector然后编写scala程序;
3、使用graphX或者其他模块,对spark中的数据进行分析;
目前:
1、我使用docker-compose拉取了nebula-graph,并在docker-compose.yaml中显式指定了端口映射,比如"9669:9669";
2、我使用docker部署了spark;
3、我使用nebula-spark-connector时,遇到get store client error的问题;
非常感谢老师的帮助

确定问题并解决了。。。回滚事件
1、使用spark-connector发现,跑模板代码可写不可读。可写原因是,解析的地址是 client(spark)指定的,也就是我们代码指定的,便发现可成功运行。然后,会发现不可读,因为storaged 的地址不是 client(spark)指定的,meta 读取出来的信息等价于 show hosts 的 host 列,如下图。。
读取的是127.0.0.1:9779与storage构建的连接。如果是同一服务器玩没问题,但如果跑的代码(本地)与部署的nebula服务跨服务器就访问不了了(如我)。。。
我的解决方案如下:1)修改部署的nebula 三个服务有关配置文件(–meta_server_addrs,–local_ip),IP信息,2)重启

2、此时我以为好了,再跑代码发现依然有问题。。。经排查发现,我们修改IP后,在修改IP之前玩的所有数据均会失效。故重新跑数据,再读,便可生效!!!

1 个赞