提问参考模版:
- nebula 版本:v3.4.1
- 部署方式:分布式
- 安装方式: RPM
- 是否上生产环境: N
- 硬件信息
- 磁盘( 推荐使用 SSD)
- CPU、内存信息
- 问题的具体描述
- 相关的 meta / storage / graph info 日志信息(尽量使用文本形式方便检索)
有没有通过pyspark 调用Graphx/ nebula-algorithm 的图计算demo?
提问参考模版:
有没有通过pyspark 调用Graphx/ nebula-algorithm 的图计算demo?
新鲜的,刚调通
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import
class NebulaAlgoServices:
session = None
@classmethod
def start(cls):
cls.session = (
SparkSession.builder.config(
"spark.jars",
"./nebula-spark-connector-3.6.0.jar",
)
.config("spark.driver-class-path" "./nebula-algorithm-3.1.0.jar")
.config("spark.jars", "./nebula-algorithm-3.1.0.jar")
.config("spark.sql.shuffle.partitions", 5)
.config("spark.executor.memory", "8g")
.config("spark.driver.memory", "4g")
.config("spark.sql.broadcastTimeout", -1)
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.appName("nebula-connector")
.master("local")
.getOrCreate()
)
@classmethod
def scan_data(cls):
df = (
cls.session.read.format("com.vesoft.nebula.connector.NebulaDataSource")
.option("type", "edge")
.option("spaceName", "xxx")
.option("label", "xxx")
.option("returnCols", "created_on")
.option(
"metaAddress",
"xxx:9559",
)
.option("partitionNumber", 1)
.option("operateType", "read")
.load()
)
df.show(2)
return df
@classmethod
def degree(cls):
data = cls.scan_data()
java_import(
cls.session._jvm, "com.vesoft.nebula.algorithm.config.DegreeStaticConfig"
)
java_import(
cls.session._jvm, "com.vesoft.nebula.algorithm.lib.DegreeStaticAlgo"
)
config = cls.session._jvm.DegreeStaticConfig(True)
result = cls.session._jvm.DegreeStaticAlgo.apply(
cls.session._jsparkSession, data._jdf, config
)
result.show()
if __name__ == "__main__":
NebulaAlgoServices.start()
NebulaAlgoServices.degree()
23/11/01 17:55:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+--------------------+--------------------+-----+----+
| _srcId| _dstId|_rank|name|
+--------------------+--------------------+-----+----+
|d4057d41ef9591eed...|436d1f07b89934a69...| 0|null|
|d4057d41ef9591eed...|ebf5cca88fcb41cdb...| 0|null|
+--------------------+--------------------+-----+----+
only showing top 2 rows
+--------------------+
| id|
+--------------------+
|d4057d41ef9591eed...|
|436d1f07b89934a69...|
|c7faa5dc976c7ed2e...|
|ae98f626cca3eabbb...|
|ebf5cca88fcb41cdb...|
|2406dfda1a02aa82d...|
|5aa0b304f29d9874a...|
|c808bae3cff0d245b...|
|78aea3e4cc23f1df5...|
|62f5a573af8418f4b...|
|ef42a19cd01bbd047...|
| true|
|eb997db2407119db2...|
+--------------------+
+-----+----+-----------+-----------+
|_rank|name| _srcId| _dstId|
+-----+----+-----------+-----------+
| 0|null| 0| 1|
| 0|null| 8589934592| 2|
| 0|null|34359738368| 2|
| 0|null|17179869184| 8589934594|
| 0|null|25769803776| 8589934595|
| 0|null| 0| 8589934593|
| 0|null|17179869185|25769803778|
| 0|null|25769803777|25769803778|
+-----+----+-----------+-----------+
[_id: string, degree: int ... 2 more fields]
+--------------------+------+--------+---------+
| _id|degree|inDegree|outDegree|
+--------------------+------+--------+---------+
|d4057d41ef9591eed...| 2| 0| 2|
|5aa0b304f29d9874a...| 1| 1| 0|
|62f5a573af8418f4b...| 1| 0| 1|
|ef42a19cd01bbd047...| 1| 0| 1|
|c808bae3cff0d245b...| 1| 0| 1|
|c7faa5dc976c7ed2e...| 2| 2| 0|
|ae98f626cca3eabbb...| 1| 0| 1|
|2406dfda1a02aa82d...| 1| 1| 0|
|78aea3e4cc23f1df5...| 1| 0| 1|
| true| 2| 2| 0|
|436d1f07b89934a69...| 1| 1| 0|
|ebf5cca88fcb41cdb...| 1| 1| 0|
|eb997db2407119db2...| 1| 0| 1|
+--------------------+------+--------+---------+
想起来了,这里我也有一个例子:
我目前也是这个问题,图比较大,这个代码我还没调通。提交方式要改一下,我看只是扫描了边,这个不用扫面点吗?
有个问题
def degree(self):
data = self.scan_data()
java_import(
self.session._jvm, "com.vesoft.nebula.algorithm.config.DegreeStaticConfig"
)
java_import(
self.session._jvm, "com.vesoft.nebula.algorithm.lib.DegreeStaticAlgo"
)
config = self.session._jvm.DegreeStaticConfig(True)
result = self.session._jvm.DegreeStaticAlgo.apply(
self.session._jsparkSession, data._jdf, config
)
result.show()
DegreeCentrality 算法这个Config 默认是计算1hop 的出入度吧?
现在 spark connector 的 scan 的并发程度满足不了么?
不是。现在一次read 只能读取一种边,想同时读取多种边有没有方式?
不然只能写多个read
df = (
self.session.read.format("com.vesoft.nebula.connector.NebulaDataSource")
.option("type", "edge")
.option("spaceName", SPACE)
.option("label", self.label)
.option("returnCols", self.cols)
.option("label", "Employ")
.option("returnCols", "role")
.option("metaAddress", metaHost[cluster])
.option("partitionNumber", 100)
.option("operateType", "read")
.load()
)
algo 里,labels 是可以给多个的,最后会 union
spark connector 的话,可以自己 union 再算
好的,感谢。还有一个问题:
这个result 我试了下,不是dataframe, 如何可以根据degree 逆序排序,打印出来?
result = self.session._jvm.DegreeStaticAlgo.apply(
self.session._jsparkSession, data._jdf, config
)
print("result show:")
result.show(truncate=False)
print("after reverse show: ")
sorted_df = result.sort(F.desc("degree"))
sorted_df.show(n=100)
jdf = result.toDF()
df = DataFrame(jdf, cls.session)
有没有方法可以获取两跳内子图的所有度数(点的个数)吗? 目前这个度数看了下只是一跳的
可以自定义算法, 迭代两次收集邻居节点做统计。
感谢。可以具体点吗?目前有GraphFrame 怎么进行迭代
# 迭代两次收集邻居节点并进行统计
two_hop_degrees = graph.find("(v1)-[]->(v2); (v2)-[]->(v3)").groupBy("v1").count()
one_hop_degrees = graph.find("(v1)-[]->(v2)").groupBy("v1").count()
# 合并一跳和两跳的度数统计
all_degrees = one_hop_degrees.union(two_hop_degrees).groupBy("v1").sum("count")
all_degrees.show(truncate=False)
这个写法只能获取出度,入度怎么写呢?
(v1)<-[]-(v2) 这个写法 不被识别
此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。