- nebula 版本:v2.0.0
- 部署方式:Docker
- 是否为线上版本:Y
- 问题的具体描述:flink导入数据插入顶点报ArrayIndexOutOfBoundsException
终端输出:
[2021-08-17T16:52:05.199+0800][ERROR][Legacy Source Thread - Source: Custom Source -> parse company info -> (company vertex insert -> Sink: Unnamed, person vertex insert -> Sink: Unnamed, employee edge insert -> Sink: Unnamed, investment edge insert -> Sink: Unnamed, partner edge insert -> Sink: Unnamed) (1/2)][com.cmft.transform.CompanyInfoFlattenMapSingle][|||][]org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at com.cmft.transform.CompanyInfoFlattenMapSingle.flatMap(CompanyInfoFlattenMapSingle.java:33)
at com.cmft.transform.CompanyInfoFlattenMapSingle.flatMap(CompanyInfoFlattenMapSingle.java:20)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at java.util.ArrayList.forEach(ArrayList.java:1249)
at com.cmft.transform.NebulaVertexCFlatMap.flatMap(NebulaVertexCFlatMap.java:21)
at com.cmft.transform.NebulaVertexCFlatMap.flatMap(NebulaVertexCFlatMap.java:15)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
... 20 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 4
at org.apache.flink.types.Row.getField(Row.java:123)
at org.apache.flink.connector.nebula.sink.NebulaRowVertexOutputFormatConverter.createValue(NebulaRowVertexOutputFormatConverter.java:66)
at org.apache.flink.connector.nebula.sink.NebulaRowVertexOutputFormatConverter.createValue(NebulaRowVertexOutputFormatConverter.java:21)
at org.apache.flink.connector.nebula.sink.NebulaBatchExecutor.addToBatch(NebulaBatchExecutor.java:52)
at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.writeRecord(NebulaBatchOutputFormat.java:92)
at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.invoke(NebulaSinkFunction.java:51)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
... 30 more
部分代码:
Row cnameRow = new Row(25);
cnameRow.setField(0, cid);
cnameRow.setField(1, cname);
cnameRow.setField(2, shortName);
cnameRow.setField(3, creditNo);
cnameRow.setField(4, regNo);
cnameRow.setField(5, orgNo);
cnameRow.setField(6, status);
cnameRow.setField(7, econKind);
cnameRow.setField(8, registCapi);
cnameRow.setField(9, scope);
cnameRow.setField(10, termStart);
cnameRow.setField(11, termEnd);
cnameRow.setField(12, belongOrg);
cnameRow.setField(13, operName);
cnameRow.setField(14, startDate);
cnameRow.setField(15, endDate);
cnameRow.setField(16, province);
cnameRow.setField(17, city);
cnameRow.setField(18, domains);
cnameRow.setField(19, telephone);
cnameRow.setField(20, email);
cnameRow.setField(21, address);
cnameRow.setField(22, websites);
cnameRow.setField(23, curTimestamp);
cnameRow.setField(24, curTimestamp);
vertexCList.add(cnameRow);
最后烦请删掉本模版和问题无关的信息之后,再提交提问,Thx