nebula-flink-connector 导入数据报错

  • nebula 版本:v2.0.0
  • 部署方式:Docker
  • 是否为线上版本:Y
  • 问题的具体描述:flink导入数据插入顶点报ArrayIndexOutOfBoundsException
终端输出:
[2021-08-17T16:52:05.199+0800][ERROR][Legacy Source Thread - Source: Custom Source -> parse company info -> (company vertex insert -> Sink: Unnamed, person vertex insert -> Sink: Unnamed, employee edge insert -> Sink: Unnamed, investment edge insert -> Sink: Unnamed, partner edge insert -> Sink: Unnamed) (1/2)][com.cmft.transform.CompanyInfoFlattenMapSingle][|||][]org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
	at com.cmft.transform.CompanyInfoFlattenMapSingle.flatMap(CompanyInfoFlattenMapSingle.java:33)
	at com.cmft.transform.CompanyInfoFlattenMapSingle.flatMap(CompanyInfoFlattenMapSingle.java:20)
	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
	at java.util.ArrayList.forEach(ArrayList.java:1249)
	at com.cmft.transform.NebulaVertexCFlatMap.flatMap(NebulaVertexCFlatMap.java:21)
	at com.cmft.transform.NebulaVertexCFlatMap.flatMap(NebulaVertexCFlatMap.java:15)
	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	... 20 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 4
	at org.apache.flink.types.Row.getField(Row.java:123)
	at org.apache.flink.connector.nebula.sink.NebulaRowVertexOutputFormatConverter.createValue(NebulaRowVertexOutputFormatConverter.java:66)
	at org.apache.flink.connector.nebula.sink.NebulaRowVertexOutputFormatConverter.createValue(NebulaRowVertexOutputFormatConverter.java:21)
	at org.apache.flink.connector.nebula.sink.NebulaBatchExecutor.addToBatch(NebulaBatchExecutor.java:52)
	at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.writeRecord(NebulaBatchOutputFormat.java:92)
	at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.invoke(NebulaSinkFunction.java:51)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	... 30 more
部分代码:
Row cnameRow = new Row(25);
        cnameRow.setField(0, cid);
        cnameRow.setField(1, cname);
        cnameRow.setField(2, shortName);
        cnameRow.setField(3, creditNo);
        cnameRow.setField(4, regNo);
        cnameRow.setField(5, orgNo);
        cnameRow.setField(6, status);
        cnameRow.setField(7, econKind);
        cnameRow.setField(8, registCapi);
        cnameRow.setField(9, scope);
        cnameRow.setField(10, termStart);
        cnameRow.setField(11, termEnd);
        cnameRow.setField(12, belongOrg);
        cnameRow.setField(13, operName);
        cnameRow.setField(14, startDate);
        cnameRow.setField(15, endDate);
        cnameRow.setField(16, province);
        cnameRow.setField(17, city);
        cnameRow.setField(18, domains);
        cnameRow.setField(19, telephone);
        cnameRow.setField(20, email);
        cnameRow.setField(21, address);
        cnameRow.setField(22, websites);
        cnameRow.setField(23, curTimestamp);
        cnameRow.setField(24, curTimestamp);
        vertexCList.add(cnameRow);

最后烦请删掉本模版和问题无关的信息之后,再提交提问,Thx

确认下你的dataStream是怎么得到的,row中都有25个元素么

有25个元素,数组越界问题我解决了,但是报新的错误。

[2021-08-18T15:30:10.504+0800][ERROR][Legacy Source Thread - Source: Custom Source -> parse company info -> (company vertex insert -> Sink: Unnamed, person vertex insert -> Sink: Unnamed, employee edge insert -> Sink: Unnamed, investment edge insert -> Sink: Unnamed, partner edge insert -> Sink: Unnamed) (2/2)][org.apache.flink.connector.nebula.sink.NebulaBatchExecutor][|||][]insert failed: Storage Error: Out of range value.

@nicole 大佬求助~!!

现在跑的flink job除了这个insert failed: Storage Error: Out of range value.没报错,一直在run,一直打印insert语句

你这个是数据本身有问题, 你的数值型数据超过了你定义的长度。

string不是变长吗?或者我用fix-string设置一个比较大的数值?

这个报错不是针对String数据类型,是针对int类型的,你看你的数据里面有两个int类型的属性,你的数据超过了你所定义的int类型可表示的最大值。

但是我的tag 属性全是定义的string和timestamp,没有定义为int啊

我可能知道问题在哪儿了,你看到的数据那两个长数字串是long类型的时间戳,而我定义的属性有两个timestamp的create_time和update_time,这可能就是问题所在,所以我想问下我想插入时间戳应该怎么搞?

你的时间戳是咋生成的, 是不是超过了int64.

就java生成的正常的long类型时间戳,我现在把属性的类型改成string可以插进去了,但是我的机子炸了,用studio查数据直接504

数据量太大了吧, 磁盘写满了吧

啊这,,,确实有这个可能,毕竟只是一台很垃圾的虚拟机。。。我试试换个机子