自己封装的数据测试可以写入nebula数据库
接入了kafka使用了DataStream 打印了流,处理了流,最后sink到nebula,nebula没有任何数据也不报错,nebulaSinkFunction也没问题,数据流也没断,走不到addSink的方法里
代码如下,这里kafka接入处理数据,专DataStream没问题
sink点的方法
assetsSource.flatMap(new FlatMapFunction<List<AssetsDTO>, org.apache.flink.types.Row>() {
@Override
public void flatMap(List<AssetsDTO> row, Collector<org.apache.flink.types.Row> collector) throws Exception {
org.apache.flink.types.Row record = new org.apache.flink.types.Row(AssetsDTO.getFieldCount());
for (AssetsDTO assetsDTO : row) {
record.setField(0, assetsDTO.getIp());
record.setField(1, assetsDTO.getName());
record.setField(2, assetsDTO.getDeviceType());
record.setField(3, assetsDTO.getRegionId());
System.out.println(record);
collector.collect(record);
}
}
})
这里流都正常 我都打印了,就是.addSink(nebulaSinkFunction);一点效果也没有,nebula没有一点数据
nebula的数据库和表我都建立了,没问题,静态数据插入都正常
nebula-flink-connector3.3.0版本,核心插入代码都是官方的
我只不过接了kafka的数据源 请问是什么问题