Nebula Exchange 2.5.0导入kafka数据出错

  • nebula 版本:2.0
  • 部署方式(分布式 / 单机 / Docker / DBaaS):单机
  • 是否为线上版本:Y
  • 硬件信息
    • 磁盘( 推荐使用 SSD):SSD
    • CPU、内存信息:CPU 32核,内存:64G
  • 问题的具体描述:
    使用Nebula Exchange 2.5.0导入kafka数据时抛出异常。
    抛出异常的日志如下所示:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
        at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
        at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:100)
        at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:67)
        at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:91)
        at org.apache.spark.sql.Dataset.persist(Dataset.scala:2962)
        at org.apache.spark.sql.Dataset.cache(Dataset.scala:2972)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:149)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:128)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:128)
        at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

抛出异常的位置:Exchange.scala main函数里data.get.cache()这一句。kafka获得的是流式数据,似乎不能直接这么读取?

       val data = createDataSource(spark, tagConfig.dataSourceConfigEntry)
        if (data.isDefined && !c.dry) {
          data.get.cache()
          val count     = data.get.count()
          val startTime = System.currentTimeMillis()
          val batchSuccess =
            spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
          val batchFailure =
            spark.sparkContext.longAccumulator(s"batchFailure.${tagConfig.name}")

Nebula 的版本是 2.0 GA 吗?如果是的话,就是版本不对齐,Exchange 2.5.0 的话只能使用 Nebula v2.5.0 版本

我们暂时还没有升级到Nebula2.5.0版本。这个语句看起来只是获取了kafka的数据然后cache的时候报错了,都还没走到process和write,这个也和版本有关系吗… :joy:

因为。。我们是版本对齐的,我去喊研发看下是不是版本问题。

谢谢你的提醒,我对比下两个版本,问题就出在data.get.cache()这句话,把这个删掉就能成功执行了。 :rofl:流式数据应该不能直接这么读取,不过我也不是很懂这个,你可以找你们研发确定一下。

是的,感谢反馈。 你可以用exchange 2.1.0的,2.1.0是没有加cache的, 并且和你的nebula版本是对应的。

1 个赞