我先简单发给你步骤吧,回头我会发文章和更详细的东西
- 部署 nebulagraph, spark, nebulagraph algorithm
这里我用 nebula-up 的 all-in-one 工具帮助我们,一键做好一切准备,注意,它需要 9669 端口没被占用,如果有别的 nebula 集群,可以先关掉。
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash -s -- v3 spark
然后,东西入口都在 ~/.nebula-up
下边
比如,我们加载篮球数据集(后边会用到,必须执行以下),其实还有别的命令在下边,可以看看。
~/.nebula-up/load-basketballplayer-dataset.sh
- 进入 pyspark shell
~/.nebula-up/nebula-pyspark.sh
- 在 pyspark shell 里调用 nebulagraph-algorithm
有点像 pyspark 版本的 nebula-algorithm/PageRankExample.scala at 02a18ceaa34ed6aa576ddb96297b79adbe99f37e · vesoft-inc/nebula-algorithm · GitHub 的例子
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PageRankExample").getOrCreate()
jspark = spark._jsparkSession
from py4j.java_gateway import java_import
# import "com.vesoft.nebula.algorithm.config.SparkConfig"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.config.SparkConfig")
# import "com.vesoft.nebula.algorithm.config.PRConfig"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.config.PRConfig")
# import "com.vesoft.nebula.algorithm.lib.PageRankAlgo"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.lib.PageRankAlgo")
df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "edge").option(
"spaceName", "basketballplayer").option(
"label", "follow").option(
"returnCols", "degree").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
# prConfig = PRConfig(5, 1.0)
prConfig = spark._jvm.PRConfig(5, 1.0)
# 注意,这个例子是 vid 是数字的情况,这个执行会报错,因为我们是在 basketballplayer follow 边上跑, VID 都是字符串
# prResult = PageRankAlgo.apply(spark, df, prConfig, False)
prResult = spark._jvm.PageRankAlgo.apply(jspark, df._jdf, prConfig, False)
# 下边是字符串的 VID 的处理例子
## convert string src and dst into int when it's not int VID
from pyspark.sql.functions import dense_rank, col
from pyspark.sql.window import Window
def convert_string_id_to_long_id(df):
src_id_df = df.select("_srcId").withColumnRenamed("_srcId", "id")
dst_id_df = df.select("_dstId").withColumnRenamed("_dstId", "id")
id_df = src_id_df.union(dst_id_df).distinct()
encode_id = id_df.withColumn("encodedId", dense_rank().over(Window.orderBy("id")))
encode_id.write.option("header", True).csv("file:///tmp/encodeId.csv")
src_join_df = df.join(encode_id, df._srcId == encode_id.id)\
.drop("_srcId")\
.drop("id")\
.withColumnRenamed("encodedId", "_srcId")
dst_join_df = src_join_df.join(encode_id, src_join_df._dstId == encode_id.id)\
.drop("_dstId")\
.drop("id")\
.drop("_rank")\
.drop("degree")\
.withColumnRenamed("encodedId", "_dstId")
return dst_join_df
df_int = convert_string_id_to_long_id(df)
prResult = spark._jvm.PageRankAlgo.apply(jspark, df_int._jdf, prConfig, False)
我们看看结果:
>>> # pageren result, but this is in vid type
... prResult.show()
+---+--------+
|_id|pagerank|
+---+--------+
|100| 1.0|
| 13| 1.0|
| 55| 1.0|
| 80| 1.0|
| 95| 1.0|
| 50| 1.0|
| 0| 1.0|
| -1| 1.0|
| 75| 1.0|
| 83| 1.0|
| 90| 1.0|
| 70| 1.0|
| 9| 1.0|
| 85| 1.0|
| 60| 1.0|
| 88| 1.0|
| 10| 1.0|
| 99| 1.0|
+---+--------+
>>>
>>> # this is the mapping of the vid and the encodedId
... mapping = spark.read.option("header", True).csv("file:///tmp/encodeId.csv")
>>>
>>> mapping.show()
+---------+---------+
| id|encodedId|
+---------+---------+
|player100| 1|
|player101| 2|
|player102| 3|
|player103| 4|
|player104| 5|
|player105| 6|
|player106| 7|
|player107| 8|
|player108| 9|
|player109| 10|
|player113| 11|
|player114| 12|
|player115| 13|
|player116| 14|
|player117| 15|
|player118| 16|
|player119| 17|
|player120| 18|
|player121| 19|
|player124| 20|
+---------+---------+
only showing top 20 rows