flink中使用java-client 获取session 连接超时问题

  • nebula 版本:2.0.1
  • 部署方式(分布式 / 单机 / Docker / DBaaS):分布式
  • 是否为线上版本:Y
  • java-client版本:使用最新的master代码打包

在flink程序中,在main函数中设置NebulaSink的并行度为3,在NebulaSink中重写open函数,创建连接池,为避免频繁的创建连接,释放连接,所以直接创建数目为graphd节点数的session数组(因为采用的是RoundRobinLoadBalancer,正好针对每个graphd节点创建一个session),上线后发现有节点连接不上,


.addSink(new NebulaSink()).setParallelism(3);

public class NebulaSink extends RichSinkFunction<List<HiveUsage>> {

    private static final String APP_MODE = "app.mode";
    private static final String NEBULA_SPACE = "nebula.space";

    @Override
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        log.info("NebulaSink NebulaGraphUtil open");
        ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

        String appMode = parameterTool.get(APP_MODE);
        if (StringUtils.isBlank(appMode)) {
            log.error("{} 不能为空", APP_MODE);
            return;
        }

        String space = parameterTool.get(NEBULA_SPACE);
        if (StringUtils.isBlank(space)) {
            log.error("{} 不能为空", NEBULA_SPACE);
            return;
        }
        NebulaGraphUtil.create(appMode, space);
    }
    @Override
    public void invoke(String ss){
    ...
    }
}



public class NebulaGraphUtil {
    private static NebulaPool nebulaPool;
    private static Session[] sessions;

    public static final String USER_NAME = "root";
    public static final String PASSWORD = "nebula";
    private static final Random random = new Random();
    private static List<HostAddress> addresses;

    public static void create(String appMode, String space) throws UnknownHostException, IOErrorException, AuthFailedException, NotValidConnectionException, UnsupportedEncodingException {
        log.info("NebulaGraphUtil create appMode={} space={}", appMode, space);
        NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
        nebulaPoolConfig.setTimeout(60 * 1000);
        nebulaPoolConfig.setMaxConnSize(10);
        nebulaPool = new NebulaPool();

        switch (appMode) {
            case "dev":
                addresses = Arrays.asList(
                        new HostAddress("10.82.232.121", 9669)
                        , new HostAddress("10.82.232.122", 9669)
                        , new HostAddress("10.82.232.123", 9669));
                break;
            case "prod":
                addresses = Arrays.asList(
                        new HostAddress("10.116.109.26", 9669)
                        , new HostAddress("10.116.109.27", 9669)
                        , new HostAddress("10.116.109.48", 9669)
                        , new HostAddress("10.116.109.49", 9669)
                        , new HostAddress("10.116.109.50", 9669));
                break;
            default:
                addresses = Arrays.asList(
                        new HostAddress("10.202.77.197", 3699)
                        , new HostAddress("10.202.77.198", 3699)
                        , new HostAddress("10.202.77.199", 3699));
        }
        try {
            nebulaPool.init(addresses, nebulaPoolConfig);
        } catch (Exception e) {
            log.error("addresses={}, msg={} ", addresses, e.getMessage(), e);
            throw e;
        }
        sessions = new Session[addresses.size()];
        for (int i = 0; i < addresses.size(); i++) {
            try {
                Session session = nebulaPool.getSession(USER_NAME, PASSWORD, true);
                session.execute("use " + space);
                sessions[i] = session;
            } catch (Exception e) {
                log.error("address host={}, msg={}", addresses.get(i).getHost(), e.getMessage(), e);
                throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage() + " address host " + addresses.get(i).getHost());
            }
        }
    }

你好,麻烦你确认下你运行 flink的环境是否能正常访问 10.116.109.49

确定可以访问10.116.109.49,这5台都可以,重启flink服务的时候,也出现过连接10.116.109.26失败,同时其他线程节点连接10.116.109.26成功,所以问题不在于flink集群和 nebula服务集群访问不通问题。
最后经过测试发现如果任选5台 graphd中的3台就没问题,我估计是java-clients连接池问题

addresses = Arrays.asList( new HostAddress("10.116.109.26", 9669) , new HostAddress("10.116.109.27", 9669) , new HostAddress("10.116.109.48", 9669) , new HostAddress("10.116.109.49", 9669) , new HostAddress("10.116.109.50", 9669))

浙ICP备20010487号