kafka入nebula报错

提问参考模版:

  • nebula 版本:3.4
  • 部署方式:分布式
  • 安装方式: Docker
  • 是否上生产环境:Y
  • 硬件信息
    • 磁盘 SATA
    • CPU32核、内存128
  • 问题的具体描述
    使用spark-submit --master “local” --class com.vesoft.nebula.exchange.Exchange nebula-exchange_spark_2.4-3.4.0.jar -c nebula-kafka.conf 提交任务时报错
2023-05-18 14:51:12,489 INFO config.Configs$: Tag Config: Tag name: ca, source: Kafka source server: 192.168.200.15:19092,192.168.200.12:19092,192.168.200.19:19092 topic:virtual_currency_tags startingOffsets:latest maxOffsetsPerTrigger:None, sink: Nebula sink addresses: [192.168.200.15:9669, 192.168.200.19:9669, 192.168.200.12:9669], vertex field: addr, vertex policy: None, batch: 1000, partition: 10, repartitionWithNebula: true, enableTagless: false.
Exception in thread "main" java.lang.IllegalArgumentException: Can not define any other configs when kafka exists
	at com.vesoft.exchange.common.config.Configs$$anonfun$parse$3.apply(Configs.scala:446)
	at com.vesoft.exchange.common.config.Configs$$anonfun$parse$3.apply(Configs.scala:444)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at com.vesoft.exchange.common.config.Configs$.parse(Configs.scala:444)
	at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:74)
	at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2023-05-18 14:51:12,496 INFO util.ShutdownHookManager: Shutdown hook called
2023-05-18 14:51:12,497 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-47d2900f-4628-4151-8c55-8f8ccb8de742

上面的问题解决了,现在是新的问题

2023-05-18 16:26:28,882 INFO internal.SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/opt/spark-warehouse').
2023-05-18 16:26:28,883 INFO internal.SharedState: Warehouse path is 'file:/opt/spark-warehouse'.
2023-05-18 16:26:28,890 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@71dfcf21{/SQL,null,AVAILABLE,@Spark}
2023-05-18 16:26:28,891 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@b965857{/SQL/json,null,AVAILABLE,@Spark}
2023-05-18 16:26:28,891 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@11e33bac{/SQL/execution,null,AVAILABLE,@Spark}
2023-05-18 16:26:28,892 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@30501e60{/SQL/execution/json,null,AVAILABLE,@Spark}
2023-05-18 16:26:28,912 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5ff2e84b{/static/sql,null,AVAILABLE,@Spark}
2023-05-18 16:26:29,420 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
	at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:52)
	at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:63)
	at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:61)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:67)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:67)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:73)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:69)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:78)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:78)
	at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:100)
	at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:67)
	at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:91)
	at org.apache.spark.sql.Dataset.persist(Dataset.scala:2968)
	at org.apache.spark.sql.Dataset.cache(Dataset.scala:2978)
	at com.vesoft.nebula.exchange.Exchange$$anonfun$main$3.apply(Exchange.scala:197)
	at com.vesoft.nebula.exchange.Exchange$$anonfun$main$3.apply(Exchange.scala:182)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:182)
	at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

3.4.0有对数据做统计和缓存,流式数据集不支持这类操作, 你用3.5.0的工具,该版本去掉了这些统计。

用了3.5.0的jar 还是报错了

2023-05-19 15:49:06,832 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.net.HostAndPort.getHostText()Ljava/lang/String;
	at com.vesoft.exchange.common.MetaProvider$$anonfun$1.apply(MetaProvider.scala:38)
	at com.vesoft.exchange.common.MetaProvider$$anonfun$1.apply(MetaProvider.scala:37)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at com.vesoft.exchange.common.MetaProvider.<init>(MetaProvider.scala:37)
	at com.vesoft.nebula.exchange.processor.VerticesProcessor.process(VerticesProcessor.scala:112)
	at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:171)
	at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:138)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:138)
	at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

然后看网上一个方法Nebula Exchange与集群guava版本不一致问题 - 十点 - 博客园 报了以下错误

spark-submit  --master "local" --class com.vesoft.nebula.exchange.Exchange --driver-class-path /opt/guava-14.0.1.jar  --driver-library-path /opt/guava-14.0.1.jar --conf spark.executor.extraClassPath=/opt/guava-14.0.1.jar --conf spark.executor.extraLibraryPath=/opt/guava-14.0.1.jar  /opt/nebula-exchange_spark_2.4-3.5.0.jar  -c /opt/nebula-kafka-tag.conf 
Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
	at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)
	at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338)
	at org.apache.spark.deploy.SparkHadoopUtil$.org$apache$spark$deploy$SparkHadoopUtil$$appendS3AndSparkHadoopConfigurations(SparkHadoopUtil.scala:464)
	at org.apache.spark.deploy.SparkHadoopUtil$.newConfiguration(SparkHadoopUtil.scala:436)
	at org.apache.spark.deploy.SparkSubmit$$anonfun$2.apply(SparkSubmit.scala:323)
	at org.apache.spark.deploy.SparkSubmit$$anonfun$2.apply(SparkSubmit.scala:323)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:323)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:774)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

你本地环境中有其他版本的guava包,版本冲突了

我这边是spark2.4.6 请问推荐用哪个guava版本的包

@nicole

14.0

我用guava-14.0.1.jar也不行

你要把你spark环境、kafka环境中的guava包版本统一掉,如果同时存在多版本的 就会出现NOSuchMethodError