- nebula 版本:3.1.0
- 部署方式:单机
flink:1.11.6
flink 在向nebula写点数据时报错,写入方法是参考官网给出的案例:
private static NebulaClientOptions getClientOptions() {
// not enable ssl for graph
NebulaClientOptions nebulaClientOptions =
new NebulaClientOptions.NebulaClientOptionsBuilder()
.setGraphAddress(NebulaConfigPo.graphAddress)
.setMetaAddress(NebulaConfigPo.metaAddress)
.build();
return nebulaClientOptions;
}
/**
* sink Nebula Graph with default INSERT mode
*/
public static void sinkVertexData(StreamExecutionEnvironment env,
DataStream<String> dataSource) {
LOG.info(" sink Nebula Graph" + NebulaConfigPo.spaceName );
NebulaClientOptions nebulaClientOptions = getClientOptions();
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider =
new NebulaMetaConnectionProvider(nebulaClientOptions);
ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace(NebulaConfigPo.spaceName)
.setTag(NebulaConfigPo.tagName)
.setIdIndex(0)
.setFields(NebulaConfigPo.fieldList)
.setPositions(Arrays.asList(1, 2, 3))
.setBatch(2)
.builder();
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
DataStream<Row> dataStream = dataSource.map(row -> {
String[] words = row.split(",");
org.apache.flink.types.Row record = new org.apache.flink.types.Row(words.length);
for (int i = 0; i < words.length; i++) {
record.setField(i, words[i]);
}
LOG.info( " ----------" + record.toString());
return record;
});
dataStream.addSink(nebulaSinkFunction);
点对应的属性是 “vid,姓名,年龄” ,record里面的值打印出来都是正常 但是还是报错。
以下是报错内容:
java.lang.IllegalArgumentException: position 1 or field vid does not exist.
at org.apache.flink.connector.nebula.sink.NebulaRowVertexOutputFormatConverter.createVertex(NebulaRowVertexOutputFormatConverter.java:63) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
at org.apache.flink.connector.nebula.sink.NebulaVertexBatchExecutor.addToBatch(NebulaVertexBatchExecutor.java:42) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.writeRecord(NebulaBatchOutputFormat.java:135) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.invoke(NebulaSinkFunction.java:50) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:823) ~[blob_p-3224ec47d028d4012c890cd0383eded0e37cd04e-74de38bc7b09f4a7e7b05e94c280ed1f:?]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.11.6.jar:1.11.6]