Nebula FlinkConnector 使用问题

package com.kzxx.flink.test;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat;
import org.apache.flink.connector.nebula.sink.NebulaSinkFunction;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.statement.VertexExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.nebula.connection.NebulaClientOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class NebulaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dataSource = env.readTextFile("hdfs://ns1/input/carfile");

        SingleOutputStreamOperator<Row> row = dataSource.flatMap(new FlatMapFunction<String, Row>() {
            @Override
            public void flatMap(String s, Collector<Row> collector) throws Exception {
                JSONObject jsonObject = JSON.parseObject(s);
                String hphm = jsonObject.getString("car_num");
                String hpzl = jsonObject.getString("car_num_type");
                String hphmzl = jsonObject.getString("hphmzl");
                String csys = jsonObject.getString("csys");
                String hpys = jsonObject.getString("hpys");
                String clpp1 = jsonObject.getString("clpp1");
                if (null != hphm && null != hpzl && null != hphmzl && null != csys && null != hpys && null != clpp1) {
                    Row row = new Row(6);
                    row.setField(0, hphm);
                    row.setField(1, hpzl);
                    row.setField(2, csys);
                    row.setField(3, hpys);
                    row.setField(4, clpp1);
                    row.setField(5, hphmzl);
                    collector.collect(row);
                }
            }
        });

        NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
                .setGraphAddress("172.20.62.117:9669, 172.20.62.118:9669, 172.20.62.119:9669")
                .setMetaAddress("172.20.62.117:9559, 172.20.62.118:9559, 172.20.62.119:9559")
                .setUsername("root")
                .setPassword("nebula")
                .build();
        NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
        NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);

        ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                // 设置图空间
                .setGraphSpace("test_xunuo")
                // 设置Tag
                .setTag("Car")
                .setIdIndex(0)
                // 设置插入属性
                .setFields(Arrays.asList("Hphm", "Hpzl", "Csys", "Hpys", "Clpp", "Hphmzl"))
                .setPositions(Arrays.asList(0, 1, 2, 3, 4, 5))
                .builder();

        NebulaBatchOutputFormat outPutFormat =
                new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
                        .setExecutionOptions(executionOptions);

        NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);

        row.addSink(nebulaSinkFunction);
        env.execute("write nebula");
    }
}

看你日志是getSession失败啊,和meta啥关系

Caused by: com.vesoft.nebula.client.graph.exception.IOErrorException
        at com.vesoft.nebula.client.graph.net.NebulaPool.getSession(NebulaPool.java:115)
        at org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider.getSession(NebulaGraphConnectionProvider.java:53)
        at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.open(NebulaBatchOutputFormat.java:60)
        ... 9 more

请问查看哪里的日志或者配置可以分析出具体错误?

从你发的日志看出来的报错。 你可以看 nebula-graph的日志

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。

浙ICP备20010487号