nebula 版本:2.6.1
部署方式:单机
安装方式: RPM
是否为线上版本:N
硬件信息
磁盘( 推荐使用 SSD)
CPU、内存信息
在利用Nebula-Algorithm进行算法结果写入的时候出现问题,不管是写入nebula还是csv。
报错位置信息:
21/11/25 10:56:16 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
21/11/25 10:56:16 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NumberFormatException: For input string: "player111"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
at com.vesoft.nebula.algorithm.utils.NebulaUtil$$anonfun$1.apply(NebulaUtil.scala:30)
at com.vesoft.nebula.algorithm.utils.NebulaUtil$$anonfun$1.apply(NebulaUtil.scala:26)
at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:236)
at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:236)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107)
at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1176)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1167)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1102)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1167)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:893)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/11/25 10:56:16 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
21/11/25 10:56:16 INFO reader.NebulaEdgePartitionReader: partition index: 2, scanParts: List(2)
21/11/25 10:56:16 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
21/11/25 10:56:16 INFO scheduler.TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled
21/11/25 10:56:16 INFO executor.Executor: Executor is trying to kill task 1.0 in stage 0.0 (TID 1), reason: Stage cancelled
21/11/25 10:56:16 INFO scheduler.TaskSchedulerImpl: Stage 0 was cancelled
21/11/25 10:56:16 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (mapPartitions at GraphImpl.scala:207) failed in 1.118 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NumberFormatException: For input string: "player111"
配置文件:
{
# Spark relation config
spark: {
app: {
name: LPA
# spark.app.partitionNum
partitionNum:100
}
master:local
}
data: {
# data source. optional of nebula,csv,json
source: nebula
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
sink: nebula
# if your algorithm needs weight
hasWeight: false
}
# Nebula Graph relation config
nebula: {
# algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
read: {
# Nebula metad server address, multiple addresses are split by English comma
metaAddress: "10.154.109.68:9559"
# Nebula space
space: basketball
# Nebula edge types, multiple labels means that data from multiple edges will union together
labels: ["serve"]
# Nebula edge property name for each edge type, this property will be as weight col for algorithm.
# Make sure the weightCols are corresponding to labels.
weightCols: ["start_year"]
}
# algo result sink into Nebula. If data.sink is nebula, then this nebula.write config can be valid.
write:{
# Nebula graphd server address, multiple addresses are split by English comma
graphAddress: "xxxx:9669"
# Nebula metad server address, multiple addresses are split by English comma
metaAddress: "xxxx:9559"
user:root
pswd:password
# Nebula space name
space:basketball
# Nebula tag name, the algorithm result will be write into this tag
tag:pagerank
}
}
local: {
# algo's data source from Nebula. If data.source is csv or json, then this local.read can be valid.
read:{
filePath: "file:///tmp/algo_edge.csv"
# srcId column
srcId:"_c0"
# dstId column
dstId:"_c1"
# weight column
#weight: "col3"
# if csv file has header
header: false
# csv file's delimiter
delimiter:","
}
# algo result sink into local file. If data.sink is csv or text, then this local.write can be valid.
write:{
resultPath:/tmp/count
}
}