Nebula Algorithm 算法 -- 最短路径

提问参考模版:

  • nebula 版本:2.0.1

  • 部署方式(分布式 / 单机 / Docker / DBaaS):分布式

  • 是否为线上版本:Y

  • 问题的具体描述

因为find shortest path 没有点属性过滤,想尝试用GraphX 中的ShortestPath, 想咨询这个方法可以实现点过滤的需求吗?

我看这个源码并没有相关的参数

object ShortestPathAlgo {
  private val LOGGER = Logger.getLogger(this.getClass)

  val ALGORITHM: String = "ShortestPath"

  /**
    * run the ShortestPath algorithm for nebula graph
    */
  def apply(spark: SparkSession,
            dataset: Dataset[Row],
            shortestPathConfig: ShortestPathConfig,
            hasWeight: Boolean): DataFrame = {

    val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)

    val prResultRDD = execute(graph, shortestPathConfig.landmarks)

    val schema = StructType(
      List(
        StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false),
        StructField(AlgoConstants.SHORTPATH_RESULT_COL, StringType, nullable = true)
      ))
    val algoResult = spark.sqlContext
      .createDataFrame(prResultRDD, schema)

    algoResult
  }

  def execute(graph: Graph[None.type, Double], landmarks: Seq[VertexId]): RDD[Row] = {
    val spResultRDD: VertexRDD[SPMap] = ShortestPaths.run(graph, landmarks).vertices
    spResultRDD.map(row => Row(row._1, row._2.toString()))
  }
}

算法包本身不支持对点或边做过滤,但你可以通过API调用的形式做更多处理:

  1. 使用Spark-connector读取点边数据,读取后数据结构为DataFrame或RDD
  2. 根据点边数据构造Graph
  3. 可以对Graph做过滤
  4. Graph调用算法
    具体操作代码见readme中提到的各个算法的测试。
1 个赞
  1. 我用spark-connect 读取之后的点, 边数据如下:
--------------------+--------------------+--------+
|           _vertexId|               keyno|hasimage|
+--------------------+--------------------+--------+
| 8866845041130209280|p08c06e62d974cc39...|   false|
|-8735697723442855936|prdfb3a510eff91df...|   false|
| 6517961932060753920|pfc2780cb07b7160e...|   false|
| 2416714699379834880|p3d4ccb8a8567aa74...|   false|
| 4504632926608097280|pd8ee41b16c2bcf4b...|   false|
|-8192784831704989696|pr83f168c1bd148c6...|   false|
| 2429233097448161280|p2980d711d30bffe0...|   false|
|-6993893904742875136|p7c895d3e7f7206b4...|   false|
| 6701695764252917760|prd306d62a2e78c2b...|   false|
|-7897252106186653696|p5c82c79679174123...|   false|
|-4495195819111284736|p852795c47e80b451...|   false|

边:

+--------------------+--------------------+-----+------+--------------------+--------------------
|              _srcId|              _dstId|_rank|  type|             startid|               endid
+--------------------+--------------------+-----+------+--------------------+--------------------
| 6517961932060753920| 8489149949548669785|    0|invest|pfc2780cb07b7160e...|e561a111a39ea01c5...
| 4504632926608097280|-2543642747794210240|    0|invest|pd8ee41b16c2bcf4b...|c2af99fa7117531fa...
|   64745182402314240| 2911281672524321457|    0|invest|d0c133c00a216666d...|03f686df91a9ddf88...
|-8192784831704989696|-6130187347699157737|    0|invest|pr83f168c1bd148c6...|02a8c0948f99469ee...
|-6993893904742875136|-8721650610771848215|    0|invest|p7c895d3e7f7206b4...|56fbe611b52c6bb45...
  1. 如何构建graph? 目前的spark-utils 有没有相关demo

  2. 我是希望寻找最短路径的时候,根据点属性过滤一些不需要的数据,比如上面的点有个hasimage property,希望找出点A 到点B 的最短路径,但是中间的点要满足hasimage == true

  3. 官方推荐用API 的方式调用这些算法, 但是这个demo 是read csv, 有没有demo 怎么把nebula 的数据构建成csv 的,src, dst 形式?nebula 中的src,dst 有若干的属性怎么处理?边需要怎么处理成csv 格式中?不需要处理吗?

class ShortestPathAlgoSuite {
  @Test
  def shortestPathAlgoSuite(): Unit = {
    val spark              = SparkSession.builder().master("local").getOrCreate()
    val data               = spark.read.option("header", true).csv("src/test/resources/edge.csv")
    val shortestPathConfig = new ShortestPathConfig(Seq(1, 2))
    val result             = ShortestPathAlgo.apply(spark, data, shortestPathConfig, false)
    assert(result.count() == 4)
  }
}

你已经拿到点和边的RDD,构建graph就直接 val graph = Graph(vertexRDD, edgeRDD) 就构造好了。
如果你想做点边过滤,就可以基于graph 来做。

关于怎么调用,你是要基于graph调用算法,这么调:

1. graph中点边属性的数据类型 处理为Graph[None.type, Double]
2. ShortestPathAlgo.execute(graph, landmarks)

读出来以后点和属性是怎么对应到 vertexRDD[vd]的?多tag时候呢?

你可以先看下example中的读取示例:

你读取的属性集合是一个list结构存储于Graphx的VertexRDD的属性中。 多tag就多次读取然后union在一起。

感谢回答, 我整理下步骤,同时也有点疑惑,请帮忙看看.

  1. 用spark-connector 读取spark数据成RDD 或者DF 构建Graph如下:
   val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ProdRel")
      .withLabel("Com")
      .withNoColumn(false)
      .withReturnCols(List())
      .withLimit(10)
      .withPartitionNum(10)
      .build()  

val vertexRDD = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToGraphx()
val edgeRDD = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToGraphx()
val graph = Graph(vertexRDD, edgeRDD)
  • 疑惑:

对我的需求而言是求两个点之前的最短路径,但是我该怎么读取数据呢?nebula自带的算法是find shortest path from a to b upto 10 steps, 指定了a,b 的vid,但是1 中的方法,好像没办法指定,那是需要我load 多少数据合适呢,全部吗,那这样c端业务相应应该会很慢吧?

  1. 怎么处理出Graph[None.type, Double]的方法?
    connector 读出的格式转为graph, 这个graph 是Graph[None.type, Double] 格式吗?
val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)

看到这可以转化需要的格式,但是这个dateset 是什么格式又不知道怎么转化? 没写过scala 这个有点痛苦

  1. 点过滤,可以基于graph 来做, 这个怎么做呢?有具体demo吗?
  1. 算法是指定一个参数集合,计算的是该集合中的点与图中其他节点之间的最短路径。 你要明确你的需求是查询类还是图计算类,图计算是涉及到全图的迭代式的计算。 如果你只想对明确的两个点求最短路径,那就用nebula的find shortest path。
  2. 可能需要你去了解下scala语法了
  3. 如何做过滤,你可以去看下spark的graphx的API文档,有针对graph的filter操作。

查询类,因为nebula 的find shortest path 暂时不支持点过滤,就想看看nebula-algorithm里面有没有合适方案。
听起来似乎nebula-algorithm 不能解决这个需求?

上面不是说了 可以过滤么。
情况是:你不介意消耗多资源更多时间去做你用不到的计算,完全ok啊。 你要求单独两个点的最短路径,算法会把你不需要的其他节点的最短路径也求出来。

    val nebulaReadVertexConfig = ReadNebulaConfig
      .builder()
      .withSpace("ProdRelation")
      .withLabel("Person")
      .withNoColumn(false)
      .withReturnCols(List("keyno", "hasimage"))
      .withLimit(10)
      .withPartitionNum(10)
      .build()

我最近又做了一些尝试,目前connector load 数据的时候是用limit 的模式,没办法指定load 的点和边,有没有什么方式load 我需要的点和边?

Connector 在这里就是个工具,你这个需求本身 Nebula Graph 的 scan 结果是随机返回指定的 n 条数据的。所以你如果是需要返回指定的点和边的话,目前来说是不支持的

好的,明白了,谢谢。

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。