Extracting Json from HDFS into Nebula Graph
参考:
一,公司集群(YARN)导入实验
0)打包Nebula Exchange 2.0
1)环境准备:
hadoop-2.7.0 hdp-safe-cloud账号
spark-2.4.5 yarn-client模式
Nebula v2.0.0-Beta
2)schema准备
-- 创建图空间
CREATE SPACE json (partition_num=1, replica_factor=1,vid_type=fixed_string(16));
-- 选择图空间 json
USE json;
-- 创建标签 source
CREATE TAG source (srcId int);
-- 创建标签 target
CREATE TAG target (dstId int);
-- 创建边类型 like
CREATE EDGE like (likeness double);
注意:
-
Nebula Graph V2.0,默认是以String为VID
- 顶点的VID务必为String类型
- 如果VID已经设置为String类型,但是还是没有录入,原因是,String的长度不够,
- 在CREATE SPACE 语句中指定vid_type=fixed_string(字节数)
-
因VID问题容易出现的问题:
- spark运行exchange报出错误
21/01/21 16:02:05 ERROR NebulaGraphClientWriter: write vertex failed for Request to storage failed, without failedCodes. 21/01/21 16:02:06 ERROR NebulaGraphClientWriter: write edge failed for Request to storage failed, without failedCodes.
3)配置文件:
vi一个配置文件application.conf,放在jar包同目录下
{
# Spark 相关配置
spark: {
app: {
name: Spark Writer
}
driver: {
cores: 1
maxResultSize: 1G
}
cores {
max: 16
}
}
# Nebula Graph 相关配置
nebula: {
address:{
#####此处确保你集群开启的graphd与这里的列表对应
#####通过配置这里,可以借由spark异地(不在nebula集群的任何节点上)提交任务,到集群
graph:["10.160.133.174:3699","10.160.133.181:3699","10.160.133.199:3699"]
meta:["10.160.133.174:45500","10.160.133.181:45500","10.160.133.199:45500"]
}
# 填写的账号必须拥有 Nebula Graph 相应图空间的写数据权限
user: user
pswd: password
space: json
connection {
timeout: 3000
retry: 3
}
execution {
retry: 3
}
error: {
max: 32
output: /test1 #当spark job提交后,从hdfs中拉数据并转换为sst的过程中会在控制台报错,而向Nebula write数据时候,会在hdfs中生产错误文档,因此,spark-submit的用户一定要具有这个error output path目录的hdfs写权限
}
rate: {
limit: 1024
timeout: 1000
}
}
# 处理标签
tags: [
# 设置顶一个顶点
{
name: source
type: {
source: json
sink: client
}
# hdfs://namenode:9000/路径
path: "hdfs://namenode:9000/home/zy/test1/test.json"
# 取数据中的列
fields: ["source"] #如果数据中列标签有source header则写header,如果没有就用 _c0,_c1代替header
# 映射为schema的属性
nebula.fields: ["srcId"] #这个是nebula schema中定义的顶点或边属性,必须与fields对应!
vertex: source
batch: 256
partition: 32
isImplicit: true
}
# 第二个顶点,中间无逗号
{
name: target
type: {
source: json
sink: client
}
path: "hdfs://namenode:9000/home/zy/test1/test.json"
fields: ["target"]
nebula.fields: ["dstId"]
vertex: target
batch: 256
partition: 32
isImplicit: true
}
# 如果还有其他标签,参考以上配置添加,确保在这个tags:[]里面
]
# 处理边数据
edges: [
{
name: like
type: {
source: json
sink: client
}
path: "hdfs://namenode:9000/home/zy/test1/test.json"
fields: ["likeness"] #
nebula.fields: ["likeness"]
source: "source" #如果数据无header,则以_c0代表第1列;因为是Json格式,所以一定有Header
target: "target"
batch: 256
partition: 32
isImplicit: true
}
# 如果还有其他边类型,参考以上配置添加
]
}
注意点:
-
fileds属性:是从源数据表中要拉取的列ID或者name
不管json还是csv,
- 如果有header一定要在fields中写出需要拉取的header,
- 如果无header一定要在fields中以_cn,代表你要选择的第n列
- _c0选择第1列, _c1选择第2列,……
-
nebula.fields属性:是之前创建schema的顶点或者边的属性名
- fields必须与nebula.fields对应
-
边的source和target
- 一定不在边的fields中
- 在边源数据文件的某些列,如果有header,写header,如果没有header,写_cn
-
path: 是HDFS数据文件的URL
-
hdfs://namenode:9000/路径
-
graph和metad列表
- graph列表和metad列表一定不可以使用中文逗号
- 不可以使用域名
- 列表中设计的机器,在现实集群环境中,务必确保对应的服务是开启的
- 否则spark submit报错,connection refused
4)数据准备
test.json,运行:hadoop fs -put 到数据目录
{"source":53802643,"target":87847387,"likeness":0.34}
{"source":29509860,"target":57501950,"likeness":0.40}
{"source":97319348,"target":50240344,"likeness":0.77}
{"source":94295709,"target":8189720,"likeness":0.82}
{"source":78707720,"target":53874070,"likeness":0.98}
{"source":23399562,"target":20136097,"likeness":0.47}
5)提交spark job
//正确启动
/usr/bin/hadoop/software/spark2.4/bin/spark-submit \
--master yarn-client \
--class com.vesoft.nebula.exchange.Exchange \
nebula-exchange-2.0.0.jar \
-c ./json_app.conf
注意:
- -c是jar包中的参数,一定是本地文件,当我们spark-submit时候需要driver拉取本地配置文件,所以,spark运行模式只能使用:
- local
- standalone
- yarn-client:yarn-client以提交job的机器为driver
- 如果以yarn-cluster提交作业,则会随机选择集群中的一台机器作为driver
- 此时必须将配置文件复制到全集群
- 或者利用**–files动态获取外部文件**
- 需要–files将配置文件提交到 hdfs://namenode:9000/home/yarn/spark/cache/zy/.sparkStaging/application_1608801054019_1336053/application.conf
- 在源码com.vesoft.nebula.exchange.Exchange中以 System.getenv(“SPARK_YARN_STAGING_DIR”) 动态获取字符串hdfs://namenode:9000/home/yarn/spark/cache/zy/.sparkStaging/application_1608801054019_1336053/
- 然后System.getenv(“SPARK_YARN_STAGING_DIR”)+“/application.json”,将配置文件读取为RDD,调用RDD中的一列key作为配置
二,测试环境(Standalone)导入
参考:
实验准备:
hadoop-2.9.2
spark-2.4.0 standalone
scala-2.11.11
-
首先将nebula-java和nebula-exchange打包
参考文首提供的打包教程:
注意:如果你是由于官方提供的打包,默认是最新版的2.0 rc1,而如果你使用的是2.0 beta,那么就需要在nebula-spark-utils/nebula-exchange的pom.xml中修改nebula的依赖为2.0-beta
-
在 Nebula-Console / Nebula-Studio 的 web Console 中创建一个图空间,并定义schema
-- 创建图空间 CREATE SPACE json (partition_num=10, replica_factor=1); -- 选择图空间 json USE json; -- 创建标签 source CREATE TAG source (srcId int); -- 创建标签 target CREATE TAG target (dstId int); -- 创建边类型 like CREATE EDGE like (likeness double);
-
将数据上传到hdfs
-
test.json(由于本例特殊,所有数据可以通过绑定同一个json文件,所以不需要区分)
{"source":53802643,"target":87847387,"likeness":0.34} {"source":29509860,"target":57501950,"likeness":0.40} {"source":97319348,"target":50240344,"likeness":0.77} {"source":94295709,"target":8189720,"likeness":0.82} {"source":78707720,"target":53874070,"likeness":0.98} {"source":23399562,"target":20136097,"likeness":0.47}
-
-
编写spark作业配置文件application.conf
-
application.conf(放在本地,也可以放在hdfs)
{ # Spark 相关配置 spark: { app: { name: Spark Writer } driver: { cores: 1 maxResultSize: 1G } cores { max: 16 } } # Nebula Graph 相关配置 nebula: { address:{ #####此处确保你集群开启的graphd与这里的列表对应 #####通过配置这里,可以借由spark异地(不在nebula集群的任何节点上)提交任务,到集群 graph:["172.17.0.11:3699","172.17.0.16:3699","172.17.0.18:3699"] meta:["172.17.0.11:45500","172.17.0.16:45500","172.17.0.18:45500"] } # 填写的账号必须拥有 Nebula Graph 相应图空间的写数据权限 user: user pswd: password # 填写 Nebula Graph 中需要写入数据的图空间名称 space: json connection { timeout: 3000 retry: 3 } execution { retry: 3 } error: { max: 32 output: /home/hdd-safe-cloud/zhaoyang10/test1 } rate: { limit: 1024 timeout: 1000 } } # 处理标签 tags: [ # 设置标签 source 相关信息 { # 设置为 Nebula Graph 中对应的标签名称 name: source type: { # 指定数据源文件格式,设置为 json。 source: json # 指定标签数据导入 Nebula Graph 的方式, # 可以设置为:client(以客户端形式导入)和 sst(以 SST 文件格式导入)。 # 关于 SST 文件导入配置,参考文档:导入 SST 文件。 sink: client } # JSON 文件所在的 HDFS 路径,String 类型,必须以 hdfs:// 开头。 path: "hdfs://master:9000/nebula/test1/test.json" # spark集群入口 # 在 fields 里指定 JSON 文件中 key 名称,其对应的 value # 会作为 Nebula Graph 中指定属性 srcId 的数据源 # 如果需要指定多个值,用英文逗号(,)隔开 fields: ["source"] nebula.fields: ["srcId"] # 将 JSON 文件中某个 key 对应的值作为 Nebula Graph 中点 VID 的来源 # 如果 VID 源数据不是 int 类型,则使用以下内容来代替 vertex 的设置,在其中指定 VID 映射策略,建议设置为 "hash"。 # vertex: { # field: key_name_in_json # policy: "hash" # } vertex: source batch: 256 partition: 32 # isImplicit 设置说明,详见 https://github.com/vesoft-inc/ # nebula-java/blob/v1.0/tools/exchange/src/main/resources/ # application.conf isImplicit: true } # 设置标签 target 相关信息 { name: target type: { source: json sink: client } path: "hdfs://master:9000/nebula/test1/test.json" fields: ["target"] nebula.fields: ["dstId"] vertex: "target" batch: 256 partition: 32 isImplicit: true } # 如果还有其他标签,参考以上配置添加 ] # 处理边数据 edges: [ # 设置边类型 like 相关信息 { # Nebula Graph 中对应的边类型名称。 name: like type: { # 指定数据源文件格式,设置为 json。 source: json # 指定边数据导入 Nebula Graph 的方式, # 可以设置为:client(以客户端形式导入)和 sst(以 SST 文件格式导入)。 # 关于 SST 文件导入配置,参考文档:导入 SST 文件(https:// # docs.nebula-graph.com.cn/nebula-exchange/ # use-exchange/ex-ug-import-sst/)。 sink: client } # 指定 JSON 文件所在的 HDFS 路径,String 类型,必须以 hdfs:// 开头。 path: "hdfs://master:9000/nebula/test1/test.json" # 在 fields 里指定 JSON 文件中 key 名称,其对应的 value # 会作为 Nebula Graph 中指定属性 likeness 的数据源 # 如果需要指定多个值,用英文逗号(,)隔开 fields: ["likeness"] nebula.fields: ["likeness"] # 将 JSON 文件中某两个 key 对应的值作为 Nebula Graph 中边起点和边终点 VID 的来源 # 如果 VID 源数据不是 int 类型,则使用以下内容来代替 source # 和/或 target 的设置,在其中指定 VID 映射策略,建议设置为 "hash"。 # source: { # field: key_name_in_json # policy: "hash" # } # target: { # field: key_name_in_json # policy: "hash" # } source: "source" target: "target" batch: 256 partition: 32 isImplicit: true } # 如果还有其他边类型,参考以上配置添加 ] }
注意点:
- graph和meta哪里一定不能是中文逗号
- 而且graph和meta的启动清空一定要与显示机器的启动情况一致,不大于现实情况
- error.output是hadoop中的目录,一定要有write权限
-
-
spark-submit运行(standalone)
$SPARK_HOME/bin/spark-submit --class com.vesoft.nebula.exchange.Exchange --master local nebula-exchange-2.0.0.jar -c ./conf/application.conf
若reader过程中遇到问题请参照: