部署3.0.2的版本 用nebula-flink-connector 消费kafka数据 运行几秒钟storage服务就挂了

同样的代码 在使用2.6.2的时候没有问题 数据能正常写入,不知道哪里出问题了

同样的代码???你的版本是不是没对齐,不要只描述现象,把环境信息(Nebula 版本,Flinker Connector)的版本都给一下,还有报错信息有吗?有的话也贴一下。

nebula graph版本是3.0.2
maven的配置如下:


我部署了两个环境,一个环境2.0.6 我把连接的IP 改成2.0.6数据可以正常接入,
改成3.0.2的地址后,运行个几秒钟storage服务就挂了
image
每次都是这样,但是每次还是能写入少量数据,后面storage服务就挂了

代码不动 就修改这两个地址,低版本可以正常写入,3.0.2写入几秒中就挂了
image

这个是部署之后发生的吗?你部署 3.0.2 之后 ADD HOSTS 过吗?这个步骤是用来激活 storage 的参考文档 https://docs.nebula-graph.com.cn/3.0.2/4.deployment-and-installation/manage-storage-host/

添加过的,我都能看到storage的状态 是正常的,而且数据能写入一部分,后面才挂的

3.x 版本和 2.x 是在一台机器上吗,是的话,做端口区分了吗?

不在同一台 2个服务都各种正常的运行,就是3.x的版本 一跑flink 任务就挂

3.x 的这个机器上的 flink connector 是 3.x 版本的吗

我是在本地运行的,依赖如下:

version 是对的啊,:thinking: 稍等哈,我让相关的同学来看下

麻烦再贴下相关的报错信息

再把我的情况说一下吧,这环境有一段时间没用了一直在用2.6.2,今天把3.0.2的storage服务器起来,数据能正常跑进去而且一直没挂。
再把我程序的逻辑讲一讲,我从kafka 里面读取日志,会分类成3种,分别处理 存入 nebula,代码如下


按图中描述,我只写入一类数据的时候,今天运行一直是正常的,当我们3类数据都同时写入时,程序已运行storage服务就挂,发现storage服务器挂过一次以后,就算我按前面的方法,只处理一类数据时程序一跑就挂,storage服务也挂 ,客户端报错如下,现在就一直处理不了了:

[Source: Custom Source -> Process -> Flat Map -> (Flat Map -> Map -> Sink: Unnamed, Flat Map -> Map -> Sink: Unnamed) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Process -> Flat Map -> (Flat Map -> Map -> Sink: Unnamed, Flat Map -> Map -> Sink: Unnamed) (1/1)#0 (2ed28e41197dbfe479f0b53252f723e9).
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Process -> Flat Map -> (Flat Map -> Map -> Sink: Unnamed, Flat Map -> Map -> Sink: Unnamed) (1/1)#0 2ed28e41197dbfe479f0b53252f723e9.
[flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Process -> Flat Map -> (Flat Map -> Map -> Sink: Unnamed, Flat Map -> Map -> Sink: Unnamed) (1/1) (2ed28e41197dbfe479f0b53252f723e9) switched from RUNNING to FAILED on 82d374f3-0ad9-473b-939c-26cae09f81a5 @ activate.navicat.com (dataPort=-1).
org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {com.dbapp.topic.rawevent-4=17726826513} whose size is larger than the fetch size 1048576 and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or newer to avoid this issue. Alternately, increase the fetch size on the client (using max.partition.fetch.bytes)
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Clearing resource requirements of job 792b678dddbd6921267a94289ec07ae5
[flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0.
[flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 
[flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Write Nebula (792b678dddbd6921267a94289ec07ae5) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {com.dbapp.topic.rawevent-4=17726826513} whose size is larger than the fetch size 1048576 and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or newer to avoid this issue. Alternately, increase the fetch size on the client (using max.partition.fetch.bytes)
[flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Write Nebula (792b678dddbd6921267a94289ec07ae5) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {com.dbapp.topic.rawevent-4=17726826513} whose size is larger than the fetch size 1048576 and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or newer to avoid this issue. Alternately, increase the fetch size on the client (using max.partition.fetch.bytes)
[flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job 792b678dddbd6921267a94289ec07ae5.
[flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.minicluster.MiniCluster - Shutting down Flink Mini Cluster
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job 792b678dddbd6921267a94289ec07ae5 reached terminal state FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {com.dbapp.topic.rawevent-4=17726826513} whose size is larger than the fetch size 1048576 and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or newer to avoid this issue. Alternately, increase the fetch size on the client (using max.partition.fetch.bytes)
[flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shutting down rest endpoint.
[flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopping TaskExecutor akka://flink/user/rpc/taskmanager_0.
[flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close ResourceManager connection 1b20612d60a508da1d9ee6694c84dce6.
[flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Closing TaskExecutor connection 82d374f3-0ad9-473b-939c-26cae09f81a5 because: The TaskExecutor is shutting down.
[flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 792b678dddbd6921267a94289ec07ae5.
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job 'Write Nebula' (792b678dddbd6921267a94289ec07ae5).
[flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager - Shutting down TaskExecutorStateChangelogStoragesManager.
[flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 bytes)}, allocationId: 9f7e2564eff6e92c7bd5ffbaaf5e32dd, jobId: 792b678dddbd6921267a94289ec07ae5).
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool - Releasing slot [9f7e2564eff6e92c7bd5ffbaaf5e32dd].
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection 1b20612d60a508da1d9ee6694c84dce6: Stopping JobMaster for job 'Write Nebula' (792b678dddbd6921267a94289ec07ae5).
[flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Disconnect job manager a7e1a1de178f775bd18b05e857a743d9@akka://flink/user/rpc/jobmanager_3 for job 792b678dddbd6921267a94289ec07ae5 from the resource manager.
[flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Stop job leader service.
[flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager.
[ForkJoinPool.commonPool-worker-11] INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Removing cache directory C:\Users\ZOUWEN~1\AppData\Local\Temp\flink-web-ui
[ForkJoinPool.commonPool-worker-11] INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shut down complete.
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Shut down cluster because application is in CANCELED, diagnostics DispatcherResourceManagerComponent has been closed..
[flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl - FileChannelManager removed spill file directory C:\Users\ZOUWEN~1\AppData\Local\Temp\flink-io-dca532c6-a900-4c22-800b-5ccf4e8cb321
[flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.io.network.NettyShuffleEnvironment - Shutting down the network environment and its components.
[flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent - Closing components.
[flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess - Stopping SessionDispatcherLeaderProcess.
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopping dispatcher akka://flink/user/rpc/dispatcher_1.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
	at akka.dispatch.OnComplete.internal(Future.scala:300)
	at akka.dispatch.OnComplete.internal(Future.scala:297)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
	at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	... 4 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {com.dbapp.topic.rawevent-4=17726826513} whose size is larger than the fetch size 1048576 and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or newer to avoid this issue. Alternately, increase the fetch size on the client (using max.partition.fetch.bytes)
[flink-akka.actor.default-dispatcher-8] INFO org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl - Stopping resource manager service.
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopping all currently running jobs of dispatcher akka://flink/user/rpc/dispatcher_1.
[flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl - FileChannelManager removed spill file directory C:\Users\ZOUWEN~1\AppData\Local\Temp\flink-netty-shuffle-dc56e36b-5ddc-4aae-8f69-872983cfcd5d
[flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Closing the slot manager.
[flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Suspending the slot manager.
[flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.taskexecutor.KvStateService - Shutting down the kvState service and its components.
[flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Stop job leader service.
[TransientBlobCache shutdown hook] INFO org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache
[PermanentBlobCache shutdown hook] INFO org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache
[flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopped dispatcher akka://flink/user/rpc/dispatcher_1.
[BlobServer shutdown hook] INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:52684
[flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.filecache.FileCache - removed file cache directory C:\Users\ZOUWEN~1\AppData\Local\Temp\flink-dist-cache-ce8bc111-a19d-40d1-ae0b-8102126c7a4e
[FileCache shutdown hook] INFO org.apache.flink.runtime.filecache.FileCache - removed file cache directory C:\Users\ZOUWEN~1\AppData\Local\Temp\flink-dist-cache-ce8bc111-a19d-40d1-ae0b-8102126c7a4e
[flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopped TaskExecutor akka://flink/user/rpc/taskmanager_0.
[AkkaRpcService-Supervisor-Termination-Future-Executor-thread-1] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka RPC service.

处理节点跟边的代码如下:


服务状态:

我看你 storage 异常了,:thinking: 你试试写入少量数据呢?

storage 重启后 换个topic 手动一条条发送消息, 能正常写入到nebula,但是切换正式环境的topic 一跑还是挂,这是性能问题?最开始就是用正式环境写入 也都能正常,storage这么脆弱吗。数据量也不大,一秒钟目前顶多50 ,不至于这么点数据量就扛不住吧

- -,原来你们的数据量已经降了啊,你稍等,我让存储的同学来看看

storage的log或者crash stack能发一下吗