一 背景
1)多个业务使用同一套NebulaGraph集群,当其中某一个业务有大量耗时查询或者大量数据加载时,这时此集群的压力可能会急剧上升,导致所有的业务出现服务性能极度下降的情况。
2)NebulaGraph使用2.6.2版本,bug相对较多。
二 需求
1)为每个业务搭建一套属于自己的NebulaGraph集群;
2)将旧集群中的各个业务的数据迁移到自己业务对应的新集群中;
3)新集群需要使用3.8.0版本NebulaGraph;
4)旧的NebulaGraph集群下线。
三 数据迁移技术选型
通过官网查看有没有现成的工具之类的,发现:
1)社区版的NebulaGraph BR:限制是版本为3.x才能用;只能恢复到原集群,不可跨集群等等。这个就放弃了。
2)NebulaGraph Importer:导入数据使用。这个之前没怎么用过,应该是使用官方提供的工具执行相应的命令将数据发送到对应的集群。考虑到失败无法重试的缘故,就放弃了。
3)NebulaGraph Exchange:这个主要考虑到配置文件的繁琐性,且我们集群上的spark客户端一般不暴露出来直接使用,都是通过dolphinscheduler使用的。所以就没用。
4)NebulaGraph Spark Connector:连接器使用,通过官网的demo发现这个可以将Nebula数据写成DF格式,还可以将DF格式数据写入Nebula;且通过dolphinscheduler好控制。
总结:使用NebulaGraph Spark Connector进行数据迁移。
具体流程如下:
四 代码部分
1)NebulaGraph to HDFS
edge类型入HDFS为例:
package nebula
import com.vesoft.nebula.connector.connector.{NebulaDataFrameReader, NebulaDataFrameWriter}
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.sql.SparkSession
object Edge2HDFS {
def main(args: Array[String]): Unit = {
try {
if (args.length < 4) {
println("Usage: <space> <edge> <graph_host> <hdfs path>")
System.exit(-1)
}
val space = args(0)
val edge = args(1)
val graph_host = args(2)
val hdfsPath = args(3)
val spark:SparkSession =SparkSession.builder()
.appName(space + "_" + edge)
.getOrCreate()
val config = NebulaConnectionConfig
.builder()
.withMetaAddress(graph_host)
.withConenctionRetry(2)
.withExecuteRetry(2)
.withTimeout(36000000)
.build()
val edge_config = ReadNebulaConfig
.builder()
.withSpace(space)
.withLabel(edge)
.withNoColumn(false)
.withPartitionNum(400)
.build()
val edges = spark.read.nebula(config, edge_config).loadEdgesToDF()
edges.repartition(400).write.
format("json").
mode("overwrite").save(hdfsPath)
} catch {
case e: Exception => {
// 在这里捕获异常,并进行相应的处理
// 例如,记录异常信息、进行异常恢复操作等
e.printStackTrace() // 这里只是简单地打印异常信息,你可以根据需要进行适当的处理
}
}
}
}
2)HDFS to NebulaGraph
edge类型入NebulaGraph为例:
package com.nebula
import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter
import com.vesoft.nebula.connector.{NebulaConnectionConfig, WriteNebulaEdgeConfig}
import org.apache.spark.sql.SparkSession
object Edge2nebula {
def main(args: Array[String]): Unit = {
try {
if (args.length < 5) {
println("Usage: <space> <edge> <graph_host> <hdfsPath> <meta_host>")
System.exit(-1)
}
val space = args(0)
val edge = args(1)
val graph_host = args(2)
val hdfsPath = args(3)
val meta_host = args(4)
val _SPARK_SESSION: SparkSession = SparkSession.builder()
.appName(edge)
.getOrCreate()
val config = NebulaConnectionConfig
.builder()
.withMetaAddress(meta_host)
.withGraphAddress(graph_host)
.withConenctionRetry(2)
.withTimeout(36000000)
.build()
// HDFS 目录路径
val edgeHdfsPath = s"hdfs://HACluster" + hdfsPath + "/*.json"
var edgeDF = _SPARK_SESSION.read
.format("json")
.load(edgeHdfsPath)
edgeDF = edgeDF.withColumnRenamed("_srcId", "_vertexId")
edgeDF = edgeDF.withColumnRenamed("_dstId", "dstId")
edgeDF = edgeDF.withColumnRenamed("_rank", "rank")
edgeDF.show(10)
val edgeCount = edgeDF.count()
// 打印记录总数
println(s"Total number of rows: $edgeCount")
val nebulaWriteContainConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
.builder()
.withSpace(space)
.withEdge(edge)
.withSrcIdField("_vertexId")
.withSrcPolicy(null)
.withDstIdField("dstId")
.withDstPolicy(null)
.withRankField("rank")
.withUser("root")
.withPasswd("nebula")
.withBatch(50)
.build()
edgeDF.write.nebula(config, nebulaWriteContainConfig).writeEdges()
} catch {
case e: Exception => {
// 在这里捕获异常,并进行相应的处理
// 例如,记录异常信息、进行异常恢复操作等
e.printStackTrace() // 这里只是简单地打印异常信息,你可以根据需要进行适当的处理
}
}
}
}
将代码写好编译成jar包上传到dolphinscheduler并通过dolphinscheduler创建对应的工作流就可以用了。
贴一下spark参数:
五 遇到的问题
1)读写时可能会遇到:com.vesoft.nebula.client.graph.exception.IOErrorException: java.net.SocketTimeoutException: Read timed out问题,这种问题是我们设置的超时时间设置的太短了。我们把withTimeout(36000000)设置的大一些就好了。
2)入数据时可能会遇到:RaftPart buffer is full问题,这种问题就是并发设置的太大了,调整spark参数,降低Executor数量,batch数量等,减小并发。
3)如果遇到NebulaGraph中创建tag和edge使用的属性类型和数据类型不一致的脏数据的话,就无法正常读取数据到HDFS,这时候需要下载官网的nebula-spark-connector源码调整相应的部分,编译打包上传到自己的maven库引用,然后将脏数据找到手动处理在进行下一步操作了。
如有错误欢迎大家指正,共同学习。
最后祝大家在新的一年里顺风顺水!