nebula-flink-connector使用问题

自己封装的数据测试可以写入nebula数据库

接入了kafka使用了DataStream 打印了流,处理了流,最后sink到nebula,nebula没有任何数据也不报错,nebulaSinkFunction也没问题,数据流也没断,走不到addSink的方法里

代码如下,这里kafka接入处理数据,专DataStream没问题

sink点的方法

 assetsSource.flatMap(new FlatMapFunction<List<AssetsDTO>, org.apache.flink.types.Row>() {
        @Override
        public void flatMap(List<AssetsDTO> row, Collector<org.apache.flink.types.Row> collector) throws Exception {
            org.apache.flink.types.Row record = new org.apache.flink.types.Row(AssetsDTO.getFieldCount());

            for (AssetsDTO assetsDTO : row) {
                record.setField(0, assetsDTO.getIp());
                record.setField(1, assetsDTO.getName());
                record.setField(2, assetsDTO.getDeviceType());
                record.setField(3, assetsDTO.getRegionId());
                System.out.println(record);
                collector.collect(record);
            }
        }
    })

这里流都正常 我都打印了,就是.addSink(nebulaSinkFunction);一点效果也没有,nebula没有一点数据

nebula的数据库和表我都建立了,没问题,静态数据插入都正常
nebula-flink-connector3.3.0版本,核心插入代码都是官方的
我只不过接了kafka的数据源 请问是什么问题

你kafka有多少数据? 看下有没有走 到 org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat#commit 这个里面来。

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。