Python和nebula结合的开源项目

好的!谢谢 :smiley: 您要是有时间可不可以写个demo 用pyspark 去调图算法的示例,如果我在用pyspark去调用之前服务器上除了安装nebula-graph 和nebula-studio 还需要安装那些呢 ?是(Nebula algorithm)吗?

是的,我会出一个教程

好的谢谢nebula图专家

调通了,骚等~~~

好的

我先简单发给你步骤吧,回头我会发文章和更详细的东西

  1. 部署 nebulagraph, spark, nebulagraph algorithm
    这里我用 nebula-up 的 all-in-one 工具帮助我们,一键做好一切准备,注意,它需要 9669 端口没被占用,如果有别的 nebula 集群,可以先关掉。
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash -s -- v3 spark

然后,东西入口都在 ~/.nebula-up 下边

比如,我们加载篮球数据集(后边会用到,必须执行以下),其实还有别的命令在下边,可以看看。

~/.nebula-up/load-basketballplayer-dataset.sh
  1. 进入 pyspark shell
~/.nebula-up/nebula-pyspark.sh
  1. 在 pyspark shell 里调用 nebulagraph-algorithm

有点像 pyspark 版本的 nebula-algorithm/PageRankExample.scala at 02a18ceaa34ed6aa576ddb96297b79adbe99f37e · vesoft-inc/nebula-algorithm · GitHub 的例子

from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("PageRankExample").getOrCreate()
jspark = spark._jsparkSession

from py4j.java_gateway import java_import
# import "com.vesoft.nebula.algorithm.config.SparkConfig"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.config.SparkConfig")

# import "com.vesoft.nebula.algorithm.config.PRConfig"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.config.PRConfig")

# import "com.vesoft.nebula.algorithm.lib.PageRankAlgo"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.lib.PageRankAlgo")

df = spark.read.format(
  "com.vesoft.nebula.connector.NebulaDataSource").option(
    "type", "edge").option(
    "spaceName", "basketballplayer").option(
    "label", "follow").option(
    "returnCols", "degree").option(
    "metaAddress", "metad0:9559").option(
    "partitionNumber", 1).load()
# prConfig = PRConfig(5, 1.0)
prConfig = spark._jvm.PRConfig(5, 1.0)

# 注意,这个例子是 vid 是数字的情况,这个执行会报错,因为我们是在 basketballplayer follow 边上跑, VID 都是字符串
# prResult = PageRankAlgo.apply(spark, df, prConfig, False)
prResult = spark._jvm.PageRankAlgo.apply(jspark, df._jdf, prConfig, False)

# 下边是字符串的 VID 的处理例子

## convert string src and dst into int when it's not int VID

from pyspark.sql.functions import dense_rank, col
from pyspark.sql.window import Window


def convert_string_id_to_long_id(df):
    src_id_df = df.select("_srcId").withColumnRenamed("_srcId", "id")
    dst_id_df = df.select("_dstId").withColumnRenamed("_dstId", "id")
    id_df = src_id_df.union(dst_id_df).distinct()
    encode_id = id_df.withColumn("encodedId", dense_rank().over(Window.orderBy("id")))
    encode_id.write.option("header", True).csv("file:///tmp/encodeId.csv")
    src_join_df = df.join(encode_id, df._srcId == encode_id.id)\
        .drop("_srcId")\
        .drop("id")\
        .withColumnRenamed("encodedId", "_srcId")
    dst_join_df = src_join_df.join(encode_id, src_join_df._dstId == encode_id.id)\
        .drop("_dstId")\
        .drop("id")\
        .drop("_rank")\
        .drop("degree")\
        .withColumnRenamed("encodedId", "_dstId")
    
    return dst_join_df

df_int = convert_string_id_to_long_id(df)
prResult = spark._jvm.PageRankAlgo.apply(jspark, df_int._jdf, prConfig, False)

我们看看结果:

>>> # pageren result, but this is in vid type
... prResult.show()
+---+--------+
|_id|pagerank|
+---+--------+
|100|     1.0|
| 13|     1.0|
| 55|     1.0|
| 80|     1.0|
| 95|     1.0|
| 50|     1.0|
|  0|     1.0|
| -1|     1.0|
| 75|     1.0|
| 83|     1.0|
| 90|     1.0|
| 70|     1.0|
|  9|     1.0|
| 85|     1.0|
| 60|     1.0|
| 88|     1.0|
| 10|     1.0|
| 99|     1.0|
+---+--------+

>>>
>>> # this is the mapping of the vid and the encodedId
... mapping = spark.read.option("header", True).csv("file:///tmp/encodeId.csv")
>>>
>>> mapping.show()
+---------+---------+
|       id|encodedId|
+---------+---------+
|player100|        1|
|player101|        2|
|player102|        3|
|player103|        4|
|player104|        5|
|player105|        6|
|player106|        7|
|player107|        8|
|player108|        9|
|player109|       10|
|player113|       11|
|player114|       12|
|player115|       13|
|player116|       14|
|player117|       15|
|player118|       16|
|player119|       17|
|player120|       18|
|player121|       19|
|player124|       20|
+---------+---------+
only showing top 20 rows
1 个赞

@nicole 这个 basketballplayer.follow 上的 pagerank 值全是 1.0 正常哈?是我给的 5, 1.0 造成的?

啊,果然!

>>> prConfig = spark._jvm.PRConfig(3, 0.85)
>>> prResult = spark._jvm.PageRankAlgo.apply(jspark, df_int._jdf, prConfig, False)

…

>>> prResult.show()
+---+------------------+
|_id|          pagerank|
+---+------------------+
|100| 0.995408631772268|
| 13|0.9935720844811753|
| 55|0.9935720844811753|
| 80| 1.017447199265381|
| 95|1.0009182736455464|
| 50|0.9935720844811753|
|  0|0.9917355371900826|
| -1| 0.995408631772268|
| 75| 0.995408631772268|
| 83|0.9935720844811753|
| 90| 1.046831955922865|
| 70|0.9972451790633607|
|  9|0.9935720844811753|
| 85| 0.995408631772268|
| 60|0.9935720844811753|
| 88|0.9935720844811753|
| 10|0.9935720844811753|
| 99|1.0156106519742885|
+---+------------------+
1 个赞

请问您的一键部署方式是用docker部署吗?

是,按理说即使你的环境没有 docker 它也会试着安装 docker,不过最好还是提前装好。

如果我要部署spark 和nebula-algorithm 可以按照


文档这个步骤部署吗?

我现在服务器上有nebu-graph 和nebula-studio 都是用docker按照部署的 如果我把这两个服务停了的话,按照你的一键安装部署方式可以执行成功吗?或者可以这样操作吗

spark 部署可以参考网上的材料,在 spark 上运行 algorihtm 就参考这个文档是可以的哈。

1 个赞

如果我想通过编译器去调用算法也是这样吗?如果是的话,这种写法调用不需要用ip链接吗?

Spark 是一个分布式(并行)运行的环境、编程范式。

按照 spark 的范式(类似 map reduce)写好代码之后,可以提交给 spark cluster 去执行。

Nebula algorithm 是一个可以跑在 spark 上的 软件包。

PySpark 是一个 spark 支持的语言环境(python)。

没太明白你的问题,不过上边的信息应该能帮助你理解哈。

spark 和nebula-algorithm 分开单独安装就可以吧?

Spark 单独安装,不过要能访问到 nebulagraph 的 graph, meta, storage 服务。

Nebula-algorithm 就是要保证 jar 包被提交、执行的时候被 pyspark 引用 到

请问在服务器下执行你的一键安装命令 (


拉取卡住了是怪网络问题吗?

这个过程是会拉取 docker 镜像,还会拉取一个数据集(live journal)和 nebula algorithm 的 jar 包,一次要个几个 GiB 的下载量哈,跑多久了?

另外,我昨天更新了 nebula-up 你这么执行之后,会自动给你启动一个 jupyter notebook,里边一切都准备好了,里边有一个 pagerank 的 notebook 可以打开按行执行哈

notebook 就是你这个服务器的 8888 端口,默认 token 是 nebula