PySpark + GraphX 图算法尝鲜

背景

最近业务需要,需要对图进行一些计算,由于对 scala 不熟悉,对官方 nebula-algorithm 的 Algo 用 Python 包装之后想要自定义一些遍历需求发现无从下手,于是翻阅了一些资料,用 Python 实现这些。

环境准备

  1. NebulaGraph Spark Connector 是一个 Spark 连接器,提供通过 Spark 标准形式读写 NebulaGraph 数据的能力。NebulaGraph Spark Connector 由 Reader 和 Writer 两部分组成。
  • Reader

提供一个 Spark SQL 接口,用户可以使用该接口编程读取 NebulaGraph 图数据,单次读取一个点或 Edge type 的数据,并将读取的结果组装成 Spark 的 DataFrame。

  • Writer

提供一个 Spark SQL 接口,用户可以使用该接口编程将 DataFrame 格式的数据逐条或批量写入 NebulaGraph。

详情参见此处

  1. GraphFrames 是 ApacheSpark 的一个包,它提供了基于 DataFrame 的 Graphs。它在 Scala、 Java 和 Python 中提供高级 API。它旨在提供 GraphX 的功能和利用 Spark DataFrames 的扩展功能。此扩展功能包括主题查找、基于 DataFrame 的序列化和高度表达式的图形查询。

GraphFrames GitHub

GraphFrames 如何提交以及版本对应参考此处

  1. PySpark 是 spark 在外层封装了 python 接口,主要是借助 py4j 实现 python 和 java 的交互。这样 python 使用者就不用多学一门 java,轻松使用 python 进行大数据开发。

安装方法pip install pyspark==<version>, pyspark 在 Spark 2.x 和 3.x 之间并不兼容,所以需要根据你的 Spark 版本安装

如何提交

主要参数如下:


spark2-submit \

--master yarn \

--num-executors 20 \

--executor-cores 5 \

--executor-memory 20g \

--driver-cores 1 \

--driver-memory 5g \

--archives /bussiness_jars/bigdata/spark-jar/path/graphx-pyspark.zip#python \

--conf spark.executorEnv.ELKLOG_LEVEL=INFO \

--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python/graphx/bin/python \

--conf spark.jars.packages=graphframes:graphframes:0.8.2-spark2.4-s_2.11 \

--conf spark.executorEnv.ELKLOG_BUSINESS_NAME=nebula340_graphx_degree_graphx_degree_1hop \

/bussiness_jars/bigdata/spark-jar/path/nebula340/nebula340_algo_degree_centrality_v2.py main 1 Relation \

以度统计为例的代码展示

第一步,读取 nebula 数据构建 GraphFrame。如果需要读取多种 Tag 和 Edge 则需要遍历多次读取之后 union 到一起。

    def readNebula(self) -> GraphFrame:
        if self.space in ["Relation", "Shareholding"]:
            vertexs = ["Company", "Person"]
            edges = ["Invest", "Employ", "Legal"]
        else:
            vertexs = ["Company", "GroupTag"]
            edges = ["HasCompany", "Controlled"]

        vertex = self.spark.createDataFrame([("", "", "")], ["id", "keyno", "name"])
        edge = self.spark.createDataFrame([("", "", "", "")], ["src", "dst", "_rank", "role"])

        for v in vertexs:
            df = (
                spark.read.format("com.vesoft.nebula.connector.NebulaDataSource")
                .option("type", "vertex")
                .option("spaceName", space)
                .option("label", v)
                .option("returnCols", "keyno,name")
                .option("metaAddress", metaHost[cluster])
                .option("partitionNumber", 1000)
                .option("operateType", "read")
                .load()
            )
            print(f"read nebula {v} type done, {df.show(n=2)}")
            vertex = vertex.union(df)

        print(f"read nebula vertex done, {vertex.show(n=3)}")

        for v in edges:
            df = (
                spark.read.format("com.vesoft.nebula.connector.NebulaDataSource")
                .option("type", "edge")
                .option("spaceName", space)
                .option("label", v)
                .option("returnCols", "role")
                .option("metaAddress", metaHost[cluster])
                .option("partitionNumber", 1000)
                .option("operateType", "read")
                .load()
            )
            print(f"read nebula {v} type done, {df.show(n=2)}")
            edge = edge.union(df)

        print(f"read nebula edge done, {edge.show(n=3, truncate=False)}")

        g = GraphFrame(vertex, edge)
        return g

第二步,获取度数:GraphFrames 支持 vertexid 为 string 类型,如果 NebulaGraph 中 Vertex 是 Fixed-String 不需要做 map。

def degree_of_hop(self, graph: GraphFrame):
if self.hop==1:
    print("one-hop-degree======")
    ######################### 1-hop degreee #########################
    indegree = g.inDegrees
    # print(indegree.show(truncate=False))
    outdegree = g.outDegrees
    # print(outdegree.show(truncate=False))
    degree = g.degrees
    # filter specific id degree
    degree.filter("id='4a788c2a83870742bb1a35074efc33f3'").show(truncate=False)
    total = degree.join(indegree, on="id", how="left")

    total = total.join(outdegree, on="id", how="left").fillna(0)
    total.write.mode("overwrite").saveAsTable("hive_tmp.nebula_degree_centrality")
    sorted_df = total.orderBy("degree", ascending=False)
    print(sorted_df.show(truncate=False))
    ######################### 1-hop degreee #########################

第三步,获取二跳度数。上一步(step2)中只是获取了某个实体一跳的子图来预估实体的大小,实际业务中需要用两跳子图来评估其大小。

由于没有现成的 2-hop degree 方法,查看了GraphFrames 方法后,决定用 motif-finding 的方式用 path 的数量来近似表示其数量。

elif self.hop ==2:
    print("two-hop-one-direction-path-cnt=======")
    data = graph.find("(a)-[e]->(b); (b)-[e2]->(c)")
    data.show(truncate=False)
    # 此处计算路径的条数,而不是点的个数
    two_hop = data.groupBy("a.id").agg(count("*")).union(data.groupBy("c.id")
    .agg(count("*"))).groupBy("id").agg({"count(1)": "sum"})

    two_hop = two_hop.withColumnRenamed("sum(count(1))", "cnt")
    two_hop.show(truncate=False)

    print("two-hop-both-direction-path-cnt=======")
    # c<-a->b->f
    # data = graph.find("(a)-[e]->(b); (b)-[]->(); (a)-[e3]->(f)").filter("b.id='A' and f.id != 'A' ")
    data = graph.find("(a)-[e]->(b); (b)-[]->(); (a)-[e2]->(c)").filter("b.id != c.id").distinct()
    # data.show(truncate=False)
    two_hop_2 = data.groupBy("b.id").agg(count("*"))
    two_hop_2 = two_hop_2.withColumnRenamed("count(1)", "cnt")
    two_hop_2.show(truncate=False)
    # two-hop total
    total_two_hop = two_hop.union(two_hop_2).groupBy("id").agg({"cnt":"sum"})
    total_two_hop = total_two_hop.withColumnRenamed("sum(cnt)", "cnt")
    print("two_hop path union")
    total_two_hop.show(truncate=False)

    total_two_hop.write.mode("overwrite").saveAsTable("hive_tmp.nebula_degree_two_hop")

在业务中,如果两跳子图太大 nebula 查询超时,可以通过对子图太大的数据通过图计算预先计算用 T+1 的数据来给业务呈现。

其他基础图算法

广度优先遍历 BFS

需要注意的是此处的 BFS 并不是给定指定的点,向外遍历 maxPathLength,而是查找从一个顶点(或一组顶点)到另一个顶点(或一组顶点)的最短路径。

from graphframes.examples import Graphs
g = Graphs.friends()  # Get example graph

# Search from "Esther" for users of age < 32.
paths = g.bfs("name = 'Esther'", "age < 32")
paths.show()

# Specify edge filters or max path lengths.
g.bfs("name = 'Esther'", "age < 32",\
  edgeFilter="relationship != 'friend'", maxPathLength=3)

连通分量 Connected components

from graphframes.examples import Graphs
g = Graphs.friends()  # Get example graph

result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()

最短路径 Shortest paths

计算从每个顶点到给定 landmarks 顶点集的最短路径,其中 landmarks 由顶点 ID 指定。请注意,这需要考虑边缘方向。

from graphframes.examples import Graphs
g = Graphs.friends()  # Get example graph

results = g.shortestPaths(landmarks=["a"])
results.select("id", "distances").show()

# OUTPUT:
# +---+-----+---+---------+
# | id| name|age|distances|
# +---+-----+---+---------+
# |  d|David| 29| {a -> 1}|
# |  a|Alice| 34| {a -> 0}|
# +---+-----+---+---------+

其他更多参考 graphframe-graph-algorithm

总结

整体上 NebulaGraph 官方提供好用的的 spark-connector,在和其他图计算框架结合使用时提供了便利,用户可以很方便的根据自己的需求跑相应的图算法或者图遍历。

本文正在参加 NebulaGraph 技术社区年度征文活动,征文详情:2023 年 NebulaGraph 技术社区年度征文

如果你觉得本文对你有所启发,记得给我点个 :heart: ,谢谢你的鼓励

3 个赞

太赞了太赞了

1 个赞

意识到感觉这个可以作为 nebulagraph-ai 项目中的 spark-backed 场景下 UDA 的实现方式呢~~

https://github.com/wey-gu/nebulagraph-ai/issues/8

尴尬:thinking:,我没太看明白 spark-backed 场景下 UDA 的实现方式 是指的做什么?两跳度数的近似表示这个实现吗

sorry,我写的太没头脑了,nebulagraph-ai (支持多种后端,已经支持 spark 和 networkx)留出来方便自 py 定义算法的接口,当 spark 的时候,直接用你的这个方案很适合,之前我就感觉 spark 场景下很难,没想到你搞出来啦

收到,后续方便的时候我仔细看看和你沟通下哈~

嗯嗯不急哈,长期来说想把这个 ng_ai 做成几行代码开箱即用的 ng 上各种图算法(包括图算法、gnn 比如 graph sage 做链路预测)、分布式或单机(spark 或者 networkx,未来可以加更多后端)调用机器,我还给了通过 ngql function 调用的例子(基于 udf),会很方便开发者最小代价充分利用图的能力

1 个赞