No extra connection: All servers are broken.

客户端页面可以正常使用,但是flink connector导入数据一直失败,报错原因如下:

java.lang.RuntimeException: An error occurred in NebulaSink.
	at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.checkErrorAndRethrow(NebulaSinkFunction.java:68)
	at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.invoke(NebulaSinkFunction.java:50)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: get graph session error, 
	at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.open(NebulaBatchOutputFormat.java:70)
	at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.open(NebulaSinkFunction.java:37)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
	... 3 more
Caused by: com.vesoft.nebula.client.graph.exception.NotValidConnectionException: No extra connection: All servers are broken.
	at com.vesoft.nebula.client.graph.net.NebulaPool.getConnection(NebulaPool.java:216)
	at com.vesoft.nebula.client.graph.net.NebulaPool.getSession(NebulaPool.java:138)
	at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.open(NebulaBatchOutputFormat.java:65)
	... 12 more

看了下服务进程也是正常的,各位大佬这是什么原因啊?

你补充下 Nebula 的版本还有 Flink 的,还有你的 ip 端口配置信息。

nebula版本 2.6.0,flink版本 1.10.0
nebula端口配置: graphAddress端口 9669, metaAddress端口 9559

哥,ip。- -,你看下 ip 和端口能不能 ping 通


然后对下你的版本是不是对齐的,还有 java 的版本是不是 java 8 以上版本。

肯定是通的,任务发布之后运行一段时间后就报get graph session error,
从监控上看机器的连接数量增长很多,nebula的flink connector没有做Session复用吗?

没有用到池子,因为客户端没实现序列化。 一个 worker 内的 session 是复用的

我引用下研发同学的话,她请假了。

最早发的报错,哪个报错原因是什么啊?
flink本地运行也是直接报错,环境肯定都是通的

抱歉。来迟了。:thinking: Flink 的版本要求是 v1.11.x,所以你可能得升级下你的 Flink 版本

我们flink集群只有1.10.x 和 1.12.x
我用1.12.x版本可以吗

你先用 v1.12 的试试。如果有问题的话,再来更新下帖子好啦

flink用了1.12.x还是报错

代码一运行就报出

Caused by: java.lang.RuntimeException: An error occurred in NebulaSink.
	at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.checkErrorAndRethrow(NebulaSinkFunction.java:68)
	at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.invoke(NebulaSinkFunction.java:50)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: get graph session error, 
	at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.open(NebulaBatchOutputFormat.java:70)
	at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.open(NebulaSinkFunction.java:37)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
	... 3 more
Caused by: com.vesoft.nebula.client.graph.exception.NotValidConnectionException: No extra connection: All servers are broken.
	at com.vesoft.nebula.client.graph.net.NebulaPool.getConnection(NebulaPool.java:216)
	at com.vesoft.nebula.client.graph.net.NebulaPool.getSession(NebulaPool.java:138)
	at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.open(NebulaBatchOutputFormat.java:65)
	... 12 more

查看服务器出现大量wal文件

这是什么原因啊?

wal 文件很正常啊, 有数据写入就会出. 那个过了一段时间会自己删的(默认4 小时)

wal很多但是数据并没有写入成功。
任务一运行就报出上面的错误,No extra connection: All servers are broken.
然后服务器上wal文件持续增长
submit job stats查看节点数和边数并没有增加。

嗯, WAL 不需要写入成功的. (毕竟数据库历史上, 这个东西的初衷就是写入不成功用来做 retry 的.)

我现在的问题在于问什么flink一启动就报No extra connection: All servers are broken.或者get session error
既然没有获取session怎么数据就写到wal了?
为什么数据写到了wal没能写入库里?

wal本身我是理解的,接收到数据就先落到日志,失败了再从日志捞取数据重试

您的flink并行度设置的是多少?可以调低点试试。
通过日志分析,应该是并发太大,client到server网络中断。

flink 并行度设置的10,测试环境

Nebula Graph 部署了几个服务器?
用dstat -cmnd 看下io、cpu 压力