Star

Nebula Flink Connector 2.0 导入(Source 是mock数据)

Nebula Flink Connector 2.0.0

实验环境

nebula graph 2.0.0-beta

nebula graph studio 2.0.0

graph服务:10.160.133.174:3699,10.160.133.181:3699,10.160.133.199:3699

meta服务:10.160.133.174:45500,10.160.133.181:45500,10.160.133.199:45500

flink环境:version1.11.3 ,部署一台机器中的三个容器(standalone启动)

源码打包

官方源码打包参考

  • 先打包nebula-java 2.0

  • 打包指令

    • cd到…/connector/下
    • 修改pom.xml中的nebula-version为2.0.0-rc1(或者最新版
    • 运行打包指令
    mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true
    
  • 然后建立到maven仓库看里面

    mvn install:install-file -DgroupId=com.vesoft -DartifactId=nebula-flink -Dversion=2.0.0 -Dpackaging=jar -Dfile=你connector打包完后的target中的jar的绝对路径
    

    然后将你的jar包,扔到maven仓库中

pom依赖

对上面的flink-connector 2.0建完仓库后,添加下面的依赖

<dependency>
    <groupId>com.vesoft</groupId>
    <artifactId>nebula-flink-connector</artifactId>
    <version>2.0.0</version>
</dependency>

整个项目的pom.xml

<build>
        <plugins>
            <!-- 编译器 -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
            <!-- 打包 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <!--   这个是assembly 所在位置  -->
                    <!-- <descriptor>src/main/assembly/assembly.xml</descriptor> -->
                    <!-- MainClass in mainfest make a executable jar -->
                    <!--
                    <archive>
                        <manifest>
                            <mainClass>util.Microseer</mainClass>
                        </manifest>
                    </archive>
                    -->
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <!-- Properties -->
    <properties>
        <scala.version>2.11.12</scala.version>
        <flink-scala.version>1.11.3</flink-scala.version>
        <flink-streaming.version>1.11.3</flink-streaming.version>
        <flink-streaming-java_.version>1.11.3</flink-streaming-java_.version>
        <nebula-flink-connector.version>2.0.0</nebula-flink-connector.version>
    </properties>

    <dependencies>
        <!-- Flink Nebula Connector -->
        <dependency>
            <groupId>com.vesoft</groupId>
            <artifactId>nebula-flink-connector</artifactId>
            <version>${nebula-flink-connector.version}</version>
        </dependency>
        <!-- Flink Client & Scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${flink-scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink-streaming.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink-streaming-java_.version}</version>
            <!--<scope>compile</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.3</version>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>SparkPackagesRepo</id>
            <url>http://dl.bintray.com/spark-packages/maven</url>
        </repository>
        <repository>
            <id>bintray-streamnative-maven</id>
            <name>bintray</name>
            <url>https://dl.bintray.com/streamnative/maven</url>
        </repository>
    </repositories>

直接根据源码建立项目

<properties>
        <nebula.version>2.0.0-rc1</nebula.version>
        <flink.version>1.11-SNAPSHOT</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <compiler.source.version>1.8</compiler.source.version>
        <compiler.target.version>1.8</compiler.target.version>
        <junit.version>4.13.1</junit.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.vesoft</groupId>
            <artifactId>client</artifactId>
            <version>${nebula.version}</version>
        </dependency>

        <!-- flink dependency -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- junit-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <!-- 编译器 -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
            <!-- 打包 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 跳过测试,直接编译打包 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.4.2</version>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>
        </plugins>
    </build>

在nebula studio中创建schema

CREATE SPACE flinkSink (partition_num=2, replica_factor=1,vid_type=fixed_string(16));

CREATE TAG player(name String,age int);

CREATE EDGE friend (degree int,start String);

//查看schema
SHOW TAGS;
SHOW EDGES;

注意:

  • 如果创建shcema与代码中不符合,那么client在于nebula建立连接以后,执行job时候env.execute(“Write Nebula”)在代码中map封装Event Records之后sink就会失败。

  • 报错:

    Caused by: java.lang.NullPointerException
            at org.apache.flink.connector.nebula.sink.NebulaRowEdgeOutputFormatConverter.createValue(NebulaRowEdgeOutputFormatConverter.java:62) ~[?:?]
            at org.apache.flink.connector.nebula.sink.NebulaRowEdgeOutputFormatConverter.createValue(NebulaRowEdgeOutputFormatConverter.java:21) ~[?:?]
            at org.apache.flink.connector.nebula.sink.NebulaBatchExecutor.addToBatch(NebulaBatchExecutor.java:56) ~[?:?]
            at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.writeRecord(NebulaBatchOutputFormat.java:92) ~[?:?]
            at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.invoke(NebulaSinkFunction.java:51) ~[?:?]
            at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:164) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.3.jar:1.11.3]
            at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run
    
    

测试代码

import org.apache.flink.connector.nebula.connection.NebulaClientOptions;
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat;
import org.apache.flink.connector.nebula.sink.NebulaSinkFunction;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.statement.VertexExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class FlinkConnectorExample {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkConnectorExample.class);

    public static void main(String[] args) {
        System.out.println("************* Getting Env**************");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<List<String>> playerSource = constructVertexSourceData(env);
        System.out.println("************* Sinking Vertex**************");
        sinkVertexData(env, playerSource);
        System.out.println("************* Vertex Sunk**************");
        DataStream<List<String>> friendSource = constructEdgeSourceData(env);
        System.out.println("************* Sinking Edge**************");
        sinkEdgeData(env, friendSource);
        System.out.println("************* Edges Sunk**************");
    }

    /**
     * construct flink data source
     */
    public static DataStream<List<String>> constructVertexSourceData(StreamExecutionEnvironment env) {
        List<List<String>> player = new ArrayList<>();
        List<String> fields1 = Arrays.asList("15", "Bob", "18");
        List<String> fields2 = Arrays.asList("16", "Tina", "19");
        List<String> fields3 = Arrays.asList("17", "Jena", "20");
        List<String> fields4 = Arrays.asList("18", "Tom", "20");
        List<String> fields5 = Arrays.asList("19", "Viki", "55");
        List<String> fields6 = Arrays.asList("20", "Jime", "32");
        List<String> fields7 = Arrays.asList("21", "Jhon", "76");
        List<String> fields8 = Arrays.asList("22", "Crea", "10");
        player.add(fields1);
        player.add(fields2);
        player.add(fields3);
        player.add(fields4);
        player.add(fields5);
        player.add(fields6);
        player.add(fields7);
        player.add(fields8);
        DataStream<List<String>> playerSource = env.fromCollection(player);
        return playerSource;
    }

    /**
     * sink Nebula Graph
     */
    public static void sinkVertexData(StreamExecutionEnvironment env, DataStream<List<String>> playerSource) {
        NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
                .setGraphAddress("10.160.133.174:3699,10.160.133.181:3699,10.160.133.199:3699")
                .setMetaAddress("10.160.133.174:45500,10.160.133.181:45500,10.160.133.199:45500")
                .build();
        NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
        NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);

        ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSink")
                .setTag("player")
                .setIdIndex(0)
                .setFields(Arrays.asList("name", "age"))
                .setPositions(Arrays.asList(1, 2))
                .setBatch(2)
                .builder();

        NebulaBatchOutputFormat outPutFormat =
                new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
                        .setExecutionOptions(executionOptions);
        System.out.println("************* Getting sinkers**************");
        NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
        System.out.println("************* Packaging Records**************");
        DataStream<Row> dataStream = playerSource.map(row -> {
            org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
            for (int i = 0; i < row.size(); i++) {
                record.setField(i, row.get(i));
            }
            return record;
        });
        System.out.println("************* Add Sinkers**************");
        dataStream.addSink(nebulaSinkFunction);
        System.out.println("************* Sink Event Executions**************");
        try {
            System.out.println("************* Executing**************");
            env.execute("Write Nebula");
            System.out.println("************* Executed**************");
        } catch (Exception e) {
            LOG.error("error when write Nebula Graph, ", e);
            System.exit(-1);
        }
        System.out.println("************* Vertex Sunk**************");
    }


    /**
     * construct flink data source
     */
    public static DataStream<List<String>> constructEdgeSourceData(StreamExecutionEnvironment env) {
        List<List<String>> friend = new ArrayList<>();
        List<String> fields1 = Arrays.asList("nicole", "Tom", "15", "18.0", "2019-05-01");
        List<String> fields2 = Arrays.asList("Tina", "John", "16", "19.0", "2018-03-08");
        List<String> fields3 = Arrays.asList("Bob", "Lisa", "17", "20.0", "2015-04-01");
        List<String> fields4 = Arrays.asList("Tom", "Lisa", "18", "20.0", "2016-04-01");
        List<String> fields5 = Arrays.asList("Jime", "John", "19", "20.0", "2017-04-01");
        List<String> fields6 = Arrays.asList("Tim", "Bob", "20", "20.0", "2020-04-01");
        friend.add(fields1);
        friend.add(fields2);
        friend.add(fields3);
        friend.add(fields4);
        friend.add(fields5);
        friend.add(fields6);
        DataStream<List<String>> playerSource = env.fromCollection(friend);
        return playerSource;
    }

    /**
     * sink Nebula Graph
     */
    public static void sinkEdgeData(StreamExecutionEnvironment env, DataStream<List<String>> playerSource) {
        System.out.println("***********************  Client Options  *********************");
        NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
                .setGraphAddress("10.160.133.174:3699,10.160.133.181:3699,10.160.133.199:3699")
                .setMetaAddress("10.160.133.174:45500,10.160.133.181:45500,10.160.133.199:45500")
                .build();
        System.out.println("***********************  Constructing Connection  *********************");
        NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
        System.out.println("***********************  graph Connected  *********************");
        NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);
        System.out.println("***********************  meta Connected  *********************");

        ExecutionOptions executionOptions = new EdgeExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSink")
                .setEdge("friend")
                .setSrcIndex(0)
                .setDstIndex(1)
                .setRankIndex(2)
                .setFields(Arrays.asList("src", "dst", "degree", "start"))
                .setPositions(Arrays.asList(0, 1, 3, 4))
                .setBatch(2)
                .builder();
        System.out.println("***********************  Set output format  *********************");
        NebulaBatchOutputFormat outPutFormat =
                new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
                        .setExecutionOptions(executionOptions);
        NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
        System.out.println("***********************  Edge Records Packaging  *********************");
        DataStream<Row> dataStream = playerSource.map(row -> {
            org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
            for (int i = 0; i < row.size(); i++) {
                record.setField(i, row.get(i));
            }
            return record;
        });
        System.out.println("***********************  Edge Records Packaged  *********************");
        dataStream.addSink(nebulaSinkFunction);
        System.out.println("***********************  Edge Sinking Job *********************");
        try {
            env.execute("Write Nebula");
            System.out.println("***********************  Edge Sunk *********************");
        } catch (Exception e) {
            LOG.error("error when write Nebula Graph, ", e);
            System.exit(-1);
        }
        System.out.println("***********************  job over *********************");
    }
}

standalone提交

/opt/flink/flink-1.11.3/bin/flink run -c FlinkConnectorExample /opt/Code/flink_test/flink-connector-1.0-SNAPSHOT-jar-with-dependencies.jar  
2赞

浙ICP备20010487号