一 背景
由于多个业务现部署在同一套环境上,当查询或加载压力上升的时候,所有的业务都会收到影响,很难定位到具体是哪各业务导致的,所以准备将集群的业务分开,各自使用对应的集群。
二 需求
1)将业务分开,分别部署对应的集群,这块选择使用新的机器部署新的集群,数据迁移完毕后,归还老的机器,防止迁移失败导致的一系列问题
2)版本需要升级,老集群2.6.2,新集群需要使用3.8.0
三 技术选型
通过官网查看有没有现成的工具之类的,发现:
1)社区版的NebulaGraph BR:限制是版本为3.x才能用;只能恢复到原集群,不可跨集群等等。这个就放弃了。
2)NebulaGraph Importer:导入数据使用。这个之前没怎么用过,应该是使用官方提供的工具执行相应的命令将数据发送到对应的集群。考虑到失败无法重试的缘故,就放弃了。
3)NebulaGraph Exchange:这个主要考虑到配置文件的繁琐性,且我们集群上的spark客户端一般不暴露出来直接使用,都是通过dolphinscheduler使用的。所以就没用。
4)NebulaGraph Spark Connector:连接器使用,通过官网的demo发现这个可以将Nebula数据写成DF格式,还可以将DF格式数据写入Nebula;且通过dolphinscheduler好控制,就用了这个。
具体流程如下:
四 代码部分
1)NebulaGraph to HDFS
edge类型入HDFS为例:
package nebula
import com.vesoft.nebula.connector.connector.{NebulaDataFrameReader, NebulaDataFrameWriter}
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.sql.SparkSession
object Edge2HDFS {
def main(args: Array[String]): Unit = {
try {
if (args.length < 4) {
println("Usage: <space> <edge> <graph_host> <hdfs path>")
System.exit(-1)
}
val space = args(0)
val edge = args(1)
val graph_host = args(2)
val hdfsPath = args(3)
val spark:SparkSession =SparkSession.builder()
.appName(space + "_" + edge)
.getOrCreate()
val config = NebulaConnectionConfig
.builder()
.withMetaAddress(graph_host)
.withConenctionRetry(2)
.withExecuteRetry(2)
.withTimeout(36000000)
.build()
val edge_config = ReadNebulaConfig
.builder()
.withSpace(space)
.withLabel(edge)
.withNoColumn(false)
.withPartitionNum(400)
.build()
val edges = spark.read.nebula(config, edge_config).loadEdgesToDF()
edges.repartition(400).write.
format("json").
mode("overwrite").save(hdfsPath)
} catch {
case e: Exception => {
// 在这里捕获异常,并进行相应的处理
// 例如,记录异常信息、进行异常恢复操作等
e.printStackTrace() // 这里只是简单地打印异常信息,你可以根据需要进行适当的处理
}
}
}
}
2)HDFS to NebulaGraph
edge类型入NebulaGraph为例:
package com.nebula
import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter
import com.vesoft.nebula.connector.{NebulaConnectionConfig, WriteNebulaEdgeConfig}
import org.apache.spark.sql.SparkSession
object Edge2nebula {
def main(args: Array[String]): Unit = {
try {
if (args.length < 5) {
println("Usage: <space> <edge> <graph_host> <hdfsPath> <meta_host>")
System.exit(-1)
}
val space = args(0)
val edge = args(1)
val graph_host = args(2)
val hdfsPath = args(3)
val meta_host = args(4)
val _SPARK_SESSION: SparkSession = SparkSession.builder()
.appName(edge)
.getOrCreate()
val config = NebulaConnectionConfig
.builder()
.withMetaAddress(meta_host)
.withGraphAddress(graph_host)
.withConenctionRetry(2)
.withTimeout(36000000)
.build()
// HDFS 目录路径
val edgeHdfsPath = s"hdfs://HACluster" + hdfsPath + "/*.json"
var edgeDF = _SPARK_SESSION.read
.format("json")
.load(edgeHdfsPath)
edgeDF = edgeDF.withColumnRenamed("_srcId", "_vertexId")
edgeDF = edgeDF.withColumnRenamed("_dstId", "dstId")
edgeDF = edgeDF.withColumnRenamed("_rank", "rank")
edgeDF.show(10)
val edgeCount = edgeDF.count()
// 打印记录总数
println(s"Total number of rows: $edgeCount")
val nebulaWriteContainConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
.builder()
.withSpace(space)
.withEdge(edge)
.withSrcIdField("_vertexId")
.withSrcPolicy(null)
.withDstIdField("dstId")
.withDstPolicy(null)
.withRankField("rank")
.withUser("root")
.withPasswd("nebula")
.withBatch(50)
.build()
edgeDF.write.nebula(config, nebulaWriteContainConfig).writeEdges()
} catch {
case e: Exception => {
// 在这里捕获异常,并进行相应的处理
// 例如,记录异常信息、进行异常恢复操作等
e.printStackTrace() // 这里只是简单地打印异常信息,你可以根据需要进行适当的处理
}
}
}
}
将代码写好编译成jar包上传到ds并通过ds创建对应的工作流就可以用了。
贴一下spark参数:
五 遇到的问题
1)读写时可能会遇到:com.vesoft.nebula.client.graph.exception.IOErrorException: java.net.SocketTimeoutException: Read timed out问题,这种问题是我们设置的超时时间设置的太短了。我们把withTimeout(36000000)设置的大一些就好了。
2)入数据时可能会遇到:RaftPart buffer is full问题,这种问题就是并发设置的太大了,调整spark参数,降低Executor数量,batch数量等,减小并发。
3)如果遇到NebulaGraph中创建tag和edge使用的属性类型和数据类型不一致的脏数据的话,就无法正常读取数据到HDFS,这时候需要下载官网的nebula-spark-connector源码调整相应的部分,编译打包上传到自己的maven库引用,然后将脏数据找到手动处理在进行下一步操作了。
如有错误欢迎大家指正,共同学习。
最后祝大家在新的一年里顺风顺水!