flink[The pool has closed. Couldn't use again.]

java环境

1.8

flink版本

1.12.3

scala版本

2.11

问题:

运行一段时间后就会报该错误 :sob:大佬们SOS

  • 部分代码
....
// 
 resultDS.countWindowAll(10000).apply(new NebulaFunction).addSink(new GraphSink).setParallelism(1)

-----------------------------sink----------------------------------
class GraphSink extends RichSinkFunction[Iterable[NebulaBean]] with NebulaSinkTrait {
  private val logger: Logger = LoggerFactory.getLogger(this.getClass)

  override def open(parameters: Configuration): Unit = {
    NebulaUtil.initPool()
    NebulaUtil.initSession()
  }

  override def invoke(elements: Iterable[NebulaBean], context: SinkFunction.Context): Unit = {
    for (value <- elements) {
        updateGraph(value.ip, value.username, value.protocol, value.ts, value.hostname, value.ip2local, value.login_type, value.is_employee, logger)
    }
  }

  override def close(): Unit = {
    NebulaUtil.releaseSession()
  }
}

你在使用的时候把graph连接池关闭了,然后又用到池子了。
你的使用过程应该有问题,放出来的代码看不到咋用的

代码其他部分都是execute操作。
请问下flink用javaclient是否只能并行度为1?

运行一个任务是可以正常执行,但消费速率很慢,两个任务又会报错。

batchsize 数据量 size/mb duration 平均消费/s 消费速率/kb
1000 50w 1945.6 14小时49分钟(53340s) 9.37 37.35
1000 50.5w 1955.84 15h(54000s) 9.35 37.08
1000 364w 14131.2 1440m(86400s) 42.23 167.4
10000 120w 4689.92 1800min(10800s) 111.11 444.67

估计你是多个graphsink公用了一个池子,其中一个并发在使用完池子后关闭了,其他并发还在使用池子,就有错误了。
你贴出来的代码都是调NebulaUtil 工具的方法,都不知道你工具里面怎么做的初始化池子和关闭池子。

你可以直接用我们提供的flink connector 来做数据插入 Nebula Flink Connector - Nebula Graph Database 手册

版本不兼容 :sob:

按批写入很慢,新增节点可以快多少?

按批写入不应该慢的,就看你的并发和batch size多大了。 具体增加一个节点可以快多少 要看你给的参数配置和网络环境。

有配置建议吗?

看你的点边数据 属性多少,如果属性不多可以batch size设置2000. 并发根据你自己的环境来配

我设置了3个并行度,(500 ~ 2000)一批读数据,之后一直报这个问题。

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(60000 * 30, CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(60000 * 3)
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

这是flink checkpoint的错误,不知道你自己的程序怎么实现的, 你在flink repo中查一下有没有相关问题。

浙ICP备20010487号