spark connector 读取Nebula多种类型点边数据构建spark Graphx 图报错

  • nebula 版本:3.0
  • 部署方式:单机
  • 安装方式:RPM
  • 是否为线上版本:N
  • 硬件信息
    • 磁盘 40G SSD
    • CPU、内存信息 : 4核 8G内存
  • 问题的具体描述
    使用Spark connector 读取Nebula中的数据去构建Graphx 的图,参照spark-connector 源码例子发现有2种方式,一种是基于RDD,一种是基于DataFrame,基于RDD的,封装类型如下:
  • 读取得到DataFrame 类型:
  def readVertex(spark: SparkSession): Unit = {
    LOG.info("start to read nebula vertices")
    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress("127.0.0.1:9559")
        .withConenctionRetry(2)
        .build()
    val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("test")
      .withLabel("person")
      .withNoColumn(false)
      .withReturnCols(List("birthday"))
      .withLimit(10)
      .withPartitionNum(10)
      .build()
    val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
    vertex.printSchema()
    vertex.show(20)
    println("vertex count: " + vertex.count())
  }

  def readEdges(spark: SparkSession): Unit = {
    LOG.info("start to read nebula edges")
    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress("127.0.0.1:9559")
        .withTimeout(6000)
        .withConenctionRetry(2)
        .build()
    val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("test")
      .withLabel("knows")
      .withNoColumn(false)
      .withReturnCols(List("degree"))
      .withLimit(10)
      .withPartitionNum(10)
      .build()
    val edge: DataFrame = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
    edge.printSchema()
    edge.show(20)
    println("edge count: " + edge.count())
  }
  • 基于RDD 封装类型:
def readVertexGraph(spark: SparkSession): Unit = {
    LOG.info("start to read graphx vertex")
    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress("127.0.0.1:9559")
        .withTimeout(6000)
        .withConenctionRetry(2)
        .build()
    val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("test")
      .withLabel("person")
      .withNoColumn(false)
      .withReturnCols(List("birthday"))
      .withLimit(10)
      .withPartitionNum(10)
      .build()

    val vertexRDD: RDD[(VertexID, PropertyValues)] = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToGraphx()
    LOG.info("vertex rdd first record: " + vertexRDD.first())
    LOG.info("vertex rdd count: {}", vertexRDD.count())
  }

  def readEdgeGraph(spark: SparkSession): Unit = {
    LOG.info("start to read graphx edge")
    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress("127.0.0.1:9559")
        .withTimeout(6000)
        .withConenctionRetry(2)
        .build()
    val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("test")
      .withLabel("knows")
      .withNoColumn(false)
      .withReturnCols(List("timep"))
      .withLimit(10)
      .withPartitionNum(10)
      .build()
    val edgeRDD: RDD[NebulaGraphxEdge] = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToGraphx()
    LOG.info("edge rdd first record:" + edgeRDD.first())
    LOG.info("edge rdd count: {}", edgeRDD.count())
  }

那么问题来了,如果使用以上两种方式需要读取多种类型的点边的话,能否得到对应的vertexRDD 和edgeRDD以及vertexDataFrame,edgeDataFrame? 又参照了Nebula-Algrithm 源码,发现进行如下封装:

class NebulaReader(spark: SparkSession, configs: Configs, partitionNum: String)
    extends DataReader(spark, configs) {
  override def read(): DataFrame = {
    val metaAddress = configs.nebulaConfig.readConfigEntry.address
    val space       = configs.nebulaConfig.readConfigEntry.space
    val labels      = configs.nebulaConfig.readConfigEntry.labels
    val weights     = configs.nebulaConfig.readConfigEntry.weightCols
    val partition   = partitionNum.toInt

    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress(metaAddress)
        .withConenctionRetry(2)
        .build()

    val noColumn = weights.isEmpty

    var dataset: DataFrame = null
    for (i <- labels.indices) {
      val returnCols: ListBuffer[String] = new ListBuffer[String]
      if (configs.dataSourceSinkEntry.hasWeight && weights.nonEmpty) {
        returnCols.append(weights(i))
      }
      val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
        .builder()
        .withSpace(space)
        .withLabel(labels(i))
        .withNoColumn(noColumn)
        .withReturnCols(returnCols.toList)
        .withPartitionNum(partition)
        .build()
      if (dataset == null) {
        dataset = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()

        if (weights.nonEmpty) {
          dataset = dataset.select("_srcId", "_dstId", weights(i))
        }
      } else {
        var df = spark.read
          .nebula(config, nebulaReadEdgeConfig)
          .loadEdgesToDF()
        if (weights.nonEmpty) {
          df = df.select("_srcId", "_dstId", weights(i))
        }
        dataset = dataset.union(df)
      }
    }
    dataset
  }
}

也就是在配置文件中配置多个labels,然后每个label的数据对应一个DataSet,然后将不同类型的dataset 合并成一个edgeDataFrame,而这里能合并成一个DataFrame的前提是根据配置文件是否有weight参数,封装Edge对象,如下:

而我们场景是希望使用spark-connector 读取Nebula 的数据进行图规则计算,而希望读取数据的时候数据类型统一都是按照DataFrame,就像源码中这样:

 private[this] def createDataSource(spark: SparkSession,
                                     configs: Configs,
                                     partitionNum: String): DataFrame = {
    val dataSource = configs.dataSourceSinkEntry.source
    val dataSet: Dataset[Row] = dataSource.toLowerCase match {
      case "nebula" => {
        val reader = new NebulaReader(spark, configs, partitionNum)
        reader.read()
      }
      case "csv" => {
        val reader = new CsvReader(spark, configs, partitionNum)
        reader.read()
      }
      case "json" => {
        val reader = new JsonReader(spark, configs, partitionNum)
        reader.read()
      }
    }
    dataSet
  }

而目前遇到的问题如下:

  1. 想基于Dataframe读取多类型的点边,分别构建点边DataFrame,多种类型的点dataframe或者边类型的dataframe无法union 到一起,因为每个类型的点边上的属性个数是不一样的,无法union 成一个dataset,无法union成一个点边dataframe,也就无法构建Graphx 的图,请问这如何解决?看了下Spark DataFrame类型,还没有ListType,如果这么做是否只能自定义udf函数?
  2. 使用RDD方式去加载不同类型点边数据,应该是可以union成一个vertexRDD,一个edgeRDD,这样就可以去构建spark Graphx 的图,然后进行图计算,但这种方式,包括以后读文件,比如csv,json 就都要基于RDD,这种感觉不是很好,研究了Nebula-spark-connector,Nebula-algrithm,Nebula-exchange相关源码,发现也都是倾向于DataFrame去进行操作,但如果解决多种类型df union的问题?
  3. 查看了相关algrithm源码,发现Nebula-algrithm 模块去构建spark graphx 都是基于edgeDatarame去构建的,我想知道为什么这么做? 调用如下方式:
def loadInitGraph(dataSet: Dataset[Row], hasWeight: Boolean): Graph[None.type, Double]dataSet: Dataset[Row], hasWeight: Boolean): Graph[None.type, Double = {
    implicit val encoder: Encoder[Edge[Double]] = org.apache.spark.sql.Encoders=7.kryo[Edge[Double]]
    val edges: RDD[Edge[Double]] = dataSet
      .map(row => {
        if (hasWeight) {
          Edge(row.get(0).toString.toLong, row.get(1).toString.toLong, row.get(2).toString.toDouble)
        } else {
          Edge(row.get(0).toString.toLong, row.get(1).toString.toLong, 1.0)
        }
      })(encoder)
      .rdd

    Graph.fromEdges(edges, None)
  }

为啥采用只读取edge 的dataframe去构建 graphx 的图,而不是分别获得点dataFrame,边dataFrame,然后转换成Rdd去构建?

第一个问题和第三个问题可以一起回答哈,因为目前大多数通用型图算法在计算过程中是不会和点属性进行交互的,和边属性的交互最多也是和权重进行计算。所以我们在构建Graphx时 不考虑点的属性,只考虑边和边上的权重。
所以针对您的第一个问题,如果只关注边和边上权重,忽略其他无关的属性,那么多种类型的边数据就是同构的,完全可以进行union。(前提是都有权重列或都没有权重列)
针对第三个问题 我们认为没有边的点在图中相对孤立,意义不大,所以在给出来的jar包使用示例中 只是读取了边数据,从边数据中抽取源点和目标点作为点数据。

第二个问题:你可以认为在使用Graphx时,我们提供出来的RDD方式比DataFrame 离 Graphx的Graph更近一步,相当于帮用户进行了DF->VertexRDD的类型转换。 具体要使用哪一种,看个人需求,两者都可以

你好,针对第二个问题,我感觉理解了其中一部分,Graphx 本身是基于RDD的,而不是DF,如果使用DF 的话,还需要自己做一次到RDD的转换,这点确实是离Graphx Graph更进一步,但我有点不明白的是,转换之后的的类型如下:

vertexRDD: RDD[(VertexID, PropertyValues)]
edgeRDD: RDD[NebulaGraphxEdge]

实际是将属性值添加到了一个List里面了,然后基于vertexRDD, edgeRDD 去构建Graphx 的Graph,然后自己去定义图计算的规则,也就四自定义Pregel 函数,去实现图模式匹配,计算完成之后,得到的Graph对象数据还是基于RDD的,而其实更需要的是以二维表的形式,也就是DF格式方便写出去,比如一个,点的属性A,B,C作为不同的列,发现这种就是df 的格式,也就是说基于RDD计算完成之后转换不到df的形式,因为List里是各个属性的值,schema

VertexRDD的类型其实是这样的RDD[(VertexId, VD)], 由于从nebula中读取出来的点数据可能存在多个属性,多个属性其实是VertexRDD中的VD,故我们用list来存储。
如果你自定义的Pregel 与点属性无关,可以在执行Pregel之前对点数据进行update,将属性置空,这样会大大提升计算过程的效率。

计算完成之后结果RDD一般是(VertexId,algoResult), 至于algoResult是什么类型要看算法本身了,有可能是id,可能是一个double值,可能是一个map结构,这就需要根据结果类型自行进行RDD->DF的转换。

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。