flink传点数据遇到Framed transport frame is too large问题

  • nebula 版本:3.7.0
  • 部署方式:linux
  • 安装方式:Docker
  • 是否上生产环境:否

测试 flink-connector的官方example时遇到的问题
以下是错误信息

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	... 4 more
Caused by: java.lang.RuntimeException: An error occurred in NebulaSink.
	at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.checkErrorAndRethrow(NebulaSinkFunction.java:68)
	at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.invoke(NebulaSinkFunction.java:50)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:164)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.io.IOException: get metaClient error, 
	at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.open(NebulaBatchOutputFormat.java:97)
	at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.open(NebulaSinkFunction.java:37)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.flink.connector.nebula.shaded.com.facebook.thrift.transport.TTransportException: Framed transport frame is too large
	at org.apache.flink.connector.nebula.shaded.com.facebook.thrift.transport.THeaderTransport.readFrame(THeaderTransport.java:300)
	at org.apache.flink.connector.nebula.shaded.com.facebook.thrift.transport.THeaderTransport._resetProtocol(THeaderTransport.java:267)
	at org.apache.flink.connector.nebula.shaded.com.facebook.thrift.protocol.THeaderProtocol.readMessageBegin(THeaderProtocol.java:217)
	at com.vesoft.nebula.meta.MetaService$Client.recv_verifyClientVersion(MetaService.java:4127)
	at com.vesoft.nebula.meta.MetaService$Client.verifyClientVersion(MetaService.java:4104)
	at com.vesoft.nebula.client.meta.MetaClient.getClient(MetaClient.java:159)
	at com.vesoft.nebula.client.meta.MetaClient.doConnect(MetaClient.java:130)
	at com.vesoft.nebula.client.meta.MetaClient.connect(MetaClient.java:119)
	at org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider.getMetaClient(NebulaMetaConnectionProvider.java:68)
	at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.open(NebulaBatchOutputFormat.java:94)
	... 12 more

数据都很小,只是测试例子,是哪里的设置问题吗

你贴一下 flink-connector 的配置?下图是我截的示例

1



这是流程

你确认下 flink 能访问 30324 这个端口,以及,你 NebulaGraph 到底是咋安装的(我回去看了下你的问题正文,里面又是 Linux 又是 Docker 的)

是用Docker部署到服务器的,这两个端口都是直接映射的,应该没什么问题,我看能读到spacelist

flink connector版本是多少

3.5.0版本的

稳定复现吗,是高并发就容易出现?
这是客户端访问meta 服务时出现的错误,我们复现下看看

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。