exchange 写sst的性能问题

背景

目前在使用exchange的2.0版本,使用的spark2.4的模块进行导数据,发现在导一个边200亿数据的时候速度比较慢。
版本:exchange 2.0,spark2.4
spark参数:100executor节点,单节点1core,堆内内存10g,堆外2g

打印代码执行耗时

分析发现主要集中在这个代码的140行
https://github.com/vesoft-inc/nebula-exchange/blob/master/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala
打印全部的执行逻辑:generateSstFile.writeSstFiles(iterator,
fileBaseConfig,
partitionNum,
namenode,
batchFailure)
也就是https://github.com/vesoft-inc/nebula-exchange/blob/master/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/FileBaseWriter.scala的63行

代码分析

  def writeSstFiles(iterator: Iterator[Row],
                    fileBaseConfig: FileBaseSinkConfigEntry,
                    partitionNum: Int,
                    namenode: String,
                    batchFailure: LongAccumulator): Unit = {
    val taskID                  = TaskContext.get().taskAttemptId()
    var writer: NebulaSSTWriter = null
    var currentPart             = -1
    val localPath               = fileBaseConfig.localPath
    val remotePath              = fileBaseConfig.remotePath
    try {
      iterator.foreach { vertex =>
        val key   = vertex.getAs[Array[Byte]](0)
        val value = vertex.getAs[Array[Byte]](1)
        var part = ByteBuffer
          .wrap(key, 0, 4)
          .order(ByteOrder.nativeOrder)
          .getInt >> 8
        if (part <= 0) {
          part = part + partitionNum
        }

        if (part != currentPart) {
          if (writer != null) {
            writer.close()
            val localFile = s"$localPath/$currentPart-$taskID.sst"
            HDFSUtils.upload(localFile,
                             s"$remotePath/${currentPart}/$currentPart-$taskID.sst",
                             namenode)
            Files.delete(Paths.get(localFile))
          }
          currentPart = part
          val tmp = s"$localPath/$currentPart-$taskID.sst"
          writer = new NebulaSSTWriter(tmp)
          writer.prepare()
        }
        writer.write(key, value)
      }
    } catch {
      case e: Throwable => {
        LOG.error("sst file write error,", e)
        batchFailure.add(1)
      }
    } finally {
      if (writer != null) {
        writer.close()
        val localFile = s"$localPath/$currentPart-$taskID.sst"
        HDFSUtils.upload(localFile,
                         s"$remotePath/${currentPart}/$currentPart-$taskID.sst",
                         namenode)
        Files.delete(Paths.get(localFile))
      }
    }
  }

第一次执行foreach用了1个小时左右,观察的日志执行是
开始执行:val remotePath = fileBaseConfig.remotePath
执行结束:writer = new NebulaSSTWriter(tmp)

后续foreach执行写sst逻辑,我在foreach内部外部都加上了耗时统计
内部foreach:所有的foreach单次耗时之和(单次结束时间-单次开始时间=605秒
开始时间:val key = vertex.getAsArray[Byte]
结束时间: writer.write(key, value)
外部foreach:结束时间-开始时间=4327秒
开始时间: val remotePath = fileBaseConfig.remotePath
结束时间: Files.delete(Paths.get(localFile))

问题

(1)第一次执行foreach为什么会耗时一个小时才开始执行?
个人测试这块的耗时代码为:.sortWithinPartitions(“key”)
(2)foreach的执行耗时为什么内部执行逻辑并没有耗时太久,但是整体耗时太多,多出的这部分费在哪里了?foreach需要读远程数据导致的耗时吗?
这里远程读文件过程耗时比较多

现场情况

所以executor的堆内堆外内存,设置多大,有没有最佳实践

是的,整个生成sst 比较耗时的部分一个是去重,一个是排序。
因为sst文件是要求严格升序的,这就要求先对key进行去重,再进行分区内部排序。

图中看到的shuffle read是foreach前进行的distinct和order中的过程,foreach是action操作,会触发前面所有的transform操作,所以第一次foreach时会耗时很久。 foreach内部没有远程读数据的操作,但在去重时由于该操作是shuffle操作 所以会有executor的写和读的过程。

3 个赞

对于spark普通任务来说这种大数据的任务可以拆分成小task来执行。
但是exchange的最后一步的task,开启repartitionWithNebula: true后,是按space的partition来划分的就限制了大数据量下的任务大小,无法提升更高的并行度,成为性能瓶颈:

总结:这种writesst的逻辑,因为需要有序的写单个sst文件,导致无法拆分小的task执行,使spark无法提高并行度,所以对大数据量的task无法起到性能提升作用,从而成为性能瓶颈,这种情况下怎么解决呢?

你可以不用repartitionWithNebula这个配置,最后写sst文件的并发数就是你所配置的shuffle partitions的值。 但这样会生成很多个sst文件,写文件性能上去了,但后面还有download和ingest的操作,这两个操作会因为sst文件数多而速度慢。

1 个赞

目前测试不加repartitionWithNebula存在丢数据的问题

是的,这是个已知问题,后面要发的版本没有这个问题了

大佬有fix这个问题的链接吗,我学习一下更改的版本是多少,我这边拉的代码可能还没更新

remove tagless for sst file by Nicole00 · Pull Request #98 · vesoft-inc/nebula-exchange · GitHub 这个pr已经合入

你可以用这个链接下面的exchange包 remove tagless for sst file (#98) · vesoft-inc/nebula-exchange@775f7e7 · GitHub

2 个赞

这个链接的改动我看是取消了https://github.com/vesoft-inc/nebula-exchange/pull/63 新增的代码
我看了一下issue 63的代码改动注释:add vertex without tag for sst
这两次改动的原因主要是,针对要不要增加orphanVertexKey
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)

可以帮忙解释一下为什么需要增加orphanVertexKey,增加后为什么会丢数据以及取消这个key后为什么不影响sst的生成吗?感谢 :clap:

1 个赞

增加和取消 orphanVertexKey 是为了与图数据的数据存储保持同步

fix tagless by nevermore3 · Pull Request #4652 · vesoft-inc/nebula · GitHub

增加后丢数据是因为重分区后在同一分区内部数据写入时key被覆盖 Whether to forcibly enable repartitioning when the number of nebula space partitions is greater than 1 · Issue #71 · vesoft-inc/nebula-exchange · GitHub

2 个赞

此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。