使用Exchange导入Hive数据,报错 java.io.EOFException: Cannot seek after EOF

  • nebula 版本:2.6.1
  • nebula-exchange版本:2.6.0
  • 部署方式:分布式
  • 安装方式:源码编译
  • 是否为线上版本: N
  • 问题的具体描述
    使用Nebula-Exchange从Hive导入数据一直报错

此前用Nebula-Exchange从Hive导数据是成功的(Nebula和Exchange版本均为2.5.0),最近版本升级到2.6.1后,导入数据一直报错,报错日志如下

========================================
Start ScribeAppender thread.
21/11/15 23:52:39 WARN SparkConf: The configuration key 'spark.scheduler.executorTaskBlacklistTime' has been deprecated as of Spark 2.1.0 and may be removed in the future. Please use the new blacklisting options, spark.blacklist.*
[Stage 2:===============================================>   (1757 + 138) / 1879]21/11/15 23:59:47 ERROR TaskSetManager: Task 1876 in stage 2.0 failed 10 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1876 in stage 2.0 failed 10 times, most recent failure: Lost task 1876.9 in stage 2.0 (TID 2885, xyz7850.hadoop.data.test.com.cn, executor 24): java.io.EOFException: Cannot seek after EOF
        at org.apache.hadoop.hdfs.DFSInputStream.seek(DFSInputStream.java:1524)
        at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:62)
        at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:107)
        at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:65)
        at org.apache.spark.sql.execution.datasources.text.TextFileFormat$$anonfun$readToUnsafeMem$1.apply(TextFileFormat.scala:119)
        at org.apache.spark.sql.execution.datasources.text.TextFileFormat$$anonfun$readToUnsafeMem$1.apply(TextFileFormat.scala:116)
        at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
        at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at com.vesoft.nebula.exchange.processor.ReloadProcessor.com$vesoft$nebula$exchange$processor$ReloadProcessor$$processEachPartition(ReloadProcessor.scala:48)
        at com.vesoft.nebula.exchange.processor.ReloadProcessor$$anonfun$process$1.apply(ReloadProcessor.scala:28)
        at com.vesoft.nebula.exchange.processor.ReloadProcessor$$anonfun$process$1.apply(ReloadProcessor.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2107)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2107)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:410)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:416)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1910)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1898)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1897)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1897)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2131)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2080)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2069)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2088)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2107)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2132)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:980)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:978)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:978)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2743)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2743)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2743)
        at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3357)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:81)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:128)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3353)
        at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2742)
        at com.vesoft.nebula.exchange.processor.ReloadProcessor.process(ReloadProcessor.scala:28)
        at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:215)
        at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.EOFException: Cannot seek after EOF
        at org.apache.hadoop.hdfs.DFSInputStream.seek(DFSInputStream.java:1524)
        at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:62)
        at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:107)
        at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:65)
        at org.apache.spark.sql.execution.datasources.text.TextFileFormat$$anonfun$readToUnsafeMem$1.apply(TextFileFormat.scala:119)
        at org.apache.spark.sql.execution.datasources.text.TextFileFormat$$anonfun$readToUnsafeMem$1.apply(TextFileFormat.scala:116)
        at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
        at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at com.vesoft.nebula.exchange.processor.ReloadProcessor.com$vesoft$nebula$exchange$processor$ReloadProcessor$$processEachPartition(ReloadProcessor.scala:48)
        at com.vesoft.nebula.exchange.processor.ReloadProcessor$$anonfun$process$1.apply(ReloadProcessor.scala:28)
        at com.vesoft.nebula.exchange.processor.ReloadProcessor$$anonfun$process$1.apply(ReloadProcessor.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2107)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2107)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:410)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:416)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
21/11/15 23:59:47 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
        at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160)
        at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:140)
        at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:655)
        at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:274)
        at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:105)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
21/11/15 23:59:47 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
        at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160)
        at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:140)
        at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:655)
        at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:274)
        at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:105)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
21/11/15 23:59:47 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
        at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160)
        at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:140)
        at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:655)
        at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:274)
        at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:105)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

conf文件如下:

{
  # Spark相关配置
  spark: {
    app: {
      name: Nebula Exchange 2.6.0
    }
    driver: {
      cores: 24
      # maxResultSize: 1G
    }
    cores {
      # max: 16
    }
  }

  # 如果Spark和Hive部署在不同集群,才需要配置连接Hive的参数,否则请忽略这些配置。
  #hive: {
  #  waredir: "hdfs://NAMENODE_IP:9000/apps/svr/hive-xxx/warehouse/"
  #  connectionURL: "jdbc:mysql://your_ip:3306/hive_spark?characterEncoding=UTF-8"
  #  connectionDriverName: "com.mysql.jdbc.Driver"
  #  connectionUserName: "user"
  #  connectionPassword: "password"
  #}

  # Nebula Graph相关配置
  nebula: {
    address:{
      # 以下为Nebula Graph的Graph服务和所有Meta服务所在机器的IP地址及端口。
      # 如果有多个地址,格式为 "ip1:port","ip2:port","ip3:port"。
      # 不同地址之间以英文逗号 (,) 隔开。
      graph:["10.30.113.10:9669",]
      meta:["10.30.113.10:9559"]
    }
    # 填写的账号必须拥有Nebula Graph相应图空间的写数据权限。
    user: root
    pswd: nebula
    # 填写Nebula Graph中需要写入数据的图空间名称。
    space: funding_link
    connection {
      timeout: 1200000
      retry: 10
    }
    execution {
      retry: 10
    }
    error: {
      max: 32
      output: "viewfs://c9/user_ext/tmp/errors"
    }
    rate: {
      limit: 1024
      timeout: 1200000
    }
  }
  # 处理点
  tags: [
    # 设置Tag player相关信息。
    {
      # Nebula Graph中对应的Tag名称。
      name: uid
      type: {
        # 指定数据源文件格式,设置为hive。
        source: hive
        # 指定如何将点数据导入Nebula Graph:Client或SST。
        sink: client
      }

      # 设置读取数据库basketball中player表数据的SQL语句
      exec: "select uid,trim(nick) as nick,trim(user_desc) as user_desc,trim(cert_info) as cert_info,trim(reg_time) as reg_time from temp_table_user_desc"

      # 在fields里指定player表中的列名称,其对应的value会作为Nebula Graph中指定属性。
      # fields和nebula.fields里的配置必须一一对应。
      # 如果需要指定多个列名称,用英文逗号(,)隔开。
      fields: [nick,user_desc,cert_info,reg_tim]
      nebula.fields: [nick,user_desc,cert_info,reg_time]

      # 指定表中某一列数据为Nebula Graph中点VID的来源。
      vertex:{
        field:uid
      }

      # 单批次写入 Nebula Graph 的最大数据条数。
      batch: 100000

      # Spark 分区数量
      partition: 15
    }
    
  ]

  # 处理边数据
  edges: [
 
  ]
}

这是hadoop的问题,参考 https://cwiki.apache.org/confluence/display/HADOOP2/EOFException

1 个赞

虽然表面看上去是这个问题,但是,如果只导入10万行数据可以完成。

可能是
(1)字段有乱码此类问题导致的,但是,nebula-exchange-2.5.0此前就不会报这样的错。
(2)版本不兼容。看到这个包有好多预设的依赖,都是spark2.4.4版本,这边公司预装的是spark2.4.5,使用限制 - Nebula Graph Database 手册 不是也说2.4.x就行吗,为什么这么多2.4.4依赖

问题已解决。解决方法:调低了conf文件中的batch。

1 个赞

如果问题解决了,你可以勾选自己的回复为解决方案哈

get

1 个赞

此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。

浙ICP备20010487号