- nebula 版本:3.0.0
- nebula-java 版本是3.0.0
- 部署方式:分布式
- 安装方式:RPM
- 是否为线上版本:Y
- 硬件信息
- 磁盘SATA
- CPU、内存信息 三台虚拟机:都是4核、8g内存
- 问题的具体描述
这个是报错行,语句是没问题的,只是隔了一个晚上就会显示管道破裂的异常,无法读取数据
val resp: ResultSet = session.execute(query)
- 相关的 meta / storage / graph info 日志信息(尽量使用文本形式方便检索)
在服务器下 logs 日志中并没有出现error 和 warning日志
代码
工具类代码:
private val log: Logger = LoggerFactory.getLogger(NebulaUtil.getClass)
private var pool: NebulaPool = null
private var session: Session = null
def getPool(): NebulaPool = {
pool
}
//初始化失败 session 为空
def init(host: String, port: Int, maxConnSize: Int, user: String, pass: String): Boolean = {
try{
var isSuccess = false
pool = new NebulaPool
// 配置
val nebulaPoolConfig = new NebulaPoolConfig
nebulaPoolConfig.setMaxConnSize(maxConnSize)
// 初始化ip和端口
val addresses = util.Arrays.asList(new HostAddress(host, port))
val initResult = pool.init(addresses, nebulaPoolConfig)
//session赋值
if (!initResult){
log.error("pool init failed.")
}else{
session = pool.getSession(user, pass, false)
isSuccess = true
}
isSuccess
} catch {
case e : Exception => log.error("pool init failed.")
false
}
}
//TODO query: 创建空间、进入空间、创建新的点和边的类型、插入点、插入边、执行查询
def executeResultSet(query: String): ResultSet = {
val resp: ResultSet = session.execute(query)
if (!resp.isSucceeded){
log.error(String.format("Execute: `%s', failed: %s", query, resp.getErrorMessage))
System.exit(1)
}
resp
}
//获取ResultSet中的各个列名及数据
//_1 列名组成的列表
//_2 多row组成的列表嵌套 单个row的列表 包含本行每一列的数据
def getInfoForResult(resultSet: ResultSet): (util.List[String], util.List[util.List[Object]]) = {
//拿到列名
val colNames: util.List[String] = resultSet.keys
//拿数据
val data: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]]
//循环获取每行数据
for (i <- 0 until resultSet.rowsSize) {
val curData = new util.ArrayList[Object]
//拿到第i行数据的容器
val record = resultSet.rowValues(i)
import scala.collection.JavaConversions._
//获取容器中数据
for (value <- record.values) {
if (value.isString) curData.add(value.asString)
else if (value.isLong) curData.add(value.asLong.toString)
else if (value.isBoolean) curData.add(value.asBoolean.toString)
else if (value.isDouble) curData.add(value.asDouble.toString)
else if (value.isTime) curData.add(value.asTime.toString)
else if (value.isDate) curData.add(value.asDate.toString)
else if (value.isDateTime) curData.add(value.asDateTime.toString)
else if (value.isVertex) curData.add(value.asNode.toString)
else if (value.isEdge) curData.add(value.asRelationship.toString)
else if (value.isPath) curData.add(value.asPath.toString)
else if (value.isList) curData.add(value.asList.toString)
else if (value.isSet) curData.add(value.asSet.toString)
else if (value.isMap) curData.add(value.asMap.toString)
}
//合并数据
data.add(curData)
}
(colNames, data)
}
逻辑代码:
private val logger = Logger.getLogger(classOf[NebulaQueryServiceImpl])
override def queryFieldRely(fieldName: String): util.Map[String, Object] = {
logger.info("根据当前字段查找上下游血缘")
var resultData = new util.HashMap[String, Object]()
try {
var isSuccess = false
if(NebulaUtil.getPool() == null){
isSuccess = NebulaUtil.init("10.88.100.88", 9669, 100, "root", "root")
} else {
//已经初始化连接池
isSuccess = true
}
if (isSuccess){
NebulaUtil.executeResultSet("use field_blood_relation_space")
//查询当前字段的上下游所有字段
//1
// resultData.put("data", getFieldRelyAll(fieldName))
//2
// resultData.put("down", getDownSubgraph(fieldName))
// resultData.put("up", getUpSubgraph(fieldName))
//3 先获取下游 bean
val down = true
val up = false
resultData.put("down_data", getSubgraph(fieldName, down).toString())
resultData.put("up_data", getSubgraph(fieldName, up).toString())
} else{
logger.error("Nebula初始化失败")
}
resultData
}
catch {
case e: Exception => e.printStackTrace()
resultData
}
}
//bean next 指针为可变数组
//获取子图
//field_name 起始节点, direct 子图方向(true 下游, false 上游)
def getSubgraph(field_name: String, direct: Boolean): FieldRely = {
// field_name 所在节点
val relyResult = new FieldRely(field_name, new mutable.ArrayBuffer[FieldRely])
// out 为下游, in 为上游
var downOrUp = "out"
// 获取当前查询的方向
if (direct){
downOrUp = "out"
} else {
downOrUp = "in"
}
//1 查询语句 查询下游所有子图
val query =
s"""
| get subgraph 100 steps from "$field_name" $downOrUp field_rely yield edges as field_rely;
|""".stripMargin
val resultSet = NebulaUtil.executeResultSet(query)
//[[:field_rely "dws.dws_order+ds_code"->"dws.dws_order_day+ds_code" @0 {}], [:field_rely "dws.dws_order+ds_code"->"tujia_qlibra.dws_order+p_ds_code" @0 {}], [:field_rely "dws.dws_order+ds_code"->"tujia_tmp.dws_order_execution+ds_code" @0 {}]]
//非空则获取数据
if (!resultSet.isEmpty) {
//非空,则拿数据,解析数据
val data = NebulaUtil.getInfoForResult(resultSet)
val curData: util.List[util.List[Object]] = data._2
//正则匹配引号中数据
val pattern = Pattern.compile("\"([^\"]*)\"")
// 上一步长的所有节点数组
// 判断节点的父节点, 方便存储
var parentNode = new mutable.ArrayBuffer[FieldRely]()
//2 首先获取步长为 1 的边
curData.get(0).get(0).toString.split(",").foreach(curEdge =>{
//拿到边的起始和目的点
val matcher = pattern.matcher(curEdge)
var startPoint = ""
var endPoint = ""
//将两点赋值
while (matcher.find()){
val curValue = matcher.group().replaceAll("\"", "")
if ("".equals(startPoint)){
startPoint = curValue
}else{
endPoint = curValue
}
}
// logger.info(s"1 startPoint: $startPoint")
// logger.info(s"1 endPoint: $endPoint")
//合并到起点 bean 中
relyResult.children.append(new FieldRely(endPoint, new ArrayBuffer[FieldRely]()))
// logger.info(s"relyResult.children.length: ${relyResult.children.length}")
// logger.info(s"relyResult.children.size: ${relyResult.children.size}")
// logger.info(s"relyResult.children: ${relyResult.children}")
})
//3 并初始化父节点数组
parentNode = relyResult.children
//4 得到其余所有边
for (i <- 1 until curData.size - 1){
//储存下个步长的父节点集合
val nextParentNode = new mutable.ArrayBuffer[FieldRely]()
val curEdges = curData.get(i).get(0).toString
//3 多个边循环解析, 拿到目的点
curEdges.split(",").foreach(curEdge => {
//拿到边的起始和目的点
val matcher = pattern.matcher(curEdge)
var startPoint = ""
val endNode = new FieldRely()
//将两点赋值
while (matcher.find()){
val curValue = matcher.group().replaceAll("\"", "")
// logger.info(s"not 1 curValue: $curValue")
if ("".equals(startPoint)){
startPoint = curValue
}else{
endNode.name = curValue
endNode.children = new mutable.ArrayBuffer[FieldRely]()
nextParentNode.append(endNode)
}
}
// logger.info(s"startPoint: $startPoint")
// logger.info(s"endNode: ${endNode.name}")
//通过 startPoint 找到父节点, 将 endPoint 加入到本父节点的 children 中
var flag = true
//至此, 一条边插入成功
for (curFieldRely <- parentNode if flag){
if (curFieldRely.name.equals(startPoint)){
curFieldRely.children.append(endNode)
flag = false
}
}
})
//更新父节点
parentNode = nextParentNode
}
}
// logger.info(s"relyResult.toString: ${relyResult.toString}")
relyResult
}
错误日志
2022-03-18 10:02:21.032 INFO 221655 --- [io-8888-exec-13] c.t.w.s.impl.NebulaQueryServiceImpl : 根据当前字段查找上下游血缘
com.vesoft.nebula.client.graph.exception.IOErrorException: java.net.SocketException: Broken pipe (Write failed)
at com.vesoft.nebula.client.graph.net.SyncConnection.executeWithParameter(SyncConnection.java:192)
at com.vesoft.nebula.client.graph.net.Session.executeWithParameter(Session.java:113)
at com.vesoft.nebula.client.graph.net.Session.execute(Session.java:78)
at com.tujia.whale.utils.NebulaUtil$.executeResultSet(NebulaUtil.scala:57)
at com.tujia.whale.service.impl.NebulaQueryServiceImpl.queryFieldRely(NebulaQueryServiceImpl.scala:43)
at com.tujia.whale.api.NebulaQueryApi.queryFieldRely(NebulaQueryApi.scala:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:967)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:901)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:635)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:317)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:127)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:91)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:170)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.jasig.cas.client.session.SingleSignOutFilter.doFilter(SingleSignOutFilter.java:97)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.web.filter.CorsFilter.doFilterInternal(CorsFilter.java:96)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:64)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:214)
at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:177)
at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:347)
at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:263)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:108)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:478)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)