好的!谢谢 您要是有时间可不可以写个demo 用pyspark 去调图算法的示例,如果我在用pyspark去调用之前服务器上除了安装nebula-graph 和nebula-studio 还需要安装那些呢 ?是(Nebula algorithm)吗?
是的,我会出一个教程
好的谢谢nebula图专家
调通了,骚等~~~
好的
我先简单发给你步骤吧,回头我会发文章和更详细的东西
- 部署 nebulagraph, spark, nebulagraph algorithm
这里我用 nebula-up 的 all-in-one 工具帮助我们,一键做好一切准备,注意,它需要 9669 端口没被占用,如果有别的 nebula 集群,可以先关掉。
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash -s -- v3 spark
然后,东西入口都在 ~/.nebula-up
下边
比如,我们加载篮球数据集(后边会用到,必须执行以下),其实还有别的命令在下边,可以看看。
~/.nebula-up/load-basketballplayer-dataset.sh
- 进入 pyspark shell
~/.nebula-up/nebula-pyspark.sh
- 在 pyspark shell 里调用 nebulagraph-algorithm
有点像 pyspark 版本的 nebula-algorithm/PageRankExample.scala at 02a18ceaa34ed6aa576ddb96297b79adbe99f37e · vesoft-inc/nebula-algorithm · GitHub 的例子
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PageRankExample").getOrCreate()
jspark = spark._jsparkSession
from py4j.java_gateway import java_import
# import "com.vesoft.nebula.algorithm.config.SparkConfig"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.config.SparkConfig")
# import "com.vesoft.nebula.algorithm.config.PRConfig"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.config.PRConfig")
# import "com.vesoft.nebula.algorithm.lib.PageRankAlgo"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.lib.PageRankAlgo")
df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "edge").option(
"spaceName", "basketballplayer").option(
"label", "follow").option(
"returnCols", "degree").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
# prConfig = PRConfig(5, 1.0)
prConfig = spark._jvm.PRConfig(5, 1.0)
# 注意,这个例子是 vid 是数字的情况,这个执行会报错,因为我们是在 basketballplayer follow 边上跑, VID 都是字符串
# prResult = PageRankAlgo.apply(spark, df, prConfig, False)
prResult = spark._jvm.PageRankAlgo.apply(jspark, df._jdf, prConfig, False)
# 下边是字符串的 VID 的处理例子
## convert string src and dst into int when it's not int VID
from pyspark.sql.functions import dense_rank, col
from pyspark.sql.window import Window
def convert_string_id_to_long_id(df):
src_id_df = df.select("_srcId").withColumnRenamed("_srcId", "id")
dst_id_df = df.select("_dstId").withColumnRenamed("_dstId", "id")
id_df = src_id_df.union(dst_id_df).distinct()
encode_id = id_df.withColumn("encodedId", dense_rank().over(Window.orderBy("id")))
encode_id.write.option("header", True).csv("file:///tmp/encodeId.csv")
src_join_df = df.join(encode_id, df._srcId == encode_id.id)\
.drop("_srcId")\
.drop("id")\
.withColumnRenamed("encodedId", "_srcId")
dst_join_df = src_join_df.join(encode_id, src_join_df._dstId == encode_id.id)\
.drop("_dstId")\
.drop("id")\
.drop("_rank")\
.drop("degree")\
.withColumnRenamed("encodedId", "_dstId")
return dst_join_df
df_int = convert_string_id_to_long_id(df)
prResult = spark._jvm.PageRankAlgo.apply(jspark, df_int._jdf, prConfig, False)
我们看看结果:
>>> # pageren result, but this is in vid type
... prResult.show()
+---+--------+
|_id|pagerank|
+---+--------+
|100| 1.0|
| 13| 1.0|
| 55| 1.0|
| 80| 1.0|
| 95| 1.0|
| 50| 1.0|
| 0| 1.0|
| -1| 1.0|
| 75| 1.0|
| 83| 1.0|
| 90| 1.0|
| 70| 1.0|
| 9| 1.0|
| 85| 1.0|
| 60| 1.0|
| 88| 1.0|
| 10| 1.0|
| 99| 1.0|
+---+--------+
>>>
>>> # this is the mapping of the vid and the encodedId
... mapping = spark.read.option("header", True).csv("file:///tmp/encodeId.csv")
>>>
>>> mapping.show()
+---------+---------+
| id|encodedId|
+---------+---------+
|player100| 1|
|player101| 2|
|player102| 3|
|player103| 4|
|player104| 5|
|player105| 6|
|player106| 7|
|player107| 8|
|player108| 9|
|player109| 10|
|player113| 11|
|player114| 12|
|player115| 13|
|player116| 14|
|player117| 15|
|player118| 16|
|player119| 17|
|player120| 18|
|player121| 19|
|player124| 20|
+---------+---------+
only showing top 20 rows
啊,果然!
>>> prConfig = spark._jvm.PRConfig(3, 0.85)
>>> prResult = spark._jvm.PageRankAlgo.apply(jspark, df_int._jdf, prConfig, False)
…
>>> prResult.show()
+---+------------------+
|_id| pagerank|
+---+------------------+
|100| 0.995408631772268|
| 13|0.9935720844811753|
| 55|0.9935720844811753|
| 80| 1.017447199265381|
| 95|1.0009182736455464|
| 50|0.9935720844811753|
| 0|0.9917355371900826|
| -1| 0.995408631772268|
| 75| 0.995408631772268|
| 83|0.9935720844811753|
| 90| 1.046831955922865|
| 70|0.9972451790633607|
| 9|0.9935720844811753|
| 85| 0.995408631772268|
| 60|0.9935720844811753|
| 88|0.9935720844811753|
| 10|0.9935720844811753|
| 99|1.0156106519742885|
+---+------------------+
请问您的一键部署方式是用docker部署吗?
是,按理说即使你的环境没有 docker 它也会试着安装 docker,不过最好还是提前装好。
我现在服务器上有nebu-graph 和nebula-studio 都是用docker按照部署的 如果我把这两个服务停了的话,按照你的一键安装部署方式可以执行成功吗?或者可以这样操作吗
spark 部署可以参考网上的材料,在 spark 上运行 algorihtm 就参考这个文档是可以的哈。
如果我想通过编译器去调用算法也是这样吗?如果是的话,这种写法调用不需要用ip链接吗?
Spark 是一个分布式(并行)运行的环境、编程范式。
按照 spark 的范式(类似 map reduce)写好代码之后,可以提交给 spark cluster 去执行。
Nebula algorithm 是一个可以跑在 spark 上的 软件包。
PySpark 是一个 spark 支持的语言环境(python)。
没太明白你的问题,不过上边的信息应该能帮助你理解哈。
spark 和nebula-algorithm 分开单独安装就可以吧?
Spark 单独安装,不过要能访问到 nebulagraph 的 graph, meta, storage 服务。
Nebula-algorithm 就是要保证 jar 包被提交、执行的时候被 pyspark 引用 到
这个过程是会拉取 docker 镜像,还会拉取一个数据集(live journal)和 nebula algorithm 的 jar 包,一次要个几个 GiB 的下载量哈,跑多久了?
另外,我昨天更新了 nebula-up 你这么执行之后,会自动给你启动一个 jupyter notebook,里边一切都准备好了,里边有一个 pagerank 的 notebook 可以打开按行执行哈
notebook 就是你这个服务器的 8888 端口,默认 token 是 nebula