调用示例图计算示例失败 nebula-algorithm

我在调用示例时候失败
示例链接:nebula-up/pagerank_example.ipynb at 7ab5cfd0e5d2a0d9ecd621d5dc1f51c83fc0aed6 · wey-gu/nebula-up · GitHub

error:

23/04/13 20:34:54 WARN BlockManager: Putting block rdd_9_0 failed due to exception java.lang.NumberFormatException: For input string: "player105".
23/04/13 20:34:54 WARN BlockManager: Block rdd_9_0 could not be removed as it was not found on disk or in memory
23/04/13 20:34:54 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NumberFormatException: For input string: "player105"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Long.parseLong(Long.java:589)
	at java.lang.Long.parseLong(Long.java:631)
	at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
	at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
	at com.vesoft.nebula.algorithm.utils.NebulaUtil$$anonfun$1.apply(NebulaUtil.scala:29)
	at com.vesoft.nebula.algorithm.utils.NebulaUtil$$anonfun$1.apply(NebulaUtil.scala:25)
	at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:236)
	at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:236)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107)
	at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
23/04/13 20:34:54 WARN BlockManager: Putting block rdd_9_0 failed due to exception java.lang.NumberFormatException: For input string: "player105".
23/04/13 20:34:54 WARN BlockManager: Block rdd_9_0 could not be removed as it was not found on disk or in memory
23/04/13 20:34:54 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.NumberFormatException: For input string: "player105"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Long.parseLong(Long.java:589)
	at java.lang.Long.parseLong(Long.java:631)
	at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
	at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
	at com.vesoft.nebula.algorithm.utils.NebulaUtil$$anonfun$1.apply(NebulaUtil.scala:29)
	at com.vesoft.nebula.algorithm.utils.NebulaUtil$$anonfun$1.apply(NebulaUtil.scala:25)
	at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:236)
	at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:236)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107)
	at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
23/04/13 20:34:54 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): java.lang.NumberFormatException: For input string: "player105"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Long.parseLong(Long.java:589)
...

补充下相关的版本号。

Spark-2.4.3
NebulaGraph-3.2.1
NebulaGraph Spark Connector-3.0.0
NebulaGraph Algorithm-3.0.0

你用的algorithm版本中没有把id编码做到算法内部,需要在执行算法前额外调用一个数据编码的接口。 3.0.1 是做在算法里面的,用户不需要感知的。

有两种方式:

  1. 采用现在的3.0.0algorithm版本,在调算法之前对df数据集做一下id转换,如下:
    df_int = convert_string_id_to_long_id(df)
    prResult = spark._jvm.PageRankAlgo.apply(jspark, df_int._jdf, prConfig, False)
  2. 换成algorithm3.0.1 版本
1 个赞

Spark-2.4.3
NebulaGraph-3.2.1
NebulaGraph Spark Connector-3.0.0
NebulaGraph Algorithm-3.0.0
好的,感谢指导,刚看了源码确实在3.0.0的基础上增加了ID convert,我再尝试下。而且文档中也提到过了。再次感谢

1 个赞

df = spark.read.format(
“com.vesoft.nebula.connector.NebulaDataSource”).option(
“type”, “edge”).option(
“spaceName”, “basketballplayer”).option(
“label”, “follow”).option(
“returnCols”, “degree”).option(
“metaAddress”, “metad0:9559”).option(
“partitionNumber”, 3).load()
1、对于某个具体的spaceName下,会存在多个label,我如何使用NebulaGraph Spark Connector把这些label一次都上来,或者多个lable如何一次读取
2、对于读上来的数据,我看_rank是nebula的rank,本质是两个节点之间的权重信息?:

+------------------+------------------+------+
|            _srcId|            _dstId| _rank
+------------------+------------------+------+
|508317857643536384|508563381382586368|106204
|508567792813121536|508317857643536384|106205
+------------------+------------------+------+

还望指导,感谢

  1. 将多个label都捞出来,那需要分别调多次你的spark.read.format.load, 得到多个df,分别union起来得到一个全量的数据集。
  2. rank的含义参考我们文档描述 数据模型 - NebulaGraph Database 手册

那某个具体的spacename下的label怎么获取?只能通过nebula-python查询获取么?还是有其他途径?
另外这个rank值在图计算的时候相当于边的权重?
刚搞入门,所以问题稍显幼稚 :joy:

你是用python client啊, python client没meta client, 你只能用show tags和show edges 自己来解析查询结果ResultSet了
rank是否要当边的权重要看你们的业务,如果你们插入数据的时候 rank没有任何意义,那么合理的应该是某个数值型的属性来作为权重。

1、不是哈,当前每次只能读取一个label,那我每个spacename会对应多个label,那这些label我怎么获取?这个意思哈
df = spark.read.format(
“com.vesoft.nebula.connector.NebulaDataSource”).option(
“type”, “edge”).option(
“spaceName”, “basketballplayer”).option(
“label”, “follow”).option(
“returnCols”, “degree”).option(
“metaAddress”, “metad0:9559”).option(
“partitionNumber”, 3).load()

NebulaGraph Algorithm是不是不支持自定义边的权重大小以供图计算啊,

当前读取nebula的数据中有_rank这一列数据,我想使得_rank作为我的权重数据,但是我发现
org.apache.spark.graphx.lib.PageRank 源码中对于pagerank的初始化压根没有用到这个rank值啊
image

是我理解的有问题么?还望赐教 :joy:

  1. 获取多个label,就执行多次df=spark.read.format, 然后将多个label的df进行union。
  2. algorithm 支持自定义边的属性作为权重,不一定是rank,绝大多数情况下是属性而非rank。
1 个赞