有pyspark 使用 nebula-algorithm 调用图计算比如DegreeCentrality的demo

提问参考模版:

  • nebula 版本:v3.4.1
  • 部署方式:分布式
  • 安装方式: RPM
  • 是否上生产环境: N
  • 硬件信息
    • 磁盘( 推荐使用 SSD)
    • CPU、内存信息
  • 问题的具体描述
  • 相关的 meta / storage / graph info 日志信息(尽量使用文本形式方便检索)

有没有通过pyspark 调用Graphx/ nebula-algorithm 的图计算demo?

参考这里? https://github.com/wey-gu/nebulagraph-ai/blob/main/ng_ai/nebula_algo.py

新鲜的,刚调通

from pyspark.sql import SparkSession
from py4j.java_gateway import java_import


class NebulaAlgoServices:
    session = None

    @classmethod
    def start(cls):
        cls.session = (
            SparkSession.builder.config(
                "spark.jars",
                "./nebula-spark-connector-3.6.0.jar",
            )
            .config("spark.driver-class-path" "./nebula-algorithm-3.1.0.jar")
            .config("spark.jars", "./nebula-algorithm-3.1.0.jar")
            .config("spark.sql.shuffle.partitions", 5)
            .config("spark.executor.memory", "8g")
            .config("spark.driver.memory", "4g")
            .config("spark.sql.broadcastTimeout", -1)
            .config("spark.sql.autoBroadcastJoinThreshold", -1)
            .appName("nebula-connector")
            .master("local")
            .getOrCreate()
        )

    @classmethod
    def scan_data(cls):
        df = (
            cls.session.read.format("com.vesoft.nebula.connector.NebulaDataSource")
            .option("type", "edge")
            .option("spaceName", "xxx")
            .option("label", "xxx")
            .option("returnCols", "created_on")
            .option(
                "metaAddress",
                "xxx:9559",
            )
            .option("partitionNumber", 1)
            .option("operateType", "read")
            .load()
        )
        df.show(2)
        return df

    @classmethod
    def degree(cls):
        data = cls.scan_data()
        java_import(
            cls.session._jvm, "com.vesoft.nebula.algorithm.config.DegreeStaticConfig"
        )
        java_import(
            cls.session._jvm, "com.vesoft.nebula.algorithm.lib.DegreeStaticAlgo"
        )
        config = cls.session._jvm.DegreeStaticConfig(True)
        result = cls.session._jvm.DegreeStaticAlgo.apply(
            cls.session._jsparkSession, data._jdf, config
        )
        result.show()


if __name__ == "__main__":
    NebulaAlgoServices.start()
    NebulaAlgoServices.degree()

23/11/01 17:55:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+--------------------+--------------------+-----+----+                          
|              _srcId|              _dstId|_rank|name|
+--------------------+--------------------+-----+----+
|d4057d41ef9591eed...|436d1f07b89934a69...|    0|null|
|d4057d41ef9591eed...|ebf5cca88fcb41cdb...|    0|null|
+--------------------+--------------------+-----+----+
only showing top 2 rows

+--------------------+                                                          
|                  id|
+--------------------+
|d4057d41ef9591eed...|
|436d1f07b89934a69...|
|c7faa5dc976c7ed2e...|
|ae98f626cca3eabbb...|
|ebf5cca88fcb41cdb...|
|2406dfda1a02aa82d...|
|5aa0b304f29d9874a...|
|c808bae3cff0d245b...|
|78aea3e4cc23f1df5...|
|62f5a573af8418f4b...|
|ef42a19cd01bbd047...|
|                true|
|eb997db2407119db2...|
+--------------------+

+-----+----+-----------+-----------+                                            
|_rank|name|     _srcId|     _dstId|
+-----+----+-----------+-----------+
|    0|null|          0|          1|
|    0|null| 8589934592|          2|
|    0|null|34359738368|          2|
|    0|null|17179869184| 8589934594|
|    0|null|25769803776| 8589934595|
|    0|null|          0| 8589934593|
|    0|null|17179869185|25769803778|
|    0|null|25769803777|25769803778|
+-----+----+-----------+-----------+

[_id: string, degree: int ... 2 more fields]
+--------------------+------+--------+---------+                                
|                 _id|degree|inDegree|outDegree|
+--------------------+------+--------+---------+
|d4057d41ef9591eed...|     2|       0|        2|
|5aa0b304f29d9874a...|     1|       1|        0|
|62f5a573af8418f4b...|     1|       0|        1|
|ef42a19cd01bbd047...|     1|       0|        1|
|c808bae3cff0d245b...|     1|       0|        1|
|c7faa5dc976c7ed2e...|     2|       2|        0|
|ae98f626cca3eabbb...|     1|       0|        1|
|2406dfda1a02aa82d...|     1|       1|        0|
|78aea3e4cc23f1df5...|     1|       0|        1|
|                true|     2|       2|        0|
|436d1f07b89934a69...|     1|       1|        0|
|ebf5cca88fcb41cdb...|     1|       1|        0|
|eb997db2407119db2...|     1|       0|        1|
+--------------------+------+--------+---------+
3 个赞

想起来了,这里我也有一个例子:

我有几亿的点都想算 degree,有什么快速的方式推荐吗
@wey

我目前也是这个问题,图比较大,这个代码我还没调通。提交方式要改一下,我看只是扫描了边,这个不用扫面点吗?

@Reid00 @sevenold 要不要考虑我们的 nebulagraph analytics @nicole 我记得性能是 10-100 倍?

有个问题


    def degree(self):
        data = self.scan_data()
        java_import(
            self.session._jvm, "com.vesoft.nebula.algorithm.config.DegreeStaticConfig"
        )
        java_import(
            self.session._jvm, "com.vesoft.nebula.algorithm.lib.DegreeStaticAlgo"
        )
        config = self.session._jvm.DegreeStaticConfig(True)
        result = self.session._jvm.DegreeStaticAlgo.apply(
            self.session._jsparkSession, data._jdf, config
        )
        result.show()

DegreeCentrality 算法这个Config 默认是计算1hop 的出入度吧?

  1. 怎么计算多跳比如3hop的呢?
  2. 如果想计算所有的边 .option(“label”, self.label) 这个地方是是传入逗号分隔的多个边名称吗?比如"Edge A,EdgeB,EdgeC"

@wey 想要同时scan 所有的边,目前有没有什么方式

现在 spark connector 的 scan 的并发程度满足不了么?

不是。现在一次read 只能读取一种边,想同时读取多种边有没有方式?
不然只能写多个read

        df = (
            self.session.read.format("com.vesoft.nebula.connector.NebulaDataSource")
            .option("type", "edge")
            .option("spaceName", SPACE)
            .option("label", self.label)
            .option("returnCols", self.cols)
            .option("label", "Employ")
            .option("returnCols", "role")
            .option("metaAddress", metaHost[cluster])
            .option("partitionNumber", 100)
            .option("operateType", "read")
            .load()
        )

algo 里,labels 是可以给多个的,最后会 union

spark connector 的话,可以自己 union 再算

好的,感谢。还有一个问题:
这个result 我试了下,不是dataframe, 如何可以根据degree 逆序排序,打印出来?

        result = self.session._jvm.DegreeStaticAlgo.apply(
            self.session._jsparkSession, data._jdf, config
        )
        print("result show:")
        result.show(truncate=False)
        print("after reverse show: ")
        sorted_df = result.sort(F.desc("degree"))
        sorted_df.show(n=100)
    jdf = result.toDF()
    df = DataFrame(jdf, cls.session)
3 个赞

有没有方法可以获取两跳内子图的所有度数(点的个数)吗? 目前这个度数看了下只是一跳的

cc @nicole

可以自定义算法, 迭代两次收集邻居节点做统计。

感谢。可以具体点吗?目前有GraphFrame 怎么进行迭代


    # 迭代两次收集邻居节点并进行统计
    two_hop_degrees = graph.find("(v1)-[]->(v2); (v2)-[]->(v3)").groupBy("v1").count()
    one_hop_degrees = graph.find("(v1)-[]->(v2)").groupBy("v1").count()

    # 合并一跳和两跳的度数统计
    all_degrees = one_hop_degrees.union(two_hop_degrees).groupBy("v1").sum("count")
    all_degrees.show(truncate=False)

这个写法只能获取出度,入度怎么写呢?
(v1)<-[]-(v2) 这个写法 不被识别

此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。