Star

exchange从hive导入数据报错

  • nebula 版本:1.1.0
  • 部署方式(分布式 / 单机 / Docker / DBaaS):分布式
  • 硬件信息
    • 磁盘( 必须为 SSD ,不支持 HDD) SSD
  • 问题的具体描述
    导入数据报错

hive 数据结构

spark-sql> desc tmp.nebule_test
         > ;
imei	string	NULL
channel	string	NULL
Time taken: 0.136 seconds, Fetched 2 row(s)

hive表数据:

862591037761561a        a_V3
1b8b3ac8bf9f57f0a	        a_V3
865166024592704a	a_V3
865166027775934a	a_V3
865579030855701a	a_V3

创建tag,edge

CREATE TAG imei_tag (imei string);
CREATE TAG channel_tag (channel string);
CREATE EDGE channel_tag_imei(channel string,imei string);

config文件

    {
  # Spark relation config
  spark: {
    app: {
      name: Spark Writer
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    cores {
      max: 4
    }
  }

  # Nebula Graph relation config
  nebula: {
    address:{
      graph:["172.20.19.63:3699"]
      meta:["172.20.19.143:45500"]
    }
    user: root
    pswd: ad123graph
    space: push_covering_test

    connection {
      timeout: 3000
      retry: 3
    }

    execution {
      retry: 3
    }

    error: {
      max: 32
      output: /tmp/error
    }

    rate: {
      limit: 1024
      timeout: 1000
    }
  }

  # Processing tags
  tags: [
    # Loading from Hive
    {
      name: imei_tag
      type: {
        source: hive
        sink: client
      }
      exec: "select imei from tmp.nebule_test"
      fields: [imei]
      nebula.fields: [imei]
      vertex: {
        field: imei
        policy: "hash"
      }
      vertex: imei
      batch: 256
      partition: 32
    }
    {
      name: channel_tag
      type: {
        source: hive
        sink: client
      }
      exec: "select channel from tmp.nebule_test group by channel"
      fields: [channel]
      nebula.fields: [channel]
      vertex: {
        field: channel
        policy: "hash"
      }
      vertex: channel
      batch: 256
      partition: 32
    }
  ]

  # Processing edges
  edges: [
    # Loading from Hive
    {
      name: channel_tag_imei
      type: {
        source: hive
        sink: client
      }
      exec: "select channel, imei from tmp.nebule_test"
      fields: [ channel, imei]
      nebula.fields: [ channel, imei]
      source: {
        field: channel
        policy: "hash"
      }
      target: {
        field: imei
        policy: "hash"
      }
      batch: 256
      partition: 10
    }
  ]
}

执行语句

bin/spark-submit --class com.vesoft.nebula.tools.importer.Exchange --master yarn --deploy-mode client /home/datasrv/chenyj/exchange-1.0.1.jar -c /home/datasrv/chenyj/test.conf -h
报错信息
2020-12-10 17:47:37,705 [main] INFO  hive.metastore - Connected to metastore.
[Stage 1:>                                                         (0 + 3) / 32]2020-12-10 17:48:14,146 [task-result-getter-0] WARN  org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 1.0 (TID 4, bjdz-hadoop01-node17.jpushoa.com, executor 1): java.lang.AssertionError: assertion failed: Not support non-Numeric type for vertex id
	at scala.Predef$.assert(Predef.scala:170)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$6.apply(VerticesProcessor.scala:244)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$6.apply(VerticesProcessor.scala:235)
	at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:236)
	at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:236)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1074)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor.com$vesoft$nebula$tools$importer$processor$VerticesProcessor$$processEachPartition(VerticesProcessor.scala:84)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$process$4.apply(VerticesProcessor.scala:274)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$process$4.apply(VerticesProcessor.scala:274)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

[Stage 1:>                                                         (0 + 2) / 32]2020-12-10 17:48:19,767 [task-result-getter-1] WARN  org.apache.spark.scheduler.TaskSetManager - Lost task 3.3 in stage 1.0 (TID 14, bjdz-hadoop01-node11.jpushoa.com, executor 3): java.lang.NumberFormatException: For input string: "868852040549110868852040549128"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Long.parseLong(Long.java:592)
	at java.lang.Long.parseLong(Long.java:631)
	at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
	at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
	at com.vesoft.nebula.tools.importer.processor.Processor$class.extraValue(Processor.scala:104)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor.extraValue(VerticesProcessor.scala:56)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$6$$anonfun$9.apply(VerticesProcessor.scala:258)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$6$$anonfun$9.apply(VerticesProcessor.scala:257)
	at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$6.apply(VerticesProcessor.scala:257)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$6.apply(VerticesProcessor.scala:235)
	at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:236)
	at org.apache.spark.sql.execution.MapElementsExec$$anonfun$7$$anonfun$apply$1.apply(objects.scala:236)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1074)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor.com$vesoft$nebula$tools$importer$processor$VerticesProcessor$$processEachPartition(VerticesProcessor.scala:84)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$process$4.apply(VerticesProcessor.scala:274)
	at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$process$4.apply(VerticesProcessor.scala:274)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

都是string类型,劳烦大佬帮我看看

1赞

你这个配置错了,要把 vertex:imei去掉

原因:你的vertex配置重复了。
对于vertex的配置有两种,当需要对vertex进行hash映射时,可按以下格式配置:

vertex:{
    field:imei
    policy:hash
}

当不需要对vertex进行hash映射时,可以直接配 vertex:imei

好的,谢谢,你们回复给力,我试试

  source: {
    field: hive-field-0
    policy: "hash"
  }

policy: “hash” ,这个是需要加双引号吗,我看你们的github的是要加双引号的

加不加都可的

你好 ,niocle

partition: 32

这个参数,是指写入点的分区吗,这个参数的大小,影响什么性能

partition是指spark的分区数,会影响性能,你可以理解为spark执行数据导入的并发数。

Spark执行任务的真实并发数是 min(partition数, 总的cores数)

浙ICP备20010487号