提问参考模版:
- nebula 版本:3.8.0
- nebula-flink-connector版本:3.5.0
- 部署方式:单机
- 安装方式: Docker
- 是否上生产环境: N
- 硬件信息
- 磁盘( SSD)
- 问题:
对NebulaSinkFunction 的批处理机制 有疑问 看了下源码 默认是2000
那如果batch 未满2000的话 是不是就不会执行写到图库 直到 满阈值
我做过测试 从flink 从kafka 读数据
简单的贴下流程:
// kafka Source数据流
FlinkKafkaConsumer<String> kafkaConsumer=new FlinkKafkaConsumer<>(topic,new SimpleStringSchema(),kafkaProperties);
kafkaConsumer.setStartFromEarliest(); // 从头开始消费
SingleOutputStreamOperator<String> kafkaSourceStream = env.addSource(kafkaConsumer)
.uid("test_kafka_consumer")
.setParallelism(DEFAULT_PARALLELISM);
// 做预处理
SingleOutputStreamOperator<Row> logVertexStream = kafkaSourceStream.uid("kafka_stream_map_to_vertex_stream").setParallelism(DEFAULT_PARALLELISM).flatMap(new HandleRwaLogMapFunctionWithVertex());
// sinkVertexFunction
NebulaSinkFunction<Row> sinkVertexFunction= NebulaSinkUtil.sinkVertexFunction(graphSpace, tag, idIndex, vertexFields, vertexPositions);
// sink
logVertexStream.addSink(sinkVertexFunction).uid("sink_vertex_to_nebula").setParallelism(DEFAULT_PARALLELISM);
kafka topic中一开始 有 超过2000条数据
运行之后 再图库中可以匹配到点 是有数据的
当我向topic中插入一条数据(历史的已经消费完了,且是新id) 然后在图库中匹配新id,查不到数据
然后之后尝试多次 插入多条数据(<10条) 均没有在图库中查到
之后 尝试 在topic中增加超过2000条,就可以查到了
随后我将batch配置为2
重启flink 重复上面流程,当kafka消费完历史offset之后,一次更新3条数据到topic(id与历史的均不同)
随之查询 可以查到
所以我得出结论 当batch达到阈值 才会执行批量的插入是么
有配置最大等待时间么的机制么 例如 就算未到达batch阈值 等待多少时间 就自动执行
或者有其他方法么(除了配置batch=1)