nebula flink导入问题

  • nebula 版本:2.0.0

  • 部署方式:docker

  • 是否为线上版本:Y

  • 问题的具体描述:flink nebula导入数据出错

终端报错:
[2021-08-22T18:56:47.751+0800][INFO ][Source: Custom Source -> parse company info -> (company vertex insert -> Sink: Unnamed, person vertex insert -> Sink: Unnamed, employee edge insert -> Sink: Unnamed, investment edge insert -> Sink: Unnamed, partner edge insert -> Sink: Unnamed) (1/2)][com.vesoft.nebula.client.graph.net.NebulaPool][|||][]Get connection to 127.0.0.1:9669
[2021-08-22T18:56:47.762+0800][ERROR][Source: Custom Source -> parse company info -> (company vertex insert -> Sink: Unnamed, person vertex insert -> Sink: Unnamed, employee edge insert -> Sink: Unnamed, investment edge insert -> Sink: Unnamed, partner edge insert -> Sink: Unnamed) (2/2)][org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat][|||][]failed to get meta client, 
com.facebook.thrift.transport.TTransportException: java.net.ConnectException: Connection refused (Connection refused)
	at com.facebook.thrift.transport.TSocket.open(TSocket.java:204)
	at com.vesoft.nebula.client.meta.MetaClient.doConnect(MetaClient.java:97)
	at com.vesoft.nebula.client.meta.MetaClient.connect(MetaClient.java:86)
	at org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider.getMetaClient(NebulaMetaConnectionProvider.java:40)
	at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.open(NebulaBatchOutputFormat.java:66)
	at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.open(NebulaSinkFunction.java:37)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)
	at java.net.Socket.connect(Socket.java:606)
	at com.facebook.thrift.transport.TSocket.open(TSocket.java:199)
	... 16 more
meta2部分日志:
I0722 09:55:20.769979     1 MetaDaemon.cpp:246] identify myself as "metad2":9559
I0722 09:55:20.802237     1 NebulaStore.cpp:47] Start the raft service...
I0722 09:55:20.829149     1 RaftexService.cpp:65] Init thrift server for raft service, port: 9560
I0722 09:55:20.834873    52 RaftexService.cpp:98] Starting the Raftex Service
I0722 09:55:20.898576    52 RaftexService.cpp:86] Starting the Raftex Service on 9560
I0722 09:55:20.903292    52 RaftexService.cpp:110] Start the Raftex Service successfully
I0722 09:55:20.903381     1 NebulaStore.cpp:75] Scan the local path, and init the spaces_
E0722 09:55:20.913609     1 FileUtils.cpp:384] Failed to read the directory "/data/meta/nebula" (2): No such file or directory
I0722 09:55:20.935791     1 NebulaStore.cpp:201] Init data from partManager for "metad2":9559
I0722 09:55:20.939049     1 NebulaStore.cpp:294] Create data space 0
I0722 09:55:21.338618     1 RocksEngine.cpp:119] open rocksdb on /data/meta/nebula/0/data
I0722 09:55:21.390635     1 Part.cpp:50] [Port: 9560, Space: 0, Part: 0] Cannot fetch the last committed log id from the storage engine
I0722 09:55:21.393970     1 RaftPart.cpp:295] [Port: 9560, Space: 0, Part: 0] There are 2 peer hosts, and total 3 copies. The quorum is 2, as learner 0, lastLogId 0, lastLogTerm 0, committedLogId 0, term 0
I0722 09:55:21.401788     1 RaftPart.cpp:308] [Port: 9560, Space: 0, Part: 0] Add peer "metad0":9560
I0722 09:55:21.408763     1 RaftPart.cpp:308] [Port: 9560, Space: 0, Part: 0] Add peer "metad1":9560
I0722 09:55:21.415215     1 NebulaStore.cpp:357] Space 0, part 0 has been added, asLearner 0
I0722 09:55:21.421247     1 NebulaStore.cpp:68] Register handler...
I0722 09:55:21.440676     1 MetaDaemon.cpp:99] Waiting for the leader elected...
I0722 09:55:21.445786     1 MetaDaemon.cpp:112] Leader has not been elected, sleep 1s
I0722 09:55:21.578783    17 RaftPart.cpp:1361] [Port: 9560, Space: 0, Part: 0] Recieved a VOTING request: space = 0, partition = 0, candidateAddr = metad1:9560, term = 1, lastLogId = 0, lastLogTerm = 0
I0722 09:55:21.581702    17 RaftPart.cpp:1394] [Port: 9560, Space: 0, Part: 0] The partition currently is a Follower, lastLogId 0, lastLogTerm 0, committedLogId 0, term 0
I0722 09:55:21.584405    17 RaftPart.cpp:1463] [Port: 9560, Space: 0, Part: 0] The partition will vote for the candidate "metad1":9560
I0722 09:55:21.595649    17 RaftPart.cpp:1361] [Port: 9560, Space: 0, Part: 0] Recieved a VOTING request: space = 0, partition = 0, candidateAddr = metad0:9560, term = 1, lastLogId = 0, lastLogTerm = 0
I0722 09:55:21.603291    17 RaftPart.cpp:1394] [Port: 9560, Space: 0, Part: 0] The partition currently is a Follower, lastLogId 0, lastLogTerm 0, committedLogId 0, term 0
I0722 09:55:21.615418    17 RaftPart.cpp:1448] [Port: 9560, Space: 0, Part: 0] We have voted "metad1":9560 on term 1, so we should reject the candidate "metad0":9560 request on term 1
I0722 09:55:21.682157    17 RaftPart.cpp:1761] [Port: 9560, Space: 0, Part: 0] The current role is Follower. Will follow the new leader metad1:9560 [Term: 1]
I0722 09:55:21.713387    50 Part.cpp:191] [Port: 9560, Space: 0, Part: 0] Find the new leader "metad1":9560
I0722 09:55:22.457420     1 MetaDaemon.cpp:141] Get meta version is 2
I0722 09:55:22.460155     1 MetaDaemon.cpp:168] Nebula store init succeeded, clusterId 5515666330599404307
I0722 09:55:22.462729     1 MetaDaemon.cpp:259] Start http service
I0722 09:55:22.470742     1 MetaDaemon.cpp:176] Starting Meta HTTP Service
I0722 09:55:22.484591    94 WebService.cpp:131] Web service started on HTTP[19559], HTTP2[19560]
I0722 09:55:22.487969     1 JobManager.cpp:52] JobManager initialized
I0722 09:55:22.491299     1 MetaDaemon.cpp:316] The meta deamon start on "metad2":9559
I0722 09:55:22.488009   100 JobManager.cpp:75] JobManager::runJobBackground() enter
I0722 09:55:23.010701   110 HBProcessor.cpp:47] Set clusterId for new host "storaged0":9779!
E0722 09:55:23.013015   110 RaftPart.cpp:367] [Port: 9560, Space: 0, Part: 0] The partition is not a leader
E0722 09:55:23.016186   110 RaftPart.cpp:687] [Port: 9560, Space: 0, Part: 0] Cannot append logs, clean the buffer
I0722 09:55:24.495188   110 HBProcessor.cpp:47] Set clusterId for new host "storaged1":9779!
I0722 10:06:56.846766    48 RaftPart.cpp:1043] [Port: 9560, Space: 0, Part: 0] Start leader election, reason: lastMsgDur 41883, term 1
I0722 10:06:57.395700    48 RaftPart.cpp:1193] [Port: 9560, Space: 0, Part: 0] Sending out an election request (space = 0, part = 0, term = 2, lastLogId = 562, lastLogTerm = 1, candidateIP = metad2, candidatePort = 9560)
I0722 10:06:57.416297    53 ThriftClientManager.inl:62] resolve "metad0":9560 as "172.18.0.4":9560
I0722 10:06:57.437726    17 RaftPart.cpp:1739] [Port: 9560, Space: 0, Part: 0] I dont know who is leader for current term 1, so accept the candidate "metad1":9560
I0722 10:06:57.466219    17 RaftPart.cpp:1761] [Port: 9560, Space: 0, Part: 0] The current role is Candidate. Will follow the new leader metad1:9560 [Term: 1]
I0722 10:06:57.486914    49 Part.cpp:191] [Port: 9560, Space: 0, Part: 0] Find the new leader "metad1":9560
I0722 10:06:57.438256    53 ThriftClientManager.inl:62] resolve "metad1":9560 as "172.18.0.2":9560
I0722 10:06:57.439091    35 RaftPart.cpp:1361] [Port: 9560, Space: 0, Part: 0] Recieved a VOTING request: space = 0, partition = 0, candidateAddr = metad0:9560, term = 2, lastLogId = 562, lastLogTerm = 1
I0722 10:06:57.580021    35 RaftPart.cpp:1394] [Port: 9560, Space: 0, Part: 0] The partition currently is a Follower, lastLogId 563, lastLogTerm 1, committedLogId 562, term 1
I0722 10:06:57.586963    35 RaftPart.cpp:1434] [Port: 9560, Space: 0, Part: 0] The partition's last log id is 563. The candidate's last log id 562 is smaller, so it will be rejected, candidate is "metad0":9560
I0722 10:06:58.034876    48 RaftPart.cpp:1127] [Port: 9560, Space: 0, Part: 0] Partition's role has changed to Follower during the election, so discard the results
I0722 10:06:58.038975    48 RaftPart.cpp:1270] [Port: 9560, Space: 0, Part: 0] Someone else was elected
I0722 10:07:02.980528    35 RaftPart.cpp:1361] [Port: 9560, Space: 0, Part: 0] Recieved a VOTING request: space = 0, partition = 0, candidateAddr = metad0:9560, term = 2, lastLogId = 563, lastLogTerm = 1
I0722 10:07:02.981890    35 RaftPart.cpp:1394] [Port: 9560, Space: 0, Part: 0] The partition currently is a Follower, lastLogId 563, lastLogTerm 1, committedLogId 562, term 1
I0722 10:07:02.983572    35 RaftPart.cpp:1463] [Port: 9560, Space: 0, Part: 0] The partition will vote for the candidate "metad0":9560
I0722 10:07:02.995791    35 RaftPart.cpp:1761] [Port: 9560, Space: 0, Part: 0] The current role is Follower. Will follow the new leader metad0:9560 [Term: 2]
I0722 10:07:02.997313    49 Part.cpp:191] [Port: 9560, Space: 0, Part: 0] Find the new leader "metad0":9560
I0722 10:07:24.261433    49 RaftPart.cpp:1043] [Port: 9560, Space: 0, Part: 0] Start leader election, reason: lastMsgDur 3286574, term 2
I0722 10:07:24.278234    49 RaftPart.cpp:1193] [Port: 9560, Space: 0, Part: 0] Sending out an election request (space = 0, part = 0, term = 3, lastLogId = 573, lastLogTerm = 2, candidateIP = metad2, candidatePort = 9560)
I0722 10:07:24.309274    54 ThriftClientManager.inl:62] resolve "metad0":9560 as "172.18.0.4":9560
I0722 10:07:24.315572    54 ThriftClientManager.inl:62] resolve "metad1":9560 as "172.18.0.2":9560
I0722 10:07:24.319607    35 RaftPart.cpp:1761] [Port: 9560, Space: 0, Part: 0] The current role is Candidate. Will follow the new leader metad0:9560 [Term: 2]
I0722 10:07:24.325042    50 Part.cpp:191] [Port: 9560, Space: 0, Part: 0] Find the new leader "metad0":9560
I0722 10:07:24.331564    35 RaftPart.cpp:1361] [Port: 9560, Space: 0, Part: 0] Recieved a VOTING request: space = 0, partition = 0, candidateAddr = metad1:9560, term = 3, lastLogId = 573, lastLogTerm = 2
I0722 10:07:24.334724    35 RaftPart.cpp:1394] [Port: 9560, Space: 0, Part: 0] The partition currently is a Follower, lastLogId 574, lastLogTerm 2, committedLogId 573, term 2
I0722 10:07:24.337750    35 RaftPart.cpp:1434] [Port: 9560, Space: 0, Part: 0] The partition's last log id is 574. The candidate's last log id 573 is smaller, so it will be rejected, candidate is "metad1":9560
I0722 10:07:24.351521    49 RaftPart.cpp:1127] [Port: 9560, Space: 0, Part: 0] Partition's role has changed to Follower during the election, so discard the results
I0722 10:07:24.353813    49 RaftPart.cpp:1270] [Port: 9560, Space: 0, Part: 0] Someone else was elected
I0722 11:06:19.218623    51 RaftPart.cpp:1043] [Port: 9560, Space: 0, Part: 0] Start leader election, reason: lastMsgDur 5052, term 2
I0722 11:06:19.323668    51 RaftPart.cpp:1193] [Port: 9560, Space: 0, Part: 0] Sending out an election request (space = 0, part = 0, term = 3, lastLogId = 574, lastLogTerm = 2, candidateIP = metad2, candidatePort = 9560)
I0722 11:06:19.473646    55 ThriftClientManager.inl:62] resolve "metad0":9560 as "172.18.0.4":9560
I0722 11:06:19.687419    55 ThriftClientManager.inl:62] resolve "metad1":9560 as "172.18.0.2":9560
I0722 11:06:20.087462    51 RaftPart.cpp:1152] [Port: 9560, Space: 0, Part: 0] Partition is elected as the new leader for term 3
I0722 11:06:20.209844    51 RaftPart.cpp:1248] [Port: 9560, Space: 0, Part: 0] The partition is elected as the leader
I0722 11:06:20.859760    56 Host.cpp:149] [Port: 9560, Space: 0, Part: 0] [Host: metad0:9560] This is the first time to send the logs to this host, lastLogIdSent = 574, lastLogTermSent = 2
I0722 11:06:20.988782    56 ThriftClientManager.inl:62] resolve "metad0":9560 as "172.18.0.4":9560
I0722 11:06:21.160014    56 Host.cpp:149] [Port: 9560, Space: 0, Part: 0] [Host: metad1:9560] This is the first time to send the logs to this host, lastLogIdSent = 574, lastLogTermSent = 2
I0722 11:06:21.301219    56 ThriftClientManager.inl:62] resolve "metad1":9560 as "172.18.0.2":9560
I0722 11:06:22.389010    35 SlowOpTracker.h:33] [Port: 9560, Space: 0, Part: 0] , total time:1972ms, Total send logs: 2
W0722 11:06:22.789868    35 RaftPart.cpp:1015] [Port: 9560, Space: 0, Part: 0] Only 0 hosts succeeded, Need to try again

meta地址写错了,你是docker部署的,要配置暴露出来的端口号和ip。

要在nebula-metad.conf文件里面修改真实ip吗

你的flink 和 nebula是在同一台机器上跑的么,如果是不用改ip,如果不是需要改成真实ip。 话说你的nebula部署方式是什么?

服务和跑的程序都在本机,docker-compose部署的

大佬help

你这个问题很简单啊,meta client连接不上Nebula metad服务。
是因为你在代码里配置的metaAddress不对,你确认下你配置的metad服务的地址,尤其是端口号,如果你没有改过docker-compose的配置,可用的metad服务的端口号不是9559.

配置文件没改,上面有metad.conf的截图,代码我改了本机的真实ip,还是failed to get meta client

而且nebula graph studio能访问

关键我另一个demo,不是用nebula-flink跑的,是可以连上

NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
            nebulaPoolConfig.setMaxConnSize(100);
            List<HostAddress> addresses = Arrays.asList(new HostAddress("127.0.0.1", 9669),
                    new HostAddress("127.0.0.1", 9559));
            pool.init(addresses, nebulaPoolConfig);
            session = pool.getSession("root", "nebula", false);

你发出来的信息有几个问题:

  1. docker-compose部署的,为什么要改metad.conf? docker-compose的配置文件是docker-compose.yaml。
  2. nebula studio 不会访问meta服务的,只访问graph服务。 docker-compose的graph服务是由9669端口号暴露出来的。

你的这个demo是有问题的,你连接graph 服务,却传入了 两个地址(一个graph服务地址一个meta服务地址),你现在之所以能连上是因为目前只用到了第一个地址。

你看一下我上面的回复, 你是meta服务连接不上,你用的docker-compose部署的nebula服务,可以用的地址一定不是127.0.0.1:9559. 你执行下docker-compose ps, 贴一下,我告诉你服务地址是啥。


你可用的端口是这三个。

1 个赞

谢谢大佬!