基于nebula-exchange(V2.5.1)工具批量导入CSV文件踩坑之旅

1 背景

公司内部有使用图数据库的场景,内部通过技术选型确定了 Nebula Graph 图数据库,首先需要验证 Nebula Graph 数据库在实际业务场景下批量导入性能并验证。在这个过程中发现通过 Exchange 工具通过把 csv 文件放在 hdfs 上,分享下踩过的坑, 希望能让后人少弯路。

2 环境信息

  • Nebula Graph 版本:nebula:2.5.0
  • 部署方式(分布式 / 单机 / Docker / DBaaS):分布式
  • 硬件信息
    • 磁盘(SSD / HDD):SSD
    • CPU、内存信息:
      • 1*.200.66 128G (graph and meta)
      • 1*.200.67 128G (graph and meta)
      • 1*.200.68 128G (graph and meta)
      • 1*.120.10 128G (graph)
      • 1*.120.11 128G (graph)
      • 1*.120.12 128G (graph)
  • 数仓环境:Hadoop 3.2.1
  • 编译后生成 jar 包:nebula-exchange-2.5.1.jar
  • spark 环境:spark-2.4.7
  • maven环境:apache-maven-3.8.4

3 编译 Exchange

  1. 下载 pulsar-spark-connector_2.11,解压到本地 Maven 库的目录 io/streamnative/connectors
  2. 本地环境下克隆:git clone -b v2.5.1 https://github.com/vesoft-inc/nebula-spark-utils.git
  3. 切换到目录 cd nebula-spark-utils/nebula-exchange
  4. 打包 Nebula Exchange:mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true
  5. 压缩上传到测试环境下 /data/sysdir/nebula-spark-utils-2.5.1

4 更改迁移数据配置

见文件 /data/sysdir/nebula-spark-utils-2.5.1/nebula-spark-utils/nebula-exchange/target/classes/csv_application_2.conf,关键信息如下:

 # Nebula Graph相关配置
  nebula: {
    address:{
      # 指定Graph服务和所有Meta服务的IP地址和端口。
      # 如果有多台服务器,地址之间用英文逗号(,)分隔。
      # 格式: "ip1:port","ip2:port","ip3:port"
graph:["**.**.110.25:9669","**.**.110.26:9669","**.**.110.27:9669","**.**.120.10:9669","**.**.120.11:9669","**.**.120.12:9669"]
meta:["**.**.110.25:9559","**.**.110.26:9559","**.**.110.27:9559"]
 }

    # 指定拥有Nebula Graph写权限的用户名和密码。
    user: root
    pswd: ***

    # 指定图空间名称。
    space: test

5 创建 schema

create space test(vid_type = FIXED_STRING(64));
create tag entity(name string);
create edge relation(name string);

6 传到 hdfs

将测试数据切割后放入到hdfs上(因为文件太大,选取切割方式上传)

hadoop fs -put  /data/sysdir/nebula-spark-utils-2.5.1/dataset/*.csv  hdfs://hadoopCluster***/user/hive/warehouse/***.db/dataset/ 

7 数据集信息

  • edge.csv 139,951,301 计约:1.4亿条,6.6G
  • vertex.csv 74,314,635 计约:7千万,4.6G

合计条数: 214265936 计约:2.14亿,11.2G

8 执行导入命令

/data/sysdir/spark/spark-2.4.7/bin/spark-submit --master yarn --num-executors 50 --class com.vesoft.nebula.exchange.Exchange /data/***/nebula-spark-utils-2.5.1/nebula-exchange/target/nebula-exchange-2.5.1.jar  -c /data/***/nebula-spark-utils-2.5.1/nebula-spark-utils/nebula-exchange/target/classes/csv_application_2.conf

9 性能监控

10 结果验证

SUBMIT JOB STATS;
SHOW STATS;

11 踩坑阻力点

  1. 上传csv文件太大,我们采取的时候切割分段上传
  2. 要搞清楚nebula的存储节点,跟原数据节点的位置,配置文件里会有用到(我就写错了总在过程中报错)
  3. Nebula Graph 中 tagName 是大小写敏感的,tags 的配置中需要注意好大小写

12 感谢

vesoft 提供了宇宙性能最强的 Nebula Graph 图数据库,我们也在努力尽快让 Nebula Graph 这么好的产品运用起来

疑问

最后还有一些其他疑问?

  1. 本次数据量级是11.2G,耗时10分钟,然后我们又准备了36G的真实数据集(tag(1:8)、edge(5:18)),同样的方式耗时4.5h,为啥时间拉长了这么久?
  2. 下一步我们准备调研 Nebula 对事务的控制,有啥好的参考文献么?

验证由养乐多,七分饱先生撰写

Nebula 暂时事务支持功能,存储这边的老大也回复过类似的回复,如下:

nebula 2.6.0 的边事务是怎么实现的呢?

先说下边事务的背景,背景是上面提到的 Nebula 是存了两份边 2 个 kv,这 2 个 kv 可能会存在不同的节点上,这会导致如果有台机器挂了,其中有一条边可能是没有成功写入。所谓边事务或者叫 TOSS,它主要解决的问题就是当我们遇到其中有一台机器宕机时,存储层能够保证这两个边(出边和入边)的最终一致。这个一致性级别是最终一致,没有选择强一致是因为研发过程中碰到一些报错信息以及数据处理流程上的问题,最后选择了最终一致性。

再来说下 TOSS 处理的整体流程,先往第一个要写入数据的机器发正向边信息,在机器上写个标记,看标记有没有写成功,如果成功了进入到下一步,如果失败直接报错。第二步的话,把反向边信息从第一台机器发给第二台机器,能让存正向边的机器向第二台机器发送反向边信息的原因是,Nebula 中正反向边只有起点和终点调换了一个位置,所以存正向边的机器是完全可以拼出反向边。存反向边的机器收到之后,会直接写入边,并将它的写入结果成功与否告诉第一台机器。第一台机器收到这个写入结果之后,假设它是成功的,它就会把之前第一步写的标记删掉,同时换成正常的边,这时整个边的正常写入流程就完成了,这是一个链式的同步机制。

简单说下失败的流程,一开始第一台机器写失败了直接就报错;第一台机器成功之后,第二台机器写失败了,这种情况下机器一会有背景线程,会一直不断尝试修复第二台机器的边,保证和第一台机器一样。当中比较复杂的是,第一台机器会根据第二台机器返回的错误码进行处理。目前来说,所有的流程都会直接把标记删掉,直接换成正常的正向边,同时写些更额外的标记来表示现在需要恢复的失败边,让它们最终保持一致。

追问:点没有事务吗?

是这样,因为点是只存了一份,所以它是不需要事务的。一般来说,问这个问题的人是想强调点和边之间的事务,像插入边时看点是否存在,或者删除点时删除对应边。目前 Nebula 的悬挂点的设计是出于性能上的考虑。如果要解决上面的问题的话,会引入完整的事务,但这样性能会有个数量级的递减。顺便提下,刚说到 TOSS 是链式形式同步信息,上面也提到能这样做的原因是因为第一个节点能完整拼出第二个节点的数据。但链式的话对完整的事务而言,性能下降会更严重,所以未来事务这块的设计不会采纳这种方式。

内容摘录自 nebula-storage 直播,见文字稿 https://nebula-graph.com.cn/posts/nebula-storage-on-nlive/#nebula-的事务

问下有试过csv导入亿级数据么?

同问