提问参考模版:
因为find shortest path 没有点属性过滤,想尝试用GraphX 中的ShortestPath, 想咨询这个方法可以实现点过滤的需求吗?
我看这个源码并没有相关的参数
object ShortestPathAlgo {
private val LOGGER = Logger.getLogger(this.getClass)
val ALGORITHM: String = "ShortestPath"
/**
* run the ShortestPath algorithm for nebula graph
*/
def apply(spark: SparkSession,
dataset: Dataset[Row],
shortestPathConfig: ShortestPathConfig,
hasWeight: Boolean): DataFrame = {
val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)
val prResultRDD = execute(graph, shortestPathConfig.landmarks)
val schema = StructType(
List(
StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false),
StructField(AlgoConstants.SHORTPATH_RESULT_COL, StringType, nullable = true)
))
val algoResult = spark.sqlContext
.createDataFrame(prResultRDD, schema)
algoResult
}
def execute(graph: Graph[None.type, Double], landmarks: Seq[VertexId]): RDD[Row] = {
val spResultRDD: VertexRDD[SPMap] = ShortestPaths.run(graph, landmarks).vertices
spResultRDD.map(row => Row(row._1, row._2.toString()))
}
}
算法包本身不支持对点或边做过滤,但你可以通过API调用的形式做更多处理:
使用Spark-connector读取点边数据,读取后数据结构为DataFrame或RDD
根据点边数据构造Graph
可以对Graph做过滤
Graph调用算法
具体操作代码见readme中提到的各个算法的测试。
1 个赞
nicole:
可以对Graph做过滤
我用spark-connect 读取之后的点, 边数据如下:
点
--------------------+--------------------+--------+
| _vertexId| keyno|hasimage|
+--------------------+--------------------+--------+
| 8866845041130209280|p08c06e62d974cc39...| false|
|-8735697723442855936|prdfb3a510eff91df...| false|
| 6517961932060753920|pfc2780cb07b7160e...| false|
| 2416714699379834880|p3d4ccb8a8567aa74...| false|
| 4504632926608097280|pd8ee41b16c2bcf4b...| false|
|-8192784831704989696|pr83f168c1bd148c6...| false|
| 2429233097448161280|p2980d711d30bffe0...| false|
|-6993893904742875136|p7c895d3e7f7206b4...| false|
| 6701695764252917760|prd306d62a2e78c2b...| false|
|-7897252106186653696|p5c82c79679174123...| false|
|-4495195819111284736|p852795c47e80b451...| false|
边:
+--------------------+--------------------+-----+------+--------------------+--------------------
| _srcId| _dstId|_rank| type| startid| endid
+--------------------+--------------------+-----+------+--------------------+--------------------
| 6517961932060753920| 8489149949548669785| 0|invest|pfc2780cb07b7160e...|e561a111a39ea01c5...
| 4504632926608097280|-2543642747794210240| 0|invest|pd8ee41b16c2bcf4b...|c2af99fa7117531fa...
| 64745182402314240| 2911281672524321457| 0|invest|d0c133c00a216666d...|03f686df91a9ddf88...
|-8192784831704989696|-6130187347699157737| 0|invest|pr83f168c1bd148c6...|02a8c0948f99469ee...
|-6993893904742875136|-8721650610771848215| 0|invest|p7c895d3e7f7206b4...|56fbe611b52c6bb45...
如何构建graph? 目前的spark-utils 有没有相关demo
我是希望寻找最短路径的时候,根据点属性过滤一些不需要的数据,比如上面的点有个hasimage property,希望找出点A 到点B 的最短路径,但是中间的点要满足hasimage == true
官方推荐用API 的方式调用这些算法, 但是这个demo 是read csv, 有没有demo 怎么把nebula 的数据构建成csv 的,src, dst 形式?nebula 中的src,dst 有若干的属性怎么处理?边需要怎么处理成csv 格式中?不需要处理吗?
class ShortestPathAlgoSuite {
@Test
def shortestPathAlgoSuite(): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
val data = spark.read.option("header", true).csv("src/test/resources/edge.csv")
val shortestPathConfig = new ShortestPathConfig(Seq(1, 2))
val result = ShortestPathAlgo.apply(spark, data, shortestPathConfig, false)
assert(result.count() == 4)
}
}
你已经拿到点和边的RDD,构建graph就直接 val graph = Graph(vertexRDD, edgeRDD)
就构造好了。
如果你想做点边过滤,就可以基于graph 来做。
关于怎么调用,你是要基于graph调用算法,这么调:
1. graph中点边属性的数据类型 处理为Graph[None.type, Double]
2. ShortestPathAlgo.execute(graph, landmarks)
abc
2021 年8 月 6 日 04:52
5
读出来以后点和属性是怎么对应到 vertexRDD[vd]的?多tag时候呢?
你可以先看下example中的读取示例:
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
package com.vesoft.nebula.examples.connector
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object NebulaSparkReaderExample {
private val LOG = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
This file has been truncated. show original
你读取的属性集合是一个list结构存储于Graphx的VertexRDD的属性中。 多tag就多次读取然后union在一起。
感谢回答, 我整理下步骤,同时也有点疑惑,请帮忙看看.
用spark-connector 读取spark数据成RDD 或者DF 构建Graph如下:
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ProdRel")
.withLabel("Com")
.withNoColumn(false)
.withReturnCols(List())
.withLimit(10)
.withPartitionNum(10)
.build()
val vertexRDD = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToGraphx()
val edgeRDD = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToGraphx()
val graph = Graph(vertexRDD, edgeRDD)
对我的需求而言是求两个点之前的最短路径,但是我该怎么读取数据呢?nebula自带的算法是find shortest path from a to b upto 10 steps, 指定了a,b 的vid,但是1 中的方法,好像没办法指定,那是需要我load 多少数据合适呢,全部吗,那这样c端业务相应应该会很慢吧?
怎么处理出Graph[None.type, Double]的方法?
connector 读出的格式转为graph, 这个graph 是Graph[None.type, Double] 格式吗?
val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)
看到这可以转化需要的格式,但是这个dateset 是什么格式又不知道怎么转化? 没写过scala 这个有点痛苦
点过滤,可以基于graph 来做, 这个怎么做呢?有具体demo吗?
查询类,因为nebula 的find shortest path 暂时不支持点过滤,就想看看nebula-algorithm里面有没有合适方案。
听起来似乎nebula-algorithm 不能解决这个需求?
上面不是说了 可以过滤么。
情况是:你不介意消耗多资源更多时间去做你用不到的计算,完全ok啊。 你要求单独两个点的最短路径,算法会把你不需要的其他节点的最短路径也求出来。
Reid00
2021 年8 月 10 日 01:55
11
val nebulaReadVertexConfig = ReadNebulaConfig
.builder()
.withSpace("ProdRelation")
.withLabel("Person")
.withNoColumn(false)
.withReturnCols(List("keyno", "hasimage"))
.withLimit(10)
.withPartitionNum(10)
.build()
我最近又做了一些尝试,目前connector load 数据的时候是用limit 的模式,没办法指定load 的点和边,有没有什么方式load 我需要的点和边?
steam
2021 年8 月 10 日 02:17
12
Connector 在这里就是个工具,你这个需求本身 Nebula Graph 的 scan 结果是随机返回指定的 n 条数据的。所以你如果是需要返回指定的点和边的话,目前来说是不支持的
system
关闭
2021 年9 月 9 日 02:50
14
此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。