nebula_flink_connector使用过程中oom问题

提问参考模版:

  • nebula 版本:3.0.0
  • 部署方式:分布式
  • 是否上生产环境:N
  • 问题的具体描述

背景:使用flink任务,上游从kafka消费,下游sink到nebula,其中一个逻辑是更新点的属性,但是在这里存在点不在数据库中的情况
问题表象:flink任务oom报错
分析过程:使用jmap工具从flink运行任务的主机上将内存信息dump后,导入MAT工具进行分析,发现NebulaBatchOutputFormat中的errorBuffer占用了大量内存,其中存储的都是上面提到的update失败的语句,而且在源码中没有找到清空这个对象的操作,只有add操作。


问题:

  1. 因为我的业务逻辑决定了可能会更新一些不存在的点,想知道除了update语句我没有更好的语句可以使用?能够避免执行失败这样的问题?
  2. 源码中的errorBuffer有没有方法让他不记录或者自动清除?
  1. 如果你的业务中要求后续的这些点不能写到库中,那只能是udpate了,要想避免执行失败只能在写入前在库中查并把不存在的数据处理掉。
  2. 当存在大量执行失败的数据时是会容易导致oom的,这里可以改成日志的形式

1.flink流式写入不方便提前把数据处理掉
2.请问这里如何改成日志的形式呢?

errorBuffer.add 代码改成打日志 log.error(errorExec)

这个是源码的部分呀,我们测试环境也要使用正式发布的代码,侵入式修改源码是不被允许的。所以看看社区能不能在下个版本中对这个errorBuffer做一个优化,增加参数可以配置,或者增加自动清理机制?

你们的需求是什么样的,对于导入失败的数据是打印日志还是要写入文件? 自动清理应该不行,可以按批次进行处理,处理完毕清空buffer。

从需求上来说,其实我们并不关心导入失败的数据,但打印日志是可以接受的。写入文件的话涉及到写入的文件地址和文件的清理,因为flink任务会持久的运行,必然会一直产生文件,文件大小的持续增加也是需要考虑的问题。
另外,我没太理解按批次处理是什么含义?运行错误的语句难道后续还会有什么其他的处理?如果一直失败,那什么时候可以定义为“处理完毕”?

errorbuffer 当积累到一定的size后,写文件或者打日志,然后清空errorBuffer,这样不会导致errorBuffer持续增长。

嗯,我觉得这样处理是可以的。现在的版本是已经有这样的功能了么?没有的话,预计会在什么版本支持呢

现在没有,下一个版本可以加上,4月底发布

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。