- nebula 版本:2.0
- 部署方式(分布式 / 单机 / Docker / DBaaS):单机
- 是否为线上版本:Y
- 硬件信息
- 磁盘( 推荐使用 SSD):SSD
- CPU、内存信息:CPU 32核,内存:64G
- 问题的具体描述:
使用Nebula Exchange 2.5.0导入kafka数据时抛出异常。
抛出异常的日志如下所示:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:100)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:67)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:91)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2962)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2972)
at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:149)
at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:128)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:128)
at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
抛出异常的位置:Exchange.scala main函数里data.get.cache()这一句。kafka获得的是流式数据,似乎不能直接这么读取?
val data = createDataSource(spark, tagConfig.dataSourceConfigEntry)
if (data.isDefined && !c.dry) {
data.get.cache()
val count = data.get.count()
val startTime = System.currentTimeMillis()
val batchSuccess =
spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
val batchFailure =
spark.sparkContext.longAccumulator(s"batchFailure.${tagConfig.name}")