package com.kzxx.flink.test;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat;
import org.apache.flink.connector.nebula.sink.NebulaSinkFunction;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.statement.VertexExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.nebula.connection.NebulaClientOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class NebulaTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataSource = env.readTextFile("hdfs://ns1/input/carfile");
SingleOutputStreamOperator<Row> row = dataSource.flatMap(new FlatMapFunction<String, Row>() {
@Override
public void flatMap(String s, Collector<Row> collector) throws Exception {
JSONObject jsonObject = JSON.parseObject(s);
String hphm = jsonObject.getString("car_num");
String hpzl = jsonObject.getString("car_num_type");
String hphmzl = jsonObject.getString("hphmzl");
String csys = jsonObject.getString("csys");
String hpys = jsonObject.getString("hpys");
String clpp1 = jsonObject.getString("clpp1");
if (null != hphm && null != hpzl && null != hphmzl && null != csys && null != hpys && null != clpp1) {
Row row = new Row(6);
row.setField(0, hphm);
row.setField(1, hpzl);
row.setField(2, csys);
row.setField(3, hpys);
row.setField(4, clpp1);
row.setField(5, hphmzl);
collector.collect(row);
}
}
});
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
.setGraphAddress("172.20.62.117:9669, 172.20.62.118:9669, 172.20.62.119:9669")
.setMetaAddress("172.20.62.117:9559, 172.20.62.118:9559, 172.20.62.119:9559")
.setUsername("root")
.setPassword("nebula")
.build();
NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);
ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
// 设置图空间
.setGraphSpace("test_xunuo")
// 设置Tag
.setTag("Car")
.setIdIndex(0)
// 设置插入属性
.setFields(Arrays.asList("Hphm", "Hpzl", "Csys", "Hpys", "Clpp", "Hphmzl"))
.setPositions(Arrays.asList(0, 1, 2, 3, 4, 5))
.builder();
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
row.addSink(nebulaSinkFunction);
env.execute("write nebula");
}
}
看你日志是getSession失败啊,和meta啥关系
Caused by: com.vesoft.nebula.client.graph.exception.IOErrorException
at com.vesoft.nebula.client.graph.net.NebulaPool.getSession(NebulaPool.java:115)
at org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider.getSession(NebulaGraphConnectionProvider.java:53)
at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.open(NebulaBatchOutputFormat.java:60)
... 9 more
从你发的日志看出来的报错。 你可以看 nebula-graph的日志