flink运行错误

  • nebula 版本:1.2.0
  • 部署方式:单机
  • 安装方式:源码编译
  • 是否为线上版本:N
  • 问题的具体描述:
    我按照教程的方式部署了,编译都跑通了
    接下来我想要运行FlinkDemo,我采取了两种方式,第一种:在idea中build artifact打包成jar,然后在flink中运行bin/flink run,报错如下:
org.apache.flink.client.program.ProgramInvocationException: The program's entry point class 'org.apache.flink.FlinkDemo' was not found in the jar file.
        at org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:475)
        at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:152)
        at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:64)
        at org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:685)
        at org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:851)
        at org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:271)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.FlinkDemo
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:473)
        ... 10 more

之后我直接在idea中直接对FlinkDemo的main函数debug报错如下:

ERROR [TAsyncClientManager#SelectorThread 8065] - onError: java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
ERROR [TAsyncClientManager#SelectorThread 8068] - onError: java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
ERROR [TAsyncClientManager#SelectorThread 8069] - onError: java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
ERROR [TAsyncClientManager#SelectorThread 8067] - onError: java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
ERROR [TAsyncClientManager#SelectorThread 8066] - onError: java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
ERROR [TAsyncClientManager#SelectorThread 8072] - onError: java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
ERROR [TAsyncClientManager#SelectorThread 8071] - onError: java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
ERROR [TAsyncClientManager#SelectorThread 8070] - onError: java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.timeoutMethods(TAsyncClientManager.java:157)
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.run(TAsyncClientManager.java:114)
java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.timeoutMethods(TAsyncClientManager.java:157)
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.run(TAsyncClientManager.java:114)
java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.timeoutMethods(TAsyncClientManager.java:157)
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.run(TAsyncClientManager.java:114)
java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.timeoutMethods(TAsyncClientManager.java:157)
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.run(TAsyncClientManager.java:114)
java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.timeoutMethods(TAsyncClientManager.java:157)
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.run(TAsyncClientManager.java:114)
java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.timeoutMethods(TAsyncClientManager.java:157)
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.run(TAsyncClientManager.java:114)
java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.timeoutMethods(TAsyncClientManager.java:157)
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.run(TAsyncClientManager.java:114)
java.util.concurrent.TimeoutException: Operation class com.vesoft.nebula.graph.GraphService$AsyncClient$authenticate_call timed out after 1008 ms.
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.timeoutMethods(TAsyncClientManager.java:157)
	at com.facebook.thrift.async.TAsyncClientManager$SelectThread.run(TAsyncClientManager.java:114)
 INFO [Map -> Sink: Unnamed (2/8)] - Auth not founded
 INFO [Map -> Sink: Unnamed (4/8)] - Auth not founded
 INFO [Map -> Sink: Unnamed (3/8)] - Auth not founded
 INFO [Map -> Sink: Unnamed (1/8)] - Auth not founded
 INFO [Map -> Sink: Unnamed (8/8)] - Auth not founded
 INFO [Map -> Sink: Unnamed (6/8)] - Auth not founded
 INFO [Map -> Sink: Unnamed (5/8)] - Auth not founded
 INFO [Map -> Sink: Unnamed (7/8)] - Auth not founded
ERROR [Source: Custom Source (1/1)] - Error during disposal of stream operator.
java.lang.NullPointerException
	at org.apache.flink.connector.nebula.connection.NebulaConnectionProvider.close(NebulaConnectionProvider.java:34)
	at org.apache.flink.connector.nebula.source.NebulaSourceFunction.close(NebulaSourceFunction.java:59)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:773)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:688)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:179)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:586)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
	at java.lang.Thread.run(Thread.java:748)
 WARN [Source: Custom Source (1/1)] - Source: Custom Source (1/1) (76298ce1f9e678c2b5541cfc688ee021) switched from RUNNING to FAILED.
com.facebook.thrift.transport.TTransportException: java.net.SocketTimeoutException: connect timed out
	at com.facebook.thrift.transport.TSocket.open(TSocket.java:204)
	at com.vesoft.nebula.client.meta.MetaClientImpl.getClient(MetaClientImpl.java:122)
	at com.vesoft.nebula.client.meta.MetaClientImpl.doConnect(MetaClientImpl.java:90)
	at com.vesoft.nebula.AbstractClient.connect(AbstractClient.java:93)
	at org.apache.flink.connector.nebula.connection.client.NebulaMetaClient.connectClient(NebulaMetaClient.java:18)
	at org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider.getClient(NebulaMetaConnectionProvider.java:36)
	at org.apache.flink.connector.nebula.source.NebulaSourceFunction.open(NebulaSourceFunction.java:50)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:309)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:545)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:503)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:566)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at com.facebook.thrift.transport.TSocket.open(TSocket.java:199)
	... 16 more
ERROR [Source: Custom Source (1/1)] - cancel exception:{}
java.lang.NullPointerException
	at org.apache.flink.connector.nebula.connection.NebulaConnectionProvider.close(NebulaConnectionProvider.java:34)
	at org.apache.flink.connector.nebula.source.NebulaSourceFunction.cancel(NebulaSourceFunction.java:93)
	at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:160)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:203)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:184)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:721)
	at org.apache.flink.runtime.taskmanager.Task.cancelInvokable(Task.java:1395)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:828)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
	at java.lang.Thread.run(Thread.java:748)
 INFO [Source: Custom Source (1/1)] - Freeing task resources for Source: Custom Source (1/1) (76298ce1f9e678c2b5541cfc688ee021).
 INFO [flink-akka.actor.default-dispatcher-3] - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source (1/1) 76298ce1f9e678c2b5541cfc688ee021.
 INFO [flink-akka.actor.default-dispatcher-6] - Source: Custom Source (1/1) (76298ce1f9e678c2b5541cfc688ee021) switched from RUNNING to FAILED on 17b2229f-39f3-473f-9086-a91f497bfca0 @ kubernetes.docker.internal (dataPort=-1).
com.facebook.thrift.transport.TTransportException: java.net.SocketTimeoutException: connect timed out
	at com.facebook.thrift.transport.TSocket.open(TSocket.java:204)
	at com.vesoft.nebula.client.meta.MetaClientImpl.getClient(MetaClientImpl.java:122)
	at com.vesoft.nebula.client.meta.MetaClientImpl.doConnect(MetaClientImpl.java:90)
	at com.vesoft.nebula.AbstractClient.connect(AbstractClient.java:93)
	at org.apache.flink.connector.nebula.connection.client.NebulaMetaClient.connectClient(NebulaMetaClient.java:18)
	at org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider.getClient(NebulaMetaConnectionProvider.java:36)
	at org.apache.flink.connector.nebula.source.NebulaSourceFunction.open(NebulaSourceFunction.java:50)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:309)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:545)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:503)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:566)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at com.facebook.thrift.transport.TSocket.open(TSocket.java:199)
	... 16 more

咋办嘞?

版本有点老啊。。。
错误信息只是连接不上Nebula啊,检查下网络连接

1 个赞

你用的是对应nebula 2.5版本之后的客户端吧,nebula 2.5还是2.6的时候,加了个验证的函数,导致2.5之后的客户端是连不上2.5之前的nebula的 :thinking:

我用python的client都能连上,但是就是flink这个连不上诶

啊,不是,我用的客户端是1.2.0的

show me the code :slight_smile:

/* Copyright (c) 2020 vesoft inc. All rights reserved.
 *
 * This source code is licensed under Apache 2.0 License,
 * attached with Common Clause Condition 1.0, found in the LICENSES directory.
 */

package org.apache.flink;

import com.vesoft.nebula.client.meta.MetaClientImpl;
import com.vesoft.nebula.client.storage.StorageClientImpl;
import com.vesoft.nebula.client.storage.processor.ScanVertexProcessor;
import com.vesoft.nebula.data.Property;
import com.vesoft.nebula.data.Result;
import com.vesoft.nebula.data.Row;
import com.vesoft.nebula.shaded.google.common.net.HostAndPort;
import com.vesoft.nebula.storage.ScanVertexResponse;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.connector.nebula.connection.NebulaClientOptions;
import org.apache.flink.connector.nebula.connection.NebulaConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.sink.AbstractNebulaOutPutFormat;
import org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat;
import org.apache.flink.connector.nebula.sink.NebulaSinkFunction;
import org.apache.flink.connector.nebula.source.NebulaInputFormat;
import org.apache.flink.connector.nebula.source.NebulaSourceFunction;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.statement.VertexExecutionOptions;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkDemo {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkDemo.class);

    private static final String ADDRESS = "9.135.95.249:13708";
    private static final String USERNAME = "root";
    private static final String PASSWORD = "nebula";
    private static final String NAMESPACE = "nb";
    private static final String LABEL = "player";
    private static final ExecutionOptions sourceExecutionOptions;
    private static final ExecutionOptions sinkExecutionOptions;
    private static final NebulaConnectionProvider graphConnectionProvider;
    private static final NebulaConnectionProvider metaConnectionProvider;

    static {
        NebulaClientOptions nebulaClientOptions = new NebulaClientOptions
                .NebulaClientOptionsBuilder()
                .setAddress("9.135.95.249:13708")
                .setUsername(USERNAME)
                .setPassword(PASSWORD)
                .build();
        graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);

        NebulaClientOptions nebulaClientOptions1 = new NebulaClientOptions
                .NebulaClientOptionsBuilder()
                .setAddress("9.135.95.249:22343")
                .setUsername(USERNAME)
                .setPassword(PASSWORD)
                .build();
        metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions1);

        List<String> cols = Arrays.asList("name", "age");
        sourceExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSource")
                .setTag(LABEL)
                .setFields(cols)
                .setLimit(100)
                .builder();
        sinkExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSink")
                .setTag(LABEL)
                .setFields(cols)
                .setIdIndex(0)
                .setBatch(2)
                .builder();

    }

    public static void main(String[] args) throws Exception {
        testSourceSink();
    }

    public static void addNebulaSource() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // customize datasource
        SourceFunction source = new SourceFunction<List<String>>() {
            private static final long serialVersionUID = -7958462911936661287L;
            private volatile boolean isRunning = true;

            @Override
            public void run(SourceContext<List<String>> ctx) throws Exception {
                List<HostAndPort> hostAddress = new ArrayList<>();
                hostAddress.add(HostAndPort.fromString(ADDRESS));
                MetaClientImpl metaClient = new MetaClientImpl(hostAddress);
                metaClient.connect();
                StorageClientImpl storageClient = new StorageClientImpl(metaClient);

                Map<String, List<String>> returnCols = new HashMap<>();
                List<String> cols = new ArrayList<>();
                cols.add("name");
                returnCols.put("player", cols);
                Iterator<ScanVertexResponse> scanVertexResponseIterator =
                        storageClient.scanVertex(NAMESPACE, returnCols);
                if (!scanVertexResponseIterator.hasNext()) {
                    LOG.error("**** empty vertexScan result");
                }
                ScanVertexProcessor processor = new ScanVertexProcessor(metaClient);

                while (scanVertexResponseIterator.hasNext()) {
                    LOG.info("**** start to process nebula vertex");
                    Result result = processor.process(NAMESPACE, scanVertexResponseIterator.next());
                    List<Row> rows = result.getRows("player");

                    for (Row row : rows) {
                        Property[] properties = row.getProperties();
                        LOG.info("**** flink read nebula player:" + properties);
                        List<String> values = new ArrayList<String>();
                        for (Property prop : properties) {
                            values.add(prop.getValue().toString());
                        }
                        ctx.collect(values);
                    }
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        };
        DataStream<List<String>> dataStream = env.addSource(source);
        dataStream.map(Object::toString).print();
        env.execute("scan nebula nb.player");
    }


    public static void testNebulaSource() throws Exception {

        NebulaInputFormat inputFormat = new NebulaInputFormat(metaConnectionProvider)
                .setExecutionOptions(sourceExecutionOptions);

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<org.apache.flink.types.Row> dataSource = env.createInput(inputFormat);
        LOG.info("data source size={}", dataSource.count());
        dataSource.print();
    }


    public static void testNebulaSinkFunction() throws Exception {
        // source
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(10000)
                .getCheckpointConfig()
                .setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

        env.getCheckpointConfig()
                .setCheckpointTimeout(1 * 60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(3);
        env.setStateBackend(new FsStateBackend("file:///Users/nicole/tmp"));

        // construct source

        List<String> fields1 = new ArrayList<>();
        fields1.add("15");
        fields1.add("nicole");
        fields1.add("18");
        List<List<String>> player = new ArrayList<>();
        player.add(fields1);

        List<String> fields2 = new ArrayList<>();
        fields2.add("16");
        fields2.add("nicole");
        fields2.add("19");
        player.add(fields2);

        List<String> fields3 = new ArrayList<>();
        fields3.add("17");
        fields3.add("nicole");
        fields3.add("20");
        player.add(fields3);


        DataStream<List<String>> playerSource = env.fromCollection(player);
        playerSource.print();
        playerSource.countWindowAll(1);

        // sink
        AbstractNebulaOutPutFormat outPutFormat =
                new NebulaBatchOutputFormat(graphConnectionProvider)
                        .setExecutionOptions(sinkExecutionOptions);
        NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);

        playerSource.map(row -> {
            org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
            for (int i = 0; i < row.size(); i++) {
                record.setField(i, row.get(i));
            }
            LOG.info("record={}", record);
            return record;
        }).addSink(nebulaSinkFunction);
        env.execute("nebula read and write");
    }

    /**
     * read from nebula and then write into nebula
     */
    public static void testSourceSink() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000)
                .getCheckpointConfig()
                .setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

        env.getCheckpointConfig()
                .setCheckpointTimeout(20 * 3600);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // env.setStateBackend(new FsStateBackend("hdfs://127.0.0.1:9000/flink/checkpoints"));

        // source
        NebulaSourceFunction sourceFunction = new NebulaSourceFunction(metaConnectionProvider)
                .setExecutionOptions(sourceExecutionOptions);
        DataStreamSource<Row> dataSource = env.addSource(sourceFunction);

        // sink
        AbstractNebulaOutPutFormat outPutFormat =
                new NebulaBatchOutputFormat(graphConnectionProvider)
                        .setExecutionOptions(sinkExecutionOptions);
        NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);

        dataSource.print();
        dataSource.map(row -> {
            org.apache.flink.types.Row record = new org.apache.flink.types.Row(3);
            record.setField(0, row.getDefaultProperties()[0].getValue().toString());
            record.setField(1, row.getProperties()[0].getValue().toString());
            record.setField(2, row.getProperties()[1].getValue().toString());
            return record;
        }).addSink(nebulaSinkFunction);

        env.execute("NebulaSourceSink");
    }
}

就是example的代码,改了一下端口,其他没动

嗨,不好意思,我是不是没有点到回复你,看看我上层楼的代码呗

啊,最近比较忙一些,就像你说的,你只改了地址啥的,我怀疑你的debug环境是不是连不上Nebula。重点可以看下为啥直接在flink里找不到你的jar包呢。

我是按着这个打包的,flink的数词的案例能跑,但是nebula这个就不能跑

你debug的环境应该是访问不了nebula的服务,在debug环境中 telnet一下nebula的meta服务地址验证下

好的,另外,如果telnet不通怎么调整debug环境呢?

浙ICP备20010487号