java环境
1.8
flink版本
1.12.3
scala版本
2.11
问题:
运行一段时间后就会报该错误 大佬们SOS
....
//
resultDS.countWindowAll(10000).apply(new NebulaFunction).addSink(new GraphSink).setParallelism(1)
-----------------------------sink----------------------------------
class GraphSink extends RichSinkFunction[Iterable[NebulaBean]] with NebulaSinkTrait {
private val logger: Logger = LoggerFactory.getLogger(this.getClass)
override def open(parameters: Configuration): Unit = {
NebulaUtil.initPool()
NebulaUtil.initSession()
}
override def invoke(elements: Iterable[NebulaBean], context: SinkFunction.Context): Unit = {
for (value <- elements) {
updateGraph(value.ip, value.username, value.protocol, value.ts, value.hostname, value.ip2local, value.login_type, value.is_employee, logger)
}
}
override def close(): Unit = {
NebulaUtil.releaseSession()
}
}
nicole
2
你在使用的时候把graph连接池关闭了,然后又用到池子了。
你的使用过程应该有问题,放出来的代码看不到咋用的
代码其他部分都是execute操作。
请问下flink用javaclient是否只能并行度为1?
运行一个任务是可以正常执行,但消费速率很慢,两个任务又会报错。
batchsize |
数据量 |
size/mb |
duration |
平均消费/s |
消费速率/kb |
1000 |
50w |
1945.6 |
14小时49分钟(53340s) |
9.37 |
37.35 |
1000 |
50.5w |
1955.84 |
15h(54000s) |
9.35 |
37.08 |
1000 |
364w |
14131.2 |
1440m(86400s) |
42.23 |
167.4 |
10000 |
120w |
4689.92 |
1800min(10800s) |
111.11 |
444.67 |
nicole
5
估计你是多个graphsink公用了一个池子,其中一个并发在使用完池子后关闭了,其他并发还在使用池子,就有错误了。
你贴出来的代码都是调NebulaUtil 工具的方法,都不知道你工具里面怎么做的初始化池子和关闭池子。
nicole
6
nicole
8
按批写入不应该慢的,就看你的并发和batch size多大了。 具体增加一个节点可以快多少 要看你给的参数配置和网络环境。
nicole
10
看你的点边数据 属性多少,如果属性不多可以batch size设置2000. 并发根据你自己的环境来配
我设置了3个并行度,(500 ~ 2000)一批读数据,之后一直报这个问题。
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(60000 * 30, CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(60000 * 3)
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
nicole
12
这是flink checkpoint的错误,不知道你自己的程序怎么实现的, 你在flink repo中查一下有没有相关问题。
system
关闭
13
此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。