- nebula 版本:NebulaGraph Algorithm v3.2.0
- 部署方式:分布式
- 安装方式:源码编译
- 是否上生产环境:Y
- 问题的具体描述
使用最新版的开源代码:https://github.com/vesoft-inc/nebula-algorithm/blob/master/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LouvainAlgo.scala 跑Louvain算法,原始图有上亿节点,结果dataframe只有几万个节点?看论坛之前也有人问,在2.5版本,说是更新版本解决了,但是我已经使用的是3.2版本了。
经过排查应该是在step2中对图做了压缩,但是又没有保留原始节点的社区信息,导致step2的输出里丢失了很多原始节点。
在Step2代码部分
val edges = G.triplets
.filter(trip => trip.srcAttr.cId != trip.dstAttr.cId)
.map(trip => {
val cid1 = trip.srcAttr.cId
val cid2 = trip.dstAttr.cId
val weight = trip.attr
((math.min(cid1, cid2), math.max(cid1, cid2)), weight)
})
.reduceByKey(_ + _)
.map(x => Edge(x._1._1, x._1._2, x._2)) //sum the edge weights between communities
已经对图进行了过滤,一直到结束的outerJoinVertices,无论如何最终的louvainGraph中节点的innverVertices都不可能包含全部的原始节点,自然也没法还原原始节点和社区的映射关系。
不知道这么理解是否正确?
mark关注下
def execute(spark: SparkSession,
graph: Graph[None.type, Double],
maxIter: Int,
internalIter: Int,
tol: Double): RDD[Row] = {
val sc = spark.sparkContext
// convert origin graph to Louvain Graph, Louvain Graph records vertex's community、innerVertices and innerDegrees
var louvainG: Graph[VertexData, Double] = LouvainGraphUtil.createLouvainGraph(graph)
// compute and broadcast the sum of all edges
val m = sc.broadcast(louvainG.edges.map(e => e.attr).sum())
var curIter = 0
var res = step1(internalIter, louvainG, m.value, tol)
while (res._2 != 0 && curIter < maxIter) {
louvainG = res._1
louvainG = step2(louvainG)
res = step1(internalIter, louvainG, m.value, tol)
curIter += 1
}
CommUtil.getCommunities(louvainG)
}
这段代码似乎也有问题?最后一次迭代的结果res没有赋值给louvanG并通过step2压缩,因为已经达到了while的停止条件,最后一次的step1完全是没有意义的,虽然可能对结果的影响不大,但是在图的规模比较庞大时还是会浪费计算资源。改成如下判断会更好一些:
if (curIter + 1 < maxIter) {
res = step1(internalIter, louvainG, m.value, tol)
}