提问参考模版:
- nebula 版本:3.0.0
- 部署方式:分布式
- 是否上生产环境:N
- 问题的具体描述
背景:使用flink任务,上游从kafka消费,下游sink到nebula,其中一个逻辑是更新点的属性,但是在这里存在点不在数据库中的情况
问题表象:flink任务oom报错
分析过程:使用jmap工具从flink运行任务的主机上将内存信息dump后,导入MAT工具进行分析,发现NebulaBatchOutputFormat中的errorBuffer占用了大量内存,其中存储的都是上面提到的update失败的语句,而且在源码中没有找到清空这个对象的操作,只有add操作。
问题:
- 因为我的业务逻辑决定了可能会更新一些不存在的点,想知道除了update语句我没有更好的语句可以使用?能够避免执行失败这样的问题?
- 源码中的errorBuffer有没有方法让他不记录或者自动清除?
1.flink流式写入不方便提前把数据处理掉
2.请问这里如何改成日志的形式呢?
errorBuffer.add 代码改成打日志 log.error(errorExec)
这个是源码的部分呀,我们测试环境也要使用正式发布的代码,侵入式修改源码是不被允许的。所以看看社区能不能在下个版本中对这个errorBuffer做一个优化,增加参数可以配置,或者增加自动清理机制?
你们的需求是什么样的,对于导入失败的数据是打印日志还是要写入文件? 自动清理应该不行,可以按批次进行处理,处理完毕清空buffer。
从需求上来说,其实我们并不关心导入失败的数据,但打印日志是可以接受的。写入文件的话涉及到写入的文件地址和文件的清理,因为flink任务会持久的运行,必然会一直产生文件,文件大小的持续增加也是需要考虑的问题。
另外,我没太理解按批次处理是什么含义?运行错误的语句难道后续还会有什么其他的处理?如果一直失败,那什么时候可以定义为“处理完毕”?
errorbuffer 当积累到一定的size后,写文件或者打日志,然后清空errorBuffer,这样不会导致errorBuffer持续增长。
嗯,我觉得这样处理是可以的。现在的版本是已经有这样的功能了么?没有的话,预计会在什么版本支持呢
system
关闭
11
此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。