GraphX 图计算实践之模式匹配抽取特定子图

前言

Nebula Graph 本身提供了高性能的 OLTP 查询可以较好地实现各种实时的查询场景,同时它也提供了基于 Spark GraphX 的 nebula-algorithm 库以便支持实时的图算法,这里给 Nebula 点个赞,很不错!

但,实践过程中,我发现部分 OLAP 场景中,想实现模式匹配分析,Nebula 的支撑就显得不那么完善。

这里我对模式匹配的解释是:在一张大图中,根据特定的规则抽取出对应的子图。

举一个简单的例子,比如想要对每个点都进行二度扩散,并按照一定逻辑过滤,最终保留符合要求的二度扩散的子图,这样的任务用 nebula-algorithm 不太好实现了。

当然,上面这个例子我们可以通过编写 nGQL 语句——查询出对应的数据,但 Nebula 的优势在 OLTP 场景,针对特定点进行查询。对于全图数据的计算,无论是计算架构还是内存大小都不是特别适合的。所以,为了补充该部分(模式匹配)的功能,这里使用 Spark GraphX 来满足 OLAP 的计算需求。

GraphX 介绍

GraphX 是 Spark 生态的一个分布式图计算引擎,提供了许多的图计算接口,方便进行图的各项操作。关于 GraphX 的基础知识我这里不进行过多的介绍了,主要是介绍一下实现模式匹配的思路。

实现模式匹配主要是依赖于一个重要的 API:PregelAPI,它是一种 BSP 计算模型(BSP:Bulk Synchronous Parallel,即整体同步并行),一次计算是由一系列超步实现的。

只看定义不是特别好理解,所以直接介绍它在 GraphX 中的实现,了解它是如何使用的。

Pregel 运行原理

源码定义如下:

  def pregel[A: ClassTag](
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
  }
 

相关参数含义如下:

  • initialMsg: 节点的初始化信息,调用 vprog 函数处理 initialMsg;
  • maxIrerations:最大迭代次数;
  • activeDiraction:控制 sendMsg 发送的方向,只有满足方向要求的三元组才会进入下一次迭代;
  • vprog:更新节点信息的函数。节点收到消息后,执行相关逻辑更新节点信息;
  • sendMsg:节点和节点之间发送消息,参数为一个三元组,并且满足 activeDiraction 的方向条件,把消息 Msg 发送给 VertexID,VertexID 可以是 src 点也可以是 dst 点;
  • mergeMsg:当同一个 VertexID 接收到多条消息时,合并多条消息为一条,便于 vprog 处理。

只看定义和逻辑同样不太清楚,所以下边再介绍一下 Pregel 的迭代流程:

  1. 对于一个 graph 对象,只有激活态的点才会参与下一次迭代,激活态的条件是完成了一次发送/收到消息 A 的动作;
  2. 首先初始化所有节点,也就是每个点都调用一次 vprog 方法,参数为 initialMsg,这样使所有节点都在激活态;
  3. 然后是将图划分为若干三元组 Triplet,三元组的组成是:src点,edge,dst 点,只保留激活点 activeDirection 方向的三元组;
  4. 执行 sendMsg 方法,将消息 A 发送给一个 VertexID 的点,由于返回值是一个 Iterator,也就是可以同时给 src、dst 发送消息,若发送 Iterator.empty 则认为没有发送消息;
  5. 由于一个 VertexID 的点会收到多条消息,所以调用 mergeMsg 方法合并消息,合并为一个 A;
  6. 合并之后调用 vprog 更新节点的消息,这样就完成了一次迭代;
  7. 重复 3-6 的步骤,执行 maxIterations 次迭代或者所有的点都不是激活态则退出,完成 Pregel 的所有计算。

模式匹配的思路

知道 Pregel 的计算原理之后,那么怎么实现模式匹配呢,主要就是根据迭代的思想,不停地将边信息聚合到点上,在迭代的过程中控制发送消息的逻辑来实现特定模式的路径

我们可以定义消息为多条路径的集合,发送消息时就是对发送点的路径集合中,每条路径都增加一个边 e,这样就实现了路径的遍历,其实对于一个点来说,本质就是一个广度优先遍历的过程。

还是以二度查询为例,看如下例子:

image-20220427163507770

首先,对每个点都执行一次初始化,每个点的属性为一个空的路径集合,路径集合使用二维数组表示,使所有点成为激活态。

然后,进行第一次迭代,可以看到会有两个三元组 A-E1->BB-E2->C,那么很容易可以得到这次迭代的结果:A:[]B:[[E1]]C:[[E2]]

再进行第二次迭代,这里要做限制,已经发送过的路径不再发送,也就是判断 E 是否已被接收了,防止重复发送的情况。所以第二次迭代的结果就只有 B-E2->C 这个三元组有效,也就是把 B 的集合中的每条路径分别增加一个 E2,并发给 C,C 将路径合并即可,那么结果就是:A:[]B:[[E1]]C:[[E2][E1,E2]]

此时 C 节点上的集合中就有了 E1,E2 两条边,刚好是 A 节点 2 度遍历的结果。

这里举的是简单例子,只是说明这样的一个思路,核心逻辑就是传递边来实现路径遍历,实际上每个节点会收到许多点的信息,那么可以将点的结果进行过滤,按照头结点分组即可。实现看如下例子:

image-20220427165454170

在这个例子中根据要求,能得到的结果就是 A 和 G 的2度路径子图,迭代的结果我不再赘述,直接列出C,F节点的属性:C:[[E2],[E6],[E1,E2],[E5,E6]]F:[[E4],[E3,E4]],当然点 H,B,D 也有路径,但其实可以清楚的看到想要的结果是在 C,F 节点上的。

那么,结果有了但它是分散的,怎么合并起来呢?我们可以将每个点路径的第一个边的起始点拿出来作为 key,因为迭代时每条路径是有序的,其实这个 key 就是目标点,比如 E1,E3 的起始点都是 A,E5 的起始点是 G,我们将每条路径都增加一个key,变更为key:path,过滤掉小于 2 条边的路径,再按照key分组,就得到了目标点对应的子图路径了,这样是不是就拿到了 A 和 G 各自的2度点边了呢!

思路延伸

2 度扩散这个例子还是比较简单的,实际业务中,会有很多的情况,当然图的结构也会比较复杂,比如:

  1. 不同标签的点如何遍历
  2. 不同类型的边如何遍历
  3. 出现环路如何解决
  4. 边的方向是有向还是无向
  5. 多条边如何处理

等等的这些问题,但是核心点不变,就是基于 Pregel 实现广度优先遍历,累积边形成路径信息,主要的逻辑基本都在于 sendMsg 这个方法,来控制发或者不发,来决定路径的走向,以满足模式匹配的业务要求。一次迭代就是积累一层的路径信息,所以迭代次数与图的深度一致。在迭代完成后,每个点上都有一些结果,他们可能是中间结果,也可能是最终结果,一般按照指定 key(一般是头结点)分组再进行一些业务逻辑的过滤(比如路径长度),即可得到指定结构的子图,接下来就可以用于业务的分析操作了。

此外,还可以借助 GraphFrames 来实现诸如:二度扩散,这种简单的模式匹配。通过使用类似 Spark SQL 的算子,十分容易的得到计算结果,大大减少代码的难度。但是由于文档较少,又不如 GraphX 多种算子的灵活,对于复杂的模式还是不太推荐的,感兴趣的可以去了解一下。

总结

利用 GraphX 的 Pregel API 进行广度优先遍历来实现模式匹配的好处:

  1. GraphX 有多种图算子可以灵活处理图数据
  2. 基于 Pregel,使用路径当做消息可以灵活控制模式子图的结构,理论上可以实现任何结构的模式提取
  3. 能够支持较大数据量的全图模式匹配,弥补 Nebula 图库 OLAP 的不足
  4. 无缝集成到大数据生态圈,方便结果的分析使用。

使用这种方式虽然能够实现模式匹配,但是也有很多缺点,比如说:

  1. 每次迭代的消息都是路径集合,越往后消息会越大,导致 JOIN 的数据量很大,内存占用较高。可以通过优化过滤掉不必要发送的信息来解决;
  2. 迭代的次数有限,太多了则会出现内存爆炸,不过一般业务中超过 10 层以上的情况也很少。
  3. 由于节点 ID 通常是 String,需要提前做映射表,计算完又要转换回来,导致计算过程中 shuffle 的次数很多。

针对上面问题,如果你有更好的实现方案,或者通过其他计算引擎能够更好的实现,请务必与我交流指导!

最后,虽然 GraphX 使用起来上手有一定难度,计算也高度依赖内存,但瑕不掩瑜它仍然是一款优秀的图计算框架,尤其是分布式的特性能够进行大量数据的计算,同时 Spark 又能较好地与大数据生态集成,又有官方提供的 nebula-spark-connector 方便读写 Nebula 数据,使用起来还是非常不错的。

我的分享就到这里了,欢迎大家交流更好想法!

我是繁凡,一名大数据开发工程师,目前从事图谱产品开发,致力于大规模图数据在业务中的使用。最近使用GraphX实践了一些业务要求的模式匹配开发,在这里分享一些使用的思路。

本文正在参加首届 Nebula 征文活动,如果你觉得本文对你有所帮助可以给我点个 :heart: 以示鼓励~ 谢谢

6 个赞

非常非常棒的文章:+1:t2:

:handshake: :handshake:

1 个赞

你好,老哥,请教一个问题,使用spark Graphx 进行抽取子图的时候,首先第一步去构建Graphx 的图,现在假设我们有个图有比如账户点,客户,员工等这些点,还有很多边比如员工转账客户,员工关联等各个关系,很多个点边文件,我想知道,在Graphx 中构建图的话,不是通过 Graph(vertexRDD,edgeRDD) 去构建的么,但是我这张图里有很多个点,边文件,这个对象也没有办法去接受多个点边RDD,那怎么构建出我的spark graphx图出来呢?
单个点边文件,直接2个RDD就完了,比如
val graph: Graph[(String, Int), Int] = Graph(users,relationships)
多个文件怎么构建出这个Graph 对象呢?我需要跑pageRank算法,还有然后按照规则抽取不同子图,也要用到Pregel 函数,但发现图中多个点边,连Graphx 的图都构建不出来,请大佬指点一下,谢谢

多个RDD直接union起来就行了,还是变成一个点RDD 一个边RDD,构图就行了。每种属性不一致的情况,一般你跑pageRank其实用不到属性,只保留点ID就行了吧。

1 个赞

这个我也试过,因为我需要每个点边属性的,这样多个RDD union或者unionAll 的时候,列数不一样,就报错了,合并也合并不了,又不能单独处理,其实我的需求也是根据规则去构建担保关系的各个子图出来,比如找出担保链,两两担保,循环担保,这其实就是通过边传递,调用Pregel 自己去控制发送消息,合并消息逻辑,问题点在于单个类型的点,比如就是人和人之间关系,构建图很方便,现在如果多种类型的点,边,需要构建spark graphx 的图,就构建不出来了,就上上面的例子,点有多种,账户,客户,员工,去发现这样多个点的关系,图否没构建出来…

这个也很好处理啊,你把RDD处理一下,列数弄成一样的就行了,属性放到一个model类或者Map都可以的。

Pregel API和aggregateMessages的区别是什么呢?
比如说我自定义了一个group by统计出度的算法
先通过subgraph过滤出目标时间段内的子图,然后通过aggregateMessages进行计算

/* Copyright (c) 2021 vesoft inc. All rights reserved.
 *
 * This source code is licensed under Apache 2.0 License.
 */

package com.vesoft.nebula.algorithm.lib

import com.vesoft.nebula.algorithm.config.{AlgoConstants, ConfigServerStaticConfig, DegreeStaticConfig}
import com.vesoft.nebula.algorithm.utils.{DecodeUtil, NebulaUtil}
import org.apache.log4j.Logger
import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object ConfigServerStaticAlgo {

  private val LOGGER = Logger.getLogger(this.getClass)
  /**
   * run the configServerStatic algorithm for nebula graph
   */
  def apply(spark: SparkSession,
            dataset: Dataset[Row],
            configServerConfig:ConfigServerStaticConfig): DataFrame = {
    var encodeIdDf: DataFrame = null

    val graph: Graph[None.type, Long] = if (configServerConfig.encodeId) {
      val (data, encodeId) = DecodeUtil.convertStringId2LongId(dataset, false)
      encodeIdDf = encodeId
      NebulaUtil.loadInitLongGraph(data, true)
    } else {
      NebulaUtil.loadInitLongGraph(dataset, true)
    }
    val filteredGraph = graph.subgraph(edge => edge.attr >= configServerConfig.startTime && edge.attr <= configServerConfig.endTime)
    val configServerResultRDD = execute(spark,filteredGraph)

    val schema = StructType(
      List(
        StructField("src_id", LongType, nullable = false),
        StructField("request_unique_server_count", IntegerType, nullable = false)
      ))
    val algoResult = spark.sqlContext.createDataFrame(configServerResultRDD, schema)

    if (configServerConfig.encodeId) {
      DecodeUtil.convertAlgoId2StringId(algoResult, encodeIdDf)
    } else {
      algoResult
    }
  }

  def execute(spark: SparkSession,graph: Graph[None.type, Long]): RDD[Row] = {
    val result = graph.aggregateMessages[Set[Any]](
      triplet => {
        triplet.sendToSrc(Set(triplet.dstId))
      },
      (a, b) => a.union(b)
    ).collectAsMap()
    val rdd = spark.sparkContext.parallelize(result.toList.map { case (vertex, set) =>
      Row(vertex, set.size)
    })
    rdd
  }

}

这种方法和直接使用Pregel API哪个效率更高呢?

我的理解是:从结果上来看没有区别的,甚至aggregateMessages要比Pregel效率更高的,Pregel就是提供了这样的一个计算模型,能够让你迭代的进行计算而已。从需求上来讲其实都可以,只不过在不知道结束条件的情况下,用Pregel实现的代码更少而已。

1 个赞