nbuela-spark-connetor使用问题

下载了nebula-spark-utils(master分支)准备编译,但是如下依赖找不到:

io.streamnative.connectors
pulsar-spark-connector_2.11
${pulsar.version}

com.vesoft client ${nebula.version}

更改client的版本为2.0.0-rc1依然报错某些类没有。

我的目的:使用nebula-spark-utils,用spark读取hive中的数据写入nebula2.0的nebula-graph中。

求大佬解答,nebula-spark-utils的正确使用姿势应该是怎样的?

这个 client 版本略低啊,可以升到 2.0.1 呀。依赖的话论坛有相似问题的,我去搜下,回来改下回复

但是这个client版本是nebula-spark-utils中自定义的……所以官方能不能一起改一下

你的问题是不是 Archie 遇到的问题一样呀

改了maven配置的settings.xml依旧不行

你编译不过和client没关系的,你是pulsar-spark-connector这个依赖下载不下来。 是因为这个包所在的maven库关闭了=== 你可以在这里下载,然后解压到你的本地maven库 io/streamnative/connectors目录中。https://oss-cdn.nebula-graph.com.cn/jar-packages/pulsar-spark-connector_2.11.zip

ps: 如果你想用snapshot版本,可以在你的pom中引用的:

        <dependency>
            <groupId>com.vesoft</groupId>
            <artifactId>nebula-spark-connector</artifactId>
            <version>2.0-SNAPSHOT</version>
        </dependency>
1 个赞

按照你给的链接下载了pulsar-spark-connector_2.11,这个依赖已经没问题了。但是client 2.0-SNAPSHOT(你刚才给的版本)和2.0.0-SNAPSHOT(nebula-spark-utils 上master分支指定的版本)都无法下载到,请问这个是从哪个仓库下载的呢?

你在自己的pom中加一下下面这个配置就好:

<project>
xxxxxxx

   <repository>
            <id>snapshots</id>
            <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
        </repository>
    </repositories>
</project>
1 个赞

其实你要是只用编译spark-connector的话,进入到nebula-spark-connector目录下编译应该没问题的。 上面那个pulsar的依赖是exchange中用到的。

2 个赞

嗯,我按照你说的直接依赖nebula-spark-connector就可以了,谢谢。

再请教一个使用问题,因为我是用java写nebula,假设我已经拿到了DataSet,我该怎么用nebula-spark-connector去写入nebula呢?

  Dataset<Row> dataFrame = hiveContext.createDataFrame(new ArrayList<>(), Map.class);
  NebulaConnectionConfig config = NebulaConnectionConfig
                .builder()
                .withMetaAddress("127.0.0.1:9559")
                .withGraphAddress("127.0.0.1:9669")
                .withConenctionRetry(2)
                .build();

  WriteNebulaVertexConfig nebulaWriteVertexConfig = WriteNebulaVertexConfig
                .builder()
                .withSpace("test")
                .withTag("person")
                .withVidField("id")
                .withVidPolicy("hash")
                .withVidAsProp(true)
                .withUser("root")
                .withPasswd("nebula")
                .withBatch(1000)
                .build();
//将顶点DataSet写入nebula

nebula-spark-connector是要用spark读取其他数据,读取的格式会是DataFrame, 你的数据是DataSet没法写入的。

DataFrameWriter write = hiveContext.createDataFrame(new ArrayList<>(), Map.class).write();
我这也是用了spark的dataframe呢,只是spark和java中的写法看起来不一样

···
val config = NebulaConnectionConfig
.builder()
.withMetaAddress(“127.0.0.1:9559”)
.withGraphAddress(“127.0.0.1:9669”)
.withConenctionRetry(2)
.build()

val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace(“test”)
.withTag(“person”)
.withVidField(“id”)
.withVidPolicy(“hash”)
.withVidAsProp(true)
.withUser(“root”)
.withPasswd(“nebula”)
.withBatch(1000)
.build()
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
···
这个是官方文档写的,里面的df是什么对象呢?

你看example啊,里面有读和写的示例的

df就是数据集呀
image

好嘞,多谢

你好,还有一个版本疑问,在nebula-spark-connector 2.0.1中没有 .withWriteMode方法,那请问默认的写模式是什么呢?会让nebula中已经存在的数据或属性消失吗?

默认的写入模式是insert。 2.0.1 只有一个insert模式,所以没有模式的配置。
snapshot版本中支持insert和update两种模式,默认是insert。

关于这两种模式,参考文档
https://docs.nebula-graph.com.cn/2.0.1/3.ngql-guide/12.vertex-statements/1.insert-vertex/

https://docs.nebula-graph.com.cn/2.0.1/3.ngql-guide/12.vertex-statements/2.update-vertex/

没有upsert模式吗?其实我想要的是upsert

没有

1 个赞