Nebula Flink Connector 2.0.0
实验环境
nebula graph 2.0.0-beta
nebula graph studio 2.0.0
graph服务:10.160.133.174:3699,10.160.133.181:3699,10.160.133.199:3699
meta服务:10.160.133.174:45500,10.160.133.181:45500,10.160.133.199:45500
flink环境:version1.11.3 ,部署一台机器中的三个容器(standalone启动)
源码打包
-
先打包nebula-java 2.0
-
打包指令
- cd到…/connector/下
- 修改pom.xml中的nebula-version为2.0.0-rc1(或者最新版)
- 运行打包指令
mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true
-
然后建立到maven仓库看里面
mvn install:install-file -DgroupId=com.vesoft -DartifactId=nebula-flink -Dversion=2.0.0 -Dpackaging=jar -Dfile=你connector打包完后的target中的jar的绝对路径
然后将你的jar包,扔到maven仓库中
pom依赖
对上面的flink-connector 2.0建完仓库后,添加下面的依赖
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>nebula-flink-connector</artifactId>
<version>2.0.0</version>
</dependency>
整个项目的pom.xml
<build>
<plugins>
<!-- 编译器 -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>utf-8</encoding>
</configuration>
</plugin>
<!-- 打包 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- 这个是assembly 所在位置 -->
<!-- <descriptor>src/main/assembly/assembly.xml</descriptor> -->
<!-- MainClass in mainfest make a executable jar -->
<!--
<archive>
<manifest>
<mainClass>util.Microseer</mainClass>
</manifest>
</archive>
-->
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- Properties -->
<properties>
<scala.version>2.11.12</scala.version>
<flink-scala.version>1.11.3</flink-scala.version>
<flink-streaming.version>1.11.3</flink-streaming.version>
<flink-streaming-java_.version>1.11.3</flink-streaming-java_.version>
<nebula-flink-connector.version>2.0.0</nebula-flink-connector.version>
</properties>
<dependencies>
<!-- Flink Nebula Connector -->
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>nebula-flink-connector</artifactId>
<version>${nebula-flink-connector.version}</version>
</dependency>
<!-- Flink Client & Scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink-scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink-streaming.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink-streaming-java_.version}</version>
<!--<scope>compile</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.3</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>SparkPackagesRepo</id>
<url>http://dl.bintray.com/spark-packages/maven</url>
</repository>
<repository>
<id>bintray-streamnative-maven</id>
<name>bintray</name>
<url>https://dl.bintray.com/streamnative/maven</url>
</repository>
</repositories>
直接根据源码建立项目
<properties>
<nebula.version>2.0.0-rc1</nebula.version>
<flink.version>1.11-SNAPSHOT</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<compiler.source.version>1.8</compiler.source.version>
<compiler.target.version>1.8</compiler.target.version>
<junit.version>4.13.1</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>client</artifactId>
<version>${nebula.version}</version>
</dependency>
<!-- flink dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- junit-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 编译器 -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>utf-8</encoding>
</configuration>
</plugin>
<!-- 打包 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 跳过测试,直接编译打包 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.4.2</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
</plugins>
</build>
在nebula studio中创建schema
CREATE SPACE flinkSink (partition_num=2, replica_factor=1,vid_type=fixed_string(16));
CREATE TAG player(name String,age int);
CREATE EDGE friend (degree int,start String);
//查看schema
SHOW TAGS;
SHOW EDGES;
注意:
-
如果创建shcema与代码中不符合,那么client在于nebula建立连接以后,执行job时候env.execute(“Write Nebula”)在代码中map封装Event Records之后sink就会失败。
-
报错:
Caused by: java.lang.NullPointerException at org.apache.flink.connector.nebula.sink.NebulaRowEdgeOutputFormatConverter.createValue(NebulaRowEdgeOutputFormatConverter.java:62) ~[?:?] at org.apache.flink.connector.nebula.sink.NebulaRowEdgeOutputFormatConverter.createValue(NebulaRowEdgeOutputFormatConverter.java:21) ~[?:?] at org.apache.flink.connector.nebula.sink.NebulaBatchExecutor.addToBatch(NebulaBatchExecutor.java:56) ~[?:?] at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.writeRecord(NebulaBatchOutputFormat.java:92) ~[?:?] at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.invoke(NebulaSinkFunction.java:51) ~[?:?] at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:164) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.3.jar:1.11.3] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run
测试代码
import org.apache.flink.connector.nebula.connection.NebulaClientOptions;
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat;
import org.apache.flink.connector.nebula.sink.NebulaSinkFunction;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.statement.VertexExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class FlinkConnectorExample {
private static final Logger LOG = LoggerFactory.getLogger(FlinkConnectorExample.class);
public static void main(String[] args) {
System.out.println("************* Getting Env**************");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<List<String>> playerSource = constructVertexSourceData(env);
System.out.println("************* Sinking Vertex**************");
sinkVertexData(env, playerSource);
System.out.println("************* Vertex Sunk**************");
DataStream<List<String>> friendSource = constructEdgeSourceData(env);
System.out.println("************* Sinking Edge**************");
sinkEdgeData(env, friendSource);
System.out.println("************* Edges Sunk**************");
}
/**
* construct flink data source
*/
public static DataStream<List<String>> constructVertexSourceData(StreamExecutionEnvironment env) {
List<List<String>> player = new ArrayList<>();
List<String> fields1 = Arrays.asList("15", "Bob", "18");
List<String> fields2 = Arrays.asList("16", "Tina", "19");
List<String> fields3 = Arrays.asList("17", "Jena", "20");
List<String> fields4 = Arrays.asList("18", "Tom", "20");
List<String> fields5 = Arrays.asList("19", "Viki", "55");
List<String> fields6 = Arrays.asList("20", "Jime", "32");
List<String> fields7 = Arrays.asList("21", "Jhon", "76");
List<String> fields8 = Arrays.asList("22", "Crea", "10");
player.add(fields1);
player.add(fields2);
player.add(fields3);
player.add(fields4);
player.add(fields5);
player.add(fields6);
player.add(fields7);
player.add(fields8);
DataStream<List<String>> playerSource = env.fromCollection(player);
return playerSource;
}
/**
* sink Nebula Graph
*/
public static void sinkVertexData(StreamExecutionEnvironment env, DataStream<List<String>> playerSource) {
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
.setGraphAddress("10.160.133.174:3699,10.160.133.181:3699,10.160.133.199:3699")
.setMetaAddress("10.160.133.174:45500,10.160.133.181:45500,10.160.133.199:45500")
.build();
NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);
ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setTag("player")
.setIdIndex(0)
.setFields(Arrays.asList("name", "age"))
.setPositions(Arrays.asList(1, 2))
.setBatch(2)
.builder();
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
System.out.println("************* Getting sinkers**************");
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
System.out.println("************* Packaging Records**************");
DataStream<Row> dataStream = playerSource.map(row -> {
org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
for (int i = 0; i < row.size(); i++) {
record.setField(i, row.get(i));
}
return record;
});
System.out.println("************* Add Sinkers**************");
dataStream.addSink(nebulaSinkFunction);
System.out.println("************* Sink Event Executions**************");
try {
System.out.println("************* Executing**************");
env.execute("Write Nebula");
System.out.println("************* Executed**************");
} catch (Exception e) {
LOG.error("error when write Nebula Graph, ", e);
System.exit(-1);
}
System.out.println("************* Vertex Sunk**************");
}
/**
* construct flink data source
*/
public static DataStream<List<String>> constructEdgeSourceData(StreamExecutionEnvironment env) {
List<List<String>> friend = new ArrayList<>();
List<String> fields1 = Arrays.asList("nicole", "Tom", "15", "18.0", "2019-05-01");
List<String> fields2 = Arrays.asList("Tina", "John", "16", "19.0", "2018-03-08");
List<String> fields3 = Arrays.asList("Bob", "Lisa", "17", "20.0", "2015-04-01");
List<String> fields4 = Arrays.asList("Tom", "Lisa", "18", "20.0", "2016-04-01");
List<String> fields5 = Arrays.asList("Jime", "John", "19", "20.0", "2017-04-01");
List<String> fields6 = Arrays.asList("Tim", "Bob", "20", "20.0", "2020-04-01");
friend.add(fields1);
friend.add(fields2);
friend.add(fields3);
friend.add(fields4);
friend.add(fields5);
friend.add(fields6);
DataStream<List<String>> playerSource = env.fromCollection(friend);
return playerSource;
}
/**
* sink Nebula Graph
*/
public static void sinkEdgeData(StreamExecutionEnvironment env, DataStream<List<String>> playerSource) {
System.out.println("*********************** Client Options *********************");
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
.setGraphAddress("10.160.133.174:3699,10.160.133.181:3699,10.160.133.199:3699")
.setMetaAddress("10.160.133.174:45500,10.160.133.181:45500,10.160.133.199:45500")
.build();
System.out.println("*********************** Constructing Connection *********************");
NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
System.out.println("*********************** graph Connected *********************");
NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);
System.out.println("*********************** meta Connected *********************");
ExecutionOptions executionOptions = new EdgeExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setEdge("friend")
.setSrcIndex(0)
.setDstIndex(1)
.setRankIndex(2)
.setFields(Arrays.asList("src", "dst", "degree", "start"))
.setPositions(Arrays.asList(0, 1, 3, 4))
.setBatch(2)
.builder();
System.out.println("*********************** Set output format *********************");
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
System.out.println("*********************** Edge Records Packaging *********************");
DataStream<Row> dataStream = playerSource.map(row -> {
org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
for (int i = 0; i < row.size(); i++) {
record.setField(i, row.get(i));
}
return record;
});
System.out.println("*********************** Edge Records Packaged *********************");
dataStream.addSink(nebulaSinkFunction);
System.out.println("*********************** Edge Sinking Job *********************");
try {
env.execute("Write Nebula");
System.out.println("*********************** Edge Sunk *********************");
} catch (Exception e) {
LOG.error("error when write Nebula Graph, ", e);
System.exit(-1);
}
System.out.println("*********************** job over *********************");
}
}
standalone提交
/opt/flink/flink-1.11.3/bin/flink run -c FlinkConnectorExample /opt/Code/flink_test/flink-connector-1.0-SNAPSHOT-jar-with-dependencies.jar