flink中使用java-client 获取session 连接超时问题

  • nebula 版本:2.0.1
  • 部署方式(分布式 / 单机 / Docker / DBaaS):分布式
  • 是否为线上版本:Y
  • java-client版本:使用最新的master代码打包

在flink程序中,在main函数中设置NebulaSink的并行度为3,在NebulaSink中重写open函数,创建连接池,为避免频繁的创建连接,释放连接,所以直接创建数目为graphd节点数的session数组(因为采用的是RoundRobinLoadBalancer,正好针对每个graphd节点创建一个session),上线后发现有节点连接不上,


.addSink(new NebulaSink()).setParallelism(3);

public class NebulaSink extends RichSinkFunction<List<HiveUsage>> {

    private static final String APP_MODE = "app.mode";
    private static final String NEBULA_SPACE = "nebula.space";

    @Override
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        log.info("NebulaSink NebulaGraphUtil open");
        ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

        String appMode = parameterTool.get(APP_MODE);
        if (StringUtils.isBlank(appMode)) {
            log.error("{} 不能为空", APP_MODE);
            return;
        }

        String space = parameterTool.get(NEBULA_SPACE);
        if (StringUtils.isBlank(space)) {
            log.error("{} 不能为空", NEBULA_SPACE);
            return;
        }
        NebulaGraphUtil.create(appMode, space);
    }
    @Override
    public void invoke(String ss){
    ...
    }
}



public class NebulaGraphUtil {
    private static NebulaPool nebulaPool;
    private static Session[] sessions;

    public static final String USER_NAME = "root";
    public static final String PASSWORD = "nebula";
    private static final Random random = new Random();
    private static List<HostAddress> addresses;

    public static void create(String appMode, String space) throws UnknownHostException, IOErrorException, AuthFailedException, NotValidConnectionException, UnsupportedEncodingException {
        log.info("NebulaGraphUtil create appMode={} space={}", appMode, space);
        NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
        nebulaPoolConfig.setTimeout(60 * 1000);
        nebulaPoolConfig.setMaxConnSize(10);
        nebulaPool = new NebulaPool();

        switch (appMode) {
            case "dev":
                addresses = Arrays.asList(
                        new HostAddress("10.82.232.121", 9669)
                        , new HostAddress("10.82.232.122", 9669)
                        , new HostAddress("10.82.232.123", 9669));
                break;
            case "prod":
                addresses = Arrays.asList(
                        new HostAddress("10.116.109.26", 9669)
                        , new HostAddress("10.116.109.27", 9669)
                        , new HostAddress("10.116.109.48", 9669)
                        , new HostAddress("10.116.109.49", 9669)
                        , new HostAddress("10.116.109.50", 9669));
                break;
            default:
                addresses = Arrays.asList(
                        new HostAddress("10.202.77.197", 3699)
                        , new HostAddress("10.202.77.198", 3699)
                        , new HostAddress("10.202.77.199", 3699));
        }
        try {
            nebulaPool.init(addresses, nebulaPoolConfig);
        } catch (Exception e) {
            log.error("addresses={}, msg={} ", addresses, e.getMessage(), e);
            throw e;
        }
        sessions = new Session[addresses.size()];
        for (int i = 0; i < addresses.size(); i++) {
            try {
                Session session = nebulaPool.getSession(USER_NAME, PASSWORD, true);
                session.execute("use " + space);
                sessions[i] = session;
            } catch (Exception e) {
                log.error("address host={}, msg={}", addresses.get(i).getHost(), e.getMessage(), e);
                throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage() + " address host " + addresses.get(i).getHost());
            }
        }
    }
1 个赞

你好,麻烦你确认下你运行 flink的环境是否能正常访问 10.116.109.49

确定可以访问10.116.109.49,这5台都可以,重启flink服务的时候,也出现过连接10.116.109.26失败,同时其他线程节点连接10.116.109.26成功,所以问题不在于flink集群和 nebula服务集群访问不通问题。
最后经过测试发现如果任选5台 graphd中的3台就没问题,我估计是java-clients连接池问题

addresses = Arrays.asList( new HostAddress("10.116.109.26", 9669) , new HostAddress("10.116.109.27", 9669) , new HostAddress("10.116.109.48", 9669) , new HostAddress("10.116.109.49", 9669) , new HostAddress("10.116.109.50", 9669))

你看下是否有get_session的次数超过连接池的数量,这个报错看着是连接池里面没有可用的连接,所以报这个错。

1 个赞

addresses = Arrays.asList(
new HostAddress(“10.82.232.121”, 9669)
, new HostAddress(“10.82.232.122”, 9669)
, new HostAddress(“10.82.232.123”, 9669));
请教下,按照这种方式创建的3个session,长期持有,但是如果 10.82.232.121节点上graphd被kill了(模拟宕机),跟踪代码发现,再次获取session还是获取的10.82.232.121节点上的导致报错,此处有3个疑惑:
1、RoundRobinLoadBalancer.updateServersStatus中判断服务状态时,用的ping函数,就算10.82.232.121:9669挂掉了,也是返回成功的,导致获取session失败;
2、RoundRobinLoadBalancer.getAddress中获取了一个HostAddress,只是进行了缓存serversStatus查看状态,存在60s的延迟(因为定时任务是60s执行一次刷新缓存操作)
3、就算在retryConnect()期间,再次重启了10.82.232.121:9669节点graphd,执行语句返回报错 E_SESSION_INVALID Session `1’ has expired,所以这个重连代码 意义是?





image

这个重连代码需要服务端那边也要一起支持的 https://github.com/vesoft-inc/nebula-graph/pull/280 ,服务端端pr还没合进去,所以客户端在服务端挂掉后的重连现在是不能用的。

get_session 里面会对拿到的 connection 检测连通性的,RoundRobinLoadBalancer.updateServersStatus主要是用于服务恢复之后的可以用回那些之前异常的服务。但是 get_session 是会保证不会拿坏的 connection给用户的。现在应该是服务挂了,但是不是在执行过程中发现服务挂了,所以没有更新到服务状态,所以那个服务状态到更新需要优化下,感谢你到反馈。

1 个赞

所以说现在 我程序运行中, graphd 挂了一个节点, 无论这个节点是否会重启成功,我这个程序就会一直失败,因为retryConnect 或者 new session 目前都有问题

重连这个目前是的,目前服务端的pr没有进去,重连拿到的connection是连到其他graphd上面的,所以session id不能用,服务端的pr进去这样处理就没问题。

这个pr 大概什么时候发布咧

我催促下,尽快合入,合进去会在这里说明。

pr 已经合进去了,月底版本会发布

1 个赞

该主题在最后一个回复创建后7天后自动关闭。不再允许新的回复。

浙ICP备20010487号