- nebula 版本:2.0.0
- 部署方式:单机
- 是否为线上版本:N
- 硬件信息
- 磁盘 512SSD
- CPU、内存信息 16核128G
- 问题的具体描述
- 相关的 meta / storage / graph info 日志信息
nebula中存了共(405283顶点,1737530边),如下算法只拉取其中的一部分边和点进行联通分量计算,实际运行过程中,发现数据加载过程缓慢,共用了2个多小时才将数据拉取完毕,该怎么优化解决?
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.{EdgeRank, NebulaDataFrameReader, Prop}
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.graphx.{Graph, VertexId, VertexRDD}
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object NebulaConnectedComponents {
private val LOG = LoggerFactory.getLogger(this.getClass)
def connectedComponents(): Unit = {
// 屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val sparkConf = new SparkConf
sparkConf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local[*]")
.config(sparkConf)
.getOrCreate()
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("10.27.20.154:9559")
.withTimeout(600000)
.withConenctionRetry(2)
.build()
val ACline_dotReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("ACline_dot")
.withNoColumn(false)
.build()
val ACline_dotVertexRDD = spark.read.nebula(config, ACline_dotReadVertexConfig).loadVerticesToGraphx()
val BUSReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("BUS")
.withNoColumn(false)
.build()
val BUSVertexRDD = spark.read.nebula(config, BUSReadVertexConfig).loadVerticesToGraphx()
val C_PReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("C_P")
.withNoColumn(false)
.build()
val C_PVertexRDD = spark.read.nebula(config, C_PReadVertexConfig).loadVerticesToGraphx()
val C_SReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("C_S")
.withNoColumn(false)
.build()
val C_SVertexRDD = spark.read.nebula(config, C_SReadVertexConfig).loadVerticesToGraphx()
val ConvertorReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("Convertor")
.withNoColumn(false)
.build()
val ConvertorVertexRDD = spark.read.nebula(config, ConvertorReadVertexConfig).loadVerticesToGraphx()
val l_oadReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("l_oad")
.withNoColumn(false)
.build()
val l_oadVertexRDD = spark.read.nebula(config, l_oadReadVertexConfig).loadVerticesToGraphx()
val two_port_transformerReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("two_port_transformer")
.withNoColumn(false)
.build()
val two_port_transformerVertexRDD = spark.read.nebula(config, two_port_transformerReadVertexConfig).loadVerticesToGraphx()
val three_port_transformerReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("three_port_transformer")
.withNoColumn(false)
.build()
val three_port_transformerVertexRDD = spark.read.nebula(config, three_port_transformerReadVertexConfig).loadVerticesToGraphx()
val unitReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("unit")
.withNoColumn(false)
.build()
val unitVertexRDD = spark.read.nebula(config, unitReadVertexConfig).loadVerticesToGraphx()
val CNReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("CN")
.withNoColumn(false)
.build()
val CNVertexRDD = spark.read.nebula(config, CNReadVertexConfig).loadVerticesToGraphx()
val neutral_pointReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("neutral_point")
.withNoColumn(false)
.build()
val neutral_pointVertexRDD = spark.read.nebula(config, neutral_pointReadVertexConfig).loadVerticesToGraphx()
val vertexRDD = ACline_dotVertexRDD.union(BUSVertexRDD).union(C_PVertexRDD).union(C_SVertexRDD).union(ConvertorVertexRDD)
.union(l_oadVertexRDD).union(two_port_transformerVertexRDD).union(three_port_transformerVertexRDD).union(unitVertexRDD).union(CNVertexRDD)
.union(neutral_pointVertexRDD)
val aclinedot_aclinedotReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("aclinedot_aclinedot")
.withNoColumn(false)
.build()
val aclinedot_aclinedotEdgeRDD = spark.read.nebula(config, aclinedot_aclinedotReadEdgeConfig).loadEdgesToGraphx()
val aclinedot_cnReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("aclinedot_cn")
.withNoColumn(false)
.build()
val aclinedot_cnEdgeRDD = spark.read.nebula(config, aclinedot_cnReadEdgeConfig).loadEdgesToGraphx()
val txI_txJ_transformerlineReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("txI_txJ_transformerline")
.withNoColumn(false)
.build()
val txI_txJ_transformerlineEdgeRDD = spark.read.nebula(config, txI_txJ_transformerlineReadEdgeConfig).loadEdgesToGraphx()
val CN_tx_twoReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("CN_tx_two")
.withNoColumn(false)
.build()
val CN_tx_twoEdgeRDD = spark.read.nebula(config, CN_tx_twoReadEdgeConfig).loadEdgesToGraphx()
val CN_tx_threeReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("CN_tx_three")
.withNoColumn(false)
.build()
val CN_tx_threeEdgeRDD = spark.read.nebula(config, CN_tx_threeReadEdgeConfig).loadEdgesToGraphx()
val neutral_threeReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("neutral_three")
.withNoColumn(false)
.build()
val neutral_threeEdgeRDD = spark.read.nebula(config, neutral_threeReadEdgeConfig).loadEdgesToGraphx()
val connected_Compensator_S_CNReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("connected_Compensator_S_CN")
.withNoColumn(false)
.build()
val connected_Compensator_S_CNEdgeRDD = spark.read.nebula(config, connected_Compensator_S_CNReadEdgeConfig).loadEdgesToGraphx()
val connected_Bus_CNReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("connected_Bus_CN")
.withNoColumn(false)
.build()
val connected_Bus_CNEdgeRDD = spark.read.nebula(config, connected_Bus_CNReadEdgeConfig).loadEdgesToGraphx()
val connected_Compensator_P_CNReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("connected_Compensator_P_CN")
.withNoColumn(false)
.build()
val connected_Compensator_P_CNEdgeRDD = spark.read.nebula(config, connected_Compensator_P_CNReadEdgeConfig).loadEdgesToGraphx()
val connected_Load_CNReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("connected_Load_CN")
.withNoColumn(false)
.build()
val connected_Load_CNEdgeRDD = spark.read.nebula(config, connected_Load_CNReadEdgeConfig).loadEdgesToGraphx()
val connected_Unit_CNReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("connected_Unit_CN")
.withNoColumn(false)
.build()
val connected_Unit_CNEdgeRDD = spark.read.nebula(config, connected_Unit_CNReadEdgeConfig).loadEdgesToGraphx()
val CN_CNReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("ems")
.withLabel("CN_CN")
.withNoColumn(false)
.build()
val CN_CNEdgeRDD = spark.read.nebula(config, CN_CNReadEdgeConfig).loadEdgesToGraphx()
val edgeRDD = aclinedot_aclinedotEdgeRDD.union(aclinedot_cnEdgeRDD).union(txI_txJ_transformerlineEdgeRDD).union(CN_tx_twoEdgeRDD).union(CN_tx_threeEdgeRDD)
.union(neutral_threeEdgeRDD).union(connected_Compensator_S_CNEdgeRDD).union(connected_Bus_CNEdgeRDD).union(connected_Compensator_P_CNEdgeRDD).union(connected_Load_CNEdgeRDD)
.union(connected_Unit_CNEdgeRDD).union(CN_CNEdgeRDD)
val graph = Graph(vertexRDD, edgeRDD)
// 计算连通体
val components = graph.connectedComponents()
LOG.info("graph vertices record count: " + graph.vertices.count())
LOG.info("graph edges record count: " + graph.edges.count())
LOG.info("components triplets record count: " + components.triplets.count())
// LOG.info("components vertices count: " + components.vertices.map(v => (v._2, 1)).groupBy(_._1).mapValues(_.size).collect().toList.sortBy(_._2))
spark.close()
sys.exit()
}
}