flink 写入nebula时出现空指针异常

2022-07-04 02:06:50,522 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: flinkCommonTransactionSource -> Map -> Sink: Unnamed (1/1)#0 (791fc3057dff292cb7bda5d786ecb97d) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
        at org.apache.flink.connector.nebula.sink.NebulaRowEdgeOutputFormatConverter.createEdge(NebulaRowEdgeOutputFormatConverter.java:64) ~[blob_p-3618d79267783b93229151d67f77934b24954876-dc8467ca21c26e6819ae2429e34bbcbd:?]
        at org.apache.flink.connector.nebula.sink.NebulaEdgeBatchExecutor.addToBatch(NebulaEdgeBatchExecutor.java:40) ~[blob_p-3618d79267783b93229151d67f77934b24954876-dc8467ca21c26e6819ae2429e34bbcbd:?]
        at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.writeRecord(NebulaBatchOutputFormat.java:135) ~[blob_p-3618d79267783b93229151d67f77934b24954876-dc8467ca21c26e6819ae2429e34bbcbd:?]
        at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.invoke(NebulaSinkFunction.java:50) ~[blob_p-3618d79267783b93229151d67f77934b24954876-dc8467ca21c26e6819ae2429e34bbcbd:?]
        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) ~[blob_p-3618d79267783b93229151d67f77934b24954876-dc8467ca21c26e6819ae2429e34bbcbd:?]
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) ~[blob_p-3618d79267783b93229151d67f77934b24954876-dc8467ca21c26e6819ae2429e34bbcbd:?]
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) ~[blob_p-3618d79267783b93229151d67f77934b24954876-dc8467ca21c26e6819ae2429e34bbcbd:?]
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) ~[blob_p-3618d79267783b93229151d67f77934b24954876-dc8467ca21c26e6819ae2429e34bbcbd:?]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267) ~[flink-dist_2.11-1.12.7.jar:1.12.7]

代码贴一下 可能属性fields 和 position 参数不合适

1 个赞

感谢,我检查了一下,确实字段有点问题。
然后我还有一个异常,就是flink任务执行到第二天的时候,就会出现session找不到的异常,提示写入数据失败。

2022-07-05 00:32:15,166 ERROR org.apache.flink.connector.nebula.sink.NebulaVertexBatchExecutor [] - write data failed: Get sessionId[1656871870739645] failed: Session `1656871870739645' not found: Session not existed!

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。