怎么求图库中的超级节点所在的连通分量以及查询效率

目前遇到一个问题,数据中存在很多的超级节点,一个中心节点关联几十万个其他节点这种情况,请问在Nebula Graph中是怎么处理这种场景的,对这种节点的检索效率怎么样?要是想求包含该节点的连通分量该怎么办呢?我用SparkGraphX试了一下直接挂掉了,不知道Nebula Graph这边有没有什么比较好的解决方案

先跑联通分量算法 再把结果update回 NebulaGraph,所有节点所在的联通分量作为新属性写回源tag中。
在NebulaGraph上查该节点的联通分量属性。

graphx 跑挂的日志可以贴一下。

23/04/19 17:26:18 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:113)
at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:112)
at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:548)
at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$1(BlockManagerMasterEndpoint.scala:547)
at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$register(BlockManagerMasterEndpoint.scala:585)
at org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:119)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.rpc.RpcEndpointNotFoundException: Cannot find endpoint: spark://CoarseGrainedScheduler@node1:38564
at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$asyncSetupEndpointRefByURI$1(NettyRpcEnv.scala:148)
at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$asyncSetupEndpointRefByURI$1$adapted(NettyRpcEnv.scala:144)
at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307)
at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99)
at scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99)
at scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at scala.concurrent.BatchingExecutor$Batch.processBatch$1(BatchingExecutor.scala:67)
at scala.concurrent.BatchingExecutor$Batch.$anonfun$run$1(BatchingExecutor.scala:82)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:59)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:875)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:110)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:873)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
at scala.concurrent.Promise.trySuccess(Promise.scala:94)
at scala.concurrent.Promise.trySuccess$(Promise.scala:94)
at scala.concurrent.impl.Promise$DefaultPromise.trySuccess(Promise.scala:187)
at org.apache.spark.rpc.netty.NettyRpcEnv.onSuccess$1(NettyRpcEnv.scala:225)
at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$5(NettyRpcEnv.scala:239)
at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$5$adapted(NettyRpcEnv.scala:238)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99)
at scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.Promise.success(Promise.scala:86)
at scala.concurrent.Promise.success$(Promise.scala:86)
at scala.concurrent.impl.Promise$DefaultPromise.success(Promise.scala:187)
at org.apache.spark.rpc.netty.LocalNettyRpcCallContext.send(NettyRpcCallContext.scala:50)
at org.apache.spark.rpc.netty.NettyRpcCallContext.reply(NettyRpcCallContext.scala:32)
at org.apache.spark.rpc.netty.RpcEndpointVerifier$$anonfun$receiveAndReply$1.applyOrElse(RpcEndpointVerifier.scala:31)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
… 3 more
23/04/19 17:26:18 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:79)
at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:636)
at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1009)
at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:212)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:113)
at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:112)
at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:548)
at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$1(BlockManagerMasterEndpoint.scala:547)
at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$register(BlockManagerMasterEndpoint.scala:585)
at org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:119)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
… 3 more

大佬,我贴上来了,你看一下,一样的代码和环境配置,如果里面没有超级节点就能正常执行,有的话就会报这个错,跑的SparkGraphX代码

数据倾斜导致的, 你可以找下spark executor的日志,看是因为数据量大发生了oom 还是timeout了

大佬,建图的过程中的数据倾斜感觉没法避免啊
val graph: Graph[Long, String] = Graph(vertices, edges)
val connectedVertices = graph.connectedComponents().vertices
val value1: RDD[Set[VertexId]] = connectedVertices
.map(tp => (tp._2, tp._1))
.groupByKey()
.values.map(iter => iter.toSet)
value1.foreach(println)
我是要求联通分量的,求连通分量和groupBy操作都是必须的,并且由于超级节点的存在,就是会存在有的联通分量和分组数据量特别大的情况,感觉这种我没法避免啊

大佬,我又试了一下,在求解连通分量的时候(graph.connectedComponents())就出问题了,这一步感觉没法改啊

优化下groupByKey算子试试,用reduceByKey, 这个算子会有个minor compaction。 超级节点存在就是会数据倾斜的,如果超级节点不重要你看能否把超级节点过滤掉

此话题已在 188 天后被自动关闭。不再允许新回复。