nebula-java使用问题,项目开启一段时间后会管道破裂,无法查到数据,只能重启恢复

  • nebula 版本:3.0.0
  • nebula-java 版本是3.0.0
  • 部署方式:分布式
  • 安装方式:RPM
  • 是否为线上版本:Y
  • 硬件信息
    • 磁盘SATA
    • CPU、内存信息 三台虚拟机:都是4核、8g内存
  • 问题的具体描述

这个是报错行,语句是没问题的,只是隔了一个晚上就会显示管道破裂的异常,无法读取数据
val resp: ResultSet = session.execute(query)

  • 相关的 meta / storage / graph info 日志信息(尽量使用文本形式方便检索)
    在服务器下 logs 日志中并没有出现error 和 warning日志

代码
工具类代码:

  private val log: Logger = LoggerFactory.getLogger(NebulaUtil.getClass)

  private var pool: NebulaPool = null
  private var session: Session = null

  def getPool(): NebulaPool = {
    pool
  }

  //初始化失败 session 为空
  def init(host: String, port: Int, maxConnSize: Int, user: String, pass: String): Boolean = {

    try{
      var isSuccess = false
      pool = new NebulaPool

      // 配置
      val nebulaPoolConfig = new NebulaPoolConfig
      nebulaPoolConfig.setMaxConnSize(maxConnSize)

      // 初始化ip和端口
      val addresses = util.Arrays.asList(new HostAddress(host, port))
      val initResult = pool.init(addresses, nebulaPoolConfig)

      //session赋值
      if (!initResult){
        log.error("pool init failed.")
      }else{
        session = pool.getSession(user, pass, false)
        isSuccess = true
      }

      isSuccess
    } catch {
      case e : Exception => log.error("pool init failed.")
        false
    }
  }

  //TODO query: 创建空间、进入空间、创建新的点和边的类型、插入点、插入边、执行查询
  def executeResultSet(query: String): ResultSet = {

    val resp: ResultSet = session.execute(query)
    if (!resp.isSucceeded){
      log.error(String.format("Execute: `%s', failed: %s", query, resp.getErrorMessage))
      System.exit(1)
    }

    resp
  }



  //获取ResultSet中的各个列名及数据
  //_1 列名组成的列表
  //_2 多row组成的列表嵌套    单个row的列表 包含本行每一列的数据
  def getInfoForResult(resultSet: ResultSet): (util.List[String], util.List[util.List[Object]]) = {

    //拿到列名
    val colNames: util.List[String] = resultSet.keys

    //拿数据
    val data: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]]

    //循环获取每行数据
    for (i <- 0 until resultSet.rowsSize) {
      val curData = new util.ArrayList[Object]
      //拿到第i行数据的容器
      val record = resultSet.rowValues(i)
      import scala.collection.JavaConversions._

      //获取容器中数据
      for (value <- record.values) {
        if (value.isString) curData.add(value.asString)
        else if (value.isLong) curData.add(value.asLong.toString)
        else if (value.isBoolean) curData.add(value.asBoolean.toString)
        else if (value.isDouble) curData.add(value.asDouble.toString)
        else if (value.isTime) curData.add(value.asTime.toString)
        else if (value.isDate) curData.add(value.asDate.toString)
        else if (value.isDateTime) curData.add(value.asDateTime.toString)
        else if (value.isVertex) curData.add(value.asNode.toString)
        else if (value.isEdge) curData.add(value.asRelationship.toString)
        else if (value.isPath) curData.add(value.asPath.toString)
        else if (value.isList) curData.add(value.asList.toString)
        else if (value.isSet) curData.add(value.asSet.toString)
        else if (value.isMap) curData.add(value.asMap.toString)
      }
      //合并数据
      data.add(curData)
    }

    (colNames, data)
  }

逻辑代码:

 private val logger = Logger.getLogger(classOf[NebulaQueryServiceImpl])

  override def queryFieldRely(fieldName: String): util.Map[String, Object] = {
    logger.info("根据当前字段查找上下游血缘")
    var resultData = new util.HashMap[String, Object]()
    try {
      var isSuccess = false
      if(NebulaUtil.getPool() == null){
        isSuccess = NebulaUtil.init("10.88.100.88", 9669, 100, "root", "root")
      } else {
        //已经初始化连接池
        isSuccess = true
      }

      if (isSuccess){

        NebulaUtil.executeResultSet("use field_blood_relation_space")

        //查询当前字段的上下游所有字段
        //1
//        resultData.put("data", getFieldRelyAll(fieldName))
        //2
//        resultData.put("down", getDownSubgraph(fieldName))
//        resultData.put("up", getUpSubgraph(fieldName))
        //3 先获取下游 bean
        val down = true
        val up = false

        resultData.put("down_data", getSubgraph(fieldName, down).toString())
        resultData.put("up_data", getSubgraph(fieldName, up).toString())

      } else{
        logger.error("Nebula初始化失败")
      }

      resultData
    }
    catch {
      case e: Exception => e.printStackTrace()
        resultData
    }
  }

  //bean next 指针为可变数组
  //获取子图
  //field_name 起始节点, direct 子图方向(true 下游, false 上游)
  def getSubgraph(field_name: String, direct: Boolean): FieldRely = {

    // field_name 所在节点
    val relyResult = new FieldRely(field_name, new mutable.ArrayBuffer[FieldRely])

    // out 为下游, in 为上游
    var downOrUp = "out"
    // 获取当前查询的方向
    if (direct){
      downOrUp = "out"
    } else {
      downOrUp = "in"
    }

    //1 查询语句 查询下游所有子图
    val query =
      s"""
         | get subgraph 100 steps from "$field_name" $downOrUp field_rely yield edges as field_rely;
         |""".stripMargin

    val resultSet = NebulaUtil.executeResultSet(query)

    //[[:field_rely "dws.dws_order+ds_code"->"dws.dws_order_day+ds_code" @0 {}], [:field_rely "dws.dws_order+ds_code"->"tujia_qlibra.dws_order+p_ds_code" @0 {}], [:field_rely "dws.dws_order+ds_code"->"tujia_tmp.dws_order_execution+ds_code" @0 {}]]
    //非空则获取数据
    if (!resultSet.isEmpty) {
      //非空,则拿数据,解析数据
      val data = NebulaUtil.getInfoForResult(resultSet)
      val curData: util.List[util.List[Object]] = data._2

      //正则匹配引号中数据
      val pattern = Pattern.compile("\"([^\"]*)\"")

      // 上一步长的所有节点数组
      // 判断节点的父节点, 方便存储
      var parentNode = new mutable.ArrayBuffer[FieldRely]()


      //2 首先获取步长为 1 的边
      curData.get(0).get(0).toString.split(",").foreach(curEdge =>{
        //拿到边的起始和目的点
        val matcher = pattern.matcher(curEdge)
        var startPoint = ""
        var endPoint = ""

        //将两点赋值
        while (matcher.find()){
          val curValue = matcher.group().replaceAll("\"", "")
          if ("".equals(startPoint)){
            startPoint = curValue
          }else{
            endPoint = curValue
          }
        }
//        logger.info(s"1 startPoint: $startPoint")
//        logger.info(s"1 endPoint: $endPoint")
        //合并到起点 bean 中
        relyResult.children.append(new FieldRely(endPoint, new ArrayBuffer[FieldRely]()))
//        logger.info(s"relyResult.children.length: ${relyResult.children.length}")
//        logger.info(s"relyResult.children.size: ${relyResult.children.size}")
//        logger.info(s"relyResult.children: ${relyResult.children}")
      })

      //3 并初始化父节点数组
      parentNode = relyResult.children




      //4 得到其余所有边
      for (i <- 1 until curData.size - 1){
        //储存下个步长的父节点集合
        val nextParentNode = new mutable.ArrayBuffer[FieldRely]()
        val curEdges = curData.get(i).get(0).toString

        //3 多个边循环解析, 拿到目的点
        curEdges.split(",").foreach(curEdge => {

          //拿到边的起始和目的点
          val matcher = pattern.matcher(curEdge)
          var startPoint = ""
          val endNode = new FieldRely()

          //将两点赋值
          while (matcher.find()){
            val curValue = matcher.group().replaceAll("\"", "")
//            logger.info(s"not 1 curValue: $curValue")
            if ("".equals(startPoint)){
              startPoint = curValue
            }else{
              endNode.name = curValue
              endNode.children = new mutable.ArrayBuffer[FieldRely]()
              nextParentNode.append(endNode)
            }
          }

//          logger.info(s"startPoint: $startPoint")
//          logger.info(s"endNode: ${endNode.name}")
          //通过 startPoint 找到父节点, 将 endPoint 加入到本父节点的 children 中
          var flag = true
          //至此, 一条边插入成功
          for (curFieldRely <- parentNode if flag){
            if (curFieldRely.name.equals(startPoint)){
              curFieldRely.children.append(endNode)
              flag = false
            }
          }

        })

        //更新父节点
        parentNode = nextParentNode
      }

    }
//    logger.info(s"relyResult.toString: ${relyResult.toString}")
    relyResult
  }

错误日志

2022-03-18 10:02:21.032  INFO 221655 --- [io-8888-exec-13] c.t.w.s.impl.NebulaQueryServiceImpl      : 根据当前字段查找上下游血缘
com.vesoft.nebula.client.graph.exception.IOErrorException: java.net.SocketException: Broken pipe (Write failed)
        at com.vesoft.nebula.client.graph.net.SyncConnection.executeWithParameter(SyncConnection.java:192)
        at com.vesoft.nebula.client.graph.net.Session.executeWithParameter(Session.java:113)
        at com.vesoft.nebula.client.graph.net.Session.execute(Session.java:78)
        at com.tujia.whale.utils.NebulaUtil$.executeResultSet(NebulaUtil.scala:57)
        at com.tujia.whale.service.impl.NebulaQueryServiceImpl.queryFieldRely(NebulaQueryServiceImpl.scala:43)
        at com.tujia.whale.api.NebulaQueryApi.queryFieldRely(NebulaQueryApi.scala:42)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
        at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
        at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
        at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:967)
        at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:901)
        at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)
        at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:635)
        at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:742)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:317)
        at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:127)
        at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:91)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:170)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.jasig.cas.client.session.SingleSignOutFilter.doFilter(SingleSignOutFilter.java:97)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.web.filter.CorsFilter.doFilterInternal(CorsFilter.java:96)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:64)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
        at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:214)
        at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:177)
        at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:347)
        at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:263)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:108)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199)
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:478)
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)
        at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803)
        at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
        at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
        at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459)
        at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
        at java.lang.Thread.run(Thread.java:748)

你能连到数据库吗?上去看下服务是不是挂了,SHOW HOSTS 看下。顺便补充下 nebula-java 的版本号

可以连接到,三台都是 ONLINE
版本是3.0.0

com.vesoft
client
3.0.0

一般来说,报这个错误是客户端和服务端的版本号不对齐,:thinking: 你可以确认下版本是否不对齐。

还有一种可能是下面这个原因,你可以看下帖子。

好的, 我去改一下session的问题,那个最大连接数就是seesion的最大数量是吧, 我看session好像没有close方法
我确定那个nebula 和 client 都是3.0.0 这俩版本兼容吗

嗯,这两个版本是兼容的,不过之前遇到过有小伙伴部署了 2 个版本的 Nebula,然后用同一个 Java 版本的客户端,就版本不对齐了。

好 感谢 我去改一下session的代码,如果明天没问题 那就是解决了 -、-

嗯嗯,如果问题解决了的话,可以回来把某条回复勾选为解决方案哈~ 谢谢 shixingr 了

好的, 再请教一哈 那个上边我提到的那个问题
怎么关闭session的问题 ,还有最大连接数的含义

我。。对 nebula-java 使用不熟悉。你稍等,我找个大佬给你解决下问题哈

好的 谢谢

这种连接池的一般不会去直接关闭连接,只会释放连接归还到池子,我看你代码中也没有调用release去释放连接,会有问题的。
另外你这个是scala语音吧?如果是要用spark运行的话,nebula有spark的读写工具

1 个赞

对 用的scala 但是不涉及 spark 的, 那我就是每次查询玩一次后,就把session release释放一下就可以了吗是

是的

好的 谢谢