nebula-flink-connector的example使用报错

  • nebula 版本:v2.2.1
  • 部署方式(分布式 / 单机 / Docker / DBaaS):Docker
  • 是否为线上版本:Y
  • 问题的具体描述“:新建了一个java项目用maven引入了nebula-flink-connector,跑的FlinkConnectorExample,报错了
日志:
[2021-08-11T15:32:11.534+0800][INFO ][main][org.apache.flink.runtime.rest.RestServerEndpoint][|||][]Shutting down rest endpoint.
[2021-08-11T15:32:11.536+0800][ERROR][main][com.cmft.FlinkConnectorExample1][|||][]error when write Nebula Graph, 
org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent.
	at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:256)
	at org.apache.flink.runtime.minicluster.MiniCluster.createDispatcherResourceManagerComponents(MiniCluster.java:412)
	at org.apache.flink.runtime.minicluster.MiniCluster.setupDispatcherResourceManagerComponents(MiniCluster.java:378)
	at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:332)
	at org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:87)
	at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1818)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1714)
	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
	at com.cmft.FlinkConnectorExample1.sinkVertexData(FlinkConnectorExample1.java:93)
	at com.cmft.FlinkConnectorExample1.main(FlinkConnectorExample1.java:25)
Caused by: org.apache.flink.util.FlinkRuntimeException: REST handler registration overlaps with another registration for: version=V1, method=GET, url=/jars/:jarid/plan.
	at org.apache.flink.runtime.rest.RestServerEndpoint.checkAllEndpointsAndHandlersAreUnique(RestServerEndpoint.java:511)
	at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:158)
	at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:163)
	... 11 more
[2021-08-11T15:32:11.539+0800][INFO ][TransientBlobCache shutdown hook][org.apache.flink.runtime.blob.AbstractBlobCache][|||][]Shutting down BLOB cache
[2021-08-11T15:32:11.539+0800][INFO ][PermanentBlobCache shutdown hook][org.apache.flink.runtime.blob.AbstractBlobCache][|||][]Shutting down BLOB cache
[2021-08-11T15:32:11.539+0800][INFO ][TaskExecutorLocalStateStoresManager shutdown hook][org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager][|||][]Shutting down TaskExecutorLocalStateStoresManager.
[2021-08-11T15:32:11.545+0800][INFO ][FileCache shutdown hook][org.apache.flink.runtime.filecache.FileCache][|||][]removed file cache directory D:\Users\XIAOSY~1\AppData\Local\Temp\flink-dist-cache-653cfe13-02b1-4332-b632-9fe0e3ccec3e
[2021-08-11T15:32:11.546+0800][INFO ][BlobServer shutdown hook][org.apache.flink.runtime.blob.BlobServer][|||][]Stopped BLOB server at 0.0.0.0:64977
[2021-08-11T15:32:11.546+0800][INFO ][FileChannelManagerImpl-io shutdown hook][org.apache.flink.runtime.io.disk.FileChannelManagerImpl][|||][]FileChannelManager removed spill file directory D:\Users\XIAOSY~1\AppData\Local\Temp\flink-io-450794f3-df45-4a04-bbb0-e07ea1828c58
[2021-08-11T15:32:11.547+0800][INFO ][FileChannelManagerImpl-netty-shuffle shutdown hook][org.apache.flink.runtime.io.disk.FileChannelManagerImpl][|||][]FileChannelManager removed spill file directory D:\Users\XIAOSY~1\AppData\Local\Temp\flink-netty-shuffle-deffd7f6-cdf1-4949-956f-00fc02b89925

Process finished with exit code -1

求解

昨天找到原因了,是因为flink版本太旧了

但是我发现用官方的demo可以把点写进去,边写不进去,报npe错误。

java.lang.NullPointerException
	at org.apache.flink.connector.nebula.sink.NebulaRowEdgeOutputFormatConverter.createValue(NebulaRowEdgeOutputFormatConverter.java:62)
	at org.apache.flink.connector.nebula.sink.NebulaRowEdgeOutputFormatConverter.createValue(NebulaRowEdgeOutputFormatConverter.java:21)
	at org.apache.flink.connector.nebula.sink.NebulaBatchExecutor.addToBatch(NebulaBatchExecutor.java:52)
	at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.writeRecord(NebulaBatchOutputFormat.java:92)
	at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.invoke(NebulaSinkFunction.java:51)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.lang.Thread.run(Thread.java:745)

我看了下是因为边的schema少了src和dst的kv对,所以this.schema.get(“src”)会报npe。请问咋改呢?官方demo

@nicole 大佬help

你 schema 定义好了吗?


应该是这样吧

1 个赞

我下面截图了,大佬看看,难道是代码里还要定义吗

你的代码是什么样子的,应该是多配置了 在edge schema中不存在的属性 src。 你贴一下代码

配置错了,setFileds是设置属性的,你的friend中没有src和dst的属性不用配。

默认index为0、1的就是src和dst,所以不用配吗

没有默认的啊, setSrcIndex就是表明 你要将flink data中的哪一列作为源点id,setDstIndex就是设置哪一列作为目标点id的。 setFields是属性, 边的src和dst是四元组中的一部分,不是属性。

1 个赞

哦我懂了!刚跑了一下,ok了!感谢大佬!

该话题在最后一个回复创建后7天后自动关闭。不再允许新的回复。