- nebula 版本:2.6.1
- 部署方式:分布式
- 安装方式: RPM
- 是否为线上版本:Y
- 硬件信息
- 磁盘( 推荐使用 SSD)
两个10.9T磁盘
- CPU、内存信息
CPU 56 +32
内存
两个都为512G
-
问题的具体描述
问题:nebula集群版本在写入数据的时候经常有问题,nebula-storage无法启动,经常需要删除数据才能启动,基本上集群活不过一个星期。历史数据怎么办?
请问使用方法哪里不对?还是nebula集群不支持大数据量的写入? -
相关的 meta / storage / graph info 日志信息
日志信息:
配置文件:
nebula-storaged.conf (4.3 KB)
nebula-graphd.conf (3.2 KB)
nebula-metad.conf (1.9 KB)
代码 / 终端输出 / 日志…
代码: 循环写入边千亿条在写入到几十亿的时候集群就不行了。
def main(args: Array[String]): Unit = {
val params = new CommandLineArgs(args)
val configFileName = if (StringUtils.isEmpty(params.configFileName)) “archive-test.properties” else params.configFileName
val time1 = System.currentTimeMillis()
val conf = new SparkConf()
.setIfMissing("spark.master", "local[*]")
.set("spark.io.compression.codec", "snappy")
.set("spark.rpc.message.maxSize", "2046")
val spark = SparkSession.builder()
.config(conf)
.appName(getClass.getName)
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
val config = new Config(configFileName)
val numPartitions = config.numPartitions.toInt
val start = config.startNums.toLong
val end = config.endNums.toLong
val initNums = config.initNums.toLong
val part = config.part.toInt
val size = (end - start + 1) / part
// 写入到nebula中
val configNebula = NebulaConnectionConfig
.builder()
.withMetaAddress(config.meta)
.withGraphAddress(config.graph)
.withConenctionRetry(2)
.build()
val nebulaWriteEdgeConfig = WriteNebulaEdgeConfig
.builder()
.withSpace("matrix")
.withEdge("peer")
.withSrcIdField("source_aid")
.withDstIdField("target_aid")
.withSrcAsProperty(false)
.withDstAsProperty(false)
.withBatch(1000)
.build()
var playerList = ListBuffer[(String, String, Double)]()
for (i <- start to end) {
val source_aid = i + initNums
for (j <- 1 to 20) {
val target_aid = initNums + initNums - i - j
val weight = Random.nextInt(100) / 100.0
playerList.append((source_aid.toString, target_aid.toString, weight))
}
if (i % size == 0) {
import spark.implicits._
val edgeDF = spark.sparkContext.makeRDD(playerList).toDF("source_aid", "target_aid", "weight")
.repartition(numPartitions)
edgeDF.write.nebula(configNebula, nebulaWriteEdgeConfig).writeEdges()
playerList.clear()
}
}
val time2 = System.currentTimeMillis()
val timeDiff = (time2 - time1)
println("耗时 :" + timeDiff + "毫秒")
spark.close()
}