Nebula Exchange 2.0 从HDFS以spark各种模式提交,导入Nebula(以Json为例)

Extracting Json from HDFS into Nebula Graph

参考:

Nebula Exchange 2.0 使用简介

Nebula Exchange 1.0 使用详细步骤

论坛

一,公司集群(YARN)导入实验

0)打包Nebula Exchange 2.0

Nebula Exchange 2.0 使用简介

1)环境准备

​ hadoop-2.7.0 hdp-safe-cloud账号

​ spark-2.4.5 yarn-client模式

   Nebula v2.0.0-Beta

2)schema准备
-- 创建图空间
CREATE SPACE json (partition_num=1, replica_factor=1,vid_type=fixed_string(16));

-- 选择图空间 json
USE json;

-- 创建标签 source
CREATE TAG source (srcId int);

-- 创建标签 target
CREATE TAG target (dstId int);

-- 创建边类型 like
CREATE EDGE like (likeness double);

注意:

  • Nebula Graph V2.0,默认是以String为VID

    • 顶点的VID务必为String类型
    • 如果VID已经设置为String类型,但是还是没有录入,原因是,String的长度不够,
      • 在CREATE SPACE 语句中指定vid_type=fixed_string(字节数)
  • 因VID问题容易出现的问题:

    • spark运行exchange报出错误
    21/01/21 16:02:05 ERROR NebulaGraphClientWriter: write vertex failed for Request to storage failed, without failedCodes.
    21/01/21 16:02:06 ERROR NebulaGraphClientWriter: write edge failed for Request to storage failed, without failedCodes.
    
3)配置文件:

vi一个配置文件application.conf,放在jar包同目录下

{
  # Spark 相关配置
  spark: {
    app: {
      name: Spark Writer
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    cores {
      max: 16
    }
  }

  # Nebula Graph 相关配置
  nebula: {
    address:{
    #####此处确保你集群开启的graphd与这里的列表对应
    #####通过配置这里,可以借由spark异地(不在nebula集群的任何节点上)提交任务,到集群
	graph:["10.160.133.174:3699","10.160.133.181:3699","10.160.133.199:3699"]
	meta:["10.160.133.174:45500","10.160.133.181:45500","10.160.133.199:45500"]
    
    }
    # 填写的账号必须拥有 Nebula Graph 相应图空间的写数据权限
    user: user
    pswd: password

    space: json

    connection {
      timeout: 3000
      retry: 3
    }

    execution {
      retry: 3
    }

    error: {
      max: 32
      output: /test1 #当spark job提交后,从hdfs中拉数据并转换为sst的过程中会在控制台报错,而向Nebula write数据时候,会在hdfs中生产错误文档,因此,spark-submit的用户一定要具有这个error output path目录的hdfs写权限
    }

    rate: {
      limit: 1024
      timeout: 1000
    }
  }

  # 处理标签
  tags: [
    # 设置顶一个顶点
    {
      name: source
      type: {
        source: json
        sink: client
      }
      #  hdfs://namenode:9000/路径
      path: "hdfs://namenode:9000/home/zy/test1/test.json"
# 取数据中的列
      fields: ["source"] #如果数据中列标签有source header则写header,如果没有就用  _c0,_c1代替header
# 映射为schema的属性
      nebula.fields: ["srcId"] #这个是nebula schema中定义的顶点或边属性,必须与fields对应! 
      
      vertex: source

      batch: 256
      partition: 32

      isImplicit: true
    }
    # 第二个顶点,中间无逗号
    {
      name: target
      type: {
        source: json
        sink: client
      }
      path: "hdfs://namenode:9000/home/zy/test1/test.json"
      fields: ["target"]
      nebula.fields: ["dstId"]
      
      vertex: target
      
      batch: 256
      partition: 32
      
      isImplicit: true
    }
  # 如果还有其他标签,参考以上配置添加,确保在这个tags:[]里面
  ]

  # 处理边数据
  edges: [
    {
      name: like
      type: {
        source: json
        sink: client
      }
      path: "hdfs://namenode:9000/home/zy/test1/test.json"

      fields: ["likeness"]        #
      nebula.fields: ["likeness"]

      source: "source"      #如果数据无header,则以_c0代表第1列;因为是Json格式,所以一定有Header
      target: "target"

      batch: 256
      partition: 32
      
      isImplicit: true
    }
    # 如果还有其他边类型,参考以上配置添加
  ]
}

注意点:

  • fileds属性:是从源数据表中要拉取的列ID或者name

    不管json还是csv,

    • 如果有header一定要在fields中写出需要拉取的header,
    • 如果无header一定要在fields中以_cn,代表你要选择的第n列
      • _c0选择第1列, _c1选择第2列,……
  • nebula.fields属性:是之前创建schema的顶点或者边的属性名

    • fields必须与nebula.fields对应
  • 边的source和target

    • 一定不在边的fields中
    • 在边源数据文件的某些列,如果有header,写header,如果没有header,写_cn
  • path: 是HDFS数据文件的URL

  • hdfs://namenode:9000/路径

  • graph和metad列表

    • graph列表和metad列表一定不可以使用中文逗号
    • 不可以使用域名
    • 列表中设计的机器,在现实集群环境中,务必确保对应的服务是开启的
      • 否则spark submit报错,connection refused
4)数据准备

​ test.json,运行:hadoop fs -put 到数据目录

{"source":53802643,"target":87847387,"likeness":0.34}
{"source":29509860,"target":57501950,"likeness":0.40}
{"source":97319348,"target":50240344,"likeness":0.77}
{"source":94295709,"target":8189720,"likeness":0.82}
{"source":78707720,"target":53874070,"likeness":0.98}
{"source":23399562,"target":20136097,"likeness":0.47}
5)提交spark job
//正确启动
/usr/bin/hadoop/software/spark2.4/bin/spark-submit \
--master yarn-client \
--class com.vesoft.nebula.exchange.Exchange \
nebula-exchange-2.0.0.jar  \
-c ./json_app.conf

注意:

  • -c是jar包中的参数,一定是本地文件,当我们spark-submit时候需要driver拉取本地配置文件,所以,spark运行模式只能使用
    • local
    • standalone
    • yarn-client:yarn-client以提交job的机器为driver
  • 如果以yarn-cluster提交作业,则会随机选择集群中的一台机器作为driver
    • 此时必须将配置文件复制到全集群
    • 或者利用**–files动态获取外部文件**
      • 需要–files将配置文件提交到 hdfs://namenode:9000/home/yarn/spark/cache/zy/.sparkStaging/application_1608801054019_1336053/application.conf
      • 在源码com.vesoft.nebula.exchange.Exchange中以 System.getenv(“SPARK_YARN_STAGING_DIR”) 动态获取字符串hdfs://namenode:9000/home/yarn/spark/cache/zy/.sparkStaging/application_1608801054019_1336053/
      • 然后System.getenv(“SPARK_YARN_STAGING_DIR”)+“/application.json”,将配置文件读取为RDD,调用RDD中的一列key作为配置

二,测试环境(Standalone)导入

参考:

Nebula Exchange 2.0 使用简介

Nebula Exchange 1.0 使用详细步骤

实验准备

hadoop-2.9.2

spark-2.4.0 standalone

scala-2.11.11

  • 首先将nebula-java和nebula-exchange打包

    参考文首提供的打包教程:

    注意:如果你是由于官方提供的打包,默认是最新版的2.0 rc1,而如果你使用的是2.0 beta,那么就需要在nebula-spark-utils/nebula-exchange的pom.xml中修改nebula的依赖为2.0-beta

  • 在 Nebula-Console / Nebula-Studio 的 web Console 中创建一个图空间,并定义schema

    -- 创建图空间
    CREATE SPACE json (partition_num=10, replica_factor=1);
    
    -- 选择图空间 json
    USE json;
    
    -- 创建标签 source
    CREATE TAG source (srcId int);
    
    -- 创建标签 target
    CREATE TAG target (dstId int);
    
    -- 创建边类型 like
    CREATE EDGE like (likeness double);
    
  • 将数据上传到hdfs

    • test.json(由于本例特殊,所有数据可以通过绑定同一个json文件,所以不需要区分)

      {"source":53802643,"target":87847387,"likeness":0.34}
      {"source":29509860,"target":57501950,"likeness":0.40}
      {"source":97319348,"target":50240344,"likeness":0.77}
      {"source":94295709,"target":8189720,"likeness":0.82}
      {"source":78707720,"target":53874070,"likeness":0.98}
      {"source":23399562,"target":20136097,"likeness":0.47}
      
  • 编写spark作业配置文件application.conf

    • application.conf(放在本地,也可以放在hdfs)

      {
        # Spark 相关配置
        spark: {
          app: {
            name: Spark Writer
          }
      
          driver: {
            cores: 1
            maxResultSize: 1G
          }
      
          cores {
            max: 16
          }
        }
      
        # Nebula Graph 相关配置
        nebula: {
          address:{
      		#####此处确保你集群开启的graphd与这里的列表对应
              #####通过配置这里,可以借由spark异地(不在nebula集群的任何节点上)提交任务,到集群
            graph:["172.17.0.11:3699","172.17.0.16:3699","172.17.0.18:3699"]
            meta:["172.17.0.11:45500","172.17.0.16:45500","172.17.0.18:45500"]
          }
          # 填写的账号必须拥有 Nebula Graph 相应图空间的写数据权限
          user: user
          pswd: password
      
          # 填写 Nebula Graph 中需要写入数据的图空间名称
          space: json
      
          connection {
            timeout: 3000
            retry: 3
          }
      
          execution {
            retry: 3
          }
      
          error: {
            max: 32
            output: /home/hdd-safe-cloud/zhaoyang10/test1
          }
      
          rate: {
            limit: 1024
            timeout: 1000
          }
        }
      
        # 处理标签
        tags: [
          # 设置标签 source 相关信息
          {
            # 设置为 Nebula Graph 中对应的标签名称
            name: source
            type: {
              # 指定数据源文件格式,设置为 json。
              source: json
      
              # 指定标签数据导入 Nebula Graph 的方式,
              # 可以设置为:client(以客户端形式导入)和 sst(以 SST 文件格式导入)。
              # 关于 SST 文件导入配置,参考文档:导入 SST 文件。
              sink: client
            }
      
            # JSON 文件所在的 HDFS 路径,String 类型,必须以 hdfs:// 开头。
            path: "hdfs://master:9000/nebula/test1/test.json" # spark集群入口
      
            # 在 fields 里指定 JSON 文件中 key 名称,其对应的 value
            # 会作为 Nebula Graph 中指定属性 srcId 的数据源
            # 如果需要指定多个值,用英文逗号(,)隔开
            fields: ["source"]
            nebula.fields: ["srcId"]
      
            # 将 JSON 文件中某个 key 对应的值作为 Nebula Graph 中点 VID 的来源
            # 如果 VID 源数据不是 int 类型,则使用以下内容来代替 vertex 的设置,在其中指定 VID 映射策略,建议设置为 "hash"。
            # vertex: {
            #   field: key_name_in_json
            #   policy: "hash"
            # }
            vertex: source
      
            batch: 256
            partition: 32
      
            # isImplicit 设置说明,详见 https://github.com/vesoft-inc/
            # nebula-java/blob/v1.0/tools/exchange/src/main/resources/
            # application.conf
            isImplicit: true
          }
          # 设置标签 target 相关信息
          {
            name: target
            type: {
              source: json
              sink: client
            }
            path: "hdfs://master:9000/nebula/test1/test.json"
            fields: ["target"]
            nebula.fields: ["dstId"]
            vertex: "target"
            batch: 256
            partition: 32
            isImplicit: true
          }
        # 如果还有其他标签,参考以上配置添加
        ]
      
        # 处理边数据
        edges: [
          # 设置边类型 like 相关信息
          {
            # Nebula Graph 中对应的边类型名称。
            name: like
            type: {
              # 指定数据源文件格式,设置为 json。
              source: json
      
              # 指定边数据导入 Nebula Graph 的方式,
              # 可以设置为:client(以客户端形式导入)和 sst(以 SST 文件格式导入)。
              # 关于 SST 文件导入配置,参考文档:导入 SST 文件(https://
              # docs.nebula-graph.com.cn/nebula-exchange/
              # use-exchange/ex-ug-import-sst/)。
              sink: client
            }
      
            # 指定 JSON 文件所在的 HDFS 路径,String 类型,必须以 hdfs:// 开头。
            path: "hdfs://master:9000/nebula/test1/test.json"
      
            # 在 fields 里指定 JSON 文件中 key 名称,其对应的 value
            # 会作为 Nebula Graph 中指定属性 likeness 的数据源
            # 如果需要指定多个值,用英文逗号(,)隔开
            fields: ["likeness"]
            nebula.fields: ["likeness"]
      
            # 将 JSON 文件中某两个 key 对应的值作为 Nebula Graph 中边起点和边终点 VID 的来源
            # 如果 VID 源数据不是 int 类型,则使用以下内容来代替 source 
            # 和/或 target 的设置,在其中指定 VID 映射策略,建议设置为 "hash"。
            # source: {
            #   field: key_name_in_json
            #   policy: "hash"
            # }
            # target: {
            #   field: key_name_in_json
            #   policy: "hash"
            # }
            source: "source"
            target: "target"
      
            batch: 256
            partition: 32
            isImplicit: true
          }
          # 如果还有其他边类型,参考以上配置添加
        ]
      }
      

    注意点:

    • graph和meta哪里一定不能是中文逗号
    • 而且graph和meta的启动清空一定要与显示机器的启动情况一致,不大于现实情况
    • error.output是hadoop中的目录,一定要有write权限
  • spark-submit运行(standalone

    $SPARK_HOME/bin/spark-submit --class com.vesoft.nebula.exchange.Exchange --master local nebula-exchange-2.0.0.jar -c ./conf/application.conf
    

若reader过程中遇到问题请参照

5 个赞

感谢大佬总结的踩坑经验~~ exchange2.0在不断更新和完善导入功能,在大佬的帖子中有几项可能和最新的exchange相比会有些delay,可能需要大佬更新下哈:

  1. NebulaGraphV2 默认的vid_type是String。 但导入时 设定为vid的源数据可以是任何类型,现在不局限于String类型的。
  1. 可以把 “如果VID已经设置为xx” 与 “因VID问题容易出现的问题” 合为一条,前者是说明后者中错误出现的原因。

  2. 关于配置文件,如果导入的数据源时json,无需配置header的,json不需要header。 只有csv会需要header的配置项。

  1. 目前exchange支持 源数据中同一个字段 既可以作为Nebula中点的vid(或边的source、target、rank),同时又可作为Nebula 的属性导入
  1. 无法访问yarn集群的所有机器的时的解决方法赞赞赞, 大佬可以描述下-files对应的地址不,里面涉及到applicationId,其他用户如果需要上传,上传的目录是什么,如何知道Spark提交应用程序时使用的暂存目录是什么呢。
1 个赞

原来我踩过的坑都被修复了QAQ,,,动态读取配置,–files会传到一个缓存地址里面,就是您试一下–files,会在info中提示,上传到用户目录的一个.sparkStaging/job_id中,通过SPARK_YARN_STAGING_DIR参数获得这个job_id缓存的路径。如果觉得烦或者出现什么问题,直接把配置文件上传到HDFS指定目录,在程序读取到RDD,然后封装到哪个options里面(我就是这么干的)。

1 个赞

不过我发现System.getenv(“SPARK_YARN_STAGING_DIR”)得到的居然是null
然后我使用如下逻辑,


因此基本上是解决了从–files读取数据的问题

2 个赞

请教一下,batch和partition参数分别是什么含义,控制是是什么内容呢?

@GuangFeiZHU 文档中有对这两个参数含义的说明
https://docs.nebula-graph.com.cn/1.2.0/nebula-exchange/use-exchange/ex-ug-import-from-csv/

已经看到,如果

,配置了很多tag,那么这些tags列表中的任务是并发执行的吗?

单个tag的数据是并发导入,并发数取决于Spark的partition数和你提交任务时起的task数,多个tag之间是顺序执行。

比如我提交的executor_cores乘以num_executors=20, partition配置了32,那么我并发的任务数就是 20*32吗?executor_cores乘以num_executors乘以partition

不是的,Spark的并发是这样的:
有m个task数可以用来处理数据,数据有n个partition数, 一个task一次处理一个partition数据。这里的概念和多线程的概念有些不同,可以做下转换。
你可以把task当成线程,把partition当成线程要处理的任务,这样并发就可以根据线程池的原理来理解了。
当m>n时,就会有线程空闲,此时并发数最多是partition数。
当m=n时,整好一个线程处理一个partition数,此时并发数就是task数,也是partition数。
当m<n时,m个线程处理完m个partition后还会有剩余的partition,此时线程会复用,当某些线程处理完一个partition后会继续处理剩余的partition,此时并发数最多是task数。

感谢,明白了