向 flink 提交作业报“NebulaPoolConfig is not serializable”

  • nebula 版本:2.0.0

  • 部署方式:Docker

  • 是否为线上版本:N

  • 问题的具体描述
    在 flink 任务中用 nebula-java(2.0.0)写数据到 nebula,提交 flink 作业时,报
    com.vesoft.nebula.client.graph.NebulaPoolConfig is not serializable. The object probably contains or references non serializable fields.

./flink run ~/apps/xdr-graph-1.0-SNAPSHOT.jar
------------------------------------------------------------
 The program finished with the following exception:

com.vesoft.nebula.client.graph.NebulaPoolConfig@7d61eb55 is not serializable. The object probably contains or references non serializable fields.
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
        org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
        org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
        org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:577)
        com.xdr.XdrGraphJob.main(XdrGraphJob.java:65)

client 初始化参考的 nebula-java 的例子

NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
nebulaPoolConfig.setMaxConnSize(10);
List<HostAddress> addresses = Arrays.asList(new HostAddress("127.0.0.1", 9669),
        new HostAddress("127.0.0.1", 9670));
NebulaPool pool = new NebulaPool();
pool.init(addresses, nebulaPoolConfig);
Session session = pool.getSession("root", "nebula", false);

NebulaPoolConfig 没有实现Serializable方法,你只能在task内部做 pool的实例化和连接获取。

浙ICP备20010487号