NebulaGraph Flink Connector 向nebula图库 Sink数据批处理问题

提问参考模版:

  • nebula 版本:3.8.0
  • nebula-flink-connector版本:3.5.0
  • 部署方式:单机
  • 安装方式: Docker
  • 是否上生产环境: N
  • 硬件信息
    • 磁盘( SSD)
  • 问题:
    对NebulaSinkFunction 的批处理机制 有疑问 看了下源码 默认是2000

那如果batch 未满2000的话 是不是就不会执行写到图库 直到 满阈值

我做过测试 从flink 从kafka 读数据
简单的贴下流程:

// kafka Source数据流 
FlinkKafkaConsumer<String> kafkaConsumer=new FlinkKafkaConsumer<>(topic,new SimpleStringSchema(),kafkaProperties);
 kafkaConsumer.setStartFromEarliest();    // 从头开始消费
SingleOutputStreamOperator<String> kafkaSourceStream = env.addSource(kafkaConsumer)
                .uid("test_kafka_consumer")
                .setParallelism(DEFAULT_PARALLELISM);
// 做预处理
SingleOutputStreamOperator<Row> logVertexStream = kafkaSourceStream.uid("kafka_stream_map_to_vertex_stream").setParallelism(DEFAULT_PARALLELISM).flatMap(new HandleRwaLogMapFunctionWithVertex());

// sinkVertexFunction
NebulaSinkFunction<Row> sinkVertexFunction= NebulaSinkUtil.sinkVertexFunction(graphSpace, tag, idIndex, vertexFields, vertexPositions);
// sink
logVertexStream.addSink(sinkVertexFunction).uid("sink_vertex_to_nebula").setParallelism(DEFAULT_PARALLELISM);

kafka topic中一开始 有 超过2000条数据
运行之后 再图库中可以匹配到点 是有数据的
当我向topic中插入一条数据(历史的已经消费完了,且是新id) 然后在图库中匹配新id,查不到数据
然后之后尝试多次 插入多条数据(<10条) 均没有在图库中查到
之后 尝试 在topic中增加超过2000条,就可以查到了

随后我将batch配置为2


重启flink 重复上面流程,当kafka消费完历史offset之后,一次更新3条数据到topic(id与历史的均不同)
随之查询 可以查到

所以我得出结论 当batch达到阈值 才会执行批量的插入是么
有配置最大等待时间么的机制么 例如 就算未到达batch阈值 等待多少时间 就自动执行

或者有其他方法么(除了配置batch=1)

已解决:
配batchIntervalMs就可以了
这个不配置的话默认是0 ,就会出现上面说的情况 得等待batch达到阈值
配置>0的值之后即使没有达到 batchSize,当指定的时间间隔 batchIntervalMs 到达时,也会触发批处理操作。

3 个赞

此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。