flink对接nebula position 1 or field vid does not exist.

  • nebula 版本:3.1.0
  • 部署方式:单机
    flink:1.11.6
    flink 在向nebula写点数据时报错,写入方法是参考官网给出的案例:
private static NebulaClientOptions getClientOptions() {
        // not enable ssl for graph
        NebulaClientOptions nebulaClientOptions =
                new NebulaClientOptions.NebulaClientOptionsBuilder()
                        .setGraphAddress(NebulaConfigPo.graphAddress)
                        .setMetaAddress(NebulaConfigPo.metaAddress)
                        .build();

        return nebulaClientOptions;
    }

    /**
     * sink Nebula Graph with default INSERT mode
     */
    public static void sinkVertexData(StreamExecutionEnvironment env,
                                      DataStream<String> dataSource) {

        LOG.info(" sink Nebula Graph" + NebulaConfigPo.spaceName );

        NebulaClientOptions nebulaClientOptions = getClientOptions();

        NebulaGraphConnectionProvider graphConnectionProvider =
                new NebulaGraphConnectionProvider(nebulaClientOptions);
        NebulaMetaConnectionProvider metaConnectionProvider =
                new NebulaMetaConnectionProvider(nebulaClientOptions);

        ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace(NebulaConfigPo.spaceName)
                .setTag(NebulaConfigPo.tagName)
                .setIdIndex(0)
                .setFields(NebulaConfigPo.fieldList)
                .setPositions(Arrays.asList(1, 2, 3))
                .setBatch(2)
                .builder();

        NebulaBatchOutputFormat outPutFormat =
                new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
                        .setExecutionOptions(executionOptions);
        NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
DataStream<Row> dataStream = dataSource.map(row -> {
            String[] words = row.split(",");
            org.apache.flink.types.Row record = new org.apache.flink.types.Row(words.length);
            for (int i = 0; i < words.length; i++) {
                record.setField(i, words[i]);
            }
            LOG.info( " ----------" + record.toString());
            return record;
        });
        dataStream.addSink(nebulaSinkFunction);

点对应的属性是 “vid,姓名,年龄” ,record里面的值打印出来都是正常 但是还是报错。
以下是报错内容:

java.lang.IllegalArgumentException: position 1 or field vid does not exist.
	at org.apache.flink.connector.nebula.sink.NebulaRowVertexOutputFormatConverter.createVertex(NebulaRowVertexOutputFormatConverter.java:63) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
	at org.apache.flink.connector.nebula.sink.NebulaVertexBatchExecutor.addToBatch(NebulaVertexBatchExecutor.java:42) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
	at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.writeRecord(NebulaBatchOutputFormat.java:135) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
	at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.invoke(NebulaSinkFunction.java:50) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:823) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.11.6.jar:1.11.6]

会不会是你传入的数据的 vid 和设定的 schema 的 vid 类型不匹配?或者就是没有传入 vid 数据?

建的时候都是string类型,数据都是这样的
1,jack,20
4,joe,38
3,nace,39

record打印出来是有数据进去的

源数据一共只有三列,这里设置的有4列

没太明白
Fields只有三个 : vid,姓名,年龄
数据 : 1,jack,20
都是三列啊

你的源数据只有三列,但你在代码里设置了 0,1,2,3 有四列, flink的stream数据列的index 是从0开始的

1 个赞

官网上的例子也是这么写的
ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace(“flinkSink”)
.setTag(“person”)
.setIdIndex(0)
.setFields(Arrays.asList(“col1”, “col2”, “col3”, “col4”, “col5”, “col6”, “col7”,
“col8”,
“col9”, “col10”, “col11”, “col12”, “col13”, “col14”))
.setPositions(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
.setBatch(2)
.builder();
这个跟我写的有什么区别吗

我试了下面的这种写法还是报一样的错
.setIdIndex(0)
.setFields(Arrays.asList(“vid”, “姓名”, “年龄”))
.setPositions(Arrays.asList(1, 2))

有区别啊,区别在于example中的源数据有15列,所以可以在配置中 配0-14 的任意下标。
你的数据只有三列,在配置中配2 就会异常。

setFields 这里是配Nebula中的属性名,setPositions 是配要把flink 数据的中第几列的数据写入nebula的对应属性中。
首先你的fields是三个属性,positions只配置了两个,不对等。
其次你的nebula中有 vid这个属性吗,如果没有,那就把setFields中的vid去掉,基本就ok了

查看一下你的space是否已经创建了相应的vertex或者edge,如果已经创建,查看字段是否一致,没有的话创建就好了

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。