Python和nebula结合的开源项目

我先简单发给你步骤吧,回头我会发文章和更详细的东西

  1. 部署 nebulagraph, spark, nebulagraph algorithm
    这里我用 nebula-up 的 all-in-one 工具帮助我们,一键做好一切准备,注意,它需要 9669 端口没被占用,如果有别的 nebula 集群,可以先关掉。
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash -s -- v3 spark

然后,东西入口都在 ~/.nebula-up 下边

比如,我们加载篮球数据集(后边会用到,必须执行以下),其实还有别的命令在下边,可以看看。

~/.nebula-up/load-basketballplayer-dataset.sh
  1. 进入 pyspark shell
~/.nebula-up/nebula-pyspark.sh
  1. 在 pyspark shell 里调用 nebulagraph-algorithm

有点像 pyspark 版本的 nebula-algorithm/PageRankExample.scala at 02a18ceaa34ed6aa576ddb96297b79adbe99f37e · vesoft-inc/nebula-algorithm · GitHub 的例子

from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("PageRankExample").getOrCreate()
jspark = spark._jsparkSession

from py4j.java_gateway import java_import
# import "com.vesoft.nebula.algorithm.config.SparkConfig"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.config.SparkConfig")

# import "com.vesoft.nebula.algorithm.config.PRConfig"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.config.PRConfig")

# import "com.vesoft.nebula.algorithm.lib.PageRankAlgo"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.lib.PageRankAlgo")

df = spark.read.format(
  "com.vesoft.nebula.connector.NebulaDataSource").option(
    "type", "edge").option(
    "spaceName", "basketballplayer").option(
    "label", "follow").option(
    "returnCols", "degree").option(
    "metaAddress", "metad0:9559").option(
    "partitionNumber", 1).load()
# prConfig = PRConfig(5, 1.0)
prConfig = spark._jvm.PRConfig(5, 1.0)

# 注意,这个例子是 vid 是数字的情况,这个执行会报错,因为我们是在 basketballplayer follow 边上跑, VID 都是字符串
# prResult = PageRankAlgo.apply(spark, df, prConfig, False)
prResult = spark._jvm.PageRankAlgo.apply(jspark, df._jdf, prConfig, False)

# 下边是字符串的 VID 的处理例子

## convert string src and dst into int when it's not int VID

from pyspark.sql.functions import dense_rank, col
from pyspark.sql.window import Window


def convert_string_id_to_long_id(df):
    src_id_df = df.select("_srcId").withColumnRenamed("_srcId", "id")
    dst_id_df = df.select("_dstId").withColumnRenamed("_dstId", "id")
    id_df = src_id_df.union(dst_id_df).distinct()
    encode_id = id_df.withColumn("encodedId", dense_rank().over(Window.orderBy("id")))
    encode_id.write.option("header", True).csv("file:///tmp/encodeId.csv")
    src_join_df = df.join(encode_id, df._srcId == encode_id.id)\
        .drop("_srcId")\
        .drop("id")\
        .withColumnRenamed("encodedId", "_srcId")
    dst_join_df = src_join_df.join(encode_id, src_join_df._dstId == encode_id.id)\
        .drop("_dstId")\
        .drop("id")\
        .drop("_rank")\
        .drop("degree")\
        .withColumnRenamed("encodedId", "_dstId")
    
    return dst_join_df

df_int = convert_string_id_to_long_id(df)
prResult = spark._jvm.PageRankAlgo.apply(jspark, df_int._jdf, prConfig, False)

我们看看结果:

>>> # pageren result, but this is in vid type
... prResult.show()
+---+--------+
|_id|pagerank|
+---+--------+
|100|     1.0|
| 13|     1.0|
| 55|     1.0|
| 80|     1.0|
| 95|     1.0|
| 50|     1.0|
|  0|     1.0|
| -1|     1.0|
| 75|     1.0|
| 83|     1.0|
| 90|     1.0|
| 70|     1.0|
|  9|     1.0|
| 85|     1.0|
| 60|     1.0|
| 88|     1.0|
| 10|     1.0|
| 99|     1.0|
+---+--------+

>>>
>>> # this is the mapping of the vid and the encodedId
... mapping = spark.read.option("header", True).csv("file:///tmp/encodeId.csv")
>>>
>>> mapping.show()
+---------+---------+
|       id|encodedId|
+---------+---------+
|player100|        1|
|player101|        2|
|player102|        3|
|player103|        4|
|player104|        5|
|player105|        6|
|player106|        7|
|player107|        8|
|player108|        9|
|player109|       10|
|player113|       11|
|player114|       12|
|player115|       13|
|player116|       14|
|player117|       15|
|player118|       16|
|player119|       17|
|player120|       18|
|player121|       19|
|player124|       20|
+---------+---------+
only showing top 20 rows
1 个赞