exchange 2.0 java.lang.Integer cannot be cast to java.lang.String

请问: 配置文件哪里写的不符合规范

提问参考模版:

  • nebula 版本:2.0
  • exchange: 2.0
  • nosql
CREATE SPACE user_relate;
USE user_relate;
CREATE TAG ve(vid string);
CREATE EDGE ed();
  • csv
点文件:
1
2
3
...
50000

边文件
rand(1,50000),rand(1,50000) 1000w组随机数据
  • conf 文件
{
  # Spark relation config
  spark: {
    app: {
      name: Nebula Exchange 2.0
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    executor: {
        memory: 1G
    }

    cores:{
      max: 16
    }
  }

  nebula: {
    address:{
      graph:["192.168.10.188:3699"]
      meta:["192.168.10.188:45500"]
    }
    user: root
    pswd: nebula
    space: user_relate

    connection {
      timeout: 3000
      retry: 3
    }

    execution {
      retry: 3
    }

    error: {
      max: 32
      output: /tmp/errors
    }

    rate: {
      limit: 1024
      timeout: 1000
    }
  }

  tags: [

    {
      name: ve
      type: {
        source: csv
        sink: client
      }
      path: "hdfs://ip:port/user/lsz/user_relate/vertex.csv"
      fields: [_c0]
      nebula.fields: [vid]
      vertex: _c0
      separator: ","
      header: false
      batch: 256
      partition: 32
    }

  ]

  edges: [
    {
      name: ed
      type: {
        source: csv
        sink: client
      }
      path: "hdfs://ip:port/user/lsz/user_relate/edge.csv"
      fields: [_c0,_c1]
      nebula.fields: []
      source: {
        field: _c1
      }
      target: {
        field: _c0
      }
      separator: ","
      header: false
      batch: 256
      partition: 32
    }
  ]
}
  • 错误日志
20/12/18 17:42:29 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
	at org.apache.spark.sql.Row$class.getString(Row.scala:257)
	at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:166)
	at com.vesoft.nebula.tools.importer.processor.Processor$class.extraValue(Processor.scala:50)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor.extraValue(VerticesProcessor.scala:42)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$1$$anonfun$3.apply(VerticesProcessor.scala:120)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$1$$anonfun$3.apply(VerticesProcessor.scala:119)
	at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$1.apply(VerticesProcessor.scala:119)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$1.apply(VerticesProcessor.scala:100)
	at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:237)
	at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:237)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1074)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor.com$vesoft$nebula$tools$importer$processor$VerticesProcessor$$processEachPartition(VerticesProcessor.scala:69)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$process$2.apply(VerticesProcessor.scala:137)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$process$2.apply(VerticesProcessor.scala:137)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
20/12/18 17:42:29 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID 4, localhost, executor driver, partition 1, ANY, 7767 bytes)
20/12/18 17:42:29 INFO Executor: Running task 1.0 in stage 3.0 (TID 4)
20/12/18 17:42:29 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks including 1 local blocks and 0 remote blocks
20/12/18 17:42:29 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
20/12/18 17:42:29 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, localhost, executor driver): java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
	at org.apache.spark.sql.Row$class.getString(Row.scala:257)
	at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:166)
	at com.vesoft.nebula.tools.importer.processor.Processor$class.extraValue(Processor.scala:50)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor.extraValue(VerticesProcessor.scala:42)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$1$$anonfun$3.apply(VerticesProcessor.scala:120)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$1$$anonfun$3.apply(VerticesProcessor.scala:119)
	at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$1.apply(VerticesProcessor.scala:119)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$1.apply(VerticesProcessor.scala:100)
	at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:237)
	at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:237)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1074)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor.com$vesoft$nebula$tools$importer$processor$VerticesProcessor$$processEachPartition(VerticesProcessor.scala:69)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$process$2.apply(VerticesProcessor.scala:137)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$process$2.apply(VerticesProcessor.scala:137)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

20/12/18 17:42:29 ERROR TaskSetManager: Task 0 in stage 3.0 failed 1 times; aborting job
20/12/18 17:42:30 INFO NebulaPool: Get connection to 192.168.10.188:3699
20/12/18 17:42:30 INFO GraphProvider: switch space user_relate
20/12/18 17:42:30 INFO NebulaGraphClientWriter: Connection to List(192.168.10.188:45500)
20/12/18 17:42:30 INFO TaskSchedulerImpl: Cancelling stage 3
20/12/18 17:42:30 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage cancelled
20/12/18 17:42:30 INFO Executor: Executor is trying to kill task 1.0 in stage 3.0 (TID 4), reason: Stage cancelled
20/12/18 17:42:30 INFO TaskSchedulerImpl: Stage 3 was cancelled
20/12/18 17:42:30 INFO DAGScheduler: ResultStage 3 (foreachPartition at VerticesProcessor.scala:137) failed in 0.222 s due to Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, localhost, executor driver): java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
	at org.apache.spark.sql.Row$class.getString(Row.scala:257)
	at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:166)
	at com.vesoft.nebula.tools.importer.processor.Processor$class.extraValue(Processor.scala:50)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor.extraValue(VerticesProcessor.scala:42)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$1$$anonfun$3.apply(VerticesProcessor.scala:120)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$1$$anonfun$3.apply(VerticesProcessor.scala:119)
	at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$1.apply(VerticesProcessor.scala:119)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$1.apply(VerticesProcessor.scala:100)
	at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:237)
	at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:237)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1074)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor.com$vesoft$nebula$tools$importer$processor$VerticesProcessor$$processEachPartition(VerticesProcessor.scala:69)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$process$2.apply(VerticesProcessor.scala:137)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$process$2.apply(VerticesProcessor.scala:137)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
20/12/18 17:42:30 INFO Executor: Executor killed task 1.0 in stage 3.0 (TID 4), reason: Stage cancelled

2.0顶点目前不支持int类型,只支持字符串类型

能给我发你编译的exchange2.0,exchange1.0吗? 谢谢 :dizzy_face:

你拉这个pr中的代码去试下