背景
最近业务需要,需要对图进行一些计算,由于对 scala
不熟悉,对官方 nebula-algorithm 的 Algo 用 Python 包装之后想要自定义一些遍历需求发现无从下手,于是翻阅了一些资料,用 Python 实现这些。
环境准备
- NebulaGraph Spark Connector 是一个 Spark 连接器,提供通过 Spark 标准形式读写 NebulaGraph 数据的能力。NebulaGraph Spark Connector 由 Reader 和 Writer 两部分组成。
- Reader
提供一个 Spark SQL 接口,用户可以使用该接口编程读取 NebulaGraph 图数据,单次读取一个点或 Edge type 的数据,并将读取的结果组装成 Spark 的 DataFrame。
- Writer
提供一个 Spark SQL 接口,用户可以使用该接口编程将 DataFrame 格式的数据逐条或批量写入 NebulaGraph。
详情参见此处
- GraphFrames 是 ApacheSpark 的一个包,它提供了基于 DataFrame 的 Graphs。它在 Scala、 Java 和 Python 中提供高级 API。它旨在提供 GraphX 的功能和利用 Spark DataFrames 的扩展功能。此扩展功能包括主题查找、基于 DataFrame 的序列化和高度表达式的图形查询。
GraphFrames GitHub
GraphFrames 如何提交以及版本对应参考此处
- PySpark 是 spark 在外层封装了 python 接口,主要是借助 py4j 实现 python 和 java 的交互。这样 python 使用者就不用多学一门 java,轻松使用 python 进行大数据开发。
安装方法pip install pyspark==<version>
, pyspark 在 Spark 2.x 和 3.x 之间并不兼容,所以需要根据你的 Spark 版本安装
如何提交
主要参数如下:
spark2-submit \
--master yarn \
--num-executors 20 \
--executor-cores 5 \
--executor-memory 20g \
--driver-cores 1 \
--driver-memory 5g \
--archives /bussiness_jars/bigdata/spark-jar/path/graphx-pyspark.zip#python \
--conf spark.executorEnv.ELKLOG_LEVEL=INFO \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python/graphx/bin/python \
--conf spark.jars.packages=graphframes:graphframes:0.8.2-spark2.4-s_2.11 \
--conf spark.executorEnv.ELKLOG_BUSINESS_NAME=nebula340_graphx_degree_graphx_degree_1hop \
/bussiness_jars/bigdata/spark-jar/path/nebula340/nebula340_algo_degree_centrality_v2.py main 1 Relation \
以度统计为例的代码展示
第一步,读取 nebula 数据构建 GraphFrame。如果需要读取多种 Tag 和 Edge 则需要遍历多次读取之后 union
到一起。
def readNebula(self) -> GraphFrame:
if self.space in ["Relation", "Shareholding"]:
vertexs = ["Company", "Person"]
edges = ["Invest", "Employ", "Legal"]
else:
vertexs = ["Company", "GroupTag"]
edges = ["HasCompany", "Controlled"]
vertex = self.spark.createDataFrame([("", "", "")], ["id", "keyno", "name"])
edge = self.spark.createDataFrame([("", "", "", "")], ["src", "dst", "_rank", "role"])
for v in vertexs:
df = (
spark.read.format("com.vesoft.nebula.connector.NebulaDataSource")
.option("type", "vertex")
.option("spaceName", space)
.option("label", v)
.option("returnCols", "keyno,name")
.option("metaAddress", metaHost[cluster])
.option("partitionNumber", 1000)
.option("operateType", "read")
.load()
)
print(f"read nebula {v} type done, {df.show(n=2)}")
vertex = vertex.union(df)
print(f"read nebula vertex done, {vertex.show(n=3)}")
for v in edges:
df = (
spark.read.format("com.vesoft.nebula.connector.NebulaDataSource")
.option("type", "edge")
.option("spaceName", space)
.option("label", v)
.option("returnCols", "role")
.option("metaAddress", metaHost[cluster])
.option("partitionNumber", 1000)
.option("operateType", "read")
.load()
)
print(f"read nebula {v} type done, {df.show(n=2)}")
edge = edge.union(df)
print(f"read nebula edge done, {edge.show(n=3, truncate=False)}")
g = GraphFrame(vertex, edge)
return g
第二步,获取度数:GraphFrames
支持 vertexid 为 string 类型,如果 NebulaGraph 中 Vertex 是 Fixed-String 不需要做 map。
def degree_of_hop(self, graph: GraphFrame):
if self.hop==1:
print("one-hop-degree======")
######################### 1-hop degreee #########################
indegree = g.inDegrees
# print(indegree.show(truncate=False))
outdegree = g.outDegrees
# print(outdegree.show(truncate=False))
degree = g.degrees
# filter specific id degree
degree.filter("id='4a788c2a83870742bb1a35074efc33f3'").show(truncate=False)
total = degree.join(indegree, on="id", how="left")
total = total.join(outdegree, on="id", how="left").fillna(0)
total.write.mode("overwrite").saveAsTable("hive_tmp.nebula_degree_centrality")
sorted_df = total.orderBy("degree", ascending=False)
print(sorted_df.show(truncate=False))
######################### 1-hop degreee #########################
第三步,获取二跳度数。上一步(step2)中只是获取了某个实体一跳的子图来预估实体的大小,实际业务中需要用两跳子图来评估其大小。
由于没有现成的 2-hop degree 方法,查看了GraphFrames
方法后,决定用 motif-finding
的方式用 path 的数量来近似表示其数量。
elif self.hop ==2:
print("two-hop-one-direction-path-cnt=======")
data = graph.find("(a)-[e]->(b); (b)-[e2]->(c)")
data.show(truncate=False)
# 此处计算路径的条数,而不是点的个数
two_hop = data.groupBy("a.id").agg(count("*")).union(data.groupBy("c.id")
.agg(count("*"))).groupBy("id").agg({"count(1)": "sum"})
two_hop = two_hop.withColumnRenamed("sum(count(1))", "cnt")
two_hop.show(truncate=False)
print("two-hop-both-direction-path-cnt=======")
# c<-a->b->f
# data = graph.find("(a)-[e]->(b); (b)-[]->(); (a)-[e3]->(f)").filter("b.id='A' and f.id != 'A' ")
data = graph.find("(a)-[e]->(b); (b)-[]->(); (a)-[e2]->(c)").filter("b.id != c.id").distinct()
# data.show(truncate=False)
two_hop_2 = data.groupBy("b.id").agg(count("*"))
two_hop_2 = two_hop_2.withColumnRenamed("count(1)", "cnt")
two_hop_2.show(truncate=False)
# two-hop total
total_two_hop = two_hop.union(two_hop_2).groupBy("id").agg({"cnt":"sum"})
total_two_hop = total_two_hop.withColumnRenamed("sum(cnt)", "cnt")
print("two_hop path union")
total_two_hop.show(truncate=False)
total_two_hop.write.mode("overwrite").saveAsTable("hive_tmp.nebula_degree_two_hop")
在业务中,如果两跳子图太大 nebula 查询超时,可以通过对子图太大的数据通过图计算预先计算用 T+1 的数据来给业务呈现。
其他基础图算法
广度优先遍历 BFS
需要注意的是此处的 BFS 并不是给定指定的点,向外遍历 maxPathLength
,而是查找从一个顶点(或一组顶点)到另一个顶点(或一组顶点)的最短路径。
from graphframes.examples import Graphs
g = Graphs.friends() # Get example graph
# Search from "Esther" for users of age < 32.
paths = g.bfs("name = 'Esther'", "age < 32")
paths.show()
# Specify edge filters or max path lengths.
g.bfs("name = 'Esther'", "age < 32",\
edgeFilter="relationship != 'friend'", maxPathLength=3)
连通分量 Connected components
from graphframes.examples import Graphs
g = Graphs.friends() # Get example graph
result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()
最短路径 Shortest paths
计算从每个顶点到给定 landmarks 顶点集的最短路径,其中 landmarks 由顶点 ID 指定。请注意,这需要考虑边缘方向。
from graphframes.examples import Graphs
g = Graphs.friends() # Get example graph
results = g.shortestPaths(landmarks=["a"])
results.select("id", "distances").show()
# OUTPUT:
# +---+-----+---+---------+
# | id| name|age|distances|
# +---+-----+---+---------+
# | d|David| 29| {a -> 1}|
# | a|Alice| 34| {a -> 0}|
# +---+-----+---+---------+
其他更多参考 graphframe-graph-algorithm
总结
整体上 NebulaGraph 官方提供好用的的 spark-connector,在和其他图计算框架结合使用时提供了便利,用户可以很方便的根据自己的需求跑相应的图算法或者图遍历。
本文正在参加 NebulaGraph 技术社区年度征文活动,征文详情:2023 年 NebulaGraph 技术社区年度征文
如果你觉得本文对你有所启发,记得给我点个 ,谢谢你的鼓励