Nebula-Algorithm导入结果时出错

nebula 版本:2.6.1
部署方式:单机
安装方式: RPM
是否为线上版本:N
硬件信息
磁盘( 推荐使用 SSD)
CPU、内存信息

在利用Nebula-Algorithm进行算法结果写入的时候出现问题,不管是写入nebula还是csv。
报错位置信息:

21/11/25 10:56:16 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
21/11/25 10:56:16 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NumberFormatException: For input string: "player111"
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Long.parseLong(Long.java:589)
        at java.lang.Long.parseLong(Long.java:631)
        at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
        at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
        at com.vesoft.nebula.algorithm.utils.NebulaUtil$$anonfun$1.apply(NebulaUtil.scala:30)
        at com.vesoft.nebula.algorithm.utils.NebulaUtil$$anonfun$1.apply(NebulaUtil.scala:26)
        at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:236)
        at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:236)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107)
        at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1176)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1167)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1102)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1167)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:893)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

21/11/25 10:56:16 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
21/11/25 10:56:16 INFO reader.NebulaEdgePartitionReader: partition index: 2, scanParts: List(2)
21/11/25 10:56:16 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
21/11/25 10:56:16 INFO scheduler.TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled
21/11/25 10:56:16 INFO executor.Executor: Executor is trying to kill task 1.0 in stage 0.0 (TID 1), reason: Stage cancelled
21/11/25 10:56:16 INFO scheduler.TaskSchedulerImpl: Stage 0 was cancelled
21/11/25 10:56:16 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (mapPartitions at GraphImpl.scala:207) failed in 1.118 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NumberFormatException: For input string: "player111"

配置文件:

 {
   # Spark relation config
   spark: {
     app: {
         name: LPA
         # spark.app.partitionNum
         partitionNum:100
     }
     master:local
   }
 
   data: {
     # data source. optional of nebula,csv,json
     source: nebula
     # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
     sink: nebula
     # if your algorithm needs weight
     hasWeight: false
   }
 
   # Nebula Graph relation config
   nebula: {
     # algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
     read: {
         # Nebula metad server address, multiple addresses are split by English comma
         metaAddress: "10.154.109.68:9559"
         # Nebula space
         space: basketball
         # Nebula edge types, multiple labels means that data from multiple edges will union together
         labels: ["serve"]
         # Nebula edge property name for each edge type, this property will be as weight col for algorithm.
         # Make sure the weightCols are corresponding to labels.
         weightCols: ["start_year"]
     }
 
     # algo result sink into Nebula. If data.sink is nebula, then this nebula.write config can be valid.
     write:{
         # Nebula graphd server address, multiple addresses are split by English comma
         graphAddress: "xxxx:9669"
         # Nebula metad server address, multiple addresses are split by English comma
         metaAddress: "xxxx:9559"
         user:root
         pswd:password
         # Nebula space name
         space:basketball
         # Nebula tag name, the algorithm result will be write into this tag
         tag:pagerank
     }
   }
 
   local: {
     # algo's data source from Nebula. If data.source is csv or json, then this local.read can be valid.
     read:{
         filePath: "file:///tmp/algo_edge.csv"
         # srcId column
         srcId:"_c0"
         # dstId column
         dstId:"_c1"
         # weight column
         #weight: "col3"
         # if csv file has header
         header: false
         # csv file's delimiter
         delimiter:","
     }
 
     # algo result sink into local file. If data.sink is csv or text, then this local.write can be valid.
     write:{
         resultPath:/tmp/count
     }
   }

抱歉,vid 在 algorithm 里必须是 int 类型

https://docs.nebula-graph.com.cn/2.6.1/nebula-algorithm/

@nicole 我们之后会支持 string vid 么?

建议采用api调用算法库的形式跑算法, 直接用工具包的形式比较不灵活,因为打成的工具包不会对数据做任何预处理的。

你的问题跟导入结果没关系,那是在做计算的时候报出来的错误,文档中有说明 进入算法的边的源点和目标点id 必须是数值。

你参考这个帖子 Algorithm string id 转 long 的问题

1 个赞

算法本身不支持string vid的,后面可以在example中加一下如何通过api调用形式支持string vid。

1 个赞

原来如此,嗯嗯,有 example 就太棒了:+1:t2:

好的 感谢

如果你觉得 Nicole 的回复解决了你的问题,可以勾选她的回复为【解决方案】哈,谢谢 _Humz

此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。