GraphX加载Nebula数据缓慢

  • nebula 版本:2.0.0
  • 部署方式:单机
  • 是否为线上版本:N
  • 硬件信息
    • 磁盘 512SSD
    • CPU、内存信息 16核128G
  • 问题的具体描述
  • 相关的 meta / storage / graph info 日志信息

nebula中存了共(405283顶点,1737530边),如下算法只拉取其中的一部分边和点进行联通分量计算,实际运行过程中,发现数据加载过程缓慢,共用了2个多小时才将数据拉取完毕,该怎么优化解决?

import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.{EdgeRank, NebulaDataFrameReader, Prop}
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.graphx.{Graph, VertexId, VertexRDD}
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory

object NebulaConnectedComponents {

  private val LOG = LoggerFactory.getLogger(this.getClass)

  def connectedComponents(): Unit = {
    // 屏蔽日志
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    val sparkConf = new SparkConf
    sparkConf
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .config(sparkConf)
      .getOrCreate()

    LOG.info("start to read nebula vertices")

    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress("10.27.20.154:9559")
        .withTimeout(600000)
        .withConenctionRetry(2)
        .build()

    val ACline_dotReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("ACline_dot")
      .withNoColumn(false)
      .build()
    val ACline_dotVertexRDD = spark.read.nebula(config, ACline_dotReadVertexConfig).loadVerticesToGraphx()

    val BUSReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("BUS")
      .withNoColumn(false)
      .build()
    val BUSVertexRDD = spark.read.nebula(config, BUSReadVertexConfig).loadVerticesToGraphx()

    val C_PReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("C_P")
      .withNoColumn(false)
      .build()
    val C_PVertexRDD = spark.read.nebula(config, C_PReadVertexConfig).loadVerticesToGraphx()

    val C_SReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("C_S")
      .withNoColumn(false)
      .build()
    val C_SVertexRDD = spark.read.nebula(config, C_SReadVertexConfig).loadVerticesToGraphx()

    val ConvertorReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("Convertor")
      .withNoColumn(false)
      .build()
    val ConvertorVertexRDD = spark.read.nebula(config, ConvertorReadVertexConfig).loadVerticesToGraphx()

    val l_oadReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("l_oad")
      .withNoColumn(false)
      .build()
    val l_oadVertexRDD = spark.read.nebula(config, l_oadReadVertexConfig).loadVerticesToGraphx()

    val two_port_transformerReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("two_port_transformer")
      .withNoColumn(false)
      .build()
    val two_port_transformerVertexRDD = spark.read.nebula(config, two_port_transformerReadVertexConfig).loadVerticesToGraphx()

    val three_port_transformerReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("three_port_transformer")
      .withNoColumn(false)
      .build()
    val three_port_transformerVertexRDD = spark.read.nebula(config, three_port_transformerReadVertexConfig).loadVerticesToGraphx()

    val unitReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("unit")
      .withNoColumn(false)
      .build()
    val unitVertexRDD = spark.read.nebula(config, unitReadVertexConfig).loadVerticesToGraphx()

    val CNReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("CN")
      .withNoColumn(false)
      .build()
    val CNVertexRDD = spark.read.nebula(config, CNReadVertexConfig).loadVerticesToGraphx()

    val neutral_pointReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("neutral_point")
      .withNoColumn(false)
      .build()
    val neutral_pointVertexRDD = spark.read.nebula(config, neutral_pointReadVertexConfig).loadVerticesToGraphx()

    val vertexRDD = ACline_dotVertexRDD.union(BUSVertexRDD).union(C_PVertexRDD).union(C_SVertexRDD).union(ConvertorVertexRDD)
      .union(l_oadVertexRDD).union(two_port_transformerVertexRDD).union(three_port_transformerVertexRDD).union(unitVertexRDD).union(CNVertexRDD)
      .union(neutral_pointVertexRDD)

    val aclinedot_aclinedotReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("aclinedot_aclinedot")
      .withNoColumn(false)
      .build()
    val aclinedot_aclinedotEdgeRDD = spark.read.nebula(config, aclinedot_aclinedotReadEdgeConfig).loadEdgesToGraphx()

    val aclinedot_cnReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("aclinedot_cn")
      .withNoColumn(false)
      .build()
    val aclinedot_cnEdgeRDD = spark.read.nebula(config, aclinedot_cnReadEdgeConfig).loadEdgesToGraphx()

    val txI_txJ_transformerlineReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("txI_txJ_transformerline")
      .withNoColumn(false)
      .build()
    val txI_txJ_transformerlineEdgeRDD = spark.read.nebula(config, txI_txJ_transformerlineReadEdgeConfig).loadEdgesToGraphx()

    val CN_tx_twoReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("CN_tx_two")
      .withNoColumn(false)
      .build()
    val CN_tx_twoEdgeRDD = spark.read.nebula(config, CN_tx_twoReadEdgeConfig).loadEdgesToGraphx()

    val CN_tx_threeReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("CN_tx_three")
      .withNoColumn(false)
      .build()
    val CN_tx_threeEdgeRDD = spark.read.nebula(config, CN_tx_threeReadEdgeConfig).loadEdgesToGraphx()

    val neutral_threeReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("neutral_three")
      .withNoColumn(false)
      .build()
    val neutral_threeEdgeRDD = spark.read.nebula(config, neutral_threeReadEdgeConfig).loadEdgesToGraphx()

    val connected_Compensator_S_CNReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("connected_Compensator_S_CN")
      .withNoColumn(false)
      .build()
    val connected_Compensator_S_CNEdgeRDD = spark.read.nebula(config, connected_Compensator_S_CNReadEdgeConfig).loadEdgesToGraphx()

    val connected_Bus_CNReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("connected_Bus_CN")
      .withNoColumn(false)
      .build()
    val connected_Bus_CNEdgeRDD = spark.read.nebula(config, connected_Bus_CNReadEdgeConfig).loadEdgesToGraphx()

    val connected_Compensator_P_CNReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("connected_Compensator_P_CN")
      .withNoColumn(false)
      .build()
    val connected_Compensator_P_CNEdgeRDD = spark.read.nebula(config, connected_Compensator_P_CNReadEdgeConfig).loadEdgesToGraphx()

    val connected_Load_CNReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("connected_Load_CN")
      .withNoColumn(false)
      .build()
    val connected_Load_CNEdgeRDD = spark.read.nebula(config, connected_Load_CNReadEdgeConfig).loadEdgesToGraphx()

    val connected_Unit_CNReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("connected_Unit_CN")
      .withNoColumn(false)
      .build()
    val connected_Unit_CNEdgeRDD = spark.read.nebula(config, connected_Unit_CNReadEdgeConfig).loadEdgesToGraphx()

    val CN_CNReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("ems")
      .withLabel("CN_CN")
      .withNoColumn(false)
      .build()
    val CN_CNEdgeRDD = spark.read.nebula(config, CN_CNReadEdgeConfig).loadEdgesToGraphx()

    val edgeRDD = aclinedot_aclinedotEdgeRDD.union(aclinedot_cnEdgeRDD).union(txI_txJ_transformerlineEdgeRDD).union(CN_tx_twoEdgeRDD).union(CN_tx_threeEdgeRDD)
      .union(neutral_threeEdgeRDD).union(connected_Compensator_S_CNEdgeRDD).union(connected_Bus_CNEdgeRDD).union(connected_Compensator_P_CNEdgeRDD).union(connected_Load_CNEdgeRDD)
      .union(connected_Unit_CNEdgeRDD).union(CN_CNEdgeRDD)

    val graph = Graph(vertexRDD, edgeRDD)

    // 计算连通体
    val components = graph.connectedComponents()

    LOG.info("graph vertices record count: " + graph.vertices.count())
    LOG.info("graph edges record count: " + graph.edges.count())
    LOG.info("components triplets record count: " + components.triplets.count())
    // LOG.info("components vertices count: " + components.vertices.map(v => (v._2, 1)).groupBy(_._1).mapValues(_.size).collect().toList.sortBy(_._2))

    spark.close()
    sys.exit()
  }
}

cc @nicole

你用的是local模式,并发度提不上去的。没有设置spark的分区数默认会是100,local模式下最大的并发度就是你提交任务的机器的核数。

  1. 可以改成集群模式提高并发度,并给sparkSession设置大点的executor.memory;
  2. 在ReadNebulaConfig上设置分区数,可以是总核数的2-3倍。
1 个赞