flink connector nebula

想问下,有没有用flink sql操作nebula的例子。我看nebula-flink 里面有connector实现了DynamicFactory,DynamicTable的。谢谢。

很抱歉,现在没有提供这样的例子,我们后续可以加上。

你意思是现在没有例子还是现在代码没有这种功能?

现在的代码还没有fink sql 操作nebula的功能,后续会逐渐增加 sql connector,但优先级比较低,我们希望开放出来让社区内有兴趣的同学可以一起参与。

没事,我已经实现了。2.0的sst生成和ingest现在功能稳定完善了吗?

赞,期待大佬的pr, repo: https://github.com/vesoft-inc/nebula-flink-connector

ps: sst & download 功能可用,将在下一版本发出

1 个赞

你好, 请问下, 这个连接器对flink的版本有要求吗?
image

nebula-flink-connector用的flink 的clients是1.11.3版本的,只要和这个版本兼容的flink服务都可以,具体flink的客户端与服务端的对应信息参考flink官网

把 ScanTableSource 功能补起来了
目前效果:

String sourceSql = "create table nebulaperson(id bigint, name string) with " +
“(‘connector’= ‘nebula’,” +
“‘meta-address’=‘xxx’,” +
“‘graph-address’=‘xxx’,” +
“‘entity-type’=‘0’,” +
“‘username’=‘root’,‘password’=‘nebula’,” +
“‘graph-space’=‘flink2’, ‘label-name’=‘person’)”;

String sinkSql = "create table nebulaman(id bigint, name string) with " +
“(‘connector’= ‘nebula’,” +
“‘meta-address’=‘xxx’,” +
“‘graph-address’=‘xxx’,” +
“‘entity-type’=‘0’,” +
“‘username’=‘root’,‘password’=‘nebula’,” +
“‘graph-space’=‘flink2’, ‘label-name’=‘man’)”;

TableResult sourceTable = tableEnvironment.executeSql(sourceSql);
TableResult sinkTable = tableEnvironment.executeSql(sinkSql);

String insertSql = “insert into nebulaman select id+1000, name from nebulaperson”;
TableResult result = tableEnvironment.executeSql(insertSql);

库里面:

fetch prop on person 1
±---------------------------+
| vertices_ |
±---------------------------+
| (1 :person{name: “name1”}) |
±---------------------------+

fetch prop on man 1001
±---------------------------+
| vertices_ |
±---------------------------+
| (1001 :man{name: “name1”}) |
±---------------------------+

2 个赞