flink使用nebula-flink-connector 3.0.0流处理时session过期导致数据同步失败问题

  • nebula 版本:3.4.1
  • 部署方式:分布式
  • 安装方式:RPM
  • 是否上生产环境:Y
  • 硬件信息
    • 磁盘( 推荐使用 SSD)400G
    • CPU、内存信息 8G
  • 问题的具体描述
    FlinkCDC使用nebula-flink-connector 版本3.0.0 实现mysql到nebula的流数据更新任务,但是在运行超过默认8小时(最高7天)未捕获到数据变更就导致了session过期,然后就导致报错,然后该流处理任务就再也不能同步数据了

具体报错(报错日志我进行了重写补充):

22:08:17.357 [nebula-write-output-format-thread-1] ERROR o.a.f.c.n.s.NebulaVertexBatchExecutor - [executeBatch,83] - [synchronization vertex failed]write data failed-GraphHost:[192.168.xx.13:9669],SpaceName:[],statement:[UPDATE VERTEX ON `Organization` "organization325_6" SET `organization_id`=325"],ErrorMessage:[Get sessionId[1680243054809802] failed: Session `1680243054809802' not found: Session not existed!]
22:08:34.361 [nebula-write-output-format-thread-1] ERROR o.a.f.c.n.s.NebulaVertexBatchExecutor - [executeBatch,83] - [synchronization vertex failed]write data failed-GraphHost:[192.168.xx.13:9669],SpaceName:[],statement:[UPDATE VERTEX ON `Organization` "organization325_6" SET `organization_id`=325"],ErrorMessage:[Get sessionId[1680243054809802] failed: Session `1680243054809802' not found: Session not existed!]

问题:
我在实时流处理任务中如何刷新session导致不过期,nebula-flink-connector有没有api可以调用(因为默认最高7天)

目前没有api暴露进行显示刷新session,该行为可加入我们的需求中 允许配置session自动刷新周期,在flink connector内部自动的进行session 更新。

2 个赞

:grinning:目前我是重写了nebula-flink-connector3.0.0里面源代码的部分类,如下
image




还有一个NebulaVertexBatchExecutor类跟上图同理改法