nebula-exchange工具通过spark读取kafka数据异常

  • nebula 版本:v3.5.0
  • 部署方式:分布式
  • 安装方式: Docker
  • 是否上生产环境:Y
  • 硬件信息
    • 磁盘 SATA
    • 32C 128G
  • 问题的具体描述
    spark-submit --master “local” --class com.vesoft.nebula.exchange.Exchange nebula-exchange_spark_2.4-3.5.0.jar -c nebula-kafka-tag.conf
    使用该语句时,报错,但是项目中没有用到hive
23/07/18 18:51:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
log4j:WARN No appenders could be found for logger (com.vesoft.exchange.common.config.Configs$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
23/07/18 18:51:15 INFO SparkContext: Running Spark version 2.4.7
23/07/18 18:51:15 INFO SparkContext: Submitted application: com.vesoft.nebula.exchange.Exchange
23/07/18 18:51:15 INFO SecurityManager: Changing view acls to: root
23/07/18 18:51:15 INFO SecurityManager: Changing modify acls to: root
23/07/18 18:51:15 INFO SecurityManager: Changing view acls groups to: 
23/07/18 18:51:15 INFO SecurityManager: Changing modify acls groups to: 
23/07/18 18:51:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
23/07/18 18:51:15 INFO Utils: Successfully started service 'sparkDriver' on port 34359.
23/07/18 18:51:15 INFO SparkEnv: Registering MapOutputTracker
23/07/18 18:51:15 INFO SparkEnv: Registering BlockManagerMaster
23/07/18 18:51:15 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/07/18 18:51:15 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/07/18 18:51:15 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-611daaf9-4f28-409e-84b3-51b5b19b01a9
23/07/18 18:51:15 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
23/07/18 18:51:15 INFO SparkEnv: Registering OutputCommitCoordinator
23/07/18 18:51:15 INFO Utils: Successfully started service 'SparkUI' on port 4040.
23/07/18 18:51:15 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://0bda1ba804d9:4040
23/07/18 18:51:15 INFO SparkContext: Added JAR file:/opt/nebula-exchange_spark_2.4-3.5.0.jar at spark://0bda1ba804d9:34359/jars/nebula-exchange_spark_2.4-3.5.0.jar with timestamp 1689677475981
23/07/18 18:51:16 INFO Executor: Starting executor ID driver on host localhost
23/07/18 18:51:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37502.
23/07/18 18:51:16 INFO NettyBlockTransferService: Server created on 0bda1ba804d9:37502
23/07/18 18:51:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/07/18 18:51:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 0bda1ba804d9, 37502, None)
23/07/18 18:51:16 INFO BlockManagerMasterEndpoint: Registering block manager 0bda1ba804d9:37502 with 366.3 MB RAM, BlockManagerId(driver, 0bda1ba804d9, 37502, None)
23/07/18 18:51:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 0bda1ba804d9, 37502, None)
23/07/18 18:51:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 0bda1ba804d9, 37502, None)
23/07/18 18:51:16 INFO Exchange$: Processing Tag ca
23/07/18 18:51:16 INFO Exchange$: field keys: addr, pId, pName, certificationCode, certificationType, bussinessCenterType, tags
23/07/18 18:51:16 INFO Exchange$: nebula keys: addr, pId, pName, certificationCode, certificationType, bussinessCenterType, tags
23/07/18 18:51:16 INFO Exchange$: Loading from Kafka 192.168.200.15:19092,192.168.200.12:19092,192.168.200.19:19092 and subscribe virtual_currency_tags_new02
23/07/18 18:51:16 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/opt/spark-warehouse/').
23/07/18 18:51:16 INFO SharedState: Warehouse path is 'file:/opt/spark-warehouse/'.
23/07/18 18:51:17 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
Exception in thread "main" java.lang.IllegalArgumentException: property name certificationType is not defined in NebulaGraph
	at com.vesoft.exchange.common.utils.NebulaUtils$$anonfun$getDataSourceFieldType$1.apply(NebulaUtils.scala:50)
	at com.vesoft.exchange.common.utils.NebulaUtils$$anonfun$getDataSourceFieldType$1.apply(NebulaUtils.scala:47)
	at scala.collection.immutable.Range.foreach(Range.scala:160)
	at com.vesoft.exchange.common.utils.NebulaUtils$.getDataSourceFieldType(NebulaUtils.scala:47)
	at com.vesoft.nebula.exchange.processor.VerticesProcessor.process(VerticesProcessor.scala:113)
	at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:171)
	at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:138)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:138)
	at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
23/07/18 18:51:18 INFO SparkContext: Invoking stop() from shutdown hook
23/07/18 18:51:18 INFO SparkUI: Stopped Spark web UI at http://0bda1ba804d9:4040
23/07/18 18:51:18 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/07/18 18:51:18 INFO MemoryStore: MemoryStore cleared
23/07/18 18:51:18 INFO BlockManager: BlockManager stopped
23/07/18 18:51:18 INFO BlockManagerMaster: BlockManagerMaster stopped
23/07/18 18:51:18 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/07/18 18:51:18 INFO SparkContext: Successfully stopped SparkContext
23/07/18 18:51:18 INFO ShutdownHookManager: Shutdown hook called
23/07/18 18:51:18 INFO ShutdownHookManager: Deleting directory /tmp/temporaryReader-aed75ba2-e7ea-492c-8a3c-1b57fe31e142
23/07/18 18:51:18 INFO ShutdownHookManager: Deleting directory /tmp/spark-78e572d1-f9e2-4021-a5f3-5c9fd42292be
23/07/18 18:51:18 INFO ShutdownHookManager: Deleting directory /tmp/spark-9045dd43-52eb-4eb0-b77a-05f4d9d916a5

这个提示很清楚啊

1 个赞

看到了, 多谢

此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。